一文上手Kafka【中】-一、发送消息细节

时间:2024-10-01 14:01:55

在发送消息的特别注意: 在版本 3.0 中,以前返回 ListenableFuture 的方法已更改为返回 CompletableFuture。为了便于迁移,2.9 版本添加了一个方法 usingCompletableFuture(),该方法为 CompletableFuture 返回类型提供了相同的方法;此方法不再可用。

1.1 ProducerConfig

在spring kafka项目当中,提供了Kafka 生产者的相关配置.在类ProducerConfig当中,其值分别定义在不同的常量类当中. 结合上篇当中发送消息的时候控制台输出的日志,具体字段含义如下所示:

通用配置:

  • acks = 1:生产者要求领导者在确认消息写入之前收到的最少同步副本数量。设置为 1
    表示领导者成功写入消息后即确认,不等待副本同步完成。
  • auto.include.jmx.reporter = true:自动包含用于Java Management Extensions(JMX)的报告器,以便通过 JMX 监控生产者的指标。
  • batch.size =16384:当多个消息发送到同一分区时,生产者将尝试在单个请求中发送消息的批量大小(以字节为单位)。
  • bootstrap.servers =[ip:9092]:Kafka 集群的服务器地址列表,用于建立初始连接。
  • buffer.memory =33554432:生产者可以用来缓冲等待发送到服务器的记录的总内存大小(以字节为单位)。
  • client.id =rj-spring-kafka-demo-producer-1:生产者的客户端 ID,用于在 Kafka 服务器端标识此生产者。
  • compression.type = none:消息的压缩类型,可以是 none、gzip、snappy 等。
  • connections.max.idle.ms = 540000:在关闭不活动的连接之前,连接可以保持空闲的最长时间(以毫秒为单位)。
  • delivery.timeout.ms = 120000:消息发送的超时时间,包括所有可能的重试。
  • enable.idempotence =false:是否启用生产者的幂等性,确保在重试时不会产生重复的消息。 >
  • enable.metrics.push =true:是否启用推送生产者的指标数据到外部系统。
  • interceptor.classes =[]:生产者拦截器的类列表,用于在发送消息之前或之后执行自定义逻辑。
  • key.serializer = class.org.apache.kafka.common.serialization.StringSerializer:用于序列化消息键的类。
  • linger.ms = 0:生产者在发送批次之前等待的额外时间(以毫秒为单位),以允许更多消息积累在批次中。
  • max.block.ms =60000:生产者在发送消息或获取元数据等操作中阻塞的最长时间。
  • max.in.flight.requests.per.connection = 5:在单个连接上允许的最大未确认请求数量。
  • max.request.size = 1048576:生产者请求的最大大小(以字节为单位)。
  • metadata.max.age.ms = 300000:元数据(如主题分区信息)的过期时间(以毫秒为单位),之后将强制刷新元数据。
  • metadata.max.idle.ms = 300000:元数据在没有任何更新的情况下保持有效的最长时间。
  • metric.reporters = []:自定义的指标报告器类列表。 metrics.num.samples =2:用于计算指标的样本数量。
  • metrics.recording.level = INFO:指标记录的级别,例如 INFO、DEBUG 等。
  • metrics.sample.window.ms = 30000:用于计算指标的时间窗口大小(以毫秒为单位)。
  • partitioner.adaptive.partitioning.enable =true:是否启用自适应分区功能,根据负载动态调整分区分配。
  • partitioner.availability.timeout.ms =0:分区器在确定分区不可用时等待的时间(以毫秒为单位)。 >
  • partitioner.class =null:自定义分区器的类,如果未设置则使用默认分区器。
  • partitioner.ignore.keys = false:是否忽略消息的键,不基于键进行分区。
  • receive.buffer.bytes = 32768:套接字接收缓冲区的大小(以字节为单位)。
  • reconnect.backoff.max.ms =1000:重新连接的最大退避时间(以毫秒为单位)。
  • reconnect.backoff.ms =50:重新连接的初始退避时间(以毫秒为单位)。
  • request.timeout.ms =30000:生产者请求的超时时间,包括发送请求和接收响应的时间。
  • retries = 3:生产者在发送消息失败时的重试次数。
  • retry.backoff.max.ms = 1000:重试之间的最大退避时间(以毫秒为单位)。
  • retry.backoff.ms =100:重试之间的初始退避时间(以毫秒为单位)。

