本讲内容:
a. Spark Streaming中的空RDD处理
b. Spark Streaming程序的停止
注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解
上节回顾
上一讲中,我们要给大家解密park Streaming两个比较高级的特性,资源动态申请和动态控制消费速率原理
默认情况下,Spark是先分配好资源,然后在进行计算,也就是粗粒度的资源分配
粗粒度的好处:资源是提前给分配好的,所以计算任务的时候,直接使用这些资源
粗粒度的缺点:从Spark Streaming的角度讲,有高峰值和低峰值,高峰和低峰的时候,需要的资源是不一样的;如果按照高峰值的角度去分配,低峰值的时候,有大量的资源的浪费
资源的动态分配是由一个定时器,不断的扫描Executor的情况,例如:有段时间之内,Executor没收到任何任务,所以会把这个Executor移除掉
动态资源调整的时候,最好不要设置太多的Core,Core设置的太多,假如资源调整太过频繁的话,是比较麻烦的(Core 的个数一般设置为奇数:3、5、7)
Spark Streaming要进行处理资源的动态调整,就是Executor的动态调整
Spark Streaming是Batch Duration的方式执行的,这个Batch Duration里需要很多资源,下一个Batch Duration里不需要那么多资源,可能想调整资源的时候,还没来得及调整完资源,当前的这个Batch Duration的运行已过期的情况,这个时候的资源调整就是浪费的
Spark Streaming中已经有自动的动态控制速度流进来的速度的方式。配置项是 spark.streaming.backpressure.enabled(强烈建议设为true)
开讲
本讲我们要给大家解密Spark Streaming中空RDD处理及流处理程序的停止
由于Spark Streaming的每个BatchDuration都会不断的产生RDD,空RDD有很大概率的,如何进行处理将影响其运行的效率、资源的有效使用
Spark Streaming会不断的接收数据,在不清楚接收的数据处理到什么状态,如果你强制停止掉的话,会涉及到数据不完整操作或者一致性相关问题
那么我们在以上理解的基础上,走进源码,开启解密之旅
Spark Streaming中的空RDD处理
在Spark Streaming应用程序中,无论使用什么 DStream,底层实际上就是操作RDD
(我们从一个案例应用程序片段开始)
程序中有一个这样的问题:wordCounts.foreachRDD里面,开始时并没有判断rdd是否为空,就进行处理了
rdd为空时,也获取CPU core等计算资源,并进行里面的计算。这显然是不合适的
虽然Spark中定义了EmptyRDD,且让其Compute时抛出异常,但实际Spark应用程序并没有使用EmptyRDD
应该对每个rdd进行处理前,应该判断rdd是否为空;这样能最大化的节约资源与提高效率
isEmpty()源码如下
Spark Streaming程序的停止
默认情况下Spark Streaming是采用什么方式停止呢?
以上的停止方式会把这个Streams停止掉,但是不会等待所有的数据处理完成默认情况下SparkContext也会被停止掉
真正好的停止一个Spark Streaming应用程序,应该用另一个stop
stopGracefully参数默认是false,生产环境应该设置为 true,具体做法是配置文件中把spark.streaming.stopGeacefullyOnShutdown设置为true,这样能保证已运行的程序运行完再停止,以保证数据处理的完整
Spark Streaming程序是怎么做到的呢?StreamingContext.stopOnShutdown调用了上面的stop
在StreamingContext.start中,会加一个hook来调用stopOnShutdown
应用程序启动的时候会调用 StopOnShutdown ,会把回调传进去
在StreamingContext启动时,就用了钩子,定义了在StopOnShutdown 时必须调用有stopGracefully参数的stop方法
使用 StopGraceFully 所有接收的数据都会被处理完成,才会停止