Scala 学习 (七) 并发编程模型Akka

时间:2021-11-21 18:01:33

一,Akka简介

二,Akka中的Actor模型

三,Akka实战案例之HelloActor

四,Akka实战案例之PingPong

五,案例基于 Actor 的聊天模型

六,用Akka实现Spark的Master和worker之间相互通信

 

 

 

 

正文

一,Akka简介

  写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口

二,Akka中的Actor模型  

  Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。

  1)对并发模型进行了更高的抽象
  2)异步、非阻塞、高性能的事件驱动编程模型
  3)轻量级事件处理(1GB 内存可容纳百万级别个 Actor)

为什么 Actor 模型是一种处理并发问题的解决方案?
  处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。

三,Akka实战案例之HelloActor

  Akka的几个重要的角色:

  MailBox:用来存储Actor之间收发的消息,是一个队列,所以收发到的消息是按顺序解析的。

  Dispatcher Message:消息分发器,Actor之间发送的消息先发送到分发器,再发送到MailBox

  Actor的Ref:用来发送消息。

实例:

package helloActor

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

class HelloActor extends Actor {
    // 接收消息并处理
    override def receive: Receive = {
        case "hellow" => print("en ,hellow")
        case _ =>{
            context.stop(self)  //停止自己的actorRef
            context.system.terminate()// 关闭ActorSystem
        }
    }
}

object HelloActor {
    private val nBfactory = ActorSystem("NBfactory")  // 工厂
    // 创建自己的ActorRef:通过Ref进行数据发送
    private val helloActor: ActorRef = nBfactory.actorOf(Props[HelloActor], "helloActor")
    def main(args: Array[String]): Unit = {
        helloActor ! "hellow"
        helloActor ! "老王八"
    }
}

四,Akka实战案例之PingPong

  两个人进行pingPong进行数据发送。

  第一个人的Actor:

package pingPangActor

import akka.actor.Actor

class FengActor extends Actor{
    override def receive: Receive = {
        case "start" => print("峰峰说:I am ok")
        case "啪" =>{
            println("峰峰:那必须滴!")
            Thread.sleep(1000)
            sender() ! "啪啪" // 向发送者发送数据
        }
    }
}

  第二个人的Actor:

package pingPangActor

import akka.actor.{Actor, ActorRef}

class LongActor(fg: ActorRef) extends Actor{
    override def receive: Receive = {
        case "start" =>{
            print("龙龙:I am ok!")
            fg ! "啪" // 向ff发送数据
        }
        case "啪啪" =>{
            print("你真猛")
            Thread.sleep(1000)
            fg ! "啪"
        }
    }
}

  启动:

package pingPangActor

import akka.actor.{ActorRef, ActorSystem, Props}

object PingPangApp extends App{

    // Actor工厂用来参数Ref
    private val pingPangActorSystem = ActorSystem("pingPangActorSystem")
    // 产生FF 的Ref
    private val ff: ActorRef = pingPangActorSystem.actorOf(Props[FengActor], "ff")
    // 参数ll 的Ref
    private val ll: ActorRef = pingPangActorSystem.actorOf(Props(new LongActor(ff)), "ll")

    ff ! "start" // 向自己的Mail发送start
    ll ! "start" // 向自己的Mail发送start
}

五,案例基于 Actor 的聊天模型

  如下实例:

  Scala 学习 (七) 并发编程模型Akka

  智能机器人回复系统实现:

  1) 创建一个 Server 端用于服务客户端发送过来的问题,并作处理并返回信息给客户
端!
  2) 创建一个 Client 端,用于向服务端发送问题,并接收服务端发送过来的消息

  server端:

package robot
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class RobotServer extends Actor{
    override def receive: Receive = {
        case "start" =>print("开始。。。")
        case ClientMessage(msg) => {
            println(s"收到客户端消息:  $msg")
            msg match {
                case "你叫啥" => sender() ! ServerMessage("铁扇公主")
                case "你是男是女" => sender() ! ServerMessage("女")
                case "你又男票吗" => sender() ! ServerMessage("没有")
                case _ => sender() ! ServerMessage("What do you say")
            }
        }
    }
}

