ConsumerRecord类详解

时间:2024-10-19 12:11:12
  • 从kafka接收的键/值对。这还包括一个主题名称和一个从中接收记录的分区号,一个指向Kafka分区中的记录的偏移量以及一个由相应ProducerRecord标记的时间戳。
  • 构造方法

ConsumerRecord( topic, int partition, long offset, K key, V value) 创建一条记录,以从指定的主题和分区接收(为与Kafka 0.9兼容,在消息格式支持时间戳和公开序列化的元数据之前提供了与记录的兼容性)。
ConsumerRecord( topic, int partition, long offset, long timestamp, timestampType, long checksum, int serializedKeySize, int serializedValueSize, K key, V value)
ConsumerRecord( topic, int partition, long offset, long timestamp, timestampType, checksum, int serializedKeySize, int serializedValueSize, K key, V value, Headers headers)
topic - 接收到该记录的主题
partition - 他是从中收到该记录的主题分区
offset - 该记录在相应的Kafka分区中的偏移量
key - 记录的键(如果存在)(允许为空)
value - 记录内容
timestamp - 记录的时间戳。
timestampType - 时间戳类型
checksum - 完整记录的校验和(CRC32)
serializedKeySize - 序列化密钥的长度
serializedValueSize - 序列化值的长度
headers - 记录的标题。

  • 方法概览Method Summary

long checksum() @Deprecated. 记录的校验和(CRC32)。从Kafka 0.11.0开始。由于可能在代理上进行消息格式转换,因此代理返回的校验和可能与生产者计算出的校验和不匹配。因此,依靠此校验和来获得端到端的交付保证是不安全的。此外,消息格式v2不包含记录级校验和(出于性能考虑,记录校验和已替换为批校验和)。为了保持兼容性,将返回从记录时间戳,序列化密钥大小和序列化值大小计算出的部分校验和,但是对于端到端可靠性,不应依赖于此。
public topic()
public int partition()
public Headers headers()
public K key()
public V value()
public long offset() 该记录在相应的Kafka分区中的位置。
public long timestamp()
public timestampType()
public int serializedKeySize()
public int serializedValueSize()
public toString()

ConsumerRecords<K,V>详解
  • 一个容器,其中包含特定主题的每个分区的ConsumerRecord列表。对于(long)操作返回的每个主题分区,都有一个ConsumerRecord列表。
  • 构造方法

ConsumerRecords(<TopicPartition,<ConsumerRecord<K,V>>> records)

  • 方法详解

public <ConsumerRecord<K,V>> records(TopicPartition partition) 仅获取给定分区的记录
public <ConsumerRecord<K,V>> records( topic) 仅获取给定主题的记录
public partitions() 获取具有该记录集中包含记录的分区。
public <ConsumerRecord<K,V>> iterator()
public int count() 所有主题的记录数
public boolean isEmpty()
public static <K,V> ConsumerRecords<K,V> empty()

OffsetAndMetadata
  • Kafka偏移提交API允许用户在提交偏移时提供其他元数据(以字符串形式)。例如,这可以用于存储有关哪个节点进行了提交,何时进行了提交等信息。
  • 构造方法
  • public OffsetAndMetadata(long offset)

构造一个新的OffsetAndMetadata对象,以通过KafkaConsumer进行提交。与提交关联的元数据将为空。
offset 要提交的偏移量

  • 方法

public long offset()
public metadata()

OffsetAndTimestamp
  • 偏移量和时间戳的容器类。
  • 构造方法
  • public OffsetAndTimestamp(long offset, long timestamp)
  • 方法

public long timestamp()
public long offset()

RangeAssignor
  • 参考:/u013256816/article/details/81123600 好好理解一下 n/m and n%m
  • public class RangeAssignor
    extends
  • 范围分配器基于每个主题工作。对于每个主题,我们以数字顺序排列可用分区,并以字典顺序排列使用者。然后,我们将分区数除以使用者总数,以确定分配给每个使用者的分区数。如果它没有均匀划分,那么前几个消费者将有一个额外的划分。
  • 例如,假设有两个使用者C0和C1,两个主题t0和t1,并且每个主题都有3个分区,从而得出分区t0p0,t0p1,t0p2,t1p0,t1p1和t1p2。分配为:

C0:[t0p0,t0p1,t1p0,t1p1]
C1:[t0p2,t1p2]

  • 嵌套类摘要
  • 从接口继承的嵌套类/接口
  • ,
  • 只有一个空的构造方法
  • 方法
  • public name()
  • public <,> assign(<,> partitionsPerTopic, <,> subscriptions)
RoundRobinAssignor
  • 参考:/u013256816/article/details/81123625,理解分配策略
  • 继承自AbstractPartitionAssignor
  • 循环分配器对所有可用分区和所有可用使用者进行布局。然后,它继续进行从分区到使用者的循环分配。
  • 如果所有使用者实例的订阅都相同,则分区将均匀分布。(即,分区所有权计数在所有使用者中的差值都在一个正整数之内。)
  • 例如,假设有两个使用者C0和C1,两个主题t0和t1,并且每个主题都有3个分区,从而得出分区t0p0,t0p1,t0p2,t1p0,t1p1和t1p2。分配为:C0:[t0p0,t0p2,t1p1] C1:[t0p1,t1p0,t1p2]
  • 当订阅者在各个消费者实例之间不同时,分配过程仍将以循环方式考虑每个消费者实例,但如果未订阅该主题,则跳过该实例。与订阅相同的情况不同,这可能导致分配不平衡。
  • 例如,我们有三个使用者C0,C1,C2和三个主题t0,t1,t2,分别具有1、2和3个分区。因此,分区是t0p0,t1p0,t1p1,t2p0,t2p1,t2p2。C0订阅了t0;C1订阅了t0,t1;C2订阅了t0,t1,t2。Tha分配为:C0:[t0p0] C1:[t1p0] C2:[t1p1,t2p0,t2p1,t2p2]
  • 只有一个空的构造方法
  • public <,> assign(<,> partitionsPerTopic, <,> subscriptions)
  • public allPartitionsSorted(<,> partitionsPerTopic,
    <,> subscriptions)
  • public name()
StickyAssignor
  • 参考:/u013256816/article/details/81123625
  • 继承自PartitionAssignor
  • 粘性分配器有两个作用。首先,它保证分配尽可能平衡,这意味着:
  • 分配给使用者的主题分区数量最多相差1个;要么
  • 每个使用者的主题分区比其他使用者少2个以上的使用者无法将任何这些主题分区转移给它。
  • 其次,当发生重新分配时,它会保留尽可能多的现有分配。当主题分区从一个使用者移动到另一个使用者时,这有助于节省一些开销处理。
  • 重新开始就可以通过将分区尽可能均匀地分配给消费者来工作。即使这听起来与循环分配器的工作方式相似,但下面的第二个示例表明事实并非如此。在重新分配期间,它将以这样的方式执行重新分配:

1.主题分区仍然尽可能均匀地分布,
2.并且主题分区尽可能地与其先前分配的使用者在一起。
当然,上述第一个目标优先于第二个目标。

  • 例如:1.假设有三个使用者C0,C1,C2,四个主题t0,t1,t2,t3,每个主题都有2个分区,从而得出分区t0p0,t0p1,t1p0,t1p1,t2p0,t2p1,t3p0,t3p1。每个消费者都订阅了这三个主题。粘性和循环分配器的分配为:
    在这里插入图片描述
    • 现在,假设C1被删除,并且即将发生重新分配。循环分配器(robin assignor)将产生:
      在这里插入图片描述
    • 而粘性分配器将导致:
      在这里插入图片描述
  • 保留所有先前的分配(与循环分配器不同)。
    例2:有三个使用者C0,C1,C2和三个主题t0,t1,t2,分别具有1、2和3个分区。因此,分区是t0p0,t1p0,t1p1,t2p0,t2p1,t2p2。C0订阅了t0;C1订阅了t0,t1;C2订阅了t0,t1,t2。循环分配器将提出以下分配:
    在这里插入图片描述
  • 它不如粘性分配器建议的分配平衡:
    在这里插入图片描述
  • 现在,如果删除消费者C0,则这两个分配器将产生以下分配。Round Robin (保留3个分区分配):
    在这里插入图片描述
  • 粘性(保留5个分区分配):
    在这里插入图片描述
  • 对ConsumerRebalanceListener的影响
  • 粘性分配策略可以为那些在onPartitionsRevoked()回调侦听器中具有一些分区清除代码的使用者提供一些优化。清理代码被放置在该回调侦听器中,因为使用范围或循环分配器时,使用者在重新平衡后没有任何保留或希望保留其分配的分区的假设或希望。侦听器代码如下所示:
class TheOldRebalanceListener implements ConsumerRebalanceListener {

   void onPartitionsRevoked(Collection<TopicPartition> partitions) {
     for (TopicPartition partition: partitions) {
       commitOffsets(partition);
       cleanupState(partition);
     }
   }

   void onPartitionsAssigned(Collection<TopicPartition> partitions) {
     for (TopicPartition partition: partitions) {
       initializeState(partition);
       initializeOffset(partition);
     }
   }
 }
  • 如上所述,粘性分配器的一个优点是,通常,它减少了在重新分配过程中实际从一个使用者转移到另一个使用者的分区的数量。因此,它允许消费者更有效地进行清理。当然,他们仍然可以在onPartitionsRevoked()侦听器中执行分区清理,但是它们可以提高效率,并在重新平衡之前和之后记下它们的分区,并在重新平衡之后仅对丢失的分区进行清理。(通常不是很多)。下面的代码片段阐明了这一点:
class TheNewRebalanceListener implements ConsumerRebalanceListener {
   Collection<TopicPartition> lastAssignment = ();

   void onPartitionsRevoked(Collection<TopicPartition> partitions) {
     for (TopicPartition partition: partitions)
       commitOffsets(partition);
   }

   void onPartitionsAssigned(Collection<TopicPartition> assignment) {
     for (TopicPartition partition: difference(lastAssignment, assignment))
       cleanupState(partition);

     for (TopicPartition partition: difference(assignment, lastAssignment))
       initializeState(partition);

     for (TopicPartition partition: assignment)
       initializeOffset(partition);

      = assignment;
   }
 }
  • 任何使用粘性分配的使用者都可以像这样利用此侦听器:(topics,new TheNewRebalanceListener());