Scala进阶之路-并发编程模型Akka入门篇

时间:2023-12-21 23:37:26

               Scala进阶之路-并发编程模型Akka入门篇

                                  作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

一.Akka Actor介绍

1>.Akka介绍

  写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口。

2>.Akka 中 中 Actor  模型

  Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。

3>.Akaka的特点

  第一:它是对并发模型进行了更高的抽象;
  第二:它是异步、非阻塞、高性能的事件驱动编程模型;
  第三:它是轻量级事件处理(1GB 内存可容纳百万级别个 Actor);

Scala进阶之路-并发编程模型Akka入门篇

4>.为什么 Actor 模型是一种处理并发问题的解决方案?

  处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。

5>.Maven依赖

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>cn.org.yinzhengjie</groupId>
<artifactId>MyActor</artifactId>
<version>1.0-SNAPSHOT</version> <!-- 定义一下常量 -->
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<akka.version>2.4.17</akka.version>
</properties> <dependencies>
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency> <!-- 添加akka的actor依赖 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency> <!-- 多进程之间的Actor通信 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies> <!-- 指定插件-->
<build>
<!-- 指定源码包和测试包的位置 -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin> </plugins>
</build>
</project>

  自定义默认的源代码包和测试包的位置,需要手动穿件Source Root目录哟,如下图:

Scala进阶之路-并发编程模型Akka入门篇

二.编写HelloActor

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.actor import akka.actor.{Actor, ActorSystem, Props} import scala.io.StdIn class HelloActor extends Actor{
// 重写接受消息的偏函数,其功能是接受消息并处理
override def receive: Receive = {
case "你好帅" => println("竟说实话,我喜欢你这种人!")
case "丑八怪" => println("滚犊子 !")
case "stop" => {
context.stop(self) // 停止自己的actorRef
context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
}
}
} object HelloActor {
/**
* 创建线程池对象MyFactory,用来创建actor的对象的
*/
private val MyFactory = ActorSystem("myFactory") //里面的"myFactory"参数为线程池的名称
/**
* 通过MyFactory.actorOf方法来创建一个actor,注意,Props方法的第一个参数需要传递我们自定义的HelloActor类,
* 第二个参数是给actor起个名字
*/
private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor") def main(args: Array[String]): Unit = {
var flag = true
while(flag){
/**
* 接受用户输入的字符串
*/
print("请输入您想发送的消息:")
val consoleLine:String = StdIn.readLine()
/**
* 使用helloActorRef来给自己发送消息,helloActorRef有一个叫做感叹号("!")的方法来发送消息
*/
helloActorRef ! consoleLine
if (consoleLine.equals("stop")){
flag = false
println("程序即将结束!")
}
/**
* 为了不让while的运行速度在receive方法之上,我们可以让他休眠0.1秒
*/
Thread.sleep(100)
}
}
}

  以上代码执行结果如下:

Scala进阶之路-并发编程模型Akka入门篇

三.两个actor通信案例-模拟下棋对话

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.actor import akka.actor.{ActorSystem, Props} import akka.actor.{Actor, ActorRef} /**
* 定义玩家1
*/
class player1Actor(val p2: ActorRef) extends Actor{
// receive方法是负责处理消息的
override def receive: Receive = {
case "start" => {
println("棋圣:I'm OK !")
p2 ! "该你了"
}
case "将军" => {
println("棋圣:你真猛!")
Thread.sleep(1000)
p2 ! "该你了"
}
}
} /**
* 定义玩家2
*/
class player2Actor extends Actor{ override def receive: Receive = {
case "start" => println("棋仙说:I'm OK !")
case "该你了" => {
println("棋仙:那必须滴!")
Thread.sleep(1000)
/**
* 注意,这个“sender()”,其实就是对ActorRef的一个引用。它指的是给发送"该你了"的这个对象本身!
*/
sender() ! "将军"
}
}
} object ChineseChess extends App{
// 创建 actorSystem的工厂,用来生产ActorRef对象!
private val ChineseChessActorSystem = ActorSystem("Chinese-chess")
/**
* 通过actorSystem创建ActorRef
*/
private val p2 = ChineseChessActorSystem.actorOf(Props[player2Actor], "player2") //创建player2Actor对象
private val p1 = ChineseChessActorSystem.actorOf(Props(new player1Actor(p2)), "player1") //创建player1Actor对象 p2 ! "start"
p1 ! "start"
}

  运行以上代码输出结果如下:

