Spark-Streaming及其工作原理

时间:2022-09-22 20:58:40
1.Spark-Streaming及其工作原理 Spark Streaming是Spark Core API的一种扩展,它可以用于进行大规模、高吞吐量、容错的实时数据流的处理。它支持从很多种数据源中读取数据,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis或者是TCP Socket。并且能够使用类似高阶函数的复杂算法来进行数据处理,比如map、reduce、join和window。处理后的数据可以被保存到文件系统、数据库、Dashboard等存储中。
Spark Streaming基本工作原理:Spark Streaming内部的基本工作原理如下:接收实时输入数据流,然后将数据拆分成多个batch,比如每收集1秒的数据封装为一个batch,然后将每个batch交给Spark的计算引擎进行处理,最后会生产出一个结果数据流,其中的数据,也是由一个一个的batch所组成的。
2.DStream介绍 Spark Streaming提供了一种高级的抽象,叫做DStream,英文全称为Discretized Stream,中文翻译为“离散流”,它代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume和Kinesis;也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。 DStream的内部,其实一系列持续不断产生的RDD。RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集。DStream中的每个RDD都包含了一个时间段内的数据。 对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一个DStream执行一个map操作,会产生一个新的DStream。但是,在底层,其实其原理为,对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。底层的RDD的transformation操作,其实,还是由Spark Core的计算引擎来实现的。Spark Streaming对Spark Core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的API。
3.Spark Streaming与Storm差别 3.1 Spark Streaming与Storm的对比 Storm:实时计算模型(纯实时,来一条数据,处理一条数据);实时计算延迟度(毫秒级);吞吐量(低);事务机制(支持完善);健壮性 / 容错性(ZooKeeper,Acker,非常强);动态调整并行度(支持) Spark Streaming:实时计算模型(准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理);实时计算延迟度(秒级);吞吐量(高);事务机制(支持,但不够完善);健壮性 / 容错性(Checkpoint,WAL,一般);动态调整并行度(不支持)
3.2 Spark Streaming与Storm的优劣分析 事实上,Spark Streaming绝对谈不上比Storm优秀。这两个框架在实时计算领域中,都很优秀,只是擅长的细分场景并不相同。 Spark Streaming仅仅在吞吐量上比Storm要优秀,而吞吐量这一点,也是历来挺Spark Streaming,贬Storm的人着重强调的。但是问题是,是不是在所有的实时计算场景下,都那么注重吞吐量?不尽然。因此,通过吞吐量说Spark Streaming强于Storm,不靠谱。 事实上,Storm在实时延迟度上,比Spark Streaming就好多了,前者是纯实时,后者是准实时。而且,Storm的事务机制、健壮性 / 容错性、动态调整并行度等特性,都要比Spark Streaming更加优秀。 Spark Streaming,有一点是Storm绝对比不上的,就是:它位于Spark生态技术栈中,因此Spark Streaming可以和Spark Core、Spark SQL无缝整合,也就意味着,我们可以对实时处理出来的中间数据,立即在程序中无缝进行延迟批处理、交互式查询等操作。这个特点大大增强了Spark Streaming的优势和功能。
3.3 Spark Streaming与Storm的应用场景 对于Storm来说: (1)、建议在那种需要纯实时,不能忍受1秒以上延迟的场景下使用,比如实时金融系统,要求纯实时进行金融交易和分析 (2)、此外,如果对于实时计算的功能中,要求可靠的事务机制和可靠性机制,即数据的处理完全精准,一条也不能多,一条也不能少,也可以考虑使用Storm (3)、如果还需要针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况),也可以考虑用Storm (4)、如果一个大数据应用系统,它就是纯粹的实时计算,不需要在中间执行SQL交互式查询、复杂的transformation算子等,那么用Storm是比较好的选择
对于Spark Streaming来说: (1)、如果对上述适用于Storm的三点,一条都不满足的实时场景,即,不要求纯实时,不要求强大可靠的事务机制,不要求动态调整并行度,那么可以考虑使用Spark Streaming (2)、考虑使用Spark Streaming最主要的一个因素,应该是针对整个项目进行宏观的考虑,即,如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询等业务功能,而且实时计算中,可能还会牵扯到高延迟批处理、交互式查询等功能,那么就应该首选Spark生态,用Spark Core开发离线批处理,用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性。
4.wordcount实时统计案例
object WordCountDemo {
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
//local[2]这里必须是2个或2个以上的线程,一个负责接收数据,一个负责将接收的数据下发到worker上执行
val config = new SparkConf().setAppName("WordCountDemo").setMaster("local[2]")

//这种方式创建streamingContxt也行
// val ssc = new StreamingContext(config, Seconds(2))
//这种方式创建streamingContxt也行
val sc = new SparkContext(config)
//Seconds两秒产生一个RDD
val ssc = new StreamingContext(sc, Seconds(2))
//使用updateStateByKey必须设置checkpoint,以防数据丢失后可以从这个目录里面找到
ssc.checkpoint("hdfs://hadoop01:8020/checkPoint")
//创建一个输入的Dstream,你的在hadoop01,节点上启动一个8008 端口(命令:nc -l 8008 ),如果没有nc这个命令
//可以使用这个命令安装 : yum install -y nc 用root用户来安装,前提你虚拟机已经联网,或则可以去网上下载nc的包自己安装
//安装完查看nc帮助 nc -help ,如果有说明已经安装上了
// 一个输入数据流Dstream,绑定一个Receiver,一个Receiver会占用一个core
//占用的这个core不会被释放,一直要等待程序的结束,这个被占用的core才会被释放,所以在做任务提交时,分配的executor-cores一定要
//大于你在程序中创建的dsteam的个数,也就是recevicer的个数。
val socketDS = ssc.socketTextStream("hadoop01", 8008)

//这个是每次只能计算获取那一批的数据,不能把之前获取的数据也进行累加
// val wordCountDS = socketDS.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//将获取到数据进行累加
val wordCountDS = socketDS.flatMap(_.split(" ")).map((_, 1))
.updateStateByKey((iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//(String, Seq[Int], Option[Int])
// String 代表的是key,
// Seq[Int],代表以前出现的次数Seq[1,2,4],
// Option[Int]代表是本次出现的次数,本次可能出现,也可能没有出现
//x._1是key,x._2.sum是上一次出现的次数,x._3.getOrElse(0)是这一次出现的次数
iter.map(x => (x._1, x._2.sum + x._3.getOrElse(0)))
}, new HashPartitioner(sc.defaultParallelism), true)
wordCountDS.print()

//调用StreamingContext的start()方法,来开始实时处理数据。
ssc.start()
//调用StreamingContext的awaitTermination()方法,来等待应用程序的终止
ssc.awaitTermination()
}
}

