kafka只消费新消息

时间:2021-01-31 20:49:31

My spark streaming job is consuming data from Kafka

我的火花流工作正在消耗卡夫卡的数据

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
                        prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);

whenever i restart my job it start consuming from last offset store (i am assuming this because it takes a lot of time to send processed data and if i change the consumer group it works instantly with new message)

每当我重新开始工作时,它就开始从最后一个偏移存储中消耗(我假设这是因为它需要花费大量时间来发送已处理的数据,如果我更改了消费者组,它会立即使用新消息)

I am kafka 8.1.1 where auto.offset.reset is default to largest which means whenever i'll restart kafka will send data from where i left.

我是kafka 8.1.1其中auto.offset.reset默认为最大,这意味着每当我重新启动kafka将从我离开的地方发送数据。

My use case ask me to ignore this data and process only arriving data. How can i achieve this? any suggestion

我的用例要求我忽略这些数据并仅处理到达的数据。我怎样才能实现这一目标?任何建议

1 个解决方案

#1


There is two ways you can achieve this:

有两种方法可以实现这一目标:

  1. Create a unique consumer group each time on restart and it will consume from the latest offset.

    每次重新启动时创建一个唯一的使用者组,它将使用最新的偏移量。

  2. Use the direct approach instead of receiver based; here you have more control over how you consume but would have to update zookeeper manually to store your offsets. In the example below it will always start at latest offset.

    使用直接方法而不是基于接收器;在这里,您可以更好地控制使用方式,但必须手动更新zookeeper以存储偏移量。在下面的示例中,它始终以最新的偏移量开始。

    import org.apache.spark.streaming.kafka._
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    

Documentation on direct approach here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html

有关直接方法的文档:https://spark.apache.org/docs/latest/streaming-kafka-integration.html

#1


There is two ways you can achieve this:

有两种方法可以实现这一目标:

  1. Create a unique consumer group each time on restart and it will consume from the latest offset.

    每次重新启动时创建一个唯一的使用者组,它将使用最新的偏移量。

  2. Use the direct approach instead of receiver based; here you have more control over how you consume but would have to update zookeeper manually to store your offsets. In the example below it will always start at latest offset.

    使用直接方法而不是基于接收器;在这里,您可以更好地控制使用方式,但必须手动更新zookeeper以存储偏移量。在下面的示例中,它始终以最新的偏移量开始。

    import org.apache.spark.streaming.kafka._
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    

Documentation on direct approach here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html

有关直接方法的文档:https://spark.apache.org/docs/latest/streaming-kafka-integration.html