Scala进阶之路-并发编程模型Akka入门篇

四.服务端和客户端交互的小程序

1>.服务端代码

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.robot import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory class ServerActor extends Actor{
/**
* receive方法是用来处理客户端发送过来的问题的
*/
override def receive: Receive = {
case "start" => println("天猫系统已启动...") case ClientMessage(msg) => {
println(s"收到客户端消息:$msg")
msg match {
/**
* sender()发送端的代理对象, 发送到客户端的mailbox中 -> 客户端的receive
*/
case "你叫啥" =>
sender() ! ServerMessage("本宝宝是天猫精灵")
case "你是男是女" =>
sender() ! ServerMessage("本宝宝非男非女")
case "你有男票吗" =>
sender() ! ServerMessage("本宝宝还小哟")
case "stop" =>
context.stop(self) // 停止自己的actorRef
context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
println("天猫系统已停止...")
case _ =>
sender() ! ServerMessage("对不起,主人,我不知道你在说什么.......")
}
}
}
} object ServerActor {
def main(args: Array[String]): Unit = {
//定义服务端的ip和端口
val host = "127.0.0.1"
val port = 8088
/**
* 使用ConfigFactory的parseString方法解析字符串,指定服务端IP和端口
*/
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
/**
* 将config对象传递给ActorSystem并起名为"Server",为了是创建服务端工厂对象(ServerActorSystem)。
*/
val ServerActorSystem = ActorSystem("Server", config)
/**
* 通过工厂对象创建服务端的ActorRef
*/
val serverActorRef = ServerActorSystem.actorOf(Props[ServerActor], "Miao~miao")
/**
* 到自己的mailbox -》 receive方法
*/
serverActorRef ! "start"
}
}

2>.客户端代码

 /*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.robot import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory import scala.io.StdIn class ClientActor(host: String, port: Int) extends Actor{ var serverActorRef: ActorSelection = _ // 服务端的代理对象 // 在receive方法之前调用
override def preStart(): Unit = {
// akka.tcp://Server@127.0.0.1:8088
serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/Miao~miao")
}
// mailbox ->receive
override def receive: Receive = { // shit
case "start" => println("2018天猫精灵为您服务!")
case msg: String => { // shit
serverActorRef ! ClientMessage(msg) // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive
}
case ServerMessage(msg) => println(s"收到服务端消息:$msg")
}
} object ClientActor {
def main(args: Array[String]): Unit = { //指定客户端的IP和端口
val host = "127.0.0.1"
val port = 8089 //指定服务端的IP和端口
val serverHost = "127.0.0.1"
val serverPort = 8088 /**
* 使用ConfigFactory的parseString方法解析字符串,指定客户端IP和端口
*/
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin) /**
* 将config对象传递给ActorSystem并起名为"Server",为了是创建客户端工厂对象(clientActorSystem)。
*/
val clientActorSystem = ActorSystem("client", config) // 创建dispatch | mailbox
val clientActorRef = clientActorSystem.actorOf(Props(new ClientActor(serverHost, serverPort.toInt)), "Client") clientActorRef ! "start" // 自己给自己发送了一条消息 到自己的mailbox => receive /**
* 接受用户的输入信息并传送给服务端
*/
while (true) {
Thread.sleep(500)
/**
* StdIn.readLine方法是同步阻塞的
*/
val question = StdIn.readLine("请问有什么我可以帮你的吗?>>>")
clientActorRef ! question
if (question.equals("stop")){
Thread.sleep(500)
println("程序即将结束")
System.exit(0)
}
}
}
}

3>.先执行服务端再执行客户端并输入相应信息测试结果如下:

  客户端运行结果如下:

Scala进阶之路-并发编程模型Akka入门篇

  服务端运行结果如下:

Scala进阶之路-并发编程模型Akka入门篇