目录
概况
- Spark Streaming支持实时数据流的可扩展(scalable)、高吞吐(high-throughput)、容错(fault-tolerant)的流处理(stream processing)。
- 架构图
- 特性
- 可线性伸缩至超过数百个节点;
- 实现亚秒级延迟处理;
- 可与Spark批处理和交互式处理无缝集成;
- 提供简单的API实现复杂算法;
- 更多的流方式支持,包括Kafka、Flume、Kinesis、Twitter、ZeroMQ等。
原理
Spark在接收到实时输入数据流后,将数据划分成批次(divides the data into batches),然后转给Spark Engine处理,按批次生成最后的结果流(generate the final stream of results in batches)。
API
DStream
- DStream(Discretized Stream,离散流)是Spark Stream提供的高级抽象连续数据流。
- 组成:一个DStream可看作一个RDDs序列。
- 核心思想:将计算作为一系列较小时间间隔的、状态无关的、确定批次的任务,每个时间间隔内接收的输入数据被可靠存储在集群中,作为一个输入数据集。
- 特性:一个高层次的函数式编程API、强一致性以及高校的故障恢复。
- 应用程序模板
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Test")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
// ...
}
}
* 模板2
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Test")
val ssc = new StreamingContext(conf, Seconds(1))
// ...
}
}
WordCount示例
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Test")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
Input DStream
- Input DStream是一种从流式数据源获取原始数据流的DStream,分为基本输入源(文件系统、Socket、Akka Actor、自定义数据源)和高级输入源(Kafka、Flume等)。
- Receiver
- 每个Input DStream(文件流除外)都会对应一个单一的Receiver对象,负责从数据源接收数据并存入Spark内存进行处理。应用程序中可创建多个Input DStream并行接收多个数据流。
- 每个Receiver是一个长期运行在Worker或者Executor上的Task,所以会占用该应用程序的一个核(core)。如果分配给Spark Streaming应用程序的核数小于或等于Input DStream个数(即Receiver个数),则只能接收数据,却没有能力全部处理(文件流除外,因为无需Receiver)。
- Spark Streaming已封装各种数据源,需要时参考官方文档。
Transformation Operation
- 常用Transformation
名称 |
说明 |
map(func) |
Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) |
Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) |
Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) |
Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) |
Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() |
Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) |
Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() |
When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) |
When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) |
When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) |
When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) |
Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) |
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
- updateStateByKey(func)
- updateStateByKey可对DStream中的数据按key做reduce,然后对各批次数据累加。
- WordCount的updateStateByKey版本
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Test")
val ssc = new StreamingContext(conf, Seconds(1))
// updateStateByKey前需设置checkpoint
ssc.checkpoint("/spark/checkpoint")
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
// 当前批次单词的总数
val currCount = currValues.sum
// 已累加的值
val prevCount = prevValueState.getOrElse(0)
// 返回累加后的结果,是一个Option[Int]类型
Some(currCount + prevCount)
}
val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey[Int](addFunc)
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
- transform(func)
- 通过对原DStream的每个RDD应用转换函数,创建一个新的DStream。
- 官方文档代码举例
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
- Window operations
- 窗口操作:基于window对数据transformation(个人认为与Storm的tick相似,但功能更强大)。
- 参数:窗口长度(window length)和滑动时间间隔(slide interval)必须是源DStream批次间隔的倍数。
- 举例说明:窗口长度为3,滑动时间间隔为2;上一行是原始DStream,下一行是窗口化的DStream。
* 常见window operation
名称 |
说明 |
window(windowLength, slideInterval) |
Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength, slideInterval) |
Return a sliding window count of elements in the stream. |
reduceByWindow(func, windowLength, slideInterval) |
Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) |
When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) |
A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) |
When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. |
* 官方文档代码举例
// window operations前需设置checkpoint
ssc.checkpoint("/spark/checkpoint")
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
- join(otherStream, [numTasks])
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
* 官方文档代码举例2
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Output Operation
名称 |
说明 |
print() |
Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. |
saveAsTextFiles(prefix, [suffix]) |
Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) |
Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix, [suffix]) |
Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
foreachRDD(func) |
The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
缓存与持久化
- 通过persist()将DStream中每个RDD存储在内存。
- Window operations会自动持久化在内存,无需显示调用persist()。
- 通过网络接收的数据流(如Kafka、Flume、Socket、ZeroMQ、RocketMQ等)执行persist()时,默认在两个节点上持久化序列化后的数据,实现容错。
Checkpoint
- 用途:Spark基于容错存储系统(如HDFS、S3)进行故障恢复。
- 分类
- 元数据检查点:保存流式计算信息用于Driver运行节点的故障恢复,包括创建应用程序的配置、应用程序定义的DStream operations、已入队但未完成的批次。
- 数据检查点:保存生成的RDD。由于stateful transformation需要合并多个批次的数据,即生成的RDD依赖于前几个批次RDD的数据(dependency chain),为缩短dependency chain从而减少故障恢复时间,需将中间RDD定期保存至可靠存储(如HDFS)。
- 使用时机:
- Stateful transformation:updateStateByKey()以及window operations。
- 需要Driver故障恢复的应用程序。
- 使用方法
streamingContext.checkpoint(checkpointDirectory)
* 需要Driver故障恢复的应用程序(以WordCount举例):如果checkpoint目录存在,则根据checkpoint数据创建新StreamingContext;否则(如首次运行)新建StreamingContext。
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
object Test {
def main(args: Array[String]): Unit = {
val checkpointDir = "/spark/checkpoint"
def createContextFunc(): StreamingContext = {
val conf = new SparkConf().setAppName("Test")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createContextFunc _)
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
- checkpoint时间间隔
dstream.checkpoint(checkpointInterval)
* 原则:一般设置为滑动时间间隔的5-10倍。
* 分析:checkpoint会增加存储开销、增加批次处理时间。当批次间隔较小(如1秒)时,checkpoint可能会减小operation吞吐量;反之,checkpoint时间间隔较大会导致lineage和task数量增长。
性能调优
降低批次处理时间
- 数据接收并行度
- 增加DStream:接收网络数据(如Kafka、Flume、Socket等)时会对数据反序列化再存储在Spark,由于一个DStream只有Receiver对象,如果成为瓶颈可考虑增加DStream。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
* 设置“spark.streaming.blockInterval”参数:接收的数据被存储在Spark内存前,会被合并成block,而block数量决定了Task数量;举例,当批次时间间隔为2秒且block时间间隔为200毫秒时,Task数量约为10;如果Task数量过低,则浪费了CPU资源;推荐的最小block时间间隔为50毫秒。
* 显式对Input DStream重新分区:在进行更深层次处理前,先对输入数据重新分区。
inputStream.repartition(<number of partitions>)
- 数据处理并行度:reduceByKey、reduceByKeyAndWindow等operation可通过设置“spark.default.parallelism”参数或显式设置并行度方法参数控制。
- 数据序列化:可配置更高效的Kryo序列化。
设置合理批次时间间隔
- 原则:处理数据的速度应大于或等于数据输入的速度,即批次处理时间大于或等于批次时间间隔。
- 方法:
- 先设置批次时间间隔为5-10秒以降低数据输入速度;
- 再通过查看log4j日志中的“Total delay”,逐步调整批次时间间隔,保证“Total delay”小于批次时间间隔。
内存调优
- 持久化级别:开启压缩,设置参数“spark.rdd.compress”。
- GC策略:在Driver和Executor上开启CMS。