package test
import java.io.File
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.ReceiveTimeout
import akka.actor.Terminated
import akka.actor.Props
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
object Sender {
def main(args: Array[String]): Unit = {
val file =new File("D:/calculator.conf");
System.out.println(file);
val system = ActorSystem("Sys", ConfigFactory.parseFile(file))
val remoteHostPort = if (args.length >= 1) args(0) else "127.0.0.1:2553"
val remotePath = s"akka.tcp://Sys@$remoteHostPort/user/rcv"
val totalMessages = if (args.length >= 2) args(1).toInt else 500000
val burstSize = if (args.length >= 3) args(2).toInt else 5000
val payloadSize = if (args.length >= 4) args(3).toInt else 100
system.actorOf(Sender.props(remotePath, totalMessages, burstSize, payloadSize), "snd")
}
def props(path: String, totalMessages: Int, burstSize: Int, payloadSize: Int): Props =
Props(new Sender(path, totalMessages, burstSize, payloadSize))
private case object Warmup
case object Shutdown
sealed trait Echo
case object Start extends Echo
case object Done extends Echo
case class Continue(remaining: Int, startTime: Long, burstStartTime: Long, n: Int)
extends Echo
}
class Sender(path: String, totalMessages: Int, burstSize: Int, payloadSize: Int) extends Actor {
import Sender._
val payload: Array[Byte] = Vector.fill(payloadSize)("a").mkString.getBytes
var startTime = 0L
var maxRoundTripMillis = 0L
context.setReceiveTimeout(3.seconds)
sendIdentifyRequest()
def sendIdentifyRequest(): Unit =
context.actorSelection(path) ! Identify(path)
def receive = identifying
def identifying: Receive = {
case ActorIdentity(`path`, Some(actor)) =>
context.watch(actor)
context.become(active(actor))
context.setReceiveTimeout(Duration.Undefined)
self ! Warmup
case ActorIdentity(`path`, None) => println(s"Remote actor not available: $path")
case ReceiveTimeout => sendIdentifyRequest()
}
def active(actor: ActorRef): Receive = {
case Warmup =>
sendBatch(actor, burstSize)
actor ! Start
case Start =>
println(s"Starting benchmark of $totalMessages messages with burst size $burstSize and payload size $payloadSize")
startTime = System.nanoTime
val remaining = sendBatch(actor, totalMessages)
if (remaining == 0)
actor ! Done
else
actor ! Continue(remaining, startTime, startTime, burstSize)
case c @ Continue(remaining, t0, t1, n) =>
val now = System.nanoTime()
val duration = (now - t0).nanos.toMillis
val roundTripMillis = (now - t1).nanos.toMillis
maxRoundTripMillis = math.max(maxRoundTripMillis, roundTripMillis)
if (duration >= 500) {
val throughtput = (n * 1000.0 / duration).toInt
println(s"It took $duration ms to deliver $n messages, throughtput $throughtput msg/s, " +
s"latest round-trip $roundTripMillis ms, remaining $remaining of $totalMessages")
}
val nextRemaining = sendBatch(actor, remaining)
if (nextRemaining == 0)
actor ! Done
else if (duration >= 500)
actor ! Continue(nextRemaining, now, now, burstSize)
else
actor ! c.copy(remaining = nextRemaining, burstStartTime = now, n = n + burstSize)
case Done =>
val took = (System.nanoTime - startTime).nanos.toMillis
val throughtput = (totalMessages * 1000.0 / took).toInt
println(s"== It took $took ms to deliver $totalMessages messages, throughtput $throughtput msg/s, " +
s"max round-trip $maxRoundTripMillis ms, burst size $burstSize, " +
s"payload size $payloadSize")
actor ! Shutdown
case Terminated(`actor`) =>
println("Receiver terminated")
}
/** * @return remaining messages after sending the batch */
def sendBatch(actor: ActorRef, remaining: Int): Int = {
val batchSize = math.min(remaining, burstSize)
(1 to batchSize) foreach { x => actor ! payload }
remaining - batchSize
}
}
package test
import java.io.File
import akka.actor.Actor
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props
object Receiver {
def main(args: Array[String]): Unit = {
val file =new File("D:/remotelookup.conf");
System.out.println(file);
val system = ActorSystem("Sys", ConfigFactory.parseFile(file))
system.actorOf(Props[Receiver], "rcv")
}
}
class Receiver extends Actor {
import Sender._
def receive = {
case m: Echo => sender() ! m
case _ =>
}
}
calculator.conf
include "common"
akka {
# LISTEN on tcp port 2552
remote.netty.tcp.port = 2552
}
common.conf
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
}
}
}
remotelookup.conf
include "common"
akka {
remote.netty.tcp.port = 2553
}
全部从官网同一个akka版本里的
akka-actor_2.10-2.3.11.jar
akka-remote_2.10-2.3.11.jar
config-1.2.1.jar
netty-3.8.0.Final.jar
protobuf-java-2.5.0.jar