本讲内容:
a. updateStateByKey解密
b. mapWithState解密
注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解。
上节回顾
上一讲中,我们从Spark Streaming源码解读Driver容错安全性:那么什么是Driver容错安全性呢?
a. 从数据层面:ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息
b. 从调度层面:DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关
c. 从运行角度: 作业生存层面,JobGenerator是Job调度层面
谈Driver容错性我们需要考虑Driver中有那些需要维持状态的运行
a. ReceivedBlockTracker跟踪了数据,因此需要容错。通过WAL方式容错
b. DStreamGraph表达了依赖关系,恢复状态的时候需要根据DStream恢复计算逻辑级别的依赖关系。通过checkpoint方式容错
c. JobGenerator表面是基于ReceiverBlockTracker中的数据,以及DStream构成的依赖关系不断的产生Job的过程。也可以这么理解这个过程中消费了那些数据,并且跟踪进行到了一个怎样的程度
具体分析如下图:
最后我们可以这样总结道:
a. ReceivedBlockTracker是通过WAL方式来进行数据容错的。
b. DStreamGraph和JobGenerator是通过checkpoint方式来进行数据容错的。
开讲
本讲我们讲Spark Streaming中一个非常重要的内容:State状态管理
a. 为了说明state状态管理,拿两个非常具体非常有价值的方法updateStateByKey和mapWithState这两个方法来说明sparkstreaming是如何实现对state状态管理的。Sparkstreaming是按照batchduration划分job的,但是有时我们想算过去一个小时或者过去一天的数据,在大于batchduration的时候对数据进行符合业务逻辑的操作,这时候不可避免的要进行状态维护。Sparkstreaming每个batchduration都会产生一个job,job里面都是RDD,所以现在面临的一个问题就是,他每个batchduration产生RDD,怎么对他的状态进行维护的问题(像updateStateByKey)。例如计算一天的商品的点击量,这时候就需要类似于updateStateByKey或者mapWithState这样的方法帮助完成核心的步骤
b. Spark 的状态管理其实有很多函数,比较典型的有类似的UpdateStateByKey、MapWithState方法来完成核心的步骤
updateStateByKey和mapWithState这两个方法在DStream中并不能找到。因为updateStateByKey和mapWithState这两个方法都是针对key-value类型的数据进行操作,也就是pair类型的,和前边讲RDD是一样的,RDD这个类本事并不会对key-value类型的数据进行操作,所以这时候就需要借助scala的语法隐式转换。隐式转换一般放在类的伴生对象中,将DStream转换成PairDStreamFunctions。这是从地狱中召唤出来的功能,使用后又回到地狱。运行机制就是找不到DStream的updateStateByKey和mapWithState,他们是PairDStreamFunctions的方法,就找隐式转换,隐式转换中发现toPairDStreamFunctions这个功能,就使用了implicit功能
继续跟踪PairDStreamFunctions类中有次方法定义
updateStateByKey
在PairDStreamFunctions中updateStateByKey具体实现如下
在已有的历史基础上,updateFunc对历史数据进行更新。该函数的返回值是DStream类型的。
现在解读一下updateStateByKey方法,updateStateByKey是在已有状态基础上采用updateFunc对历史数据进行更新,具体怎么更新就是updateFunc这样的函数进行操作的,返回一个DStream
这里采用的是defaultPartitioner,不论是基于状态的计算还是基于batchduration的计算都是基于RDD的,RDD这里面肯定需要Partitioner,默认采用的是HashPartitioner,HashPartitioner其实就对应着Hash的shuffle或者Hash的计算方式。Hash的特点就是效率高,不需要进行排序等
(spark sql如果操作hive数据仓库中表的时候,假设自己设制了一个并行度,对spark sql on hive是否会生效?也就是分多少partition会不会受自定义分片的控制?:不会,这是spark sql特殊的一个地方,不会的时候有时候就会导致一种情况就是并行度太低,因为RDD的操作是后面的RDD继承自前面的RDD的partition,前面并行度太低就会影响后面的计算,影响GC等等。这时候就需要repartition而不是使用collase的方式)
rememberPartitioner前面传的是true,所以Partitioner都会传下去。new StateDStream
继续跟踪StateDStream,继承了DStream,如果对状态不断的操作就会产生很多的StateDStream状态对象
这里进行persist是在磁盘上,super.persist(StorageLevel.MEMORY_ONLY_SER),因为计算的是过去很长时间的数据,数据可能会很多。核心的代码是val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner),cogroup就是按照key对value进行聚合,按照key对所有数据进行扫描然后聚合,这样做好处是对rdd的计算;
不好的地方就是性能,cogroup对所有数据进行扫描,随着时间流逝数据规模越来越大性能越低,cogroup rdd和另一个cogroup rdd数据进行扫描合并
所以,如果数据很多的时候不建议使用updateStateByKey
updateStateByKey函数实现如下:
mapWithState
下面来看mapWithState,返回一个MapWithStateDStream,使用一个函数不断作用于这个DStream中key-value的元素,基于key进行状态维护和更新。mapWithState这个方法参数是StateSpec,这里边封装了对数据操作的函数。对State可以理解为是一个内存数据表,这个表中记录了所有的历史状态,可以对表中不同key的数据进行操作。更新的时候根据key在state的基础上更新对应的value。就相当于对一个表进行增删改查
StateSpecImpl是一个case class,里面封装了对数据操作的function
走进 MapWithStateDStream源码
可以看到MapWithStateDStreamImpl这个类的计算交给了内部一个类InternalMapWithStateDStream。这个类是对数据进行更新
InternalMapWithStateDStream的compute方法创建了一个MapWithStateRDD,MapWithStateRDD中包含了mapWithState的数据,以及对数据怎么操作。这个RDD的每一个partition被一个MapWithStateRDDRecord代表的
MapWithStateRDDRecord有对数据的具体更新,这里边有两个关键的数据结构mappedData 和wrappedState 。newStateMap 是对以前RDD数据进行复制。每次更新数据的时候是对当前barchduration的数据进行遍历,而不是像updateStateByKey一样要遍历所有的数据
dataIterator.foreach dataIterator是当前barchduration的数据。每次遍历的时候会先记录以往数据中相同key的state,再根据指定的函数对遍历的数据进行操作,然后把操作后的状态保存。没有对历史数据进行从新遍历。这个效率会高很多。返回的还是MapWithStateRDDRecord ,从RDD的角度看
MapWithStateRDDRecord 并没有变,但是它内部变了。相当于RDD原先指向一条数据,这条数据不能修改。现在RDD也是指向一条数据,只是这个数据中又封装了数据。可以改变封装的数据,从RDD角度看,RDD指向的数据并没有变。所以RDD可以处理变化的数据。只是要自定义一个RDD指向的数据结构
MapWithState实现如下:
最后我们附上代码执行流程图:
(来源:http://blog.csdn.net/hanburgud/article/details/51545414,感谢作者)