offset偏移量控制策略

时间:2025-02-21 13:32:16

Kafka 中的 offset(偏移量) 是指消费者在 Kafka 分区内读取消息的位置,表示消费者已经消费到哪个消息。在 Kafka 中,offset 控制策略对于确保消息的正确消费、处理顺序、容错能力和高可用性至关重要。Kafka 提供了多种方式来管理和控制消费者的 offset,以下是 Kafka 中常见的 offset 控制策略

1. 自动提交(Auto Commit) 

自动提交 是 Kafka 消费者最简单的 offset 管理方式。在这种模式下,消费者会定期将当前消费的 offset 自动提交到 Kafka 集群。在nodejs环境使用kafkajs时,自动提交无需做任何配置。

优点:

  • 简单易用,适用于一些不要求严格精确一次消费的场景。

缺点

  • 可能丢失消息:如果消费者在自动提交偏移量之前崩溃,可能会重复消费一些消息。
  • 无法精确控制:偏移量提交的时机和粒度不容易控制,可能不适合一些需要精确消费顺序的场景。

2. 手动提交(Manual Commit)

手动提交偏移量允许消费者精确控制何时提交偏移量。消费者可以在消息成功消费后显式地提交偏移量。在nodejs环境使用kafkajs实现方式。

await consumer.run({
  // 不进行自动提交
  autoCommit: false, 
  partitionsConsumedConcurrently: 0,
  eachMessage: async ({ topic, partition, message }) => {
    const [id, name, age, sex, desc] = message.value?.toString()?.split?.('@_@') || [];

    console.log(partition, { id, name, age, sex, desc });

    // 手动提交控制偏移量
    await consumer.commitOffsets([{ topic, partition, offset: String(count++) }]);
  },
});

优点: 

  • 精确控制:消费者可以在确认消息已正确处理后再提交偏移量,确保消息不被丢失或重复消费。
  • 避免不必要的重复消费:只有当消费者完成整个消费处理(例如事务处理或确认某些条件成立)后,才会提交偏移量。

缺点

  • 需要额外的开发工作:消费者需要显式地处理提交偏移量的逻辑,为了防读取的数据偏移量offset丢失也可将每个topic中读取到的partition分区上的数据偏移量offset保存在数据库中,手动提交控制offset增加了开发复杂性。

 3.Offset 重置(Seek) 

 Offset 重置允许手动调整消费者在分区中的消费位置,常用于数据回溯或修复异常。直接指定目标 Offset,从该位置开始消费。

// 跳转到分区 0 的 Offset 100
await consumer.seek({ 
  topic: 'topic1', 
  partition: 0, 
  offset: '100'
});

示例 

如果要增加消费者组中partition分区数量需要使用命令扩展,执行后分区变为4个,0,1,2,3。

kafka-topics.sh --alter --topic topic1 --partitions 4 --bootstrap-server localhost:9092

生产者向topic1发送消息,存储在partition分区0,发给消费者组group1和group2。(topic1创建好后默认只有一个分区)。

 producer.ts

import { CompressionTypes, Kafka } from 'kafkajs';


async function run() {
  const kafka = new Kafka({
    clientId: 'test1',
    brokers: ['localhost:9092'],
    connectionTimeout: 1000, // 1 秒连接超时
  });

  const producer = kafka.producer();

  await producer.connect();

  for (let i = 60; i <= 70; ++i) {
    let name = Math.random().toString().slice(2, -1);
    let age = Math.ceil(Math.random() * 40);
    let sex = Math.random() * 100 > 50 ? 1 : 0;
    let person = {
      id: i,
      name,
      age,
      sex,
      desc: `${i},我是${name},年龄${age},性别${sex}`,
    };

    await producer.send({
      topic: 'topic1',
      messages: [
        {
          // value: Buffer.from(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`) 
          // 默认会转成buffer
          value: `${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`,
          // key:'key1',
          // partition: 0,
        },
      ],
      // None = 0, 不处理
      // GZIP = 1, 压缩率较高,但可能会带来一些性能开销
      // Snappy = 2, 压缩和解压速度很快,压缩率不如gzip
      // LZ4 = 3, 压缩和解压速度很快,压缩率不如gzip
      // ZSTD = 4, 压缩比和压缩速度之间提供了较好的平衡
      compression: CompressionTypes.GZIP,
    });

    console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`);
  }

  await producer.disconnect();

  producer.on('producer.connect', (evt) => {

  });

  producer.on('producer.disconnect', (evt) => {

  });

  producer.on('producer.network.request', (evt) => {

  });

  producer.on('producer.network.request_queue_size', () => {

  });

  producer.on('producer.network.request_timeout', (evt) => {

  });
}

run();

consumer.ts

import { Kafka } from 'kafkajs'


const kafka = new Kafka({
  clientId: 'test1',
  brokers: ['localhost:9092'],
  connectionTimeout: 100, // 0.1 秒连接超时
  requestTimeout: 1000,
});

const consumer = kafka.consumer({
  groupId: 'group1',
  rackId: 'test1.group1.consumer1',
  maxBytes: 3 * 1024 * 1024,  // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
  sessionTimeout: 60000,      // 消费者心跳超时时间(默认 30s)
  rebalanceTimeout: 60000,   // rebalance 最大等待时间(默认 60s)
  heartbeatInterval: 6000,    // 心跳间隔(默认 3s)
  maxWaitTimeInMs: 500,       // 每次拉取消息等待的时间(默认0.5s)
});

await consumer.connect();

await consumer.subscribe({
  topic: 'topic1',
  fromBeginning: true, // 从头开始消费
});

let count = BigInt(0);
await consumer.run({
  autoCommit: false,  // 禁用自动提交
  partitionsConsumedConcurrently: 10, // 控制每次最多消费的分区数
  eachMessage: async ({ topic, partition, message }) => {
    const [id, name, age, sex, desc] = message.value?.toString()?.split?.('@_@') || [];
    console.log(partition, { id, name, age, sex, desc });
    await consumer.commitOffsets([{ topic, partition, offset: String(count++) }]);
  },
});

consumer.on('consumer.network.request', (evt) => {

});

consumer.on('consumer.connect', (evt) => {

});

consumer.on('consumer.disconnect', (evt) => {

});

// consumer.seek({ topic: 'topic1', partition: 0, offset: '0' });

consumer2.ts

import { Kafka } from 'kafkajs'


const kafka = new Kafka({
  clientId: 'test2',
  brokers: ['localhost:9092'],
  connectionTimeout: 100, // 0.1 秒连接超时
  requestTimeout: 1000,
});

const consumer = kafka.consumer({
  groupId: 'group2',
  rackId: 'test2.group2.consumer2',
  maxBytes: 3 * 1024 * 1024,  // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
  sessionTimeout: 60000,      // 消费者心跳超时时间(默认 30s)
  rebalanceTimeout: 60000,    // rebalance 最大等待时间(默认 60s)
  heartbeatInterval: 6000,    // 心跳间隔(默认 3s)
  maxWaitTimeInMs: 500,       // 每次拉取消息等待的时间(默认0.5s)
});

await consumer.connect();

await consumer.subscribe({
  topic: 'topic1',
  fromBeginning: true, // 从头开始消费
});

let count = BigInt(0);
await consumer.run({
  autoCommit: false,  // 禁用自动提交
  partitionsConsumedConcurrently: 10, // 控制每次最多消费的分区数
  eachMessage: async ({ topic, partition, message }) => {
    const [id, name, age, sex, desc] = message.value?.toString()?.split?.('@_@') || [];
    console.log(partition, { id, name, age, sex, desc });
    await consumer.commitOffsets([{ topic, partition, offset: String(count++) }]);
  },
});

consumer.on('consumer.network.request', (evt) => {

});

consumer.on('consumer.connect', (evt) => {

});

consumer.on('consumer.disconnect', (evt) => {

});

// consumer.seek({ topic: 'topic1', partition: 0, offset: '2' });