SASL 相关配置(用于安全认证):

  • sasl.client.callback.handler.class = null:SASL 客户端回调处理程序的类。
  • sasl.jaas.config = null:Java Authentication and Authorization Service(JAAS)配置,用于 SASL 认证。
  • sasl.kerberos.kinit.cmd = /usr/bin/kinit:Kerberos 的 kinit 命令路径。
  • sasl.kerberos.min.time.before.relogin = 60000:Kerberos 重新登录之前的最小时间(以毫秒为单位)。
  • sasl.kerberos.service.name = null:Kerberos 服务名称。
  • sasl.kerberos.ticket.renew.jitter = 0.05:Kerberos 票证更新的抖动因子。
  • sasl.kerberos.ticket.renew.window.factor = 0.8:Kerberos 票证更新的窗口因子。
  • sasl.login.callback.handler.class = null:SASL 登录回调处理程序的类。
  • sasl.login.class = null:SASL 登录机制的类。
  • sasl.login.connect.timeout.ms = null:SASL 登录连接超时时间(以毫秒为单位)。
  • sasl.login.read.timeout.ms = null:SASL 登录读取超时时间(以毫秒为单位)。
  • sasl.login.refresh.buffer.seconds = 300:SASL 登录刷新缓冲区时间(以秒为单位)。
  • sasl.login.refresh.min.period.seconds = 60:SASL 登录刷新的最小周期(以秒为单位)。
  • sasl.login.refresh.window.factor = 0.8:SASL 登录刷新的窗口因子。
  • sasl.login.refresh.window.jitter = 0.05:SASL 登录刷新的抖动因子。
  • sasl.login.retry.backoff.max.ms = 10000:SASL 登录重试的最大退避时间(以毫秒为单位)。
  • sasl.login.retry.backoff.ms = 100:SASL 登录重试的初始退避时间(以毫秒为单位)。
  • sasl.mechanism = GSSAPI:SASL 认证机制,如 GSSAPI、PLAIN 等。
  • sasl.oauthbearer.clock.skew.seconds = 30:OAuth Bearer 令牌的时钟偏差时间(以秒为单位)。
  • sasl.oauthbearer.expected.audience = null:预期的 OAuth Bearer 令牌受众。
  • sasl.oauthbearer.expected.issuer = null:预期的 OAuth Bearer 令牌发行者。
  • sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000:OAuth Bearer JWKS 端点的刷新时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000:OAuth Bearer JWKS 端点重试的最大退避时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100:OAuth Bearer JWKS 端点重试的初始退避时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.url = null:OAuth Bearer JWKS 端点的 URL。
  • sasl.oauthbearer.scope.claim.name = scope:OAuth Bearer 令牌范围声明的名称。
  • sasl.oauthbearer.sub.claim.name = sub:OAuth Bearer 令牌主题声明的名称。
  • sasl.oauthbearer.token.endpoint.url = null:OAuth Bearer 令牌端点的 URL。

安全协议相关配置:

  • security.protocol = PLAINTEXT:生产者使用的安全协议,如 PLAINTEXT、SSL、SASL_PLAINTEXT 等。
  • security.providers = null:安全提供程序的类列表。

网络相关配置:

  • send.buffer.bytes = 131072:套接字发送缓冲区的大小(以字节为单位)。
  • socket.connection.setup.timeout.max.ms = 30000:套接字连接设置的最大超时时间(以毫秒为单位)。
  • socket.connection.setup.timeout.ms = 10000:套接字连接设置的初始超时时间(以毫秒为单位)。

