Scala并发编程【进阶】

时间:2020-12-04 17:59:48
 1 package com.dingxin.entrance
 2 
 3 import java.text.SimpleDateFormat
 4 import java.util.Date
 5 
 6 import scala.actors.Actor
 7 import scala.actors.Actor._
 8 /**
 9   * Created by zhen on 2019/1/24.
10   */
11 object My_Actor_Receive extends Actor{
12   def act(){
13     while(true){
14       receive{
15         case str : String => print(str + " ") // 模式匹配
16         case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
17         case _ => println("My heart will go on !")
18       }
19     }
20   }
21 }
22 object Actor_Receive {
23   def main(args: Array[String]) {
24     val getMessage = actor{
25       while(true){
26         receive{
27           case str : String => print(str) // 模式匹配
28           case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
29           case _ => My_Actor_Receive ! null // 消息转发
30         }
31       }
32     }
33     val sendMessage = actor{
34       while(true){
35         receive{
36           case str : String => getMessage ! str + " " // 消息转发
37           case dat : Date => getMessage ! dat
38           case _ => getMessage ! null
39         }
40       }
41     }
42     sendMessage ! "Scala"
43     sendMessage ! new Date()
44     sendMessage ! 2020
45 
46     // 这种方式必须执行start开启,且都是并行执行,不确定先后顺序
47     My_Actor_Receive.start()
48     My_Actor_Receive ! "Spark"
49   }
50 }

结果1:

  Scala并发编程【进阶】

结果2:

  Scala并发编程【进阶】

 信息交互

 1 package big.data.analyse.scala
 2 
 3 import scala.actors.Actor
 4 import scala.actors.Actor._
 5 /**
 6   * 消息发送与接收,可用于流计算测试的输入
 7   * Created by zhen on 2018/4/15.
 8   */
 9 object ActorTest {
10   def main(args: Array[String]) {
11     val actor = new HelloActor
12     actor.start//启动actor消息机制
13     var counter = 0
14     while(counter<10){
15       actor ! "Step " + counter //发送消息
16       counter += 1
17       Thread.sleep(2000)
18       self.receive{case msg => println("返回结果:"+msg)} // 获取子线程的消息
19     }
20   }
21 }
22 class HelloActor extends Actor{
23   def act(): Unit ={
24     while(true){
25       receive{
26         case content : String => println("Message : " + content)
27           sender ! content.split(" ")(1) // 向主线程发送消息
28       }
29     }
30   }
31 }

结果3:

  Scala并发编程【进阶】

 Actor详解

  1.Actor是一个通信模型,Scala提供了Actor的实现

  2.Spark1.6之前集群节点之间通信使用的是Akka,Akka底层是Actor实现的。Spark1.6之后,节点的通信变成Netty

  3.Actor相当于我们理解的Thread,Actor的出现主要解决的是代码锁的问题

  4.Actor底层通信实现用到了模式匹配