Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

时间:2020-12-25 20:48:25

本讲内容:

a. Spark Streaming中的空RDD处理
b. Spark Streaming程序的停止

注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解

上节回顾

上一讲中,我们要给大家解密park Streaming两个比较高级的特性,资源动态申请和动态控制消费速率原理

默认情况下,Spark是先分配好资源,然后在进行计算,也就是粗粒度的资源分配
粗粒度的好处:资源是提前给分配好的,所以计算任务的时候,直接使用这些资源
粗粒度的缺点:从Spark Streaming的角度讲,有高峰值和低峰值,高峰和低峰的时候,需要的资源是不一样的;如果按照高峰值的角度去分配,低峰值的时候,有大量的资源的浪费

资源的动态分配是由一个定时器,不断的扫描Executor的情况,例如:有段时间之内,Executor没收到任何任务,所以会把这个Executor移除掉

动态资源调整的时候,最好不要设置太多的Core,Core设置的太多,假如资源调整太过频繁的话,是比较麻烦的(Core 的个数一般设置为奇数:3、5、7)

Spark Streaming要进行处理资源的动态调整,就是Executor的动态调整
Spark Streaming是Batch Duration的方式执行的,这个Batch Duration里需要很多资源,下一个Batch Duration里不需要那么多资源,可能想调整资源的时候,还没来得及调整完资源,当前的这个Batch Duration的运行已过期的情况,这个时候的资源调整就是浪费的

Spark Streaming中已经有自动的动态控制速度流进来的速度的方式。配置项是 spark.streaming.backpressure.enabled(强烈建议设为true)

开讲

本讲我们要给大家解密Spark Streaming中空RDD处理及流处理程序的停止

由于Spark Streaming的每个BatchDuration都会不断的产生RDD,空RDD有很大概率的,如何进行处理将影响其运行的效率、资源的有效使用

Spark Streaming会不断的接收数据,在不清楚接收的数据处理到什么状态,如果你强制停止掉的话,会涉及到数据不完整操作或者一致性相关问题

那么我们在以上理解的基础上,走进源码,开启解密之旅

Spark Streaming中的空RDD处理

在Spark Streaming应用程序中,无论使用什么 DStream,底层实际上就是操作RDD

(我们从一个案例应用程序片段开始)
Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

程序中有一个这样的问题:wordCounts.foreachRDD里面,开始时并没有判断rdd是否为空,就进行处理了

rdd为空时,也获取CPU core等计算资源,并进行里面的计算。这显然是不合适的

虽然Spark中定义了EmptyRDD,且让其Compute时抛出异常,但实际Spark应用程序并没有使用EmptyRDD

应该对每个rdd进行处理前,应该判断rdd是否为空;这样能最大化的节约资源与提高效率

Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

isEmpty()源码如下

Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

Spark Streaming程序的停止

默认情况下Spark Streaming是采用什么方式停止呢?

Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

以上的停止方式会把这个Streams停止掉,但是不会等待所有的数据处理完成默认情况下SparkContext也会被停止掉

真正好的停止一个Spark Streaming应用程序,应该用另一个stop

Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

stopGracefully参数默认是false,生产环境应该设置为 true,具体做法是配置文件中把spark.streaming.stopGeacefullyOnShutdown设置为true,这样能保证已运行的程序运行完再停止,以保证数据处理的完整

Spark Streaming程序是怎么做到的呢?StreamingContext.stopOnShutdown调用了上面的stop

Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

在StreamingContext.start中,会加一个hook来调用stopOnShutdown

Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

应用程序启动的时候会调用 StopOnShutdown ,会把回调传进去

Spark 定制版:018~Spark Streaming中空RDD处理及流处理程序优雅的停止

在StreamingContext启动时,就用了钩子,定义了在StopOnShutdown 时必须调用有stopGracefully参数的stop方法

使用 StopGraceFully 所有接收的数据都会被处理完成,才会停止