Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程
- 2. Kafka基础
- 2.1 集群部署
- 2.2 集群启动
- 2.3 创建主题
- 2.4 生产消息
- 2.4.1 生产消息的基本步骤
- 2.4.2 生产消息的基本代码
- 2.4.3 发送消息
- 2.4.3.1 拦截器
- 2.4.3.1.1 增加拦截器类
- 2.4.3.1.2 配置拦截器
- 2.4.3.2 回调方法
- 2.4.3.3 异步发送
- 2.4.3.4 同步发送
- 2.4.4 消息分区
- 2.4.4.1 指定分区
- 2.4.4.2 未指定分区⚠️
- 2.4.4.3 分区器
- 2.4.4.3.1 增加分区器类
- 2.4.4.3.2 配置分区器
- 2.4.5 消息可靠性
- 2.4.5.1 ACK = 0
- 2.4.5.2 ACK = 1
- 2.4.5.3 ACK = -1(ALL)(默认)
- 2.4.6 消息去重 & 有序
- 2.4.6.1数据重试
- 2.4.6.2数据乱序
- 2.4.6.3 数据幂等性
- 2.4.6.4数据事务
- 2.4.6.4.1 普通数据发送流程
- 2.4.6.4.2 事务数据发送流程
- 2.4.6.4.3 事务提交流程
- 2.4.6.4.4 事务操作代码
- 2.4.6.5 数据传输语义
本文档参看的视频是:
- [尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战](https://www.bilibili.com/video/BV1Gp421m7UN/? p=6&spm_id_from=pageDriver&vd_source=9beb0a2f0cec6f01c2433a881b54152c)
- 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
- 小朋友也可以懂的Kafka入门教程,还不快来学
本文档参看的文档是:
- 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!
在这之前大家可以看我以下几篇文章,循序渐进:
❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
2. Kafka基础
2.1 集群部署
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
2.2 集群启动
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
2.3 创建主题
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
2.4 生产消息
Topic主题已经创建好了,接下来我们就可以向该主题生产消息了,这里我们采用Java代码通过
Kafka Producer API
的方式生产数据。
2.4.1 生产消息的基本步骤
(一)创建Map类型的配置对象,根据场景增加相应的配置属性。
(二)创建待发送数据
在kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型:
下面的代码,我们很熟悉吧~ 包括topic、key、value
但是! 其实对于表述一份完整的Record 是不完整的!那完整的是什么样的呢~~
like this~
相关属性必须在构建数据模型时指定,其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。value就是我们需要真正传递的数据了,而Key可以用于数据的分区定位。
(三)创建生产者对象,发送生产的数据:
根据前面提供的配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者消费者模式。
(1) 数据生产者(KafkaProducer):生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者。这里我们简单介绍一下内部的数据转换处理:
- 如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
- 因为发送的数据为KV数据,所以需要根据配置信息中的序列化对象对数据中Key和Value分别进行序列化处理。
- 计算数据所发送的分区位置。
- 将数据追加到数据收集器中。
(2) 数据收集器(RecordAccumulator):用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。
- 默认情况下,一个发送批次的数据容量为16K,这个可以通过参数batch.size进行改善。
- 批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
- 如果当前批次能容纳数据,那么直接将数据追加到批次中即可,如果不能容纳数据,那么会产生新的批次放入到当前分区的批次队列中,这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送
(3) 数据发送器(Sender):线程对象,用于从收集器对象中获取数据,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中
-
因为数据真正发送的地方是
Broker节点
,不是分区
。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。 -
将组合后的
<节点,List<批次>>
的数据封装成客户端请求(请求键为:Produce)发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。 -
Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。
2.4.2 生产消息的基本代码
// TODO 配置属性集合
Map<String, Object> configMap = new HashMap<>();
// TODO 配置属性:Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// TODO 创建Kafka生产者对象,建立Kafka连接
// 构造对象时,需要传递配置参数
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 准备数据,定义泛型
// 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test", "key1", "value1"
);
// TODO 生产(发送)数据
producer.send(record);
// TODO 关闭生产者连接
producer.close();
两个线程
在生产者客户端中,主要通过两个线程协调运行:主线程和Sender线程
主线程
主线程负责创建消息,然后将消息传递给拦截器、序列化器和分区器,最后将消息缓存到消息累加器
Sender线程
sender线程负责从消息累加器中获取消息,然后进行组装成<Node, List< ProducerBatch>>
, -> <Node,Request>
->Map<Nodeld, Deque<Request>>
这样的形式发送到对应的broker中
ProducerBatch
ProducerBatch包含多个ProducerRecord,目的是为了消息更加紧凑,提高吞吐量
BufferPool
因为kafka内部是通过ByteBuffer创建和释放内存,ByteBuffer的创建 是很消耗资源的,为了解决这个问题,Kafka封装了BufferPool实现ByteBuffer重复使用,当然并不是所有大小的ByteBuffer都可以缓存 到BufferPoll中的,可以通过配置batch.size,如果小于等于这个大小则可以被缓存到pool中,默认16kb.ProducerBatch 也和这个参数关联。
2.4.3 发送消息
2.4.3.1 拦截器
生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能,所以大家可以根据实际的情况自行选择使用。
但是要注意,这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。
接下来,我们来演示一下拦截器的操作
2.4.3.1.1 增加拦截器类
(1) 实现生产者拦截器接口ProducerInterceptor
package com.atguigu.test;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* TODO 自定义数据拦截器
* 1. 实现Kafka提供的生产者接口ProducerInterceptor
* 2. 定义数据泛型 <K, V>
* 3. 重写方法
* onSend
* onAcknowledgement
* close
* configure
*/
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
(2) 实现接口中的方法,根据业务功能重写具体的方法
方法名 | 作用 |
---|---|
onSend | 数据发送前,会执行此方法,进行数据发送前的预处理 |
onAcknowledgement | 数据发送后,获取应答时,会执行此方法 |
close | 生产者关闭时,会执行此方法,完成一些资源回收和释放的操作 |
configure | 创建生产者对象的时候,会执行此方法,可以根据场景对生产者对象的配置进行统一修改或转换。 |
2.4.3.1.2 配置拦截器
package com.atguigu.test;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
public class ProducerInterceptorTest {
public static void main(String[] args) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());
KafkaProducer<String, String> producer = null;
try {
producer = new KafkaProducer<>(configMap);
for ( int i = 0; i < 1; i++ ) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
final Future<RecordMetadata> send = producer.send(record);
}
} catch ( Exception e ) {
e.printStackTrace();
} finally {
if ( producer != null ) {
producer.close();
}
}
}
}
2.4.3.2 回调方法
Kafka发送数据时,可以同时传递回调对象(Callback)用于对数据的发送结果进行对应处理,具体代码实现采用匿名类或Lambda表达式都可以。
package com.atguigu.kafka.test;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerASynTest {
public static void main(String[] args) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 循环生产数据
for ( int i = 0; i < 1; i++ ) {
// TODO 创建数据
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
// TODO 发送数据
producer.send(record, new Callback() {
// TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// TODO 当数据发送成功后,会回调此方法
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}
});
}
producer.close();
}
}
2.4.3.3 异步发送
Kafka发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。
如果Kafka通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送一下条数据的场合,我们就称之为异步发送。
package com.atguigu.kafka.test;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerASynTest {
public static void main(String[] args) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 循环生产数据
for ( int i = 0; i < 10; i++ ) {
// TODO 创建数据
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
// TODO 发送数据
producer.send(record, new Callback() {
// TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// TODO 当数据发送成功后,会回调此方法
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}
});
// TODO 发送当前数据
System.out.println("发送数据");
}
producer.close();
}
}
2.4.3.4 同步发送
Kafka发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。
如果Kafka通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送一下条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。
代码实现上,采用的是JDK1.5增加的JUC并发编程的Future接口的get方法实现。
package com.atguigu.kafka.test;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerASynTest {
public static void main(String[] args) throws Exception {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 循环生产数据
for ( int i = 0; i < 10; i++ ) {
// TODO 创建数据
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);
// TODO 发送数据
producer.send(record, new Callback() {
// TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// TODO 当数据发送成功后,会回调此方法
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}