org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:775) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:726)
a) CommitFailedException
b) Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
其实如果我们对其中的参数,或是对消费的机制比较了解,这个问题就很好解决。当我看到这个异常,我很开心,因为我知道我能通过此异常了解一下Kafka Consumer 消费消息的大致过程。心态是好的~~~
主要问题在于,何为session timeout?maximum size of batches?poll(timeout)中timeout什么意思?
a) 找官网doc
通过上边的例子,我们大致清楚了max.poll.interval.ms?maximum size of batches?
maximum size of batches:消费者每次获取消息的个数
Parameters: timeout - The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.
public class ProducerTest { @Test public void TestPro() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 30; i++) producer.send(new ProducerRecord<String, String>("user_behavior", Integer.toString(i), "hello-"+i)); producer.close(); } }
public class ConsumerTest { @Test public void TestCon() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("max.poll.records", 5); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("user_behavior")); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(3000); System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis())); for (ConsumerRecord<String, String> record : records) { System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n", KafkaHelper.timestmp2date(record.timestamp()), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); } } }
polls out: 1time: 2018-06-13 15:25:19 polls out: 2time: 2018-06-13 15:25:22 polls out: 3time: 2018-06-13 15:25:25 polls out: 4time: 2018-06-13 15:25:28
polls out: 1time: 2018-06-13 15:27:40 polls out: 2time: 2018-06-13 15:27:43 polls out: 3time: 2018-06-13 15:27:46 polls out: 4time: 2018-06-13 15:27:49 polls out: 5time: 2018-06-13 15:27:52 time = 2018-06-13 15:27:52, partition = 0, offset = 503, key = 1, value = hello-1 time = 2018-06-13 15:27:52, partition = 0, offset = 504, key = 5, value = hello-5 polls out: 6time: 2018-06-13 15:27:52 time = 2018-06-13 15:27:52, partition = 1, offset = 157, key = 4, value = hello-4 time = 2018-06-13 15:27:52, partition = 2, offset = 129, key = 0, value = hello-0 time = 2018-06-13 15:27:52, partition = 2, offset = 130, key = 2, value = hello-2 time = 2018-06-13 15:27:52, partition = 2, offset = 131, key = 3, value = hello-3 polls out: 7time: 2018-06-13 15:27:55 polls out: 8time: 2018-06-13 15:27:58 polls out: 9time: 2018-06-13 15:28:01 polls out: 10time: 2018-06-13 15:28:04
至此,结合官网的描述对poll(timeout) 的timeout参数认识如下
public class ConsumerTest { @Test public void TestCon() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("max.poll.records", 5); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("user_behavior")); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(3000); System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis())); for (ConsumerRecord<String, String> record : records) { System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n", KafkaHelper.timestmp2date(record.timestamp()), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); } } }
polls out: 1time: 2018-06-13 15:53:07 polls out: 2time: 2018-06-13 15:53:07 time = 2018-06-13 15:53:07, partition = 1, offset = 158, key = 4, value = hello-4 time = 2018-06-13 15:53:07, partition = 0, offset = 505, key = 1, value = hello-1 time = 2018-06-13 15:53:07, partition = 0, offset = 506, key = 5, value = hello-5 time = 2018-06-13 15:53:07, partition = 2, offset = 132, key = 0, value = hello-0 time = 2018-06-13 15:53:07, partition = 2, offset = 133, key = 2, value = hello-2 polls out: 3time: 2018-06-13 15:53:07 time = 2018-06-13 15:53:07, partition = 2, offset = 134, key = 3, value = hello-3 polls out: 4time: 2018-06-13 15:53:10 polls out: 5time: 2018-06-13 15:53:13
想到异常里提到的处理消息时间过长(spending too much time message processing)
public class ConsumerTest { @Test public void TestCon() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("max.poll.records", 5); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("user_behavior")); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(3000); System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis())); for (ConsumerRecord<String, String> record : records) { System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n", KafkaHelper.timestmp2date(record.timestamp()), record.partition(), record.offset(), record.key(), record.value()); TimeUnit.SECONDS.sleep(2); } consumer.commitSync(); } } }
polls out: 1time: 2018-06-13 15:59:13 polls out: 2time: 2018-06-13 15:59:16 polls out: 3time: 2018-06-13 15:59:19 polls out: 4time: 2018-06-13 15:59:22 polls out: 5time: 2018-06-13 15:59:22 time = 2018-06-13 15:59:22, partition = 2, offset = 135, key = 0, value = hello-0 time = 2018-06-13 15:59:22, partition = 2, offset = 136, key = 2, value = hello-2 time = 2018-06-13 15:59:22, partition = 2, offset = 137, key = 3, value = hello-3 org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ...
Ps: 代码中包含<strong>标签是因为我想在更改出加粗,生成后就多了<strong>标签了。