Quick Start Guide: Reactive Tweets
快速入门指南: Reactive Tweets
(reactive tweets 大概可以理解为“响应式推文”,在此可以测试下GFW是否还在正常工作 Twitter)
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
流处理的一个典型用例是从一个活动数据流中提取或者聚合我们想要的数据。在这个例子中,我们将会消费一个推文流并且从中获取跟Akka相关的信息。
We will also consider the problem inherent to all non-blocking streaming solutions: "What if the subscriber is too slow to consume the live stream of data?". Traditionally the solution is often to buffer the elements, but this can—and usually will—cause eventual buffer overflows and instability of such systems. Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios.
我们还会考虑所有非阻塞的流解决方案都有固有的一个问题:“如果采集者太慢而不能消费实时数据流的话该怎么办?”。通常采用的方案是把流的元素缓存起来,但是这样可能——并且经常会——最终导致缓冲溢出以及系统不稳定。与此不同的是,Akka Streams依靠一个内部的反向压力(backpressure)信号,使得我们可以控制在这种情况下该怎么做。
Here's the data model we'll be working with throughout the quickstart examples:
在这个快速开始示例中,我们将使用下面的数据模型。
final case class Author(handle: String) final case class Hashtag(name: String) final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
} val akka = Hashtag("#akka")
Transforming and consuming simple streams
转化和消费简单的流
implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorFlowMaterializer()
The ActorFlowMaterializer can optionally take ActorFlowMaterializerSettings which can be used to define materialization properties, such as default buffer sizes (see also Buffers in Akka Streams), the dispatcher to be used by the pipeline etc. These can be overridden withAttributes on Flow, Source, Sink and Graph.
ActorFlowMaterializer可选地接受ActorFlowMaterializerSetting作为参数,这个参为用来决定物化相关的属性,比如默认的buffer大小(参见Buffers in Akka Streams),管道(pipeline)所使用的分配器(dispatcher)。这些可以通过Flow, Source, Sink, Graph的withAttributes覆盖。
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a Source[Out, M]:
假设我们有一个已准备好的推文流,在Akka中应该这样表达Source[Out, M]:
val tweets: Source[Tweet, Unit]
Streams always start flowing from a Source[Out,M1] then can continue through Flow[In,Out,M2] elements or more advanced graph elements to finally be consumed by a Sink[In,M3] (ignore the type parameters M1, M2 and M3 for now, they are not relevant to the types of the elements produced/consumed by these classes). Both Sources and Flows provide stream operations that can be used to transform the flowing data, a Sink however does not since its the "end of stream" and its behavior depends on the type of Sink used.
流总是以一个Source[Out, M1]开始,然后经过Flow[In, Out, M2]元素,或者更加高级的graph元素,最终被Sink[In, M3]消费(先忽略类型参数M1, M2和M3, 他们与这些类所生产/消费的元素类型无关)。Sources和Flows都提供了流操作,可以用来转换流动的数据,但是Sink就没有这样的功能,因为它是“流的末端”,Sink的行为取决于所使用的Sink的类型。
In our case let's say we want to find all twitter handles of users which tweet about #akka, the operations should look familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data:
val authors: Source[Author, Unit] =
tweets
.filter(_.hashtags.contains(akka))
.map(_.author)
Finally in order to materialize and run the stream computation we need to attach the Flow to a Sink that will get the flow running. The simplest way to do this is to call runWith(or by using the shorthand version (which are defined only for the most popular sinks such as FoldSink andForeachSink):sink) on a Source. For convenience a number of common Sinks are predefined and collected as methods on the Sink companion object. For now let's simply print each author:
最后,为了物化并且运行上边的流计算,我们需要把这个Flow挂在一个使它运行起来的Sink上。最简单的作法就是在Source上调用runWith(sink)。为了方便,有一些Sink已经被预先定义好了,并且在Sink的companion object里被作为方法收集了起来。让我们先简单地打印出每个作者:
authors.runWith(Sink.foreach(println))
or by using the shorthand version (which are defined only for the most popular sinks such as FoldSink andForeachSink):
或者使用简化版(只在最流行的sink,比如FoldSink和ForeachSink里定义了):
authors.runForeach(println)
Materializing and running a stream always requires a FlowMaterializer to be in implicit scope (or passed in explicitly, like this: .run(materializer)).
物化和运行一个流总要求在隐式作用域里有一个FlowMaterializer(或者显示地传递,像这样:.run(materializer)).
Flattening sequences in streams
使流中的序列扁平化
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like flatMap works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the mapConcat combinator:
上一节中我们处理的元素是1:1的关系,这也是最常见的情况,但是有时候我们想要把一个元素映射成一些元素,得到一个“扁平化”的流,就像Scala集合中的 flatMap .为了从我们的推文流中获得一个扁平化的hashtag流,我们使用 mapConcat 这个连接器:
val hashtags: Source[Hashtag, Unit] = tweets.mapConcat(_.hashtags.toList)
Note
The name flatMap was consciously avoided due to its proximity with for-comprehensions and monadic composition. It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for our implementation of flatMap (due to the liveness issues).
Please note that the mapConcat requires the supplied function to return a strict collection (f:Out=>immutable.Seq[T]), whereas flatMap would have to operate on streams all the way through.
Broadcasting a stream
广播一个流
Now let's say we want to persist all hashtags, as well as all author names from this one live stream. For example we'd like to write all author handles into one file, and all hashtags into another file on disk. This means we have to split the source stream into 2 streams which will handle the writing to these different files.
假如我们想要持久化这个实时流中的所有hashtag,以及所有的作者名字。比如,我们想要把所有的用户名写到磁盘里的一个文件里,把所有的hashtag写到另一个文件里。这意味着我们必须把这个作为源的流分成两个流,分到不同的文件里。
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. One of these that we'll be using in this example is called Broadcast, and it simply emits elements from its input port to all of its output ports.
能够用于实现这种“扇出”结构的元素在Akka Streams里边被称为"交叉点"。我们这个例子里用到的一种交叉点被称为Broadcast(广播),它单纯地把元素从输入端发射到所有输出端。
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation graphs as Flows, Sinks or Sources, which will be explained in detail in Constructing Sources, Sinks and Flows from Partial Graphs. FlowGraphs are constructed like this:
Akka Streams有意地把线性的流结构(Flows)和非线性的、分支的流结构(FlowGraphs)分开,以便于为这两种情况提供最方便的API。图(Graph)可以表示任意复杂的流,但是就不像集合转换那样流起来很熟悉了。也可以把复杂的计算图包装成Flows, Sinks和Sources,这将在Constructing Sources, Sinks and Flows from Partial Graphs 里详细描述。FlowGraphs像这样构造:
val writeAuthors: Sink[Author, Unit] = ???
val writeHashtags: Sink[Hashtag, Unit] = ???
val g = FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._ val bcast = b.add(Broadcast[Tweet](2))
tweets ~> bcast.in
bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors
bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
}
g.run()
Note
The ~> (read as "edge", "via" or "to") operator is only available if FlowGraph.Implicits._ are imported. Without this import you can still construct graphs using the builder.addEdge(from,[through,]to) method.
As you can see, inside the FlowGraph we use an implicit graph builder to mutably construct the graph using the ~>"edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value g it is immutable, thread-safe, and freely shareable. A graph can can be run() directly - assuming all ports (sinks/sources) within a flow have been connected properly. It is possible to construct partial graphs where this is not required but this will be covered in detail in Constructing and combining Partial Flow Graphs.
你可以看到,在FlowGraph里我们可以用一个隐式的图构建器,使用~>这个“边操作符”(edge operator),构造图。一旦FlowGraph被放在g中,那么它就是不可变的、线程安全的以及可以*共享的。一个graph可以被直接run()运行——假如流中的所有端(sinks/sources)都被正确地连接起来。也可以构造不完全图(partial graph),这不是必须的,其细节将在 Constructing and combining Partial Flow Graphs 中详细阐述。
As all Akka Streams elements, Broadcast will properly propagate back-pressure to its upstream element.
就像所有的Akka Streams元素一样, Broadcast可以正确地向它的上流元素施加back-pressure.
Back-pressure in action
Back-pressure实战
One of the main advantages of Akka Streams is that they always propagate back-pressure information from stream Sinks (Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read Back-pressure explained.
Akka Streams的一个主要优势就是它们总是从流的Sinks(收集者)传播back-pressure信息到Sources(发布者)。这不是一个可选的特性,而是总是开启的。可以查看Back-pressure explained.来了解更多Akka Streams和其它Reactive Streams实现所采用的back-pressure协议。
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting in either OutOfMemoryError s or other severe degradations of service responsiveness. With Akka Streams buffering can and must be handled explicitly. For example, if we are only interested in the "most recent tweets, with a buffer of 10 elements" this can be expressed using the buffer element:
tweets
.buffer(10, OverflowStrategy.dropHead)
.map(slowComputation)
.runWith(Sink.ignore)
The buffer element takes an explicit and required OverflowStrategy, which defines how the buffer should react when it receives another element element while it is full. Strategies provided include dropping the oldest element (dropHead), dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
Buffer这种元素接受一个显式的以及必须的溢出策略(OverFlowStrategy),这个策略决定当buffer己经满的时候它接受到另外的element(译注:指流中的数据元素)时该怎么办。已提供的策略包括丢弃最旧的元素(dropHead),丢弃整个buffer,报告发生错误等。一定要选择最适于你的实际情况的策略。
Materialized values
物化的值
先插播一段Akka Streams文档中对于materialize的描述,不然下边说的东西不好理解。
Stream Materialization
流的物化
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing, but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs.
当在Akka Streams中构建流(flow)和图(graph)时,可以把它们当作正在准备一个蓝图,一个执行计划。流的物化就是获取一个流的描述(就是流程图),然后分配它运行时需要的资源。在Akka Streams的例子中,这意味着起动驱动这个流处理过程的actor,但是不仅限于此——也可能意味着打开文件或者socket连接,等——取决于这个流需要什么。
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing values or storing them in some external system. However sometimes we may be interested in some value that can be obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer this question in a streaming setting would to create a stream of counts described as "up until now, we've processed N tweets"), but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
到目前为止我们只是使用Flow来处理数据,并且把数据消费到一些外部的Sink中——或者是打印出值,或者是存储到外部系统中。(译注:意思是不在我们使用这个流的程序的内部获取结果)但是,有时候我们感兴趣的是一些从物化后的处理管道中获取的值(译注:指我们想要从这个流获取一些值,赋给变量)。比如,我们想要知道我们处理过了多少推文。在流是无穷的情况下,这个问题的答案不是那么明显(一种解答方法是创建一个计数流,描述“到目前为止,我们已经处理了N条推文),但是在有限的流中,这个问题是可以解决的,可以得到一个很好的答案,比如元素的总数。
First, let's write such an element counter using FoldSink and see how the types look like:
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) val counter: RunnableFlow[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right) val sum: Future[Int] = counter.run() sum.foreach(c => println(s"Total tweets processed: $c"))
First, we prepare the FoldSink which will be used to sum all Int elements of the stream. Next we connect thetweets stream though a map step which converts each tweet into the number 1, finally we connect the flow usingtoMat the previously prepared Sink. Remember those mysterious type parameters on Source Flow and Sink? They represent the type of values these processing parts return when materialized. When you chain these together, you can explicitly combine their materialized values: in our example we used the Keep.right predefined function, which tells the implementation to only care about the materialized type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is Future[Int] and because of using Keep.right, the resultingRunnableFlow has also a type parameter of Future[Int].
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableFlow: RunnableFlow[Future[Int]] =
tweetsInMinuteFromNow
.filter(_.hashtags contains akka)
.map(t => 1)
.toMat(sumSink)(Keep.right) // materialize the stream once in the morning
val morningTweetsCount: Future[Int] = counterRunnableFlow.run()
// and once in the evening, reusing the flow
val eveningTweetsCount: Future[Int] = counterRunnableFlow.run()
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or steering these elements which will be discussed in detail in Stream Materialization. Summing up this section, now we know what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)
Note
runWith() is a convenience method that automatically ignores the materialized value of any other stages except those appended by the runWith() itself. In the above example it translates to using Keep.right as the combiner for materialized values.
runWith()是一个例利方法,它自动的忽略了其它阶段的物化值,而只保留通过runWith()附加的那个阶段的值。在上边的例子中,它自动地使用Keep.right作为物化值的组合器。