Spark streaming 跟踪kafka offset的问题研究

时间:2021-03-10 20:50:44

背景:

spark streaming 读取kafka消息的offset,有三种方式:

 Spark streaming 跟踪kafka offset的问题研究

设置为Smallest,会导致spark进程每次重启后都从最开始读取消息。导致大量消息重复消费

设置为largest(默认),会导致spark进程重启后,从当前最新的消息开始读取。导致历史的消息丢失。---现在项目遇到的问题

即默认方式,无论是at most onceat least once,都很极端。

需要解决的问题是:

spark进程重启后,从上次消费的offset位置,继续消费。“尽可能的实现exactly once语义

 

方案:

使用kafka low-level api 手工保存offset。尝试以下两种方案:

 

1、 保存到zookeeper

2、 保存到kafka内部的特殊topic__consumer_offsets

 

方式1,经过测试,读写zookeeper效率很差。这个经过我在OCRM中测试,效率下降非常明显。Consumer的效率下降约70%左右,基本无法使用。

Kafka官方也已经声明:不建议使用zookeeper保存。

使用方式2,采用low-level api保存至kafka内部topic__consumer_offsets

编写程序如下:

object KafkaOffsetManage {


    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("TestOffset")
        if (args.length == 0) {
            conf.setMaster("local[*]")
        }
        val ssc = new StreamingContext(conf, Seconds(5))
        val topic = "TEST"
        val groupId = "TEST_GROUP"

        // Kafka configurations
        val topics = topic.split("\\,").toSet

        val brokers = "1.2.3.4:9092"
        val kafkaParams = Map[String, String](
            "metadata.broker.list" -> brokers,
            "serializer.class" -> "kafka.serializer.StringEncoder",
            "group.id" -> groupId,
            "offsets.storage" -> "kafka"
        )


        //create simple consumer
        val simpleConsumer = new SimpleConsumer("1.2.3.4", 9092, 1000000, 64 * 1024, "test-client")
        val topicList = List(topic)

         //1、查询topic元数据
        val topicReq = new TopicMetadataRequest(topicList, 0)
        // low level api interface
        val res = simpleConsumer.send(topicReq)
        //TopicMetadataRequest topic broker partition
        val topicMetaOption = res.topicsMetadata.headOption

        val topicAndPartition: Seq[TopicAndPartition] = topicMetaOption match {
            case Some(tm) => tm.partitionsMetadata.map(pm => TopicAndPartition("hhf2", pm.partitionId))
            case None => Seq[TopicAndPartition]()
        }

        //2、获取每个TopicAndPartition的offset信息
        val fetchRequest = OffsetFetchRequest(groupId, topicAndPartition)
        val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)

        //3、和earliestTime的offset作比较,防止Offset OutOfRange异常
        val fromOffsets: Map[TopicAndPartition, Long] = fetchResponse.requestInfo.map(v => {
            val tp = v._1
            var offset = v._2.offset
            val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) // -2,1

            val curOffsets = simpleConsumer.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
            println(s"curOffsets:${curOffsets}")

            if (curOffsets.length > 0 && offset < curOffsets.head) {
                offset = curOffsets.head
            }
            if (offset == -1) {
                offset = 0
            }
            (tp -> offset)
        })

        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())

        //4、从fromOffset位置处读取流
        val kafkaStream = KafkaUtils
            .createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

        kafkaStream.foreachRDD { rdd => {
            rdd.foreach(line => println("*********" + line))

            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

            for (o <- offsetRanges) {
                val topicAndPartition = TopicAndPartition(topic, o.partition)
                val commitRequest = OffsetCommitRequest(groupId, immutable.Map(topicAndPartition -> OffsetAndMetadata(o.fromOffset)))
                //5、保存当前RDD的fromOffset至kafka
                val commitResponse = simpleConsumer.commitOffsets(commitRequest)
            }
        }
        }
        ssc.start()
        ssc.awaitTermination()
    }
}

验证:

#读取__consumer_offsets中的offset,--partition 取值为Math.abs(s"${group.id}".hashCode())%50

运行kafka命令行

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 28 --broker-list 22.11.143.1:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

停止spark进程,producer进程保持继续写数据,等待一段时间后,再次启动spark进程,发现可以从上次的offset位置继续开始消费。不会丢失消息。

问题:

最后一个RDD可能存在重复消费(因为保存的是每个RDDfromOffset,如0->100,保存0。进程重启后,从0开始消费,因此最差情况是rdd’s size个消息被重复消费)。

即仍然是at least once语义,但是比sparksmallest(每次都从0开始读)要强很多

如果要实现exactly once的语义,需要将保存offset和消息处理放在一个事务中进行管理。

反观storm,由于是逐笔消费,所以对offset的处理应该简单很多,storm默认就已经实现了exactly once的语义。而sparkbatch 模式,所以在对offset的处理上要复杂很多。


参考资料:

《streaming-kafka-0-8-integration》

《streaming-kafka-0-10-integration》

《kafka_2.10-0.10.2.0-site-docs\site-docs\implementation.html》

kafka_2.10-0.10.2.0\site-docs\kafka_2.10-0.10.2.0-site-docs\kafka_2.10-0.10.2.0-site-docs\site-docs\configuration.html