背景:
spark streaming 读取kafka消息的offset,有三种方式:
设置为Smallest,会导致spark进程每次重启后都从最开始读取消息。导致大量消息重复消费
设置为largest(默认),会导致spark进程重启后,从当前最新的消息开始读取。导致历史的消息丢失。---现在项目遇到的问题
即默认方式,无论是at most once和at least once,都很极端。
需要解决的问题是:
当spark进程重启后,从上次消费的offset位置,继续消费。“尽可能的实现exactly once语义”
方案:
使用kafka low-level api 手工保存offset。尝试以下两种方案:
1、 保存到zookeeper上
2、 保存到kafka内部的特殊topic:__consumer_offsets中
方式1,经过测试,读写zookeeper效率很差。这个经过我在OCRM中测试,效率下降非常明显。Consumer的效率下降约70%左右,基本无法使用。
Kafka官方也已经声明:不建议使用zookeeper保存。
使用方式2,采用low-level api保存至kafka内部topic:__consumer_offsets
编写程序如下:
object KafkaOffsetManage {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestOffset")
if (args.length == 0) {
conf.setMaster("local[*]")
}
val ssc = new StreamingContext(conf, Seconds(5))
val topic = "TEST"
val groupId = "TEST_GROUP"
// Kafka configurations
val topics = topic.split("\\,").toSet
val brokers = "1.2.3.4:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder",
"group.id" -> groupId,
"offsets.storage" -> "kafka"
)
//create simple consumer
val simpleConsumer = new SimpleConsumer("1.2.3.4", 9092, 1000000, 64 * 1024, "test-client")
val topicList = List(topic)
//1、查询topic元数据
val topicReq = new TopicMetadataRequest(topicList, 0)
// low level api interface
val res = simpleConsumer.send(topicReq)
//TopicMetadataRequest topic broker partition
val topicMetaOption = res.topicsMetadata.headOption
val topicAndPartition: Seq[TopicAndPartition] = topicMetaOption match {
case Some(tm) => tm.partitionsMetadata.map(pm => TopicAndPartition("hhf2", pm.partitionId))
case None => Seq[TopicAndPartition]()
}
//2、获取每个TopicAndPartition的offset信息
val fetchRequest = OffsetFetchRequest(groupId, topicAndPartition)
val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
//3、和earliestTime的offset作比较,防止Offset OutOfRange异常
val fromOffsets: Map[TopicAndPartition, Long] = fetchResponse.requestInfo.map(v => {
val tp = v._1
var offset = v._2.offset
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) // -2,1
val curOffsets = simpleConsumer.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
println(s"curOffsets:${curOffsets}")
if (curOffsets.length > 0 && offset < curOffsets.head) {
offset = curOffsets.head
}
if (offset == -1) {
offset = 0
}
(tp -> offset)
})
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
//4、从fromOffset位置处读取流
val kafkaStream = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
kafkaStream.foreachRDD { rdd => {
rdd.foreach(line => println("*********" + line))
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
val topicAndPartition = TopicAndPartition(topic, o.partition)
val commitRequest = OffsetCommitRequest(groupId, immutable.Map(topicAndPartition -> OffsetAndMetadata(o.fromOffset)))
//5、保存当前RDD的fromOffset至kafka
val commitResponse = simpleConsumer.commitOffsets(commitRequest)
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
验证:
#读取__consumer_offsets中的offset,--partition 取值为Math.abs(s"${group.id}".hashCode())%50
运行kafka命令行
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 28 --broker-list 22.11.143.1:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
停止spark进程,producer进程保持继续写数据,等待一段时间后,再次启动spark进程,发现可以从上次的offset位置继续开始消费。不会丢失消息。
问题:
最后一个RDD可能存在重复消费(因为保存的是每个RDD的fromOffset,如0->100,保存0。进程重启后,从0开始消费,因此最差情况是rdd’s size个消息被重复消费)。
即仍然是at least once语义,但是比spark的smallest(每次都从0开始读)要强很多
如果要实现exactly once的语义,需要将保存offset和消息处理放在一个事务中进行管理。
反观storm,由于是逐笔消费,所以对offset的处理应该简单很多,storm默认就已经实现了exactly once的语义。而spark是batch 模式,所以在对offset的处理上要复杂很多。
参考资料:
《streaming-kafka-0-8-integration》
《streaming-kafka-0-10-integration》
《kafka_2.10-0.10.2.0-site-docs\site-docs\implementation.html》
《kafka_2.10-0.10.2.0\site-docs\kafka_2.10-0.10.2.0-site-docs\kafka_2.10-0.10.2.0-site-docs\site-docs\configuration.html》