I'm using spark directStream api to read data from Kafka. My code as following please:
我使用spark directStream api从Kafka读取数据。我的代码如下:
val sparkConf = new SparkConf().setAppName("testdirectStreaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map[String, String](
"auto.offset.reset" -> "smallest",
"metadata.broker.list"->"10.0.0.11:9092",
"spark.streaming.kafka.maxRatePerPartition"->"100"
)
//I set all of the 3 partitions fromOffset are 0
var fromOffsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("mytopic",0) -> 0)
fromOffsets+=(TopicAndPartition("mytopic",1) -> 0)
fromOffsets+=(TopicAndPartition("mytopic",2) -> 0)
val kafkaData = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
ssc, kafkaParams, fromOffsets,(mmd: MessageAndMetadata[String, String]) => mmd)
var offsetRanges = Array[OffsetRange]()
kafkaData.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
_.message()
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"---${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.foreachPartition{ partitionOfRecords =>
partitionOfRecords.foreach { line =>
println("===============value:"+line)
}
}
}
I'm sure there are data in the kafka cluster, but my code could not get any of them. Thanks in advance.
我确信在kafka集群中有数据,但是我的代码无法获得其中任何一个。提前谢谢。
1 个解决方案
#1
3
I found the reason: The old messages in kafka have already been deleted since the retention period expired. So when I set the fromOffset is 0 it caused OutOfOffSet exception. The exception caused Spark reset the offset with the latest ones. Therefore I could not get any messages. The solution is that I need to set the appropriate fromOffset to avoid the Exception.
我找到了原因:自保留期结束以来,kafka中的旧消息已经被删除。当我将fromOffset设置为0时,会导致OutOfOffSet异常。该异常导致星火重置的偏移与最新的。因此我无法收到任何消息。解决方案是我需要设置适当的fromOffset以避免异常。
#1
3
I found the reason: The old messages in kafka have already been deleted since the retention period expired. So when I set the fromOffset is 0 it caused OutOfOffSet exception. The exception caused Spark reset the offset with the latest ones. Therefore I could not get any messages. The solution is that I need to set the appropriate fromOffset to avoid the Exception.
我找到了原因:自保留期结束以来,kafka中的旧消息已经被删除。当我将fromOffset设置为0时,会导致OutOfOffSet异常。该异常导致星火重置的偏移与最新的。因此我无法收到任何消息。解决方案是我需要设置适当的fromOffset以避免异常。