1 package com.dingxin.entrance 2 3 import java.text.SimpleDateFormat 4 import java.util.Date 5 6 import scala.actors.Actor 7 import scala.actors.Actor._ 8 /** 9 * Created by zhen on 2019/1/24. 10 */ 11 object My_Actor_Receive extends Actor{ 12 def act(){ 13 while(true){ 14 receive{ 15 case str : String => print(str + " ") // 模式匹配 16 case dat : Date => println(new SimpleDateFormat("yyyy").format(dat)) 17 case _ => println("My heart will go on !") 18 } 19 } 20 } 21 } 22 object Actor_Receive { 23 def main(args: Array[String]) { 24 val getMessage = actor{ 25 while(true){ 26 receive{ 27 case str : String => print(str) // 模式匹配 28 case dat : Date => println(new SimpleDateFormat("yyyy").format(dat)) 29 case _ => My_Actor_Receive ! null // 消息转发 30 } 31 } 32 } 33 val sendMessage = actor{ 34 while(true){ 35 receive{ 36 case str : String => getMessage ! str + " " // 消息转发 37 case dat : Date => getMessage ! dat 38 case _ => getMessage ! null 39 } 40 } 41 } 42 sendMessage ! "Scala" 43 sendMessage ! new Date() 44 sendMessage ! 2020 45 46 // 这种方式必须执行start开启,且都是并行执行,不确定先后顺序 47 My_Actor_Receive.start() 48 My_Actor_Receive ! "Spark" 49 } 50 }
结果1:
结果2:
信息交互
1 package big.data.analyse.scala 2 3 import scala.actors.Actor 4 import scala.actors.Actor._ 5 /** 6 * 消息发送与接收,可用于流计算测试的输入 7 * Created by zhen on 2018/4/15. 8 */ 9 object ActorTest { 10 def main(args: Array[String]) { 11 val actor = new HelloActor 12 actor.start//启动actor消息机制 13 var counter = 0 14 while(counter<10){ 15 actor ! "Step " + counter //发送消息 16 counter += 1 17 Thread.sleep(2000) 18 self.receive{case msg => println("返回结果:"+msg)} // 获取子线程的消息 19 } 20 } 21 } 22 class HelloActor extends Actor{ 23 def act(): Unit ={ 24 while(true){ 25 receive{ 26 case content : String => println("Message : " + content) 27 sender ! content.split(" ")(1) // 向主线程发送消息 28 } 29 } 30 } 31 }
结果3:
Actor详解
1.Actor是一个通信模型,Scala提供了Actor的实现
2.Spark1.6之前集群节点之间通信使用的是Akka,Akka底层是Actor实现的。Spark1.6之后,节点的通信变成Netty
3.Actor相当于我们理解的Thread,Actor的出现主要解决的是代码锁的问题
4.Actor底层通信实现用到了模式匹配