使用spark streaming 处理kafka数据,有时候程序出现异常,或者需要修改程序再次运行,就可能会造成这样的情况:
- kafka中的数据读取出来了,zookeeper中已经保存了读取的offset,但是数据处理出了异常,那修改程序后再次运行就不会再处理这部分数据了。
- 原有的程序需要修改后再运行,kill掉之后再运行,这时可能kafka的offset还没有提交到zookeeper,修改程序后再次运行会有部分数据重复处理。
由于上面这些问题,所以希望可以自己人工地管理kafka中数据读取的状态。
原来是使用下面的方式创建kafka流:
kvs = KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:2182/kafka", group_id, {topic:1},{'auto.offset.reset':'smallest'})
这种方式是使用zookeeper来管理kafka中topic中每个partition读取的offset。
上网搜了一些资料,发现可以使用createDirectStream来自定义设置读取的offset:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
ssc = StreamingContext(sc, 30)
topic = "test"
partition = 0
start = 0
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 'xxx.xxx.xxx.xxx:9092'}, fromOffsets=fromOffset)
其中,fromOffset保存topic partition对应的offset的信息,例子中test这个topic只有一个partition,设置从offset=0的位置开始读。
在数据处理完之后,希望可以保存数据读取位置的状态:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
print rdd.count()
for o in offsetRanges:
print "__________________________________"
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
print "__________________________________"
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
例子中的storeOffsetRanges函数将数据读取的偏移信息保存在了offsetRanges中,在printOffsetRanges中打印出了rdd的topic、partition、读取数据最小的offset,最大的offset。
这样我们也可以将这些信息保存到mysql、mongodb等,在以后需要重启spark streaming任务的时候,就可以从数据库中保存的上次的offset开始读取数据。
github:https://github.com/Stratio/spark-kafka