六,用Akka实现Spark的Master和worker之间相互通信
正文
一,Akka简介
写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口
二,Akka中的Actor模型
Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。
1)对并发模型进行了更高的抽象
2)异步、非阻塞、高性能的事件驱动编程模型
3)轻量级事件处理(1GB 内存可容纳百万级别个 Actor)
为什么 Actor 模型是一种处理并发问题的解决方案?
处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。
三,Akka实战案例之HelloActor
Akka的几个重要的角色:
MailBox:用来存储Actor之间收发的消息,是一个队列,所以收发到的消息是按顺序解析的。
Dispatcher Message:消息分发器,Actor之间发送的消息先发送到分发器,再发送到MailBox
Actor的Ref:用来发送消息。
实例:
package helloActor import akka.actor.{Actor, ActorRef, ActorSystem, Props} class HelloActor extends Actor { // 接收消息并处理 override def receive: Receive = { case "hellow" => print("en ,hellow") case _ =>{ context.stop(self) //停止自己的actorRef context.system.terminate()// 关闭ActorSystem } } } object HelloActor { private val nBfactory = ActorSystem("NBfactory") // 工厂 // 创建自己的ActorRef:通过Ref进行数据发送 private val helloActor: ActorRef = nBfactory.actorOf(Props[HelloActor], "helloActor") def main(args: Array[String]): Unit = { helloActor ! "hellow" helloActor ! "老王八" } }
四,Akka实战案例之PingPong
两个人进行pingPong进行数据发送。
第一个人的Actor:
package pingPangActor import akka.actor.Actor class FengActor extends Actor{ override def receive: Receive = { case "start" => print("峰峰说:I am ok") case "啪" =>{ println("峰峰:那必须滴!") Thread.sleep(1000) sender() ! "啪啪" // 向发送者发送数据 } } }
第二个人的Actor:
package pingPangActor import akka.actor.{Actor, ActorRef} class LongActor(fg: ActorRef) extends Actor{ override def receive: Receive = { case "start" =>{ print("龙龙:I am ok!") fg ! "啪" // 向ff发送数据 } case "啪啪" =>{ print("你真猛") Thread.sleep(1000) fg ! "啪" } } }
启动:
package pingPangActor import akka.actor.{ActorRef, ActorSystem, Props} object PingPangApp extends App{ // Actor工厂用来参数Ref private val pingPangActorSystem = ActorSystem("pingPangActorSystem") // 产生FF 的Ref private val ff: ActorRef = pingPangActorSystem.actorOf(Props[FengActor], "ff") // 参数ll 的Ref private val ll: ActorRef = pingPangActorSystem.actorOf(Props(new LongActor(ff)), "ll") ff ! "start" // 向自己的Mail发送start ll ! "start" // 向自己的Mail发送start }
五,案例基于 Actor 的聊天模型
如下实例:
智能机器人回复系统实现:
1) 创建一个 Server 端用于服务客户端发送过来的问题,并作处理并返回信息给客户
端!
2) 创建一个 Client 端,用于向服务端发送问题,并接收服务端发送过来的消息
server端:
package robot import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory class RobotServer extends Actor{ override def receive: Receive = { case "start" =>print("开始。。。") case ClientMessage(msg) => { println(s"收到客户端消息: $msg") msg match { case "你叫啥" => sender() ! ServerMessage("铁扇公主") case "你是男是女" => sender() ! ServerMessage("女") case "你又男票吗" => sender() ! ServerMessage("没有") case _ => sender() ! ServerMessage("What do you say") } } } } object RobotServer{ def main(args: Array[String]): Unit = { var host = "127.0.0.1" var port = 8808 val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin ) val server = ActorSystem("Server", config) val shanshan = server.actorOf(Props[RobotServer], "shanshan") shanshan ! "start" } }
client端:
package robot import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.io.StdIn class PeopleClient(host: String, port: Int) extends Actor{ var serverActorRef: ActorSelection = _ // 服务端的代理对象 override def preStart(): Unit = { serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/shanshan") } override def receive: Receive = { case "start" =>print("老王系列启动....") case msg: String=>{ // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive serverActorRef ! ClientMessage(msg) } case ServerMessage(msg) =>println(s"搜到服务端消息:$msg") } } object PeopleClient{ def main(args: Array[String]): Unit = { val host = "127.0.0.1" val port = 8809 val serverHost = "127.0.0.1" val serverPort = 8808 var config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin ) val client = ActorSystem("client", config) // 创建dispatch | mailbox val actorRef = client.actorOf(Props(new PeopleClient(serverHost, serverPort.toInt)), "001") // 自己给自己发送了一条消息 到自己的mailbox => receive actorRef ! "start" while (true){ val question = StdIn.readLine() // 同步阻塞的, shit actorRef ! question // mailbox -> receive } } }
参数序列化:
package robot // 服务端发送客户端的消息格式 case class ServerMessage(msg: String) // 客户端发送到服务单的消息格式 实现了serilize接口,可以用于网络传输 case class ClientMessage(msg: String)
六,用Akka实现Spark的Master和worker之间相互通信
如下图所示:
实现方式:
Master:
package cn.sheep.spark import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ class SparkMaster extends Actor{ // 存储worker的信息的 val id2WorkerInfo = collection.mutable.HashMap[String, WorkerInfo]() // override def preStart(): Unit = { // context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker) // } override def receive: Receive = { // 收到worker注册过来的信息 case RegisterWorkerInfo(wkId, core, ram) => { // 将worker的信息存储起来,存储到HashMap if (!id2WorkerInfo.contains(wkId)) { val workerInfo = new WorkerInfo(wkId, core, ram) id2WorkerInfo += ((wkId, workerInfo)) print(s"$wkId 我过来注册了------") // master存储完worker注册的数据之后,要告诉worker说你已经注册成功 sender() ! RegisteredWorkerInfo // 此时worker会收到注册成功消息 } } case HearBeat(wkId) => { // master收到worker的心跳消息之后,更新woker的上一次心跳时间 val workerInfo = id2WorkerInfo(wkId) // 更改心跳时间 val currentTime = System.currentTimeMillis() workerInfo.lastHeartBeatTime = currentTime } case CheckTimeOutWorker => { import context.dispatcher // 使用调度器时候必须导入dispatcher context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker) } case RemoveTimeOutWorker => { // 将hashMap中的所有的value都拿出来,查看当前时间和上一次心跳时间的差 3000 val workerInfos = id2WorkerInfo.values val currentTime = System.currentTimeMillis() // 过滤超时的worker workerInfos .filter(wkInfo => currentTime - wkInfo.lastHeartBeatTime > 3000) .foreach(wk => id2WorkerInfo.remove(wk.id)) println(s"-----还剩 ${id2WorkerInfo.size} 存活的Worker-----") } } } object SparkMaster { def main(args: Array[String]): Unit = { // 检验参数 if(args.length != 3) { println( """ |请输入参数:<host> <port> <masterName> """.stripMargin) sys.exit() // 退出程序 } val host = args(0) val port = args(1) val masterName = args(2) val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) val actorSystem = ActorSystem("sparkMaster", config) val masterActorRef = actorSystem.actorOf(Props[SparkMaster], masterName) // 自己给自己发送一个消息,去启动一个调度器,定期的检测HashMap中超时的worker masterActorRef ! CheckTimeOutWorker } }
worker:
package cn.sheep.spark import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ // 导入时间单位 class SparkWorker(masterUrl: String) extends Actor{ // master的 actorRef var masterProxy: ActorSelection = _ val workId = UUID.randomUUID().toString override def preStart(): Unit = { masterProxy = context.actorSelection(masterUrl) } override def receive: Receive = { case "started" => { // 自己已就绪 // 向master注册自己的信息,id, core, ram masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024) // 此时master会收到该条信息 } case RegisteredWorkerInfo => { // master发送给自己的注册成功消息 // worker 启动一个定时器,定时向master 发送心跳 import context.dispatcher context.system.scheduler.schedule(0 millis, 1500 millis, self, SendHeartBeat) } case SendHeartBeat => { // 开始向master发送心跳了 println(s"------- $workId 发送心跳-------") masterProxy ! HearBeat(workId) // 此时master将会收到心跳信息 } } } object SparkWorker { def main(args: Array[String]): Unit = { // 检验参数 if(args.length != 4) { println( """ |请输入参数:<host> <port> <workName> <masterURL> """.stripMargin) sys.exit() // 退出程序 } val host = args(0) val port = args(1) val workName = args(2) val masterURL = args(3) val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) val actorSystem = ActorSystem("sparkWorker", config) // 创建自己的actorRef val workerActorRef = actorSystem.actorOf(Props(new SparkWorker(masterURL)), workName) // 给自己发送一个以启动的消息,标识自己已经就绪了 workerActorRef ! "started" } }
参数传输封装:
package cn.sheep.spark /** * worker -> master */ // worker向master注册自己(信息) case class RegisterWorkerInfo(id: String, core: Int, ram: Int) // worker给master发送心跳信息 case class HearBeat(id: String) /** * master -> worker */ // master向worker发送注册成功消息 case object RegisteredWorkerInfo // worker 发送发送给自己的消息,告诉自己说要开始周期性的向master发送心跳消息 case object SendHeartBeat //master自己给自己发送一个检查超时worker的信息,并启动一个调度器,周期新检测删除超时worker case object CheckTimeOutWorker // master发送给自己的消息,删除超时的worker case object RemoveTimeOutWorker // 存储worker信息的类 class WorkerInfo(val id: String, core: Int, ram: Int) { var lastHeartBeatTime: Long = _ }