近期做目需要用到Spark的流处理框架,故进行了官方文档的阅读,顺手翻译在此。
概要
Spark流是对于Spark核心API的拓展,从而支持对于实时数据流的可拓展,高吞吐量和容错性流处理。数据可以由多个源取得,例如:Kafka,Flume,Twitter,ZeroMQ,Kinesis或者TCP接口,同时可以使用由如map,reduce,join和window这样的高层接口描述的复杂算法进行处理。最终,处理过的数据可以被推送到文件系统,数据库和实时面板。实际上,我们可以在数据流上直接应用Spark的机器学习算法和图处理算法
在内部,其按如下方式运行。Spark Streaming接收到实时数据流同时将其划分为分批,这些数据的分批将会被Spark的引擎所处理从而生成同样按批次形式的最终流。
Spark Streaming提供了被称为离散化流或者DStream的高层抽象,这个高层抽象用于表示数据的连续流。
创建DStream的两种方式:
1. 由Kafka,Flume和Kinesis取得的数据作为输入数据流。
2. 在其他DStream进行的高层操作。
在内部,DStream被表达为RDDs的一个序列。
这个指南会指引你如何利用DStreams编写Spark Streaming的程序。你可以使用诸如Scala,Java或者Python来编写Spark Streaming的程序。文中的标签可以让你在不同编程语言间切换。
注意:少量的API在Python中要么是不可用的,要么是和其他有差异的。在本文中,这些点将会被高亮显示。
译者注:为方便起见,Spark Streaming 直接缩写为SS形式。
一个简单的例子
在深入了解如何编写你自己的SS程序之前,让我们先迅速浏览下基本的SS程序是什么样的。假设我们想统计文本数据中单词个数(数据来自于监听一个TCP接口的数据服务器)。你只需要这样做:
第一步,加载入StreamingContext,这个是所有流功能函数的主要访问点,我们使用两个执行线程和1s的批次间隔来创建本地的StreamingContext:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建一个本地的StreamingContext,两个工作线程,批次间间隔1s,
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
# 使用这个Context,就可以创建一个DStream来展现来自于TCP源的流形数据了,
# 指定 主机名 和 相应的端口即可
# 创建一个和 hostname:port相连接的DStream,形如 local:9999
lines = ssc.socketTextStream("localhost", 9999)
# 这个行的DStream表示来自数据服务器的数据流。DStream中的每个记录就是text中的一行。下来利用空格" " 将所有行分割为单词的组合
words = lines.flapMap(lambda line: line.split(" "))
# flapMap 是一个一对多映射的DStream操作,其从原DStream的每一条记录里产生相应的多个新记录。这样,每一行被分割成为多个单词,同时分割后的单词流以words DStream的形式展现。下来,将这些单词进行计数:
# 在各批次中计数单词
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# 将在这个DStream中产生的每个RDD的前十个元素打印到终端
wordCounts.pprint()
words DStream 进一步被映射成(一对一的转换)(word,1)对的DStream形式,这个“对”形式的DStream将会被reduced(一个Spark操作)以取得数据各个批次中单词的统计。最后,wordCounts.pprint()打印处每一秒所获得的少量计数值。
注意:这么多行代码被执行后,SS仅仅设置了其若开始运行将要进行的运算,但是并没有开始真正意义上的处理。在所有的转换都部署完毕后,我们需要调用下面两个操作来真正启动处理:
ssc.start() # 开始计算
ssc.awaitTermination() # 等待计算终止
完整代码见:SS的案例 NetworkWordCount
如果你已经下载并编译了Spark,就可以按如下讲解来运行这个例子。首先你需要运行Netcat(大多数类Unix系统都有的工具)作为数据服务器:
$ nc -lk 9999
接下来,在不同的终端,可以使用如下方式启动历程:
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
运行netcat终端上的任何键入的行将会被计算并打印到屏幕上。
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world
# TERMINAL 2: RUNNING network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
(hello,1)
(world,1)
基本概念
下来,我们超越简单历程的局限,阐述SS的基本知识。
Linking
和Spark相似,SS通过Maven中心也可用。为了编写你自己的SS程序,你需要将下面的依赖加进你的SBT或者Maven项目。
# Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.1</version>
</dependency>
#SBT:
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1"
为了从像Kafka,Flume,和Kinesis这些没有出现在SS核心API的源来摄取数据,你需要将相应的artifact: spark-streaming-xyz_2.10 加进依赖中。下面列举一些常见的:
Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10
对于最新的列表,请参阅 Maven仓库 以获得完整的支持源和artifacts。
初始化StreamingContext
为初始化一个SS程序,必须创建一个StreaingContext对象,此对象为所有Spark Streaming功能函数的主要入口。
1. 可以由一个SparkContext对象来创建一个StreamingContext对象
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# appName: 在集群UI上显示的应用标签。
# master: 一个 Spark, Mesos, or YARN集群URL, or 一个特定的
# 在"loacl[*]"字符串以运行于本地。
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
在实际中,运行在集群上时,你或许不希望在程序中将 master 写死,而是使用 spark-submit来发射应用并在这里接收。但是,为了本地测试和单元测试,你可以写入”local[*]”来在进程中启动SS(监测本地系统中核的数量)。
在定义了一个context后,你需要做:
1. 通过创建DStream来定义输入源
2. 通过对DStream使用转换和输出操作来定义流计算
3. 使用streamingContext.start()来接收并处理数据
4. 使用streamingContext.awaitTermination()等待处理的停止(手动或者因为任何出错).
5. 处理进程可以使用streamingContext.step()来手动停止。
需要记住的几点:
1. 一旦一个Context被启动了,任何形式的流计算都不能建立或者加入其中。
2. 一旦一个context停止了,就不能再次启动了。
3. 一个JVM一次只能活跃一个StreamingContext
4. 停止StreamingContext同时也会停止SparkContext。为了只停止StreamingContext,将stopSparkContext参数置为false。
5. 只要上一个StreamingConetxt在下一个StreamingContext创建前停止了,那么一个SparkContext就可以重用来创建多个StreamingContext。
离散数据流(DStreams)
离散数据流或者DStream是SS提供的基本抽象。其表现数据的连续流,这个输入数据流可以来自于源,也可以来自于转换输入流产生的已处理数据流。内部而言,一个DStream以一系列连续的RDDs所展现,这些RDD是Spark对于不变的,分布式数据集的抽象(详见Spark编程指南)。一个DStream中的每个RDD都包含来自一定间隔的数据,如下图:
在DStream上使用的任何操作都会转换为针对底层RDD的操作。例如:之前那个将行的流转变为词流的例子中,flatMap操作应用于行DStream的每个RDD上 从而产生words DStream的RDD。如下图:
这些底层的RDD转换是通过Spark引擎计算的。DStream操作隐藏了大多数细节,同时为了方便为开发者提供了一个高层的API。这里的一些操作会在下文中详述。
输入DStream和接收器
输入DStream是用于表示来自于源流的输入数据流的。在上边讲到的例子中,lines就是这样一个输入DStream,因为其表示了来自于netcat服务器的数据流。除了文件流(下文中会讲到)之外的所有输入流都会和接收器对象(Scala文档,Java文档)相关联,其接收来自源的数据并为了之后的处理将数据保存在Spark的内存中。
SS提供了两种类型的内置流源:
1. 基本源:StreamingContext API直接可用的源。例如:文件系统,套接字连接。
2. 高级源:类似Kafks,Flume,Kinesis,Twitter等等这样的源可以通过额外的工具类来使用。这要求链接到额外的依赖(如linking节所讲)。
这两种类型的源我们都会涉及到一些。
注意,如果你想在自己的流应用中并行接收多数据流,你可以创建多个输入DStream(这在性能优化章节会有所涉及)。这会创建创建多个接收器以同时接收多数据流。但是要注意,Spark工作/执行者是一个跨度很长的任务,因此其占用分配给Spark流应用中的一个。因而,需要给Spark Streaming应用分配足够多的核(若是本地模式就线程)从而保证处理接收到的数据并运行接收器。
铭记:
当以本地模式运行一个Spark Streaming程序时,不要使用”local”或者”local[1]”作为master URL。这些都意味着仅仅只有一个线程被用于在本地运行任务。如果你基于一个接收器(如套接字,Kafka,Flume等等)来运行输入DStream,那么仅有一个单线程来跑接收器,而没有余下线程来处理接收到的数据。因此,当运行于本地时,必须使用”local[n]”作为master URL,其中n应大于你要跑的接收器的数目(参阅Spark Peoperties 来学习如何设置master)。
Extending the logic to running on a cluster,分配给Spark Streaming应用的核的数量一定要多于接收器的数量。否则系统将只能接收数据,但不能进行处理。
基本源
如上讲解,我们已经通过一个简单的例子了解了如何从文本数据来创建一个DStream(文本数据来自于TCP套接字的连接)。除了套接字,StreamingContext API也提供了以文件和Akka actors作为输入源来创建DStream的途径。
1. 文件流:从兼容HDFS API的任何文件系统文件中读取的数据,可按如下方式创建DStream:
streamingContext.textFileStream(dataDirectory)
SS将监听目录”dataDirectory”,同时处理任何创建在此目录中的文件(不包括此目录下嵌套目录中的文件)。
注意:
1. 文件中数据格式必须一致。
2. 文件必须以原子方式移动或者重命名进入目录的方式来创建。
3. 一旦移动完毕,文件就不能在改动了。所以即使继续加入文件内容,新加入数据也不会被读取。
对于简单的文本文件,有一个更加简易的模式streamingContext.textFileStream(dataDirectroy)。文件流不需要运行一个接收器,因此不需要分配核。
Note:fileStream在Python API中没有实现,仅仅textFileStream是可用的。
2. 基于Custom Actors的流:使用streamingContext.actorStream(actorProps,actor-name)可以用接收自Akka actors的数据流创建DStreams。细则见 Custom Receiver Guide
Note:因为actors仅在Java和Scala库中可用,故actorStream不能被Python API调用。
3:RDD队列作为流:使用测试数据测试Spark Streaming应用,也可以使用streamingContext.queueStream(queueOfRdds)来基于一个RDD的队列创建一个DStream。每个在队列中的RDD都会在DStream中被当做一匹数据,同时以流方式进行处理。
更多细则关于来自sockets,files,和actors的流,请看相应的API文档。Scala:StreamingContext; Java:JavaStreamingContext;Python:StreamingContext。
高级源
Note:这些源中, Kafka,Kinesis,Flume和MQTT在Python API中可用。
这一类源需要非Spark库的外部接口,其中一些需要复杂的依赖(如Kafka和Flume)。因此,为了尽可能减少版本之间的依赖冲突问题,从这些源中创建DStreams的功能函数被移至单独的库从而可以在需要的时候显式链接。例如,若你想使用来自于Twitter的推特流数据来创建一个DStream,你需要做如下的工作:
1. 链接:将 spark-streaming-twitter_2.10加入SBT/Maven项目依赖中。
2. 编程:导入TwitterUtils类并用TwitterUtils.createStreams创建一个DStream
3. 部署:生成一个包含所有依赖的超级JAR(含有依赖 spark-streaming-twitter_2.10和其所需依赖)并部署应用。在部署那章细讲。
import org.apache.spark.streaming.twitter.*
TwitterUtils.createStream(jssc);
注意:这些高级源不能在Spark shell上使用,因此基于这些源的应用程序不能在shell上测试。如果你非要在Spark shell上使用他们,那就自己去下载Maven artifact的JAR以及其相应源并将其添加到路径中。
其中一些高级源列举如下:
Kafka:SS 1.6.1和Kafka的 0.8.2.1兼容。见Kafka集成
Flume:SS 1.6.1 和 Flume的 1.6.0兼容。见Flume集成
Kinesis:SS1.6.1和Kinesis客户端库1.2.1兼容。见Kinesis集成
Twitter:。。。略
自定义源
Note:目前还不支持Python。
完全可以抛开一般的数据源来创建输入DStream。你只需实现一个自己定义的接收器(下一章会细讲)来从自定义源接收数据并将其导入Spark。详见自定义源指导
接收器可靠性
基于稳定性可以将数据源分为两类。像Kafka和Flume的源允许对所传送的数据进行确认。若接收到来自于这些稳定源数据的系统正确确认了接收到的数据,就可以确保任何形式的故障都不会使数据丢失。这个机制引出以下两种接收器:
1. 可靠接收器 - 当数据被接收并将副本存储到Spark后,可靠接收器会给可靠源发送确认信息。
2. 不可靠接收器 - 不可靠接收器不会给源发送确认。此可用于不支持确认的源,或者是不想额外引入确认复杂度的可靠源。
如何写一个可靠接收器的讲解在:自定义接收器指引
对DStream的转换
和RDD一样,使用转换可以修改从输入DStream获取的数据。DStreams支持许多用在一般Spark RDD上的转换。其中一些常用的如下:
map(func):将源DStream中的每个元素通过一个函数func从而得到新的DStreams。
flatMap(func):和map类似,但是每个输入的项可以被映射为0或更多项。
filter(func):选择源DStream中函数func判为true的记录作为新DStreams
repartition(numPartitions):通过创建更多或者更少的partition来改变此DStream的并行级别。
union(otherStream):联合源DStreams和其他DStreams来得到新DStream
count:统计源DStreams中每个RDD所含元素的个数得到单元素RDD的新DStreams。
reduce(func):通过函数func(两个参数一个输出)来整合源DStreams中每个RDD元素得到单元素RDD的DStreams。这个函数需要关联从而可以被并行计算。
countByValue:对于DStreams中元素类型为K调用此函数,得到包含(K,Long)对的新DStream,其中Long值表明相应的K在源DStream中每个RDD出现的频率。
reduceByKey(func, [numTasks]):对(K,V)对的DStream调用此函数,返回同样(K,V)对的新DStream,但是新DStream中的对应V为使用reduce函数整合而来。Note:默认情况下,这个操作使用Spark默认数量的并行任务(本地模式为2,集群模式中的数量取决于配置参数spark.default.parallelism)。你也可以传入可选的参数numTaska来设置不同数量的任务。
join(otherStream,[numTasks]):两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream。
cogroup(otherStream,[numTasks]):两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStreams
transform(func):将RDD到RDD映射的函数func作用于源DStream中每个RDD上得到新DStream。这个可用于在DStream的RDD上做任意操作。
updateStateByKey(func):得到”状态”DStream,其中每个key状态的更新是通过将给定函数用于此key的上一个状态和新值而得到。这个可用于保存每个key值的任意状态数据。
这其中的一些转换操作值得详细讨论。
UpdateStateByKey Operation
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:
1. 定义状态,状态可以是一个任意的数据类型。
2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
在每一批次,无论在批次中有没有新数据,Spark都将更新函数用于所有存在的keys。如果更新函数返回None,那么key-value对将会被消除。
看一个示例:你想要保持对出现在text数据流中的单词进行计数。这里,运行计数是一个状态同时它是整数。如下定义更新函数:
def updateFunction(newValues, runningCOunt):
if runningCount is None:
runningCount = 0
# 将最新值加入到之前运行的计数中以获取新计数
return sum(newValues, runningCount)
这个思想用于DStream对单词的保留(即前例中pairs DStream包含(word,1)对)
更新函数会被每个单词所调用,其中newValues有一个1的序列(来自于(word, 1)对)和runningCount保留之前的计数。完整的Python代码,请见例子 state_network_wordcount.py
。
Note: 使用updateStateByKey需要对检查点目录进行配置,详细讨论见 checkpointing章节.
transform操作
转换操作(连同其参数 诸如 transformWith)允许在DStream上使用任意的RDD到RDD函数。其可以用于实现甚至DStream API中没有展露的应用。例如,将数据流中每个批次数据和其他数据集接合的功能并没有用DStream API展露出来。然而,你可以用 transform轻易实现。这提供了诸多可能性。例如,你可以通过结合预先计算的垃圾信息来做实时的数据清理???( one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.)
# 包含 垃圾信息的RDD
spamInfoRDD = sc.pickleFile(...)
# 通过将数据流结合垃圾信息来做数据清理??
cleandDStream = wordCount.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
Note: 在每个批次间隔中提供的函数都会被调用。这允许你做随时间变化的RDD操作,这也意味着RDD操作,分区个数,广播变量可以随着批次而改变。
Window Operations
Spark Streaming也提供窗口化计算,这允许将转换用于滑动窗口数据。下图说明了这类滑动窗口:
如上图所示,随着窗口沿着源DStream滑动,落到窗口中的源RDD会被合并,并进行操作从而产生属于窗口DStream类型的RDD。如上图,操作作用在三个单位时间数据上同时每次滑动2时间单位。这意味着任何窗口操作都需要指定两个参数。
窗长: 窗口持续时间(图中是3)
滑动间隔: 窗操作执行的间隔(图中是2)
这个两个参数必须乘以源DStream中的匹次间隔(图中为1)
让我们用示例对窗操作进行说明。
假设,你想拓展前例从而每隔十秒对持续30秒的数据生成word count。为做到这个,我们需要在持续30秒数据的(word,1)对DStream上应用reduceByKey。使用操作reduceByKeyAndWindow.
# reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 10)
一般窗操作如下。所有这些操作有如上述的两个参数 - 窗长和滑动间隔。
window(窗长,滑动间隔): 基于对源DStream窗化的批次进行计算返回一个新的DStream。
countByWindow(窗长,滑动间隔):返回一个滑动窗口计数流中的元素。
reduceByWindow(自定义函数,窗长,滑动间隔):通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
reduceByKeyAndWindow(自定义函数,窗长,滑动间隔,[numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来做grouping。你可以通过设置可选参数numTasks来设置不同数量的tasks。
reduceByKeyAndWindow(自定义函数,invFunc,窗长,滑动间隔,[numTasks]):这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可用。
countByValueAndWindow(窗长,滑动间隔,[numTasks]):对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的值是其在滑动窗口中频率。如上,可配置reduce任务数量。
连接操作
最终,值得注意的是在Spark Stream中执行不同类型的连接多么简单。
Stream-stream 连接
Stream可以很容易地与其他流连接。
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
在每一批次间隔,stream1产生的RDD会和stream2产生的RDD相连接。你也可以用leftOuterJoin, rightOuterJoin,fullOuterJoin。进而,在流窗口中做连接非常有用,当然也很简单。
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
在介绍DStream.transform操作时已经对这个进行了介绍。这里再介绍一个将数据集和窗化的流join例子。
dataset = … # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
实际上,你也可以动态改变你想join的数据集。如上代码中提供给transform的函数是估计每一批次的间隔,因此可以使用dataset 引用指向的当前数据集。
完整的DStreams transformation列表在API文件中可见。对于Scala API,见DStreams和PairDStreamFunctions.对于Python API,见DStream.
基于DStream的输出操作
输出操作使得DStream数据可以被推送到外部的系统,如:数据库或者文件系统。因为输出操作的确允许外部系统来使用转换的数据,故而其触发所有DStream转换的真实执行(对于RDD同样如此)。现在,下边是定义的输出操作:
print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫pprint()。
saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”.
saveAsObjectFiles(prefix, [suffix]):Python API中不可用。译者是写Python的,所以..。
saveAsHadoopFiles(prefix, [suffix]):同上。
foreachRDD(func):这是最通用的输出操作,即将函数func用于产生于stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。
使用foreachRDD的设计模式
dstream.foreachRDD是强大的原始操作,其允许数据被传送到外部系统。然而,理解如何正确有效地使用这个原始显得至关重要。需要避开一些如下所示的常见错误。
通常将数据写入外部系统需要创建一个连接对象(如连接到远程服务器TCP),同时用此连接对象将数据传送到远程系统。为达到这个目的,开发者会尝试在Spark驱动上创建一个连接,然后在Spark工作结点使用从而存储RDD中的记录。例如(在Scala):
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record)
connection.close()
dstream.foreachRDD(sendRecord)
上述代码是不对的,因为这需要连接对象序列化再由驱动送至工作结点。这样的连接对象极少在机器之间转移。这个问题会表现为序列化错误(连接对象不能序列化),初始化错误(连接对象需要在工作结点初始化)等等。正确的解决方法是在工作结点创建一个连接对象。
然而,这又会引出另一个常见的问题 - 为每个记录创建一个新的连接。例如:
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
通常,创建一个连接对象有时间和资源上的花费。因此,为每一个record都创建和销毁一个连接对象会引起不必要的高花费,同时会显著减少整个系统的吞吐量。更好的解决方案是使用rdd.foreachPartition - 创建一个单独的连接对象并使用此连接为一个RDD分区中所有records做传送工作。<即针对分区创建连接>
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
为分区创建连接就避免了为records创建带来的高创建量。
最后,可以在多RDDs/批次间重复使用连接对象来进行进一步的优化。使用者可以创建一个连接对象状态池,从而在多batch里的RDD被推送到外部系统后重复使用,这个可以进一步减少开销。
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
注意: 池中的连接应该按需懒创建并在一段时间无用后超时。这个操作高效地实现了将数据传送到外部系统。
其他需要记住的点
DStreams被输出操作延迟执行,同样的思想体现在RDD操作对RDDs的延迟执行。具体而言,DStream中的RDD操作输出操作迫使对接收数据的处理。因此,如果你的应用没有任何输出操作或者像dstream.foreachRDD()这样的其中没有任何RDD操作的输出操作,那么没有什么会执行。这个系统将简单地接收数据之后丢弃。
默认情况下,一次执行一个输出操作。同时,他们会以在应用中的定义顺序来执行。
累加器和广播变量
累加器(Accumulators)和广播变量(Broadcast variables)不能从Spark Streaming的检查点中恢复。如果你启用检查并也使用了累加器和广播变量,那么你必须创建累加器和广播变量的延迟单实例从而在驱动因失效重启后他们可以被重新实例化。如下例述:
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()["wordBlacklist"]
def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']
def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.countext)
# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True
counts = "Counts as time %s %s" % (time, rdd.filter(filterFunc).collect())
wordCounts.foreachRDD(echo)
DataFrame ans SQL Operations
你可以很容易地在流数据上使用DataFrames和SQL。你必须使用SparkContext来创建StreamingContext要用的SQLContext。此外,这一过程可以在驱动失效后重启。我们通过创建一个实例化的SQLContext单实例来实现这个工作。如下例所示。我们对前例word count进行修改从而使用DataFrames和SQL来产生word counts。每个RDD被转换为DataFrame,以临时表格配置并用SQL进行查询。
# lazily instantiated global instance of SQLContext
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
...
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print("======== %s =======" % str(time))
try:
# Get the singleton instance of SQLContext
sqlContext = getSqlContextInstance(rdd.context)
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word = w))
wordsDataFrame = sqlContext.createDataFrame(rowRdd)
# Register as table
wordsDataFrame.registerTempTable("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = sqlContext.sql("select word, count(*)as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
你也可以从不同的线程在定义于流数据的表上运行SQL查询(也就是说,异步运行StreamingContext)。仅确定你设置StreamingContext记住了足够数量的流数据以使得查询操作可以运行。否则,StreamingContext不会意识到任何异步的SQL查询操作,那么其就会在查询完成之后删除旧的数据。例如,如果你要查询最后一批次,但是你的查询会运行5分钟,那么你需要调用streamingContext.remrember(Minutes(5))(in Scala, 或者其他语言的等价操作)。
查阅DataFrames and SQL来了解更多的DataFrames。
MLlib Operations
你也可以使用MLlib提供的机器学习算法。首先,有流机器学习算法(如,流线性回归,流聚类等等),这些算法在从流数据中学习的同时也将模型用到了流数据上。除了这些,对于更多类型的机器学习算法,你可以离线学习一个模型(如在历史数据上进行学习)然后将其用在在线的流数据上。详见MLlib。
Caching / Persistence
和RDDs类似,DStreams同样允许开发者将流数据保存在内存中。也就是说,在DStream上使用persist()方法将会自动把DStreams中的每个RDD保存在内存中。当DStream中的数据要被多次计算时,这个非常有用(如在同样数据上的多次操作)。对于像reduceByWindowred和reduceByKeyAndWindow以及基于状态的(updateStateByKey)这种操作,保存是隐含默认的。因此,即使开发者没有调用persist(),由基于窗操作产生的DStreams会自动保存在内存中。
对于接收自网络(如Kafka,Flume,sockets等等)的数据作为输入流,默认的持久化是将数据备份到两个结点以保证容错。
注意,不同于RDDs,DStreams的默认持久化层级是将数据序列化保存到内存中。在Performance Tuning章进行进一步探讨。对于不同持久化层级的讨论见Spark Programming Guide.
检查点
一个流应用必须操作24/7??因此必须对与应用程序逻辑无关的错误(即系统错位,JVM崩溃等)有迅速恢复的能力。为了实现这个,Spark Streaming需要为容错存储系统checkpoint足够的信息从而使得其可以从失败中恢复过来。有两种类型的数据设置检查点。
Metadata checkpointing:将定义流计算的信息存入容错的系统如HDFS。元数据包括:
配置 – 用于创建流应用的配置。
DStreams操作 – 定义流应用的DStreams操作集合。
不完整批次 – 批次的工作已进行排队但是并未完成。
Data checkpointing: 将产生的RDDs存入可靠的存储空间。对于在多批次间合并数据的状态转换,这个很有必要。在这样的转换中,RDDs的产生基于之前批次的RDDs,这样依赖链长度随着时间递增。为了避免在恢复期这种无限的时间增长(和链长度成比例),状态转换中间的RDDs周期性写入可靠地存储空间(如HDFS)从而切短依赖链。
总而言之,元数据检查点在由驱动失效中恢复是首要需要的。而数据或者RDD检查点甚至在使用了状态转换的基础函数中也是必要的。
When to enable Checkpointing
应用程序的检查点在以下需求中是必须设置为可用的:
使用状态转换的操作 - 如果updateStateByKey和reduceByKeyAndWindow(和inverse function)用于应用中,那么必须为周期性的RDD检查点提供检查点目录。
从运行应用的驱动中恢复 - 元数据检查点用于恢复进度信息。
注意没有上述状态转换的简单流应用可以任意(不启用检查点)运行。由驱动失效中恢复也是这个情况的一部分(一些接收但还没有处理的数据可能会丢失)。这通常是可接受的,很多时候以这个方式来运行Spark Streaming应用。对于非Hadoop环境的支持以后会开发。
How to configure Checkpointing
通过在一个容错可靠的文件系统(如HDFS)设置一个存储检查点信息的目录来配置检查点可用。使用streamingContext.checkpoint(checkpointDirectory)。这允许你使用前述的状态转换。另外,如果你希望使得应用程序可以由驱动失效中恢复,你应该重写你的流应用从而拥有如下的性能。
1. 当程序首次启动,其将创建一个新的StreamingContext,设置所有的流并调用start()。
2. 当程序在失效后重启,其将依据检查点目录的检查点数据重新创建一个StreamingContext。
通过使用StraemingContext.getOrCreate很容易获得这个性能。使用方式如下:
# 创建和设置一个新的StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # 设置检查点目录
return ssc
# 从检查点数据中获取StreamingContext或者重新创建一个
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# 在需要完成的context上做额外的配置
# 无论其有没有启动
context ...
# 启动context
context.start()
contaxt.awaitTermination()
如果检查点目录(checkpointDirectory)存在,那么context将会由检查点数据重新创建。如果目录不存在(首次运行),那么函数functionToCreateContext将会被调用来创建一个新的context并设置DStreams。阅读一个Python的实例recoverable_network_wordound.py。这个实例将对网络数据的计数追加到文件中。
你也可以显示地由检查点数据中创建一个StreamingContext,并使用StreamingContext.getOrCreate(checkpointDirectory, None)来开启计算。
另外使用getOrCreate,你也需要确保驱动处理从失效中自动恢复。只有通过部署运行应用程序的基础设施来实现这个。在基础设施章节会有进一步的讨论。
注意RDDs的检查点引起存入可靠内存的开销。在RDDs需要检查点的批次里,处理的时间会因此而延长。所以,检查点的间隔需要很仔细地设置。在小尺寸批次(1秒钟)。每一批次检查点会显著减少操作吞吐量。反之,检查点设置的过于频繁导致“血统”和任务尺寸增长,这会有很不好的影响对于需要RDD检查点设置的状态转换,默认间隔是批次间隔的乘数一般至少为10秒钟。可以通过dstream.checkpoint(checkpointInterval)。通常,检查点设置间隔是5-10个DStream的滑动间隔。
Deploying Applications
这节讨论部署一个Spark Streaming应用的步骤。
所需
为运行一个Spark Streaming应用,你需要做如下的工作:
1. 使用集群管理运行一个集群 - 一般所有的Spark应用都需要这个。我们在deplayment指引中再进行详细说明。
2. 打包应用程序JAR - 你必须将你的streaming应用编译成JAR。如果你使用spark-submit来启动应用,那你将不必在JAR中提供Spark和SparkStreaming。然而,如果你的应用使用了高级源(如 Kafka,Flume,Twitter),然后你就必须将他们链接的人工生成物和依赖打包进用于部署应用的JAR中。例如,一个使用TwitterUtils的应用必须将spark-streaming-twitter_2.10和其所有的转换依赖包含进应用的JAR。
3. 为执行器配置足够的内存 - 因为接收到的数据必须被存储进内存,执行器必须配置足够的内存来保留接收到的数据。注意如果你在做10分钟的窗操作,那么系统必须将数据保存于内存至少10分钟。所以应用程序所需要的内存取决于在其上的操作。
4. 配置检查点 - 若流应用需要这个,那么Hadoop容错内存中一个目录必须被配置成检查点目录,同时应用程序以检查点信息可用于错误恢复的形式撰写。详见checkpointing章节。
5. 配置应用驱动的重启 - 为了自动从驱动的失效中恢复,那么运行流程序的基础架构必须监视驱动进程并在驱动失效后将其重启。不同的集群管理有不同的工具来达到这个目标。
- Spark Standalone - 一个Spark应用可以被提交到Spark Standalone集群上运行,也就是说应用程序驱动自己并运行在其中一个工作结点。此外,Standalone集群管理可以被架构来监察驱动并在驱动因non-zero exit码或者运行运行驱动结点崩溃而导致的驱动失效后及时恢复。详见Saprk Standalone指引的驱动模式和监察。
- YARN - Yarn支持一个相似的自动重启应用机制。详见YARN文档。
- Mesos - Marathon运用Mesos来达到这个目标。
6. 配置提前写日志 - 因为Spark1.2,我们为达到强的容错保证而引入了提前写日志。如果启用,