Kafka 中的 offset(偏移量) 是指消费者在 Kafka 分区内读取消息的位置,表示消费者已经消费到哪个消息。在 Kafka 中,offset 控制策略对于确保消息的正确消费、处理顺序、容错能力和高可用性至关重要。Kafka 提供了多种方式来管理和控制消费者的 offset,以下是 Kafka 中常见的 offset 控制策略。
1. 自动提交(Auto Commit)
自动提交 是 Kafka 消费者最简单的 offset 管理方式。在这种模式下,消费者会定期将当前消费的 offset 自动提交到 Kafka 集群。在nodejs环境使用kafkajs时,自动提交无需做任何配置。
优点:
- 简单易用,适用于一些不要求严格精确一次消费的场景。
缺点:
- 可能丢失消息:如果消费者在自动提交偏移量之前崩溃,可能会重复消费一些消息。
- 无法精确控制:偏移量提交的时机和粒度不容易控制,可能不适合一些需要精确消费顺序的场景。
2. 手动提交(Manual Commit)
手动提交偏移量允许消费者精确控制何时提交偏移量。消费者可以在消息成功消费后显式地提交偏移量。在nodejs环境使用kafkajs实现方式。
await consumer.run({
// 不进行自动提交
autoCommit: false,
partitionsConsumedConcurrently: 0,
eachMessage: async ({ topic, partition, message }) => {
const [id, name, age, sex, desc] = message.value?.toString()?.split?.('@_@') || [];
console.log(partition, { id, name, age, sex, desc });
// 手动提交控制偏移量
await consumer.commitOffsets([{ topic, partition, offset: String(count++) }]);
},
});
优点:
- 精确控制:消费者可以在确认消息已正确处理后再提交偏移量,确保消息不被丢失或重复消费。
- 避免不必要的重复消费:只有当消费者完成整个消费处理(例如事务处理或确认某些条件成立)后,才会提交偏移量。
缺点:
- 需要额外的开发工作:消费者需要显式地处理提交偏移量的逻辑,为了防读取的数据偏移量offset丢失也可将每个topic中读取到的partition分区上的数据偏移量offset保存在数据库中,手动提交控制offset增加了开发复杂性。
3.Offset 重置(Seek)
Offset 重置允许手动调整消费者在分区中的消费位置,常用于数据回溯或修复异常。直接指定目标 Offset,从该位置开始消费。
// 跳转到分区 0 的 Offset 100
await consumer.seek({
topic: 'topic1',
partition: 0,
offset: '100'
});
示例
如果要增加消费者组中partition分区数量需要使用命令扩展,执行后分区变为4个,0,1,2,3。
kafka-topics.sh --alter --topic topic1 --partitions 4 --bootstrap-server localhost:9092
生产者向topic1发送消息,存储在partition分区0,发给消费者组group1和group2。(topic1创建好后默认只有一个分区)。
producer.ts
import { CompressionTypes, Kafka } from 'kafkajs';
async function run() {
const kafka = new Kafka({
clientId: 'test1',
brokers: ['localhost:9092'],
connectionTimeout: 1000, // 1 秒连接超时
});
const producer = kafka.producer();
await producer.connect();
for (let i = 60; i <= 70; ++i) {
let name = Math.random().toString().slice(2, -1);
let age = Math.ceil(Math.random() * 40);
let sex = Math.random() * 100 > 50 ? 1 : 0;
let person = {
id: i,
name,
age,
sex,
desc: `${i},我是${name},年龄${age},性别${sex}`,
};
await producer.send({
topic: 'topic1',
messages: [
{
// value: Buffer.from(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`)
// 默认会转成buffer
value: `${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`,
// key:'key1',
// partition: 0,
},
],
// None = 0, 不处理
// GZIP = 1, 压缩率较高,但可能会带来一些性能开销
// Snappy = 2, 压缩和解压速度很快,压缩率不如gzip
// LZ4 = 3, 压缩和解压速度很快,压缩率不如gzip
// ZSTD = 4, 压缩比和压缩速度之间提供了较好的平衡
compression: CompressionTypes.GZIP,
});
console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`);
}
await producer.disconnect();
producer.on('producer.connect', (evt) => {
});
producer.on('producer.disconnect', (evt) => {
});
producer.on('producer.network.request', (evt) => {
});
producer.on('producer.network.request_queue_size', () => {
});
producer.on('producer.network.request_timeout', (evt) => {
});
}
run();
consumer.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'test1',
brokers: ['localhost:9092'],
connectionTimeout: 100, // 0.1 秒连接超时
requestTimeout: 1000,
});
const consumer = kafka.consumer({
groupId: 'group1',
rackId: 'test1.group1.consumer1',
maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s)
rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s)
heartbeatInterval: 6000, // 心跳间隔(默认 3s)
maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s)
});
await consumer.connect();
await consumer.subscribe({
topic: 'topic1',
fromBeginning: true, // 从头开始消费
});
let count = BigInt(0);
await consumer.run({
autoCommit: false, // 禁用自动提交
partitionsConsumedConcurrently: 10, // 控制每次最多消费的分区数
eachMessage: async ({ topic, partition, message }) => {
const [id, name, age, sex, desc] = message.value?.toString()?.split?.('@_@') || [];
console.log(partition, { id, name, age, sex, desc });
await consumer.commitOffsets([{ topic, partition, offset: String(count++) }]);
},
});
consumer.on('consumer.network.request', (evt) => {
});
consumer.on('consumer.connect', (evt) => {
});
consumer.on('consumer.disconnect', (evt) => {
});
// consumer.seek({ topic: 'topic1', partition: 0, offset: '0' });
consumer2.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'test2',
brokers: ['localhost:9092'],
connectionTimeout: 100, // 0.1 秒连接超时
requestTimeout: 1000,
});
const consumer = kafka.consumer({
groupId: 'group2',
rackId: 'test2.group2.consumer2',
maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s)
rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s)
heartbeatInterval: 6000, // 心跳间隔(默认 3s)
maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s)
});
await consumer.connect();
await consumer.subscribe({
topic: 'topic1',
fromBeginning: true, // 从头开始消费
});
let count = BigInt(0);
await consumer.run({
autoCommit: false, // 禁用自动提交
partitionsConsumedConcurrently: 10, // 控制每次最多消费的分区数
eachMessage: async ({ topic, partition, message }) => {
const [id, name, age, sex, desc] = message.value?.toString()?.split?.('@_@') || [];
console.log(partition, { id, name, age, sex, desc });
await consumer.commitOffsets([{ topic, partition, offset: String(count++) }]);
},
});
consumer.on('consumer.network.request', (evt) => {
});
consumer.on('consumer.connect', (evt) => {
});
consumer.on('consumer.disconnect', (evt) => {
});
// consumer.seek({ topic: 'topic1', partition: 0, offset: '2' });