SparkStreaming采用直连方式(Direct Approach)获取Kafka数据的研究心得

时间:2021-07-21 00:54:57

这里不多废话介绍一些基础的,只说说我最近研究直连方式的心得


使用SparkStreaming的直连方式来进行流式处理,并且这个程序要可靠性,并且具有一致性(原子性),那么我心中就产生了如下的疑问:

1、如何保证直连方式中Kafka的offset的精准度

2、如何保证StreamingContext信息的不丢失:即重启后,集群配置信息、计算处理过程中信息的不丢失

3、如何保证Driver挂掉后信息的不丢失:即Driver挂掉(单节点)重启后,集群能继续从挂掉前的状态继续执行

4、如何保证中间过程数据的不丢失:中间需要缓冲数据之类的,经典的就是流处理过程中有状态的操作中数据的保存

5、如果要修改一下程序,如何停止SparkStreaming程序


于是我翻遍了各个论坛和Spark官网终于找到了心中的答案:

1、offset的精准度:如果要求不是特别高的话,程序通过checkpoint会自动跟踪offset,官网中有一句话验证了这个观点:
“we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. ”

但是,如果出现程序在处理过程中被强制杀掉等等情况,那么程序自动记录则不能完全保证数据执行的exactly one

这时,我们可以通过获取到每批数据的offset,然后手动保存到Zookeeper中,然后当出现任何情况导致重启时,用Zookeeper中的offset作为Kafka的消费起点,这样就能保证exactly one,以此来保证整个处理过程中的事务性;

通过查看Zookeeper中的offset还能对程序执行情况进行监控,一举两得!

官网获取offset的示例代码:

 // Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()

directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}

2、StreamingContext信息或Driver挂掉后信息的不丢失:可以通过checkpoint得到保证,即启动程序时通过对checkpoint getOrElse的方式获取ssc

官网简单示例代码:

val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(ip, port, outputPath, checkpointDirectory))

点击查看完整代码


3、中间过程数据的不丢失:对于这点需要说明的是DStream是可以进行checkpoint的


4、停止Spark程序:可以用注册钩子的形式来停止程序
详情请看:https://www.iteblog.com/archives/1890.html


研究到这里,产生了个疑问,就是 Spark Streaming在其检查点内是如何跟踪偏移量的?