5.StreamingContext详解 5.1 有两种创建StreamingContext的方式: 方法一: val conf = new SparkConf().setAppName(appName).setMaster(master); val ssc = new StreamingContext(conf, Seconds(1)); 方法二: StreamingContext,还可以使用已有的SparkContext来创建 val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)); 注释:appName,是用来在Spark UI上显示的应用名称。master,是一个Spark、Mesos或者Yarn集群的URL,或者是local[*]。
5.2 一个StreamingContext定义之后,必须做以下几件事情: (1)、通过创建输入DStream来创建输入数据源。 (2)、通过对DStream定义transformation和output算子操作,来定义实时计算逻辑。 (3)、调用StreamingContext的start()方法,来开始实时处理数据。 (4)、调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。 (5)、也可以通过调用StreamingContext的stop()方法,来停止应用程序。 5.3 需要注意的要点: (1)、只要一个StreamingContext启动之后,就不能再往其中添加任何计算逻辑了。比如执行start()方法之后,还给某个DStream执行一个算子。 (2)、一个StreamingContext停止之后,是肯定不能够重启的。调用stop()之后,不能再调用start() (3)、一个JVM同时只能有一个StreamingContext启动。在你的应用程序中,不能创建两个StreamingContext。 (4)、调用stop()方法时,会同时停止内部的SparkContext,如果不希望如此,还希望后面继续使用SparkContext创建其他类型的Context,比如SQLContext,那么就用stop(false)。 (5)、一个SparkContext可以创建多个StreamingContext,只要上一个先用stop(false)停止,再创建下一个即可。
6.输入DStream和Receiver详解 输入DStream代表了来自数据源的输入数据流。在之前的wordcount例子中,lines就是一个输入DStream(JavaReceiverInputDStream),代表了从netcat(nc)服务接收到的数据流。除了文件数据流之外,所有的输入DStream都会绑定一个Receiver对象,该对象是一个关键的组件,用来从数据源接收数据,并将其存储在Spark的内存中,以供后续处理。
Spark Streaming提供了两种内置的数据源支持 (1)、基础数据源:StreamingContext API中直接提供了对这些数据源的支持,比如文件、socket、Akka Actor等。 (2)、高级数据源:诸如Kafka、Flume、Kinesis、Twitter等数据源,通过第三方工具类提供支持。这些数据源的使用,需要引用其依赖。 (3)、自定义数据源:我们可以自己定义数据源,来决定如何接受和存储数据。
要注意的是,如果你想要在实时计算应用中并行接收多条数据流,可以创建多个输入DStream。这样就会创建多个Receiver,从而并行地接收多个数据流。但是要注意的是,一个Spark Streaming Application的Executor,是一个长时间运行的任务,因此,它会独占分配给Spark Streaming Application的cpu core。从而只要Spark Streaming运行起来以后,这个节点上的cpu core,就没法给其他应用使用了。
使用本地模式,运行程序时,绝对不能用local或者local[1],因为那样的话,只会给执行输入DStream的executor分配一个线程。而Spark Streaming底层的原理是,至少要有两条线程,一条线程用来分配给Receiver接收数据,一条线程用来处理接收到的数据。因此必须使用local[n],n>=2的模式。
如果不设置Master,也就是直接将Spark Streaming应用提交到集群上运行,那么首先,必须要求集群节点上,有>1个cpu core,其次,给Spark Streaming的每个executor分配的core,必须>1,这样,才能保证分配到executor上运行的输入DStream,两条线程并行,一条运行Receiver,接收数据;一条处理数据。否则的话,只会接收数据,不会处理数据。