Spark Streaming基本使用介绍

时间:2022-10-15 20:46:41

一、Spark Streaming 概述

Spark Streaming是基于Spark core API的扩展,能够支持大规模可扩展的、高吞吐量的、容错的流处理,可作用于实时流处理之上,并且可通过多种数据源的方式采集数据,比如Kafka、Flume、TCP socket。能够使用复杂的算法,通过其高级别的方法处理数据,比如map,reduce,window,join。处理过后的数据可以推送到其他目的地,类似数据库、文件系统、live dashboards上面去。Spark Streaming作用图如下:

Spark Streaming基本使用介绍

Spark Streaming个人的定义:将不同的数据源的数据经过Spark Streaming处理之后将结果输出到外部文件系统。另外再给出张Spark的生态系统。

Spark Streaming基本使用介绍

从图中,我们可以看出,Spark Streaming是运行在Spark Core之上,属于Spark生态系统,所以Spark Streaming不需要单独安装,直接可以在安装好的Spark上运行。另外可以将Spark生态系统单独提取出来,可归类为:

Spark Streaming基本使用介绍

在Spark生态系统中,Spark Streaming可以跟其他组件联合调用调试。

二、Spark Streaming的特点

特点:
1. 低延时
2. 能从错误中高效的恢复:fault-tolerant 
3. 能够运行在成百上千的节点
4. 能够将批处理、机器学习、图计算等子框架和Spark Streaming综合起来使用

三、Spark Streaming的工作原理

Spark Streaming将输入的数据拆分成一些批次,这些小批次再被Spark 引擎进行处理,处理完后,最终结果的数据流以batch的方式进行返回。流程图如下:

Spark Streaming基本使用介绍

工作原理:粗粒度
Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。

工作原理:细粒度

先给出张运行流程图:

Spark Streaming基本使用介绍

首先Spark应用程序运行 在Driver端,里面包含Spark Context和Streaming Context,Driver端会要求一些executor上面启动一些接收器receiver,接收到数据后,会把数据拆分成一些block,先存储到内存上面,如果设置了多副本,可以拷贝到其他机器上面去,完成后,receiver会将block信息反馈给Streaming Context,每隔一定时间后,Streaming Context会向Spark Context通知启动一些jobs,Spark Context就将这些Jobs分发到executor上面去执行。

四、Spark核心组件

1. Streaming Context

为了初始化Spark Streaming应用程序,Spark Context这个对象必须要创建,因为它对于Spark Streaming的所有功能来说是一个主的入口,这其实也相当于PHP中的index.php为入口文件一样。Streaming Context对象可以从Spark Conf里创建过来,创建格式如下:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
Spark Streaming依赖包中的源码如下:

def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}

def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
其中要注意的是setAppName和setMaster最好别设置死了,最好通过spark-submit提交的方式,进行设置。另外batch interval可以根据你的应用程序需求的延迟要求以及集群可用的资源情况来设置。

当Streaming Context被定义后,你需要做以下几件事情:

1)通过创建输入DStream 来指定输入源;

2)作用一些计算如transformation或者out operations到DStream上面去;

3)处理数据,通过StreamingContext.start方法;

4)等待处理进程被停止(异常或者错误),使用streamingcontext.awaitTermination();

5)也可以通过手工的方式停下来,streamingcontext.stop()。

注意点:

1)一旦你的context启动后,是无法设置或新增streaming 计算的

2)Streaming context停止后,不能重启它,就无法在start了,需要将整个业务重新开始。

3)Streaming context在同一时段只能存活在一个JVM里面

4)stop()会停止Spark Context 和Streaming Context,如果只想停止Streaming Context,需要设置一个参数,将stopSparkContext设置为false

5)一个Spark Context可以重复使用创建多个StreamingContext

2. DStream(Discretized Streams)

DStream是SparkStreaming提供的最基础的抽象,编程都是通过DStream过来的,它代表的是一个持续化的数据流(源源不断进来没有停止),一从源头来的,二是一个DStream通过transformation转成另一个DStream,本质上,一个DStream代表的是一系列的RDD。RDD是Spark里面的抽象,一个不可变的分布式的数据集。DStream里的每一个RDD包含的是一个具体的间隔,如下展示:

Spark Streaming基本使用介绍

任何对DStream的操作,底层都是通过RDD进行操作的。比如转换每一行的数据流为字母,对DStream进行flatmap方法的操作,就类似于对每一个RDD进行相同的操作:

Spark Streaming基本使用介绍

对DStream操作算子,比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作;因为一个DStream是由不同批次的RDD所构成的。

3. Input DStream和receiver

InputDStream代表输入数据流是从源头过来的,在wordcount那个案例中,line就相当于一个Input DStream,因为他是从一个设置好的监听端口过来的,每一个Input Dstream都需要关联一个receiver,这个receiver是接收数据的,存储到Spark内存来,除了文件系统,每个Input DStream都有个receiver接收。

SparkStreaming提供两个内置的流的源

1)基础数据源:这个数据源,直接提供了StreamingContext的API进行调用,例如:文件系统,socket

2)高级数据源:这类数据源是需要链接额外的设备,类似Kafka,Flume。

当运行Spark Streaming 的local模式时,不能设置为local[1],因为这意味这仅仅只有一个线程能够使用,因为一个receiver需要开启一个线程,local[n]中,n的数量要大于receiver的数量。

将逻辑扩展到集群上运行时,分派给Spark Streaming应用程序的核心数量,必须大于receiver的数量,否则只能接收,不能处理。

4. Transformation and Out operation

Transformation允许Input DStream进来的数据进行修改,转变为新的DStream,包含很多内置方法,常用方法如下:

Transformation Meaning
map(func) Return a new DStream by passing each element of the source DStream through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items.
filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true.
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions.
union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream.
count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel.
countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
cogroup(otherStream, [numTasks]) When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.
transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.
updateStateByKey(func) Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

Output operation允许DStream的数据推送到其他系统上,像数据库和文件系统,常用方法如下:

Transformation Meaning
window(windowLengthslideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLengthslideInterval) Return a sliding window count of elements in the stream.
reduceByWindow(funcwindowLengthslideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(funcwindowLengthslideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKeyAndWindow(funcinvFuncwindowLengthslideInterval, [numTasks])

A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.

countByValueAndWindow(windowLength,slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.