当我们使用producer client发送消息之前,需要创建该对象:
Producer<String, String> producer = new KafkaProducer<>(props);
所以从这里开始,首先,该类的注释一定要看,里面包含大量的最佳实践,大概的内容有如下几点:
(1)kafka producer是线程安全的。
(2)不要使用多个实例,多个线程共享同一个producer的实例就可以了。
(3)producer包含了一个内存缓冲池,这个缓冲中包含了一些要发送的数据,producer负责将这些数据转成request发送出去,关闭producer失败会导致数据丢失。
(4)send方法是个异步操作,执行会将数据写入本地缓存,会立即返回,但是这不意味着数据已经发送到broker。
(5)用来控制使用的最大缓存,如果生成的速度比较快导致缓存满了,那么会阻塞这么长时间,如果这么长时间过去之后还是block,则抛出timeout异常。
(6)还有acks,retries等参数的配置。
接着就是构造函数的执行:
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
("Starting the Kafka producer");
//用户设置的参数
Map<String, Object> userProvidedConfigs = ();
= config;
= new SystemTime();
//1.生成配置,如果没有配置则使用"producer-{id}"这样的自动生成的id,是个自增长的数量
clientId = (ProducerConfig.CLIENT_ID_CONFIG);
if (() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricTags = new LinkedHashMap<String, String>();
("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples((ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow((ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), )
.tags(metricTags);
List<MetricsReporter> reporters = (ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
);
(new JmxReporter(JMX_PREFIX));
= new Metrics(metricConfig, reporters, time);
//2.创建分区器,用来决定发送的消息路由到topic的哪个分区。PARTITIONER_CLASS_CONFIG:,设置分区器,如果没有指定则使用默认的
= (ProducerConfig.PARTITIONER_CLASS_CONFIG, );
//,生产端发送失败会重试,失败后重新发送请求的间隔时间,防止短时间内重复多次发送请求
long retryBackoffMs = (ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
//使用用户设置的
if (keySerializer == null) {
= (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
);
((), true);
} else {
(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
= keySerializer;
}
//使用用户设置的
if (valueSerializer == null) {
= (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
);
((), false);
} else {
(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
= valueSerializer;
}
// load interceptors and make sure they get clientId
(ProducerConfig.CLIENT_ID_CONFIG, clientId);
//3.自定义拦截器,很少使用
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
);
= () ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
//4.元数据,代表了集群目前的状态,以及每个partition及leader情况,如果发现写入的topic对应的元数据不再本地,就会去broker上拉取元数据信息
//:超时时间,默认1分钟,默认每隔五分钟强制刷新
= new Metadata(retryBackoffMs, (ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
//一次request请求最大的大小,一个请求中会包含
= (ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//producer端总的内存大小,这个内存不能设置的跟jvm内存一样大,因为jvm的内存还要做数据压缩,inflight数据等使用,默认32M大小
= (ProducerConfig.BUFFER_MEMORY_CONFIG);
//压缩类型,默认不压缩,可以使用gzip,snappy,lz4 这几种压缩方式
= ((ProducerConfig.COMPRESSION_TYPE_CONFIG));
/* check for user defined settings.
* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
* This should be removed with release 0.9 when the deprecated configs are removed.
*/
//在缓存区满了之后是否block,默认是false,超过一段时间后就会抛出异常,默认block一分钟
if ((ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
boolean blockOnBufferFull = (ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
if (blockOnBufferFull) {
= Long.MAX_VALUE;
} else if ((ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
= (ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
= (ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
} else if ((ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
= (ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
= (ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
/* check for user defined settings.
* If the TIME_OUT config is set use that for request timeout.
* This should be removed with release 0.9
*/
//REQUEST_TIMEOUT_MS_CONFIG:请求的超时时间,默认30s,超过时间则重试,重试次数耗尽则报错
if ((ProducerConfig.TIMEOUT_CONFIG)) {
(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
= (ProducerConfig.TIMEOUT_CONFIG);
} else {
= (ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
}
//5.数据缓存核心组件,
// BATCH_SIZE_CONFIG:,每批数据量的大小,一个batch包含多个record。默认16384 bytes = 16KB
// 一次请求Request对应一个broker,request会包含多个batch,每个batch对应了这个broker上的一个partition,
// batch太小会导致频繁发送request,导致网络通信次数变多,导致吞吐量变小。如果设置成0.那就不打包,来一条发一条。
// 如果batch过大则内存里会缓存大量的batch,浪费内存。
// LINGER_MS_CONFIG:,默认是0。如果到了这个时间还没有到达batch大小,到了linger时间也会发送出去。
= new RecordAccumulator((ProducerConfig.BATCH_SIZE_CONFIG),
,
,
(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
//BOOTSTRAP_SERVERS_CONFIG:,broker的地址列表,由用户编写的程序提供
List<InetSocketAddress> addresses = ((ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
//6.这里并没有更新元数据
((addresses), ());
//创建KafkaChannel
ChannelBuilder channelBuilder = (());
//初始化NetworkClient。后面的通信底层都是基于这个类
//CONNECTIONS_MAX_IDLE_MS_CONFIG:最大网络连接空闲时间就要被回收掉。默认9分钟。
//MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:,每个连接最多有多少个没收到响应就停止发送数据
//RECONNECT_BACKOFF_MS_CONFIG:,重试间隔时间
//SEND_BUFFER_CONFIG:,tcp网络连接发送缓冲区大小。默认128K
//RECEIVE_BUFFER_CONFIG:,tcp网络连接接收缓冲区大小。默认32K
NetworkClient client = new NetworkClient(
new Selector((ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), , time, "producer", channelBuilder),
,
clientId,
(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
(ProducerConfig.SEND_BUFFER_CONFIG),
(ProducerConfig.RECEIVE_BUFFER_CONFIG),
, time);
//sender是个线程,发送数据的线程
//ACKS_CONFIG:acks.0表示写入本地缓存就算成功,1表示写入了leader。all表示isr列表里全部写成功。
= new Sender(client,
,
,
(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks((ProducerConfig.ACKS_CONFIG)),
(ProducerConfig.RETRIES_CONFIG),
,
new SystemTime(),
clientId,
);
//启动包装了sender的KafkaThread线程
String ioThreadName = "kafka-producer-network-thread" + (() > 0 ? " | " + clientId : "");
= new KafkaThread(ioThreadName, , true);
();
= ("errors");
();
(JMX_PREFIX, clientId);
("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, , true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
producer主要构造了如下重要的组件:
的构造,这里如果使用kafka的限流功能,需要手动指定这个id
2.指定分区器,来指定数据发送到哪个分区。
3.指定kv序列化器。
4.设置retry,acks,超时时间,缓冲区大小,批大小等参数。
5.初始化元数据。
6.设置压缩。
7.构造NetworkClient 。
8.构造KafkaThread,该thread包装了send线程。
这里重点分析如下三块:
(1)初始化元数据及update方法。
(2)构造NetworkClient 。
(3)开启KafkaThread线程
(1)初始化元数据及update方法。
metadata在new出来后,((addresses), ());实际上并没有更新元数据信息。metadata类中最重要的就是Cluaster属性,cluster中主要变量:
private final boolean isBootstrapConfigured;
//node是对kafka一台机器的封装,包含host,port,id等信息
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> internalTopics;
//集群所有的partition
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
//一个topic中所有的partition信息
private final Map<String, List<PartitionInfo>> partitionsByTopic;
//可用的topic集合
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
//每个node上有哪些partition
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
//每个nodeId与node的对应关系
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
这里面还有个重要的类:PartitionInfo,主要包含如下变量,比较重要的是ISR列表:inSyncReplicas的概念,里面包含了leader本身已经与leader保持同步的follower的信息。
public class PartitionInfo {
private final String topic;
private final int partition;
//leader
private final Node leader;
//副本
private final Node[] replicas;
//ISR列表
private final Node[] inSyncReplicas;
方法:通过bootstrap方法创建一个Cluster,创建一个cluster后返回:
public static Cluster bootstrap(List<InetSocketAddress> addresses) {
List<Node> nodes = new ArrayList<>();
int nodeId = -1;
for (InetSocketAddress address : addresses)
(new Node(nodeId--, (), ()));
return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
}
这里根据用户代码的配置生成了Node,每个node代表了一台kafka服务端的机器,多个node对象及其他一些topic信息组成Cluster对象,但是此时一些topic信息是空的,只是做了初始化。
再来分析update方法:
public synchronized void update(Cluster cluster, long now) {
(cluster, "cluster should not be null");
= false;
= now;
= now;
+= 1;
if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
for (Iterator<<String, Long>> it = ().iterator(); (); ) {
<String, Long> entry = ();
long expireMs = ();
if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
(now + TOPIC_EXPIRY_MS);
else if (expireMs <= now) {
();
("Removing unused topic {} from the metadata list, expiryMs {} now {}", (), expireMs, now);
}
}
}
//监听器,metadata发生变动的时候通知监听者
for (Listener listener: listeners)
(cluster);
String previousClusterId = ().clusterId();
//第一次进入,这里needMetadataForAllTopics在构造函数中赋值为false,进入else语句,
//直接将前面初始化的cluster对象赋值给metadata的成员变量cluster
if () {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
= false;
= getClusterForCurrentTopics(cluster);
} else {
//赋值metadata的成员变量cluster
= cluster;
}
// The bootstrap cluster is guaranteed not to have any useful information
if (!()) {
String clusterId = ().clusterId();
if (clusterId == null ? previousClusterId != null : !(previousClusterId))
("Cluster ID: {}", ().clusterId());
(());
}
//这里,如果有别的线程对当前类操作,并且处于等待状态,那么会被通知
notifyAll();
("Updated cluster metadata version {} to {}", , );
}
这里最重要的就是将new出来的cluster对象直接赋值给metadata的cluster成员变量,所以此处的update方法并没有去broker集群通过网络拉取元数据,此时cluaster对象里的很多与topic相关的内容是空的。
(2)构造NetworkClient 。
NetworkClient的初始化主要是一些赋值及生成一些对象,NetworkClient主要负责异步的io请求和响应,并且不是线程安全的。这里会初始化一个DefaultMetadataUpdater组件,用于更新元数据
private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
*/
//第一次,走这里,metadataUpdater 传过来的就是 null
if (metadataUpdater == null) {
if (metadata == null)
throw new IllegalArgumentException("`metadata` must not be null");
//1.初始化DefaultMetadataUpdater
= new DefaultMetadataUpdater(metadata);
} else {
= metadataUpdater;
}
= selector;
= clientId;
//这个类里面的集合记录了已经发送出去但是未收到服务端响应的request
= new InFlightRequests(maxInFlightRequestsPerConnection);
//记录当前producer与各个broker的连接状态
= new ClusterConnectionStates(reconnectBackoffMs);
= socketSendBuffer;
= socketReceiveBuffer;
= 0;
= new Random();
= requestTimeoutMs;
= time;
}
(3)开启KafkaThread线程
这里KafkaThread只是Sender线程的包装,并且上面的NetworkClient作为Sender的成员变量:
所以这里的关系是:KafkaThread包含了Sender,Sender包含了NetworkClient,NetworkClient是跟broker做实际的网络通信组件。
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
setDaemon(daemon);
setUncaughtExceptionHandler(new () {
public void uncaughtException(Thread t, Throwable e) {
("Uncaught exception in " + name + ": ", e);
}
});
}
从sender线程的初始化可以看出send类的重要组件:网络通信组件,内存缓存,元数据
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
Metrics metrics,
Time time,
String clientId,
int requestTimeout) {
//网络通信组件
= client;
//内存缓存
= accumulator;
//元数据
= metadata;
= guaranteeMessageOrder;
= maxRequestSize;
= true;
= acks;
= retries;
= time;
= clientId;
//统计组件
= new SenderMetrics(metrics);
= requestTimeout;
}
当真正执行线程的时候执行的还是Sender类的run方法:
public void run() {
("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
//running,volatile修饰,作为标志位的经典用法
//这里死循环,不停的执行run方法
while (running) {
try {
run(());
} catch (Exception e) {
("Uncaught error in kafka producer I/O thread: ", e);
}
}
//后面不重要
}
执行run(());这里执行逻辑比较多,但是第一次执行这个方法的时候,多数的数据结构都是空的,主要是执行最后一行代码:
void run(long now) {
//第一次到这里,直接从metadata里获取cluster对象,此时的cluster对象是个空的 ,并没有去broker上拉取元数据信息
Cluster cluster = ();
// get the list of partitions with data ready to send
//第一次来到这里,因为cluster的属性值多数是空的,所以并没有执行什么,以至于result里的数据结构都是空的
result = (cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!()) {//第一次会跳过这里
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : )
(topic);
();
}
// remove any nodes we aren't ready to send to
//第一次:result是空的,这里都会跳过
Iterator<Node> iter = ();
long notReadyTimeout = Long.MAX_VALUE;
while (()) {
Node node = ();
if (!(node, now)) {
();
notReadyTimeout = (notReadyTimeout, (node, now));
}
}
// create produce requests
//第一次:batches是空的
Map<Integer, List<RecordBatch>> batches = (cluster,
,
,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : ()) {
for (RecordBatch batch : batchList)
();
}
}
List<RecordBatch> expiredBatches = (, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
((), );
(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = (, notReadyTimeout);
if (() > 0) {
("Nodes with data ready to send: {}", );
("Created {} produce requests: {}", (), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
(pollTimeout, now);
}
后面的 流程比较复杂,总体的调用流程是:
run(long now)
->(pollTimeout, now);
->(now);
->maybeUpdate(now, node);
->handleCompletedReceives(responses, updatedNow);
->(req, now, body)
->handleResponse(().header(), body, now);
->(cluster, now)
再次来到update方法,与上一次不同的是,这次已经真正从broker端获取到了cluster信息,会将这些信息更新给元数据。