Kafka消费者

时间:2023-03-30 08:54:14

KafkaConsumer 的概念

消费者 & 消费者群组

消费者读取消息。在其他基于发布与订阅的消息系统中,消费者可能被称为订阅者 或 读者。

消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。

偏移量是一种元数据,它是一个不断递增的整数值,在创建消息时, Kafka 会把偏移量添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。


消费者群组

消费者是消费者群组的一部分。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题的一部分分区的消息。消费者群组保证每个分区只能被一个消费者使用 。消费者与分区之间的映射通常被称为消费者对分区的所有权关系。

通过消费者群组的方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,消费者群组里的其他消费者可以接管失效消费者的工作。

往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。

Kafka消费者

分区再均衡

当一个消费者被关闭或发生崩溃时,这个消费者就离开群组,原本由它读取的分区将由消费者群组里的其他消费者来读取。

当一个新的消费者加入消费者群组时,这个新的消费者读取的是原本由其他消费者读取的消息。

在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。分区再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生分区再均衡。原因如下:

  • 在分区再均衡期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。
  • 另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

分区再均衡的过程

消费者通过向被指派为群组协调器的 broker(不同的消费者群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。

  • 只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。
  • 如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为消费者已经死亡,就会触发一次分区再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认消费者已经死亡了才会触发分区再均衡。在清理消费者时,消费者会通知群组协调器它自己将要离开消费者群组,群组协调器会立即触发一次分区再均衡,尽量降低处理停顿。


分配分区的过程:

  • 当消费者要加入消费者群组时,消费者会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。

  • 群主从群组协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接口的类来决定哪些分区应该被分配给哪个消费者,Kafka 内置了两种分区分配策略。

  • 群主将分区分配完毕之后,群主把分区的分配情况列表发送给群组协调器,群组协调器再把这些信息发送给所有消费者。

每个消费者只能看到自己的分区分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次分区再均衡时重复发生。消费者群组的群主应该保证在分配分区时,尽可能少的改变原有的分区和消费者的映射关系。

订阅主题 & 轮询

应用程序使用 KafkaConsumer 向 Kafka 订阅主题,并从订阅的主题上接收消息。

应用程序调用 kafkaConsumer 的 subscribe() 方法订阅主题:

  • 我们可以在调用 subscribe() 方法时传入一个主题列表作为参数。
  • 我们也可以在调用 subscribe() 方法时传入一个正则表达式,正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次分区再均衡,消费者就可以读取新添加的主题了。

轮询

消费者通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括消费者群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的 API 来处理从分区返回的数据。

轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。如果发生了分区再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。

提交 & 偏移量

我们把更新分区当前位置的操作叫作提交。那么消费者是如何提交偏移量的呢?消费者往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。

如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发分区再均衡,完成分区再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

  • 如果消费者提交的偏移量 小于 客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理
  • 如果消费者提交的偏移量 大于 客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

所以,处理偏移量的方式对客户端会有很大的影响。KafkaConsumer API 提供了很多种方式来提交偏移量:自动提交偏移量、手动提交偏移量。

自动提交

如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交的时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。

与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否应该提交偏移量了,如果距离上次的提交时间已经超过了配置参数 auto.commit.interval.ms 指定的值,那么就会提交上一次轮询返回的偏移量。

在调用 close() 方法之前也会进行自动提交。


让消费者自动提交偏移量是最简单的方式。不过,在使用这种简便的方式之前,需要知道自动提交将会带来怎样的结果。

假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了分区再均衡,分区再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s,所以在这 3s 内消费者已经处理过的消息会再被重复处理。我们可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗口,不过这种情况是无法完全避免的。

手动提交

手动提交指的是,把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。应用程序可以使用 commitSync()、commitAsync() 方法手动提交偏移量

  • commitSync 同步提交偏移量:手动提交偏移量之后,同步等待 broker 响应。commitSync() 方法会提交由 poll() 方法返回的最新偏移量,只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败就抛出异常,我们也只能把异常记录到错误日志里。
  • commitAsync 异步提交偏移量:手动提交偏移量之后,不等待 broker 响应,而是在提交偏移量时指定一个回调方法,在 broker 作出响应时会执行这个回调方法。回调经常被用于记录提交错误或生成度量指标。在成功提交或碰到无怯恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会重试。

消费者也可以提交特定的偏移量:消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map,这样我们就可以提交特定的偏移量。需要使用期望处理的下一个消息的偏移量更新 map 里的偏移量。

异步提交:同步提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了分区再均衡,会增加重复消息的数量。这个时候我们可以使用异步提交,我们只管发送提交请求,无需等待 broker 的响应。

再均衡监听器

在【分区再均衡前后】、【消费者开始读取消息之前】、【消费者停止读取消息之后】我们可以通过消费者 API 执行一些应用程序代码,在调用 kafkaConsumer 的 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。

再均衡监听器 ConsumerRebalanceListener 有两个需要实现的方法。

  1. public void onPartitionsRevoked(Collection< TopicPartition > partitions):该方法会在【分区再均衡开始之前】和【消费者停止读取消息之后】被调用。我们可以在消费者失去分区所有权之前,通过 onPartitionsRevoked() 方法来提交偏移量。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取消息了。
  2. public void onPartitionsAssigned(Collection< TopicPartition > partitions):该方法会在【重新分配分区之后】和【消费者开始读取消息之前】被调用。我们可以在消费者获取分区所有权之后,通过 onPartitionsAssigned() 方法来指定读取消息的起始偏移量。保证消费者总是能够从正确的位置开始读取消息。

如何退出

如果消费者确定要退出循环,需要通过另一个线程调用 consumer.wakeup() 方法。

consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。

调用 consumer.wakeup() 可以退出 poll(),并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式。

独立消费者

我们可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和分区再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。

如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。

独立消费者除了不会发生分区再均衡,也不需要手动查找分区,其他的看起来一切正常。不过要记住,如果主题增加了新的分区,消费者并不会收到通知。所以,要么周期性地调用 consumer.partitionsFor() 方法来检查是否有新分区加入,要么在添加新分区后重启应用程序。

public void singleCustomer() {
    // 向集群请求主题可用的分区。如果只打算读取特定分区,可以跳过这一步
    List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
    ArrayList<TopicPartition> partitions = new ArrayList<>();

    if (partitionInfos != null) {
        for (PartitionInfo partition : partitionInfos) {
            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        // 为自己分配分区
        consumer.assign(partitions);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }
    }
}

消费者的示例代码

再均衡监听器

public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {

    private KafkaConsumer consumer;

    public void MyConsumerRebalanceListener(KafkaConsumer consumer) {
        this.consumer = consumer;
    }

    public static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

    /**
     * 在消费者失去分区所有权之前,提交偏移量
     *
     * @param partitions
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }

    /**
     * 在消费者获取分区所有权之后,指定读取消息的起始偏移量
     *
     * @param partitions
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        long offset = 0;
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, offset);
        }
    }
}

消费者读取消息

public void customer() {
    consumer.subscribe(Collections.singletonList("MyTopic"), new MyConsumerRebalanceListener(consumer));

    // 如果不需要手动指定消费者读取消息的起始偏移量,下面的代码不是必须的
    consumer.poll(0);
    long offset = 0;
    for (TopicPartition partition : consumer.assignment()) {
        consumer.seek(partition, offset);
    }

    try {
        while (true) {
            // 参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。
            // 如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量、消息以及消息键。
                System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ", record.topic(), record.partition(), record.offset(), record.key(), record.value());

                // 将记录保存到数据存储系统里
                System.out.println(record);

                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
            }

            // 如果一切正常,我们使用 commitAsync() 方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。
            // 使用 commitAsync() 方法只会执行一次提交,不会重试
            consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if (e != null) {
                        log.error("Commit failed for offsets {}", map, e);
                    }
                }
            });
        }
    } catch (WakeupException e) {
        // 我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式
    } catch (Exception e) {
        log.error("Unexpected error", e);
    } finally {
        try {
            // 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。
            consumer.commitSync();
        } finally {
            // 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭
            // 并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }
    }
}

参考资料

《Kafka 权威指南》第 4 章:Kafka 消费者——从 Kafka 读取数据