SSL 相关配置(用于加密连接):

  • ssl.cipher.suites = null:SSL 加密套件列表。
  • ssl.enabled.protocols = [TLSv1.2, TLSv1.3]:启用的 SSL 协议版本列表。
  • ssl.endpoint.identification.algorithm = https:SSL 端点标识算法。
  • ssl.engine.factory.class = null:SSL 引擎工厂的类。
  • ssl.key.password = null:SSL 密钥密码。
  • ssl.keymanager.algorithm = SunX509:SSL 密钥管理器算法。
  • ssl.keystore.certificate.chain = null:SSL 密钥库证书链。
  • ssl.keystore.key = null:SSL 密钥库的密钥。
  • ssl.keystore.location = null:SSL 密钥库的位置。
  • ssl.keystore.password = null:SSL 密钥库的密码。
  • ssl.keystore.type = JKS:SSL 密钥库的类型。
  • ssl.protocol = TLSv1.3:SSL 协议版本。
  • ssl.provider = null:SSL 提供程序。
  • ssl.secure.random.implementation = null:SSL 安全随机数生成器的实现。
  • ssl.trustmanager.algorithm = PKIX:SSL 信任管理器算法。
  • ssl.truststore.certificates = null:SSL 信任库证书。
  • ssl.truststore.location = null:SSL 信任库的位置。
  • ssl.truststore.password = null:SSL 信任库的密码。
  • ssl.truststore.type = JKS:SSL 信任库的类型。

事务相关配置:

  • transaction.timeout.ms = 60000:事务的超时时间(以毫秒为单位)。
  • transactional.id = null:事务 ID,用于标识一个事务性生产者。

序列化相关配置:

  • value.serializer = class org.apache.kafka.common.serialization.StringSerializer:用于序列化消息值的类。

1.2 sendDefault

CompletableFuture<SendResult<K, V>> sendDefault(V data);

该api要求向模板提供的默认主题发送消息.要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它.

@Configuration
public class KafkaConfig {
    @Bean
    public Map<String, Object> producerConfig(){
        Map<String, Object> map = new HashMap<>();
        map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092");
        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        map.put(ProducerConfig.RETRIES_CONFIG, "3");
        return map;
    }

    @Bean
    public ProducerFactory<Integer, Object> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<Integer,Object> kafkaTemplate(){
        KafkaTemplate<Integer, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        // 设置默认主题
        kafkaTemplate.setDefaultTopic("rj-default-topic");
        return kafkaTemplate;
    }
}

此时发送的时候,可以不必指定主题了,而直接将消息发送到我们自己定义的默认的主题当中了

    @GetMapping("/default")
    public String sendDefaultMsg(String msg) throws ExecutionException, InterruptedException {
        CompletableFuture<SendResult<Integer, Object>> completableFuture = kafkaTemplate.sendDefault(msg);
        SendResult<Integer, Object> sendResult = completableFuture.get();
        log.info("sendResult:{}", sendResult);
        return "向默认主题发送消息";
    }

从版本 2.5 开始,您现在可以覆盖工厂的 ProducerConfig 属性,以从同一工厂创建具有不同生产者配置的模板。

    @Bean
    public KafkaTemplate<Integer,Object> kafkaTemplate(){
        KafkaTemplate<Integer, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        // 设置默认主题
        kafkaTemplate.setDefaultTopic("rj-default-topic");
        return kafkaTemplate;
    }

    /**
     * 从同一个工厂创建具有不同生产者的配置的模块
     * @param producerFactory
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){
        return new KafkaTemplate<>(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    }
    
    // 如果要重用ProducerFactory,则必须修改一下ProducerFactory的初始的泛型,修改为如下的格式
      @Bean
    public ProducerFactory<?, ?> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

当然以上的ProducerFactory相关配置的属性,也可以在application.yml配置文件当中进行配置.

1.3 Message接口

在使用KafkaTemplate发送数据的时候,可以直接发送一个Message.方法定义如下所示:

	@Override
	public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
		ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
		if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
			byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
			if (correlationId != null) {
				producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
			}
		}
		return observeSend((ProducerRecord<K, V>) producerRecord);
	}

这里需要注意, Message是在org.springframework.messaging包当中定义的,定义接口如下所示:

// 带有 headers 和 body 的通用消息表示形式。
public interface Message<T> {

	/**
	 * Return the message payload.
	 * 消息体
	 */
	T getPayload();

	/**
	 * Return message headers for the message (never {@code null} but may be empty).
	 * 消息头,可以在发送的时候指定
	 */
	MessageHeaders getHeaders();

}

