官方文档地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html
Spark Streaming是spark api的扩展
能实现可扩展,高吞吐,可容错,的流式处理
从外接数据源接受数据流,处理数据流使用的是复杂的高度抽象的算法函数map reduce join window等
输出的数据可以存储到文件系统和数据库甚至是直接展示在命令行
也可以应用ml 和graph processing在这些数据流上
spark streaming本质还是spark只是实现了所谓的微批量
spark streaming中连续数据流用DStream表示,DStream可以从输入数据创建,也可以从其他的DStream转化来
本质上DStream是一组RDD组成的序列
一个迅速上手的例子:
# coding: utf-8
# In[ ]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# In[ ]:
#创建两个工作线程,将这两个线程喂给StreamingContext,时间间隔是1秒
#这里有个错误Cannot run multiple SparkContexts at once
#参考:http://*.com/questions/28259756/how-to-create-multiple-sparkcontexts-in-a-console
#先要尝试关闭sc才能创建多个SparkContext
try:
sc.stop()
except:
pass
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
#sc.stop()
# In[ ]:
#创建一个DStream,从本机的这个端口取数据
lines = ssc.socketTextStream("localhost", 9999)
# In[ ]:
#lines中的数据记录是一行行的文本,下面将文本切割成字
words = lines.flatMap(lambda line: line.split(" "))
#这里的flatMap是一对多DStream操作,生成一个新的DStream,就是变成了字流了
# In[ ]:
#下面数一数每一批次的字的个数
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# In[ ]:
# 打印DStream中的每个RDD的前十个元素
wordCounts.pprint()
# In[ ]:
ssc.start() # 开始计算
ssc.awaitTermination() #等待计算停止
# In[ ]:
#将这个文件的py脚本提交计算: ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
#在命令行输入nc -lk 9999 然后模拟输入字符串文本,那么在pyspark命令行会打印出每秒钟输入的数据的统计结果
基本概念
要想写自己的streaming程序,首先要添加maven或者sbt的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
对于外部输入流的依赖现在不在核心api中了,需要单独添加依赖。
初始化StreamingContext
可以从SparkContext对象中创建
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
appName是程序名字可以在UI中显示
master是Spark,Mesos,YARN cluster URL 或者是 声明的Local[*]字符串使得运行在本地模式
当运行在集群上的时候,不要写死在代码里面,而是要从spark-submit启动,传递进去。
对于本地测试或者单元测试,可以传递local[*]
在context被定义之后,要做下面的事情:
1. 通过创建DStream去定义输入资源
2. 通过对DStream的转换和输出操作定义流的计算
3. 使用streamingContext.start()开始接收数据并处理数据
4. 使用streamingContext.awaitTermination()等待处理过程停止(手动或者因为错误)
5. 可以使用streamingContext.stop()手动停止处理过程
要点:
1. 一旦一个context被启动,不能再添加任何新的流进去了
2. 一旦context被停止,就不能重启了
3. 在JVM中只能有一个StreamingContext被激活
4. 在streamingContext上使用stop()也会停止SparkContext(),要想单独停止前者,设置stop()的可选的参数 stopSparkContext()参数为false
5. 一个sparkContext可以被重复使用,去创建多个StreamingContext,只要前一个StreamingContext被单独停止,下一个就可以接着创建。
Discretized Streams(DStreams)
是Spark Streaming的基本的抽象,代表了一个连续的数据流
可以是从数据源接受的输入数据流,也可以是转换输入数据流得到的数据流
一个DStream代表一串RDD,RDD是不可分割的基本数据单元抽象。
每一个在DStream中的RDD包含特定时间间隔的数据
如果时间是1秒的话,从0-1秒的很多RDD,与从1-2的RDD等,组成了DStream
任何对DStream的操作,都会被翻译成对底层的RDD的操作,例如,将Lines转换成words的操作
对DStream的操作,隐藏了很多细节,给开发者提供高度抽象的API
输入DStreams和Receivers
每一个输出DStream(除了file strem)都关联一个Receiver对象,这个对象从数据源接受数据存储在spark的内存中等待处理。
Spark Streaming支持两种类型的内建数据源
1. Basic Sources:直接在StreamingContext API中可用的Sources,比如file systems和socket connections
2. Advances Sources:从外部工具类中调过来的例如Kafka,Flume,Kinesis等,需要链接一些外部依赖。
关键点:
1. 注意在本地运行SparkStreaming的时候,不要使用local或者local[1]作为主机的URL,因为这些都是意味着开一个线程,因为如果只是输入一个数据源,那么这个单一的线程会用来运行receiver,那么没有线程去处理接收到的数据了。所以本地运行的时候,参数local[n]中的n最好大于运行中的receiver。
2. 相应的在集群上运行的时候,分配的核心数要比接收者的数目多,否则的话系统能接收数据,但是不能处理。
Basic Sources 基础数据源
在基础例子中已经看到过ssc.socketTextStream(),下面看file streams
streamingContext.textFileStream(dataDirectory)
可以创建一个DStream
spark会监控这个路径,处理在那个路径中的任何文件
注意:
1. 路径中的文件要有相同的数据格式
2. 文件必须通过自动专业或者重命名进入到这个路径的
3. 一旦进入,这些文件不能被改变,所以如果文件被连续附加,那么新的数据不能被读取的
针对简单的文本文件,有个简单的方法streamingContext.textFileStream(dataDirectory)
因为file stream不需要运行receiver所以不需要分配核心或者线程去处理
Python API不支持fileStream只是支持textFileStream
可以使用RDD的queue去创建DStream,使用streamingContext.queueStream(queueofRDDs)
Advanced Souces 高级数据源
As of Spark 2.0.0, Kafka, Kinesis and Flume are available in the Python API.
因为这些高级的数据源的支持比较复杂,需要依赖单独的包,现在被转移出了核心的API,所以不能再shell中使用,也就不能在shell中测试这些数据源。如果非要的话,需要下载对应的maven jar包,和对应的依赖,然后添加到classpath
Custom Sources自定义数据源
现在python还不支持,但是要想从自定义的数据源创建DStream,就要自己实现用户定义的receiver,这可以接受自定义的数据,并且发送到spark中
Receiver Reliability 接收者的可靠性
按照可靠性可以把数据分为两种,有的数据源例如Kafka和Flume运行传送被回复的数据、
如果系统正确接受到这些要被确认的数据,可以保证不会因为某种失败而导致数据丢失。这导致两种类型的接收者。
1. 可靠的接收者:当数据被接受并存储到spark之后,必须回复确认消息给可靠数据源。
2. 不可靠的接收者:不用回复确认,针对没有确认机制的数据源,或者有确认机制但是不需要执行复杂确认机制的数据源。
Transformations on DStreams DStream的转换
map(fun) | 这个函数将输入的DStream的每一个元素传递给func得到一个新的DStream |
flatMap(func) | 同上,只是每个输入可以map到多个输出项 |
filter(func) | 选择func返回结果为true的DStream中的记录组成新的DStream |
reparitition(numPartitions) | 通过改变划分去改变DStream的并行水平 |
union(otherStream) | 合并 |
count() | 返回一个新的DStream,是原始的DStream中的每个RDD的元素的数目 |
reduce(func) | 使用函数func聚合原始数据汇总的每个RDD得到一个新的单一元素RDD组成的DStream |
countByValue() | 调用类型K的DStream时候返回一个新的DStream有(K,long)对,其中long是k在每个RDD中出现的频率 |
reduceByKey(func,[numTasks]) | 将(k,v)中的v按照k使用func进行聚合 |
join(otherStream,[numTasks]) | (k,v)(k,w)得到(k,(v,w)) |
cogroup(otherStream,[numTasks]) | (k,v)(k,w)得到(k,Seq[V],Seq[W]) |
tansform(func) | 作用在每个RDD上得到新的RDD组成的DStream |
updateStateByKey(func) | 每个键都是通过将给定的函数作用在其值上得到的新的DStream |
下面是对某些转换的详细的讨论
UpdateStateByKey Operation
允许当使用新的信息连续更新的时候,维护任意的状态
1. 定义状态,这个状态可以是任意的数据类型
2. 定义状态的更新函数
不管有没有数据,spark都会更新状态,如果更新函数返回为none那么键值对就会被消除
假设想要维持一个运行时数目,那么运行时数目就是一个状态,是个整数,下面是一个更新函数
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
假设使用前面的paris DStream包含(word,1)对
runningCounts = pairs.updateStateByKey(updateFunction)
转换操作:
tranform操作允许任意的RDD-to-RDD函数应用到DStream,下面是一个例子:
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
窗口操作:
允许我们应用transformation到一个滑动窗口的数据上
上面的例子说明了每个窗口操作要声明下面的两个参数
windows length:窗口的长度,上面的例子是3
sliding interval:窗口被执行的时间间隔,例子中的书2
上面的两个参数都应该是元素DStream批间隔(上面的间隔是1)的整数倍
下面是一个窗口操作的例子,假设我们想生成过去的30秒的数据的wordcounts,每10秒钟一次
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
下面是一个常见的窗口操作的描述,所有的操作都传递两个参数,窗口的长度和时间间隔
window(长度,间隔) | 原来的DStream按照新的指定窗口进行切分返回新的DStream |
countByWindow(长度,间隔) | 返回滑动窗口的元素个数 |
reduceByWindow(func, windowLength, slideInterval) |
读原来的DStream数据进行聚合得到新的DStream |
reduceByKeyAndWindow(func,长度,间隔,[numtasks]) | (k,v)中的k被函数合并得到新的DStream |
reduceByKeyAndWindow(func,invFunc,长度,间隔,[numtasks]) | 比上面的更高效,对窗口内的数据增量聚合和逐步移去得到聚合后新的DStream |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 根据窗口计算每个元素的频次 |
Join Operations
下面是简单的流的join
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin
下面是基于窗口的流的Join
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
sream-dataset的join
流和数据集的join操作是使用lambda表达式实现的
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
DStream的输出操作
print() |
前十个元素打印出来 |
saveAsTextFiles(prefix, [suffix]) |
将DStream中的内容以文本方式保存成文件,每次批处理间隔内产生的文件按照prefix-TIME_IN_MS[.suffix]命名 |
saveAsObjectFiles(prefix, [suffix]) |
将DStream中的内容按对象序列化并且以SequenceFile格式保存,每次批处理间隔文件按照上面的命名 |
saveAsHadoopFiles(prefix, [suffix]) |
将DStream中的内容按对象序列化并且以hadoop格式保存,每次批处理间隔文件按照上面的命名 |
foreachRDD(func) |
对每个RDD应用这个函数,将RDD保存在外部文件中 |
Design Patterns for using foreachRDD
foreachRDD的设计模式
dstream.foreachRDD非常强大,但是容易出错
将数据写到外部系统需要创建一个连接对象,使用这个对象例如Tcp Connection发送数据到远程的系统
开发者可能会错误的连接到Spark Driver,然后试图在worker中使用将数据保存到RDD中
例如:
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
这是错误的,因为这要求连接对象序列化并且从driver发送到worker
这样的连接对象很少能跨机器转让
正确的做法是在worker中创建连接对象
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
通常的,创建一个对象需要时间和资源的管理费用,因此,为每个记录创建和摧毁连接对象可能会带来不必要的管理费用,这可能会显著降低系统的吞吐量,一个更好的解决方案是使用rdd.foreachPartition去创造唯一连接对象,并且用这个对象发送所有的RDD。
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
最终的优化是,跨RDD或者批次,重用连接对象
程序员可以维护一个连接对象的静态的池。
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))