I want to have actors running on various processes (or nodes) send messages to other actors running off of different processes (or nodes), all while maintaining fault-tolerance and load balancing. I am currently attempting to use Akka.Cluster's Sharding feature to accomplish this.


However, I am not sure how to accomplish this...


I have the following code that reflects my seed node:


let configurePort port =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
              serialization-bindings {
                "System.Object" = hyperion
          remote {
            helios.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
          cluster {
            auto-down-unreachable-after = 5s
            seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"

let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored

// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2551)
let shardRegion1 = spawnSharded id system1 "shardRegion1" <| props (actorOf2 consumer)

let system2 = System.create "cluster-system" (configurePort 2552)
let shardRegion2 = spawnSharded id system2 "shardRegion2" <| props (actorOf2 consumer)

let system3 = System.create "cluster-system" (configurePort 2553)
let shardRegion3 = spawnSharded id system3 "shardRegion3" <| props (actorOf2 consumer)

// NOTE: Even thou we sent all messages through single shard region,
//       some of them will be executed on the second and third one thanks to shard balancing
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-1", "entity-2", "hello world 2")
shardRegion1 <! ("shard-2", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")


let printShards shardRegion =
    async {
        let! (reply:AskResult<ShardRegionStats>) = (retype shardRegion) <? GetShardRegionStats.Instance
        let (stats: ShardRegionStats) = reply.Value
        for kv in stats.Stats do
            printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value
    } |> Async.RunSynchronously

let printNodes() =
    printfn "\nShards active on node 'localhost:2551':"
    printShards shardRegion1
    printfn "\nShards active on node 'localhost:2552':"
    printShards shardRegion2
    printfn "\nShards active on node 'localhost:2553':"
    printShards shardRegion3


The output looks something like this:


Shards active on node 'localhost:2551':
    Shard 'shard-1' has 2 entities on it
    Shard 'shard-2' has 2 entities on it

Shards active on node 'localhost:2552':


I then have a separate process that executes the following code:


let configurePort port =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
              serialization-bindings {
                "System.Object" = hyperion
          remote {
            helios.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = "0"
          cluster {
            auto-down-unreachable-after = 5s
            seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"

let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored

// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2554)
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer)

let system2 = System.create "cluster-system" (configurePort 2555)
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer)

let system3 = System.create "cluster-system" (configurePort 2556)
let shardRegion3 = spawnSharded id system3 "printer" <| props (actorOf2 consumer)

My cluster system (running on a separate process) recognizes new nodes that are joining:


> [INFO][3/15/2017 9:12:13 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52953] is JOINING, roles []
[INFO][3/15/2017 9:12:14 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52956] is JOINING, roles []
[INFO][3/15/2017 9:12:15 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52961] is JOINING, roles []
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52953] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52956] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52961] to [Up]



In conclusion, I want to have actors running on various processes (or nodes) send messages to other actors running off of different processes (or nodes) while maintaining fault-tolerance and load balancing. I am currently attempting to use Akka.Cluster's Sharding feature to accomplish this.




open System
open System.IO
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug")

#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll"
#r @"C:\Users\Snimrod\Documents\Visual Studio 2015\Projects\Temp\packages\Akka.FSharp.1.1.3\lib\net45\Akka.FSharp.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll"

open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence

open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion

In order to maintain consistent view of the shards and their locations, Akka.Cluster.Sharding persistent backend must point to a database that is visible to all of the processes. In your configuration, you're using akka.persistence.journal.inmem which is in-memory data store (used only for tests and development). It won't be visible from another processes.


You'll need to configure a persistent backend in order for shards to be visible between nodes living on different machines/processes. You can do that i.e. by using Akka.Persistence.SqlServer or any other plugin. This is the most basic configuration for your persistence backend used only by sharding:


akka.persistence {
    journal {
        plugin = "akka.persistence.journal.sql-server"
        sql-server {
            connection-string = "<connection-string>"
            auto-initialize = on
    snapshot-store {
        plugin = "akka.persistence.snapshot-store.sql-server"
        sql-server {
            connection-string = "<connection-string>"
            auto-initialize = on

For something more practical, please refer to this article.


Also keep in mind that both Akka.Cluster.Sharding and Akka.Persistence plugins are available only in prerelease mode (so you need to install-package with -pre flag).

还要记住这两个akca . cluster。分片和Akka。持久性插件只能在预发布模式下使用(因此需要使用-pre标志安装包)。