object RobotServer{
    def main(args: Array[String]): Unit = {
        var host = "127.0.0.1"
        var port = 8808
        val config = ConfigFactory.parseString(
            s"""
              |akka.actor.provider="akka.remote.RemoteActorRefProvider"
              |akka.remote.netty.tcp.hostname=$host
              |akka.remote.netty.tcp.port=$port
            """.stripMargin
        )
        val server = ActorSystem("Server", config)
        val shanshan = server.actorOf(Props[RobotServer], "shanshan")
        shanshan ! "start"

    }
}

  client端:

package robot

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.io.StdIn

class PeopleClient(host: String, port: Int) extends Actor{

    var serverActorRef: ActorSelection = _ // 服务端的代理对象
    override def preStart(): Unit = {
        serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/shanshan")
    }

    override def receive: Receive = {
        case "start" =>print("老王系列启动....")
        case msg: String=>{
            // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive
            serverActorRef ! ClientMessage(msg)
        }
        case ServerMessage(msg) =>println(s"搜到服务端消息:$msg")
    }
}

object PeopleClient{
    def main(args: Array[String]): Unit = {
        val host = "127.0.0.1"
        val port  = 8809
        val serverHost = "127.0.0.1"
        val serverPort = 8808
        var config = ConfigFactory.parseString(
            s"""
               |akka.actor.provider="akka.remote.RemoteActorRefProvider"
               |akka.remote.netty.tcp.hostname=$host
               |akka.remote.netty.tcp.port=$port
            """.stripMargin
        )
        val client = ActorSystem("client", config)
        // 创建dispatch | mailbox
        val actorRef = client.actorOf(Props(new PeopleClient(serverHost, serverPort.toInt)), "001")
        // 自己给自己发送了一条消息 到自己的mailbox => receive
        actorRef ! "start"

        while (true){
            val question = StdIn.readLine() // 同步阻塞的, shit
            actorRef ! question // mailbox -> receive
        }
    }
}

  参数序列化:

package robot

// 服务端发送客户端的消息格式
case class ServerMessage(msg: String)

// 客户端发送到服务单的消息格式 实现了serilize接口,可以用于网络传输
case class ClientMessage(msg: String)

 六,用Akka实现Spark的Master和worker之间相互通信

  如下图所示:

Scala 学习 (七) 并发编程模型Akka

  实现方式:

  Master:

package cn.sheep.spark

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

class SparkMaster extends Actor{

    // 存储worker的信息的
    val id2WorkerInfo = collection.mutable.HashMap[String, WorkerInfo]()

//    override def preStart(): Unit = {
//        context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker)
//    }

    override def receive: Receive = {

        // 收到worker注册过来的信息
        case RegisterWorkerInfo(wkId, core, ram) => {
            // 将worker的信息存储起来,存储到HashMap
            if (!id2WorkerInfo.contains(wkId)) {
                val workerInfo = new WorkerInfo(wkId, core, ram)
                id2WorkerInfo += ((wkId, workerInfo))
                print(s"$wkId 我过来注册了------")
                // master存储完worker注册的数据之后,要告诉worker说你已经注册成功
                sender() ! RegisteredWorkerInfo // 此时worker会收到注册成功消息
            }
        }
        case HearBeat(wkId) => {
            // master收到worker的心跳消息之后,更新woker的上一次心跳时间
            val workerInfo = id2WorkerInfo(wkId)
            // 更改心跳时间
            val currentTime = System.currentTimeMillis()
            workerInfo.lastHeartBeatTime = currentTime
        }
        case CheckTimeOutWorker => {
            import context.dispatcher // 使用调度器时候必须导入dispatcher
            context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker)
        }
        case RemoveTimeOutWorker => {
            // 将hashMap中的所有的value都拿出来,查看当前时间和上一次心跳时间的差 3000
            val workerInfos = id2WorkerInfo.values
            val currentTime = System.currentTimeMillis()

            //  过滤超时的worker
            workerInfos
              .filter(wkInfo => currentTime - wkInfo.lastHeartBeatTime > 3000)
              .foreach(wk => id2WorkerInfo.remove(wk.id))
            println(s"-----还剩 ${id2WorkerInfo.size} 存活的Worker-----")
        }
    }
}

object SparkMaster {

    def main(args: Array[String]): Unit = {

        // 检验参数
        if(args.length != 3) {
            println(
                """
                  |请输入参数:<host> <port> <masterName>
                """.stripMargin)
            sys.exit() // 退出程序
        }
        val host = args(0)
        val port = args(1)
        val masterName = args(2)

        val config = ConfigFactory.parseString(
            s"""
               |akka.actor.provider="akka.remote.RemoteActorRefProvider"
               |akka.remote.netty.tcp.hostname=$host
               |akka.remote.netty.tcp.port=$port
            """.stripMargin)

        val actorSystem = ActorSystem("sparkMaster", config)

        val masterActorRef = actorSystem.actorOf(Props[SparkMaster], masterName)

        // 自己给自己发送一个消息,去启动一个调度器,定期的检测HashMap中超时的worker
        masterActorRef ! CheckTimeOutWorker
    }
}

  worker:

package cn.sheep.spark

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._ // 导入时间单位

class SparkWorker(masterUrl: String) extends Actor{

    // master的 actorRef
    var masterProxy: ActorSelection = _
    val workId = UUID.randomUUID().toString

    override def preStart(): Unit = {
        masterProxy = context.actorSelection(masterUrl)
    }

    override def receive: Receive = {

        case "started" => { // 自己已就绪
            // 向master注册自己的信息,id, core, ram
            masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024) // 此时master会收到该条信息
        }
        case RegisteredWorkerInfo => { // master发送给自己的注册成功消息
            // worker 启动一个定时器,定时向master 发送心跳
            import context.dispatcher
            context.system.scheduler.schedule(0 millis, 1500 millis, self, SendHeartBeat)
        }
        case SendHeartBeat => {
            // 开始向master发送心跳了
            println(s"------- $workId 发送心跳-------")
            masterProxy ! HearBeat(workId) // 此时master将会收到心跳信息
        }
    }
}


object SparkWorker {
    def main(args: Array[String]): Unit = {
        // 检验参数
        if(args.length != 4) {
            println(
                """
                  |请输入参数:<host> <port> <workName> <masterURL>
                """.stripMargin)
            sys.exit() // 退出程序
        }

        val host = args(0)
        val port = args(1)
        val workName = args(2)
        val masterURL = args(3)

        val config = ConfigFactory.parseString(
            s"""
              |akka.actor.provider="akka.remote.RemoteActorRefProvider"
              |akka.remote.netty.tcp.hostname=$host
              |akka.remote.netty.tcp.port=$port
            """.stripMargin)

        val actorSystem = ActorSystem("sparkWorker", config)

        // 创建自己的actorRef
        val workerActorRef = actorSystem.actorOf(Props(new SparkWorker(masterURL)), workName)

        // 给自己发送一个以启动的消息,标识自己已经就绪了
        workerActorRef ! "started"
    }


}

  参数传输封装:

package cn.sheep.spark


/**
  * worker -> master
  */

// worker向master注册自己(信息)
case class RegisterWorkerInfo(id: String, core: Int, ram: Int)

// worker给master发送心跳信息
case class HearBeat(id: String)


/**
  * master -> worker
  */
// master向worker发送注册成功消息
case object RegisteredWorkerInfo


// worker 发送发送给自己的消息,告诉自己说要开始周期性的向master发送心跳消息
case object SendHeartBeat

//master自己给自己发送一个检查超时worker的信息,并启动一个调度器,周期新检测删除超时worker
case object CheckTimeOutWorker

// master发送给自己的消息,删除超时的worker
case object RemoveTimeOutWorker

// 存储worker信息的类
class WorkerInfo(val id: String, core: Int, ram: Int) {
    var lastHeartBeatTime: Long = _
}