继承关系
这里我们使用实现类GenericMessage即可:

// 推荐使用这个构造方法,简单方便
public GenericMessage(T payload, Map<String, Object> headers) {
	this(payload, new MessageHeaders(headers));
}

public GenericMessage(T payload, MessageHeaders headers) {
	Assert.notNull(payload, "Payload must not be null");
	Assert.notNull(headers, "MessageHeaders must not be null");
	this.payload = payload;
	this.headers = headers;
}

测试代码:

 @GetMapping("/message")
    public String sendMessage(){
        Map<String, Object> map = new HashMap<>();
        // 向 Kafka 发送数据时包含主题的header
        map.put(KafkaHeaders.TOPIC, Constants.Kafka.TOPIC_NAME);
        map.put(KafkaHeaders.KEY, "rj");
        // 包含从中接收消息的主题的header。
        map.put(KafkaHeaders.RECEIVED_TOPIC, Constants.Kafka.TOPIC_NAME);
        // 创建MessageHeaders对象
        MessageHeaders messageHeaders = new MessageHeaders(map);
        // 构建Message对象
        Message<String> message = new GenericMessage<>("hello, message!", messageHeaders);
        // 将Message发送到指定的topic
        CompletableFuture<SendResult<Integer, Object>> completableFuture = kafkaTemplate.send(message);
        completableFuture.whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("发送成功");
            } else {
                System.out.println("发送失败");
            }
        });

        return "发送成功!";
    }

上述Map集合的key直接使用定义好的即可: KafkaHeaders, 用的时候,需要啥就添加啥.
header定义
注意事项:

  • 使用的KafkaTemplate发送消息的时候,要注意泛型匹配的问题.这里步及到key、value的序列化与反序列化操作.
  • 如果重用了ProducerFactory则需要注意使用的泛型和发送消息的类型是否能匹配得上.

如上所示: 我们使用的的是private final KafkaTemplate<String, String> kafkaTemplate,我们通过配置文件,注入到容器的类型是:

  @Bean
public KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){
    return new KafkaTemplate<>(producerFactory,
    // 【注意】: 这里重新设置了value的序列化,而对于key的序列化是在构建ProducerFactory的时候,传入的. 
    Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}

而注入到容器当中ProducerFactory对象,在构建的时候,分别设置了key、value的序列化规则.
序列化
以上的泛型必须得能匹配上,或者可以直接转换,否则会抛出异常.

1.4 ProducerListener

使用ProducerListener配置KafkaTemplate,以获取带有发送结果(成功或失败)的异步回调,而不是等待Future完成。下面的清单显示了ProducerListener接口的定义:

public interface ProducerListener<K, V> {
    // 发送成功
    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
    // 发送失败
    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);
}

默认情况下,模板配置了 LoggingProducerListener,它会记录错误,并且在发送成功时不执行任何操作。

@Slf4j
public class CustomProducerListener implements ProducerListener<String, String> {

    @Override
    public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
        String topic = producerRecord.topic();
        String key = producerRecord.key();
        String value = producerRecord.value();
        Long timestamp = producerRecord.timestamp();

        int partition = recordMetadata.partition();
        long offset = recordMetadata.offset();

        log.info("消息发送成功,topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}", topic, key, value, timestamp, partition, offset);
    }

    @Override
    public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        String topic = producerRecord.topic();
        String key = producerRecord.key();
        String value = producerRecord.value();
        Long timestamp = producerRecord.timestamp();

        int partition = recordMetadata.partition();
        long offset = recordMetadata.offset();

        log.error("消息发送失败,topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}, exception message: {}", topic, key, value, timestamp, partition, offset, exception.getMessage());
    }
}

将自定义ProducerListener的对象,配置到KafkaTemplate当中

/**
  * 从同一个工厂创建具有不同生产者的配置的模块
  * @param producerFactory
  * @return
  */
@Bean
public KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){
    KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    // 配置ProducerListener
    stringStringKafkaTemplate.setProducerListener(new CustomProducerListener());
    // 配置默认主题
    stringStringKafkaTemplate.setDefaultTopic("rj-string-topic");
    return stringStringKafkaTemplate;
}

发送消息:

@GetMapping("/listener")
public String sendMsgProducerListener(String msg) {
    CompletableFuture<SendResult<String, String>> completableFuture = stringKafkaTemplate.sendDefault(msg);
    completableFuture.whenComplete((result, ex) -> {
        if (ex == null) {
            System.out.println("发送成功");
        } else {
            System.out.println("发送失败");
        }
    });
    return "发送成功!";
}

控制台日志输出如下所示:

2024-09-24T19:20:07.061+08:00  INFO 16436 --- [rj-spring-kafka-demo] [demo-producer-1] c.r.k.listener.CustomProducerListener    : 消息发送成功,topic:rj-string-topic,key:null,value:listener,timestamp:null,partition:0,offset:0

1.5 发送结果监听CompletableFuture

发送消息的send 方法返回 CompletableFuture<SendResult>。您可以向侦听器注册回调,以异步接收发送的结果.

CompletableFuture<SendResult<Integer, String>> future = template.send("topic-name", "msg data");
future.whenComplete((result, ex) -> {
    ...
});

如果你希望阻塞发送线程等待结果,你可以调用 future 的 get() 方法;建议使用带有 timeout 的方法。如果你已经设置了 linger.ms,你可能希望在等待之前调用 flush(),或者为了方便起见,模板有一个带有 autoFlush 参数的构造函数,该参数会导致模板在每次发送时 flush()

  • 在使用 Kafka 生产者发送消息时,通常会得到一个表示发送任务的Future对象。如果调用future.get()方法,发送线程会被阻塞,直到发送结果返回。这意味着发送线程会暂停执行,等待消息成功发送到 Kafka 集群并获取结果。然而,直接使用get()方法可能会导致线程长时间阻塞,在实际应用中可能不太理想,所以建议使用带有超时参数的get(long timeout, TimeUnit unit)方法,这样可以在一定时间后如果还未获取到结果就不再等待,避免无限期阻塞。
  • linger.ms和flush
    • linger.ms属性
      • 当设置了linger.ms生产者属性时,生产者会在发送消息时等待一段时间,让更多的消息积累在一个批次中,以提高发送效率。如果在这段时间内积累了足够多的消息,生产者会将这些消息作为一个批次发送出去。
    • flush()方法
      • 如果希望立即发送部分批处理的消息而不是等待linger.ms指定的时间,可以调用flush()方法。这个方法会强制生产者立即发送当前缓冲区内的消息,而不管是否满足批次大小或等待时间的条件。
    • 带有autoFlush参数的构造函数
      • 为了方便起见,KafkaTemplate有一个带有autoFlush参数的构造函数。当设置autoFlushtrue时,每次发送消息后,模板会自动调用flush()方法,确保消息立即发送出去。这在需要立即确认消息发送的场景中非常有用,但可能会降低发送效率,因为每次发送都不会等待批次积累。

在使用 Kafka 生产者时,需要根据实际需求合理选择是否阻塞发送线程等待结果,以及是否使用flush()方法或带有autoFlush参数的构造函数来控制消息的发送时机。如果设置了linger.ms属性,并且需要在特定情况下立即发送部分批处理的消息,可以考虑调用flush()方法或使用带有autoFlush参数的构造函数。

linger.ms属性可以在配置文件当中进行配置.

@Bean
public Map<String, Object> producerConfig(){
    // ...
    // 配置linger.ms
    map.put(ProducerConfig.LINGER_MS_CONFIG, "500");
    return map;
}

SendResult 有两个属性:ProducerRecordRecordMetadata

public class SendResult<K, V> {
	// ProducerRecord是生产者发送消息时使用的数据结构
	private final ProducerRecord<K, V> producerRecord;
	// 当生产者成功发送消息后,会返回一个RecordMetadata对象
	private final RecordMetadata recordMetadata;

	public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
		this.producerRecord = producerRecord;
		this.recordMetadata = recordMetadata;
	}

	public ProducerRecord<K, V> getProducerRecord() {
		return this.producerRecord;
	}

	public RecordMetadata getRecordMetadata() {
		return this.recordMetadata;
	}

	@Override
	public String toString() {
		return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
	}

}

1.5.1 ProducerRecord

  • 主题(Topic)

    • 确定消息的归属主题。Kafka 中的不同主题用于区分不同类型的消息流。例如,一个电商系统可能有 “订单主题”、“用户行为主题” 等。

    • 生产者通过指定主题将消息发送到 Kafka 集群中的相应主题

  • 分区(Partition)

    • 分区的目的是为了实现可扩展性和并行处理。Kafka 将一个主题的数据分布在多个分区上,不同的分区可以由不同的消费者或消费者组同时消费。

    • 可以手动指定消息发送到特定分区,通常根据消息的键或者特定的业务规则来决定分区。如果不指定,Kafka 会根据默认的分区策略来分配分区。

  • 键(Key)

    • 键在消息处理中有多种用途。一方面,它可以用于确定消息的分区。例如,基于键的哈希值来决定消息发送到哪个分区,这样可以确保具有相同键的消息被发送到同一个分区,方便后续的有序处理。

    • 键也可以在消费者端用于消息的分组和聚合。例如,在处理订单数据时,可以根据订单 ID 作为键,将同一订单的不同状态更新消息发送到同一个分区,方便消费者对同一订单的消息进行有序处理。

  • 值(Value)

    • 这是消息的实际内容,可以是任何可序列化的对象。例如,在一个日志系统中,值可以是一条日志记录;在电商系统中,值可以是一个订单对象或者用户行为事件。
  • 消息头部(Headers)

    • 消息头部提供了一种在消息中添加额外元数据的方式。这些元数据可以用于传递特定的业务信息或者用于消息的路由和处理。

    • 例如,可以在头部添加消息的来源系统、消息的类型、处理优先级等信息

1.5.2 RecordMetadata

  • 主题(Topic)

    • 确认消息最终被发送到的主题,与ProducerRecord中的主题相对应。这可以用于验证消息是否被发送到了正确的主题。
  • 分区(Partition)

    • 指示消息存储在哪个分区。在消费者端,可以根据分区信息来确定从哪个分区读取消息。

    • 对于需要对特定分区进行监控或管理的场景,分区信息非常重要。

  • 偏移量(Offset)

    • 偏移量是消息在分区中的唯一标识,它代表了消息在分区中的位置顺序。每个分区中的消息都有一个连续的偏移量。

    • 消费者通过偏移量来确定已经消费到了哪个位置,以便在下次消费时从正确的位置继续读取消息。

    • 偏移量也可以用于数据恢复和重新处理消息的场景。例如,如果消费者出现故障,在恢复时可以根据存储的偏移量重新开始消费。

  • 时间戳(Timestamp)

    • 时间戳可以由生产者在发送消息时指定,也可以由 Kafka 自动生成。时间戳可以用于基于时间的消息处理和查询。

    • 例如,可以根据时间戳来查询特定时间段内的消息,或者对消息进行时间序列分析