spark streaming 自定义kafka读取topic的offset(python)

时间:2022-06-19 00:27:07

使用spark streaming 处理kafka数据,有时候程序出现异常,或者需要修改程序再次运行,就可能会造成这样的情况:

  1. kafka中的数据读取出来了,zookeeper中已经保存了读取的offset,但是数据处理出了异常,那修改程序后再次运行就不会再处理这部分数据了。
  2. 原有的程序需要修改后再运行,kill掉之后再运行,这时可能kafka的offset还没有提交到zookeeper,修改程序后再次运行会有部分数据重复处理。

由于上面这些问题,所以希望可以自己人工地管理kafka中数据读取的状态。

原来是使用下面的方式创建kafka流:

kvs = KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:2182/kafka", group_id, {topic:1},{'auto.offset.reset':'smallest'})


这种方式是使用zookeeper来管理kafka中topic中每个partition读取的offset。

上网搜了一些资料,发现可以使用createDirectStream来自定义设置读取的offset:


from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition


spark_conf = SparkConf()

sc = SparkContext(conf=spark_conf)
ssc = StreamingContext(sc, 30)

topic = "test"
partition = 0
start = 0

topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 'xxx.xxx.xxx.xxx:9092'}, fromOffsets=fromOffset)

其中,fromOffset保存topic partition对应的offset的信息,例子中test这个topic只有一个partition,设置从offset=0的位置开始读。


在数据处理完之后,希望可以保存数据读取位置的状态:

offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd

def printOffsetRanges(rdd):
print rdd.count()
for o in offsetRanges:
print "__________________________________"
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
print "__________________________________"


directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)

例子中的storeOffsetRanges函数将数据读取的偏移信息保存在了offsetRanges中,在printOffsetRanges中打印出了rdd的topic、partition、读取数据最小的offset,最大的offset。

这样我们也可以将这些信息保存到mysql、mongodb等,在以后需要重启spark streaming任务的时候,就可以从数据库中保存的上次的offset开始读取数据。


api:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils

github:https://github.com/Stratio/spark-kafka

*: http://*.com/questions/33268689/how-to-create-inputdstream-with-offsets-in-pyspark-using-kafkautils-createdirec