apache spark流- kafka -阅读旧消息。

时间:2021-03-10 20:50:50

I am trying to read older messages from Kafka with spark streaming. However, I am only able to retrieve messages as they are sent in real time (i.e., if I populate new messages, while my spark program is running - then I get those messages).

我试着从卡夫卡上读一些旧的信息,用火花流。但是,我只能够在实时发送消息的情况下检索消息。如果我填充新消息,而我的spark程序正在运行,那么我将得到这些消息。

I am changing my groupID and consumerID to make sure zookeeper isn't just not giving messages it knows my program has seen before.

我正在改变我的groupID和consumerID,以确保zookeeper不会只是不给它知道我的程序以前见过的消息。

Assuming spark is seeing the offset in zookeeper as -1, shouldn't it read all the old messages in the queue? Am I just misunderstanding the way a kafka queue can be used? I'm very new to spark and kafka, so I can't rule out that I'm just misunderstanding something.

假设spark将zookeeper的偏移量视为-1,它不应该读取队列中的所有旧消息吗?我只是误解了卡夫卡队列的使用方式吗?我对spark和kafka非常陌生,所以我不能排除我只是误解了一些东西。

package com.kibblesandbits

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import net.liftweb.json._

object KafkaStreamingTest {

  val cfg = new ConfigLoader().load
  val zookeeperHost = cfg.zookeeper.host
  val zookeeperPort = cfg.zookeeper.port
  val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot

  implicit val formats = DefaultFormats 

  def parser(json: String): String = {
    return json
}

def main(args : Array[String]) {
  val zkQuorum = "test-spark02:9092"

  val group = "myGroup99"
  val topic = Map("testtopic" -> 1)
  val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
  val ssc = new StreamingContext(sparkContext, Seconds(3))
  val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
  var gp = json_stream.map(_._2).map(parser)

  gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
  ssc.start()
}

When running this, I will see the following message. So I am confident that it's not just not seeing the messages because the offset is set.

在运行这个过程时,我将看到以下消息。因此,我相信,这不仅仅是没有看到消息,因为偏移已经设置好了。

14/12/05 13:34:08 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] Added fetcher for partitions ArrayBuffer([[testtopic,0], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,1], initOffset -1 to broker i d:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] )

在ArrayBuffer([testtopic,0], initOffset -1到broker id:1,主机:test-spark02)中添加了fetcher。vpc,port:9092], [[testtopic,1], initOffset -1 to broker i d:1,host:test-spark02。vpc,port:9092], [[testtopic,2], initOffset -1到broker id:1,主机:test-spark02。vpc,port:9092], [[testtopic,3], initOffset -1到broker id:1,主机:test-spark02。vpc,port:9092], [[testtopic,4], initOffset -1到broker id:1,主机:test-spark02。vpc,端口:9092)

Then, if I populate 1000 new messages -- I can see those 1000 messages saved in my temp directory. But I don't know how to read the existing messages, which should number in the (at this point) tens of thousands.

然后,如果我填充1000个新消息——我可以看到在临时目录中保存的1000条消息。但我不知道如何读取已有的消息,这些信息应该在(此时)数以万计。

1 个解决方案

#1


8  

Use the alternative factory method on KafkaUtils that lets you provide a configuration to the Kafka consumer:

使用KafkaUtils上的替代工厂方法,您可以为Kafka客户提供配置:

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)]

Then build a map with your kafka configuration and add the parameter 'kafka.auto.offset.reset' set to 'smallest':

然后使用kafka配置构建一个映射,并添加参数“kafka.auto.offset”。重置”设置为“最小”:

val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000",
      "kafka.auto.offset.reset" -> "smallest"
)

Provide that config to the factory method above. "kafka.auto.offset.reset" -> "smallest" tells the consumer to starts from the smallest offset in your topic.

向上面的工厂方法提供配置。“kafka.auto.offset。重置“->”最小“告诉消费者从你的话题中最小的偏移量开始。

#1


8  

Use the alternative factory method on KafkaUtils that lets you provide a configuration to the Kafka consumer:

使用KafkaUtils上的替代工厂方法,您可以为Kafka客户提供配置:

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)]

Then build a map with your kafka configuration and add the parameter 'kafka.auto.offset.reset' set to 'smallest':

然后使用kafka配置构建一个映射,并添加参数“kafka.auto.offset”。重置”设置为“最小”:

val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000",
      "kafka.auto.offset.reset" -> "smallest"
)

Provide that config to the factory method above. "kafka.auto.offset.reset" -> "smallest" tells the consumer to starts from the smallest offset in your topic.

向上面的工厂方法提供配置。“kafka.auto.offset。重置“->”最小“告诉消费者从你的话题中最小的偏移量开始。