Spark Streaming官方文档学习--上

时间:2021-12-14 08:30:26
官方文档地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html

Spark Streaming是spark api的扩展

能实现可扩展,高吞吐,可容错,的流式处理
从外接数据源接受数据流,处理数据流使用的是复杂的高度抽象的算法函数map reduce join window等

输出的数据可以存储到文件系统和数据库甚至是直接展示在命令行
也可以应用ml 和graph processing在这些数据流上

spark streaming本质还是spark只是实现了所谓的微批量
Spark Streaming官方文档学习--上
 spark streaming中连续数据流用DStream表示,DStream可以从输入数据创建,也可以从其他的DStream转化来
本质上DStream是一组RDD组成的序列

一个迅速上手的例子:
  1. # coding: utf-8
  2. # In[ ]:
  3. from pyspark import SparkContext
  4. from pyspark.streaming import StreamingContext
  5. # In[ ]:
  6. #创建两个工作线程,将这两个线程喂给StreamingContext,时间间隔是1秒
  7. #这里有个错误Cannot run multiple SparkContexts at once
  8. #参考:http://*.com/questions/28259756/how-to-create-multiple-sparkcontexts-in-a-console
  9. #先要尝试关闭sc才能创建多个SparkContext
  10. try:
  11. sc.stop()
  12. except:
  13. pass
  14. sc = SparkContext("local[2]", "NetworkWordCount")
  15. ssc = StreamingContext(sc, 1)
  16. #sc.stop()
  17. # In[ ]:
  18. #创建一个DStream,从本机的这个端口取数据
  19. lines = ssc.socketTextStream("localhost", 9999)
  20. # In[ ]:
  21. #lines中的数据记录是一行行的文本,下面将文本切割成字
  22. words = lines.flatMap(lambda line: line.split(" "))
  23. #这里的flatMap是一对多DStream操作,生成一个新的DStream,就是变成了字流了
  24. # In[ ]:
  25. #下面数一数每一批次的字的个数
  26. # Count each word in each batch
  27. pairs = words.map(lambda word: (word, 1))
  28. wordCounts = pairs.reduceByKey(lambda x, y: x + y)
  29. # In[ ]:
  30. # 打印DStream中的每个RDD的前十个元素
  31. wordCounts.pprint()
  32. # In[ ]:
  33. ssc.start() # 开始计算
  34. ssc.awaitTermination() #等待计算停止
  35. # In[ ]:
  36. #将这个文件的py脚本提交计算: ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
  37. #在命令行输入nc -lk 9999 然后模拟输入字符串文本,那么在pyspark命令行会打印出每秒钟输入的数据的统计结果


基本概念

    要想写自己的streaming程序,首先要添加maven或者sbt的依赖
    
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming_2.11</artifactId>
  4. <version>2.0.0</version>
  5. </dependency>

对于外部输入流的依赖现在不在核心api中了,需要单独添加依赖。

初始化StreamingContext
可以从SparkContext对象中创建
  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. sc = SparkContext(master, appName)
  4. ssc = StreamingContext(sc, 1)
appName是程序名字可以在UI中显示
master是Spark,Mesos,YARN cluster URL    或者是 声明的Local[*]字符串使得运行在本地模式
当运行在集群上的时候,不要写死在代码里面,而是要从spark-submit启动,传递进去。
对于本地测试或者单元测试,可以传递local[*]

在context被定义之后,要做下面的事情:
1. 通过创建DStream去定义输入资源
2. 通过对DStream的转换和输出操作定义流的计算
3. 使用streamingContext.start()开始接收数据并处理数据
4. 使用streamingContext.awaitTermination()等待处理过程停止(手动或者因为错误)
5. 可以使用streamingContext.stop()手动停止处理过程

要点:
1. 一旦一个context被启动,不能再添加任何新的流进去了
2. 一旦context被停止,就不能重启了
3. 在JVM中只能有一个StreamingContext被激活
4. 在streamingContext上使用stop()也会停止SparkContext(),要想单独停止前者,设置stop()的可选的参数 stopSparkContext()参数为false
5. 一个sparkContext可以被重复使用,去创建多个StreamingContext,只要前一个StreamingContext被单独停止,下一个就可以接着创建。

Discretized Streams(DStreams)

是Spark Streaming的基本的抽象,代表了一个连续的数据流
可以是从数据源接受的输入数据流,也可以是转换输入数据流得到的数据流
一个DStream代表一串RDD,RDD是不可分割的基本数据单元抽象。
每一个在DStream中的RDD包含特定时间间隔的数据
Spark Streaming官方文档学习--上
 
如果时间是1秒的话,从0-1秒的很多RDD,与从1-2的RDD等,组成了DStream

任何对DStream的操作,都会被翻译成对底层的RDD的操作,例如,将Lines转换成words的操作
Spark Streaming官方文档学习--上
 
对DStream的操作,隐藏了很多细节,给开发者提供高度抽象的API

输入DStreams和Receivers
每一个输出DStream(除了file strem)都关联一个Receiver对象,这个对象从数据源接受数据存储在spark的内存中等待处理。
Spark Streaming支持两种类型的内建数据源
    1. Basic Sources:直接在StreamingContext API中可用的Sources,比如file systems和socket connections
    2. Advances Sources:从外部工具类中调过来的例如Kafka,Flume,Kinesis等,需要链接一些外部依赖。

关键点:
    1. 注意在本地运行SparkStreaming的时候,不要使用local或者local[1]作为主机的URL,因为这些都是意味着开一个线程,因为如果只是输入一个数据源,那么这个单一的线程会用来运行receiver,那么没有线程去处理接收到的数据了。所以本地运行的时候,参数local[n]中的n最好大于运行中的receiver。
    2. 相应的在集群上运行的时候,分配的核心数要比接收者的数目多,否则的话系统能接收数据,但是不能处理。

Basic Sources 基础数据源

在基础例子中已经看到过ssc.socketTextStream(),下面看file streams
  1. streamingContext.textFileStream(dataDirectory)
可以创建一个DStream
spark会监控这个路径,处理在那个路径中的任何文件
注意:
    1. 路径中的文件要有相同的数据格式
    2. 文件必须通过自动专业或者重命名进入到这个路径的
    3. 一旦进入,这些文件不能被改变,所以如果文件被连续附加,那么新的数据不能被读取的
针对简单的文本文件,有个简单的方法streamingContext.textFileStream(dataDirectory)
因为file stream不需要运行receiver所以不需要分配核心或者线程去处理
Python API不支持fileStream只是支持textFileStream

可以使用RDD的queue去创建DStream,使用streamingContext.queueStream(queueofRDDs)

Advanced Souces 高级数据源

As of Spark 2.0.0, Kafka, Kinesis and Flume are available in the Python API.
因为这些高级的数据源的支持比较复杂,需要依赖单独的包,现在被转移出了核心的API,所以不能再shell中使用,也就不能在shell中测试这些数据源。如果非要的话,需要下载对应的maven jar包,和对应的依赖,然后添加到classpath

Custom Sources自定义数据源

现在python还不支持,但是要想从自定义的数据源创建DStream,就要自己实现用户定义的receiver,这可以接受自定义的数据,并且发送到spark中

Receiver Reliability 接收者的可靠性

按照可靠性可以把数据分为两种,有的数据源例如Kafka和Flume运行传送被回复的数据、
如果系统正确接受到这些要被确认的数据,可以保证不会因为某种失败而导致数据丢失。这导致两种类型的接收者。
    1. 可靠的接收者:当数据被接受并存储到spark之后,必须回复确认消息给可靠数据源。
    2. 不可靠的接收者:不用回复确认,针对没有确认机制的数据源,或者有确认机制但是不需要执行复杂确认机制的数据源。


Transformations on DStreams DStream的转换
map(fun) 这个函数将输入的DStream的每一个元素传递给func得到一个新的DStream
flatMap(func) 同上,只是每个输入可以map到多个输出项
filter(func) 选择func返回结果为true的DStream中的记录组成新的DStream
reparitition(numPartitions) 通过改变划分去改变DStream的并行水平
union(otherStream) 合并
count() 返回一个新的DStream,是原始的DStream中的每个RDD的元素的数目
reduce(func) 使用函数func聚合原始数据汇总的每个RDD得到一个新的单一元素RDD组成的DStream
countByValue() 调用类型K的DStream时候返回一个新的DStream有(K,long)对,其中long是k在每个RDD中出现的频率
reduceByKey(func,[numTasks]) 将(k,v)中的v按照k使用func进行聚合
join(otherStream,[numTasks]) (k,v)(k,w)得到(k,(v,w))
cogroup(otherStream,[numTasks]) (k,v)(k,w)得到(k,Seq[V],Seq[W])
tansform(func) 作用在每个RDD上得到新的RDD组成的DStream
updateStateByKey(func) 每个键都是通过将给定的函数作用在其值上得到的新的DStream

下面是对某些转换的详细的讨论
UpdateStateByKey Operation
允许当使用新的信息连续更新的时候,维护任意的状态
    1. 定义状态,这个状态可以是任意的数据类型
    2. 定义状态的更新函数
不管有没有数据,spark都会更新状态,如果更新函数返回为none那么键值对就会被消除
假设想要维持一个运行时数目,那么运行时数目就是一个状态,是个整数,下面是一个更新函数
  1. def updateFunction(newValues, runningCount):
  2. if runningCount is None:
  3. runningCount = 0
  4. return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
假设使用前面的paris DStream包含(word,1)对
  1. runningCounts = pairs.updateStateByKey(updateFunction)


转换操作:
tranform操作允许任意的RDD-to-RDD函数应用到DStream,下面是一个例子:
  1. spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
  2. # join data stream with spam information to do data cleaning
  3. cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))


窗口操作:
   允许我们应用transformation到一个滑动窗口的数据上
Spark Streaming官方文档学习--上
 上面的例子说明了每个窗口操作要声明下面的两个参数
    windows length:窗口的长度,上面的例子是3
    sliding interval:窗口被执行的时间间隔,例子中的书2
上面的两个参数都应该是元素DStream批间隔(上面的间隔是1)的整数倍

下面是一个窗口操作的例子,假设我们想生成过去的30秒的数据的wordcounts,每10秒钟一次
  1. # Reduce last 30 seconds of data, every 10 seconds
  2. windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
下面是一个常见的窗口操作的描述,所有的操作都传递两个参数,窗口的长度和时间间隔
window(长度,间隔) 原来的DStream按照新的指定窗口进行切分返回新的DStream
countByWindow(长度,间隔) 返回滑动窗口的元素个数
reduceByWindow(func, windowLength, slideInterval)
读原来的DStream数据进行聚合得到新的DStream
reduceByKeyAndWindow(func,长度,间隔,[numtasks]) (k,v)中的k被函数合并得到新的DStream
reduceByKeyAndWindow(func,invFunc,长度,间隔,[numtasks]) 比上面的更高效,对窗口内的数据增量聚合和逐步移去得到聚合后新的DStream
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 根据窗口计算每个元素的频次

Join Operations
下面是简单的流的join
  1. stream1 = ...
  2. stream2 = ...
  3. joinedStream = stream1.join(stream2)
You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin
下面是基于窗口的流的Join
  1. windowedStream1 = stream1.window(20)
  2. windowedStream2 = stream2.window(60)
  3. joinedStream = windowedStream1.join(windowedStream2)
sream-dataset的join
流和数据集的join操作是使用lambda表达式实现的
  1. dataset = ... # some RDD
  2. windowedStream = stream.window(20)
  3. joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))


DStream的输出操作

print()
前十个元素打印出来
saveAsTextFiles(prefix, [suffix])
将DStream中的内容以文本方式保存成文件,每次批处理间隔内产生的文件按照prefix-TIME_IN_MS[.suffix]命名
saveAsObjectFiles(prefix, [suffix])
将DStream中的内容按对象序列化并且以SequenceFile格式保存,每次批处理间隔文件按照上面的命名
saveAsHadoopFiles(prefix, [suffix])

将DStream中的内容按对象序列化并且以hadoop格式保存,每次批处理间隔文件按照上面的命名

foreachRDD(func)
对每个RDD应用这个函数,将RDD保存在外部文件中


Design Patterns for using foreachRDD
foreachRDD的设计模式
dstream.foreachRDD非常强大,但是容易出错
将数据写到外部系统需要创建一个连接对象,使用这个对象例如Tcp Connection发送数据到远程的系统
开发者可能会错误的连接到Spark Driver,然后试图在worker中使用将数据保存到RDD中
例如:
  1. def sendRecord(rdd):
  2. connection = createNewConnection() # executed at the driver
  3. rdd.foreach(lambda record: connection.send(record))
  4. connection.close()
  5. dstream.foreachRDD(sendRecord)
这是错误的,因为这要求连接对象序列化并且从driver发送到worker
这样的连接对象很少能跨机器转让
正确的做法是在worker中创建连接对象
  1. def sendRecord(record):
  2. connection = createNewConnection()
  3. connection.send(record)
  4. connection.close()
  5. dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
通常的,创建一个对象需要时间和资源的管理费用,因此,为每个记录创建和摧毁连接对象可能会带来不必要的管理费用,这可能会显著降低系统的吞吐量,一个更好的解决方案是使用rdd.foreachPartition去创造唯一连接对象,并且用这个对象发送所有的RDD。
  1. def sendPartition(iter):
  2. connection = createNewConnection()
  3. for record in iter:
  4. connection.send(record)
  5. connection.close()
  6. dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
最终的优化是,跨RDD或者批次,重用连接对象
程序员可以维护一个连接对象的静态的池。
  1. def sendPartition(iter):
  2. # ConnectionPool is a static, lazily initialized pool of connections
  3. connection = ConnectionPool.getConnection()
  4. for record in iter:
  5. connection.send(record)
  6. # return to the pool for future reuse
  7. ConnectionPool.returnConnection(connection)
  8. dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))