当我们使用producer client发送消息之前,需要创建该对象:

Producer<String, String> producer = new KafkaProducer<>(props);


(1)kafka producer是线程安全的。







private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    try {
        ("Starting the Kafka producer");
        Map<String, Object> userProvidedConfigs = ();
         = config;
         = new SystemTime();
        clientId = (ProducerConfig.CLIENT_ID_CONFIG);
        if (() <= 0)
            clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
        Map<String, String> metricTags = new LinkedHashMap<String, String>();
        ("client-id", clientId);
        MetricConfig metricConfig = new MetricConfig().samples((ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow((ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), )
        List<MetricsReporter> reporters = (ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
        (new JmxReporter(JMX_PREFIX));
         = new Metrics(metricConfig, reporters, time);
         = (ProducerConfig.PARTITIONER_CLASS_CONFIG, );
        long retryBackoffMs = (ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        if (keySerializer == null) {
             = (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            ((), true);
        } else {
             = keySerializer;
        if (valueSerializer == null) {
             = (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            ((), false);
        } else {
             = valueSerializer;

        // load interceptors and make sure they get clientId
        (ProducerConfig.CLIENT_ID_CONFIG, clientId);
        List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
         = () ? null : new ProducerInterceptors<>(interceptorList);

        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
         = new Metadata(retryBackoffMs, (ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
         = (ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
         = (ProducerConfig.BUFFER_MEMORY_CONFIG);
        //压缩类型,默认不压缩,可以使用gzip,snappy,lz4 这几种压缩方式
         = ((ProducerConfig.COMPRESSION_TYPE_CONFIG));
        /* check for user defined settings.
         * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
         * This should be removed with release 0.9 when the deprecated configs are removed.
        if ((ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
            (ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
            boolean blockOnBufferFull = (ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
            if (blockOnBufferFull) {
                 = Long.MAX_VALUE;
            } else if ((ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                (ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                 = (ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
            } else {
                 = (ProducerConfig.MAX_BLOCK_MS_CONFIG);
        } else if ((ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
            (ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
             = (ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
        } else {
             = (ProducerConfig.MAX_BLOCK_MS_CONFIG);

        /* check for user defined settings.
         * If the TIME_OUT config is set use that for request timeout.
         * This should be removed with release 0.9
        if ((ProducerConfig.TIMEOUT_CONFIG)) {
            (ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
             = (ProducerConfig.TIMEOUT_CONFIG);
        } else {
             = (ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        // BATCH_SIZE_CONFIG:,每批数据量的大小,一个batch包含多个record。默认16384 bytes = 16KB
        // 一次请求Request对应一个broker,request会包含多个batch,每个batch对应了这个broker上的一个partition,
        // batch太小会导致频繁发送request,导致网络通信次数变多,导致吞吐量变小。如果设置成0.那就不打包,来一条发一条。
        // 如果batch过大则内存里会缓存大量的batch,浪费内存。
        // LINGER_MS_CONFIG:,默认是0。如果到了这个时间还没有到达batch大小,到了linger时间也会发送出去。
         = new RecordAccumulator((ProducerConfig.BATCH_SIZE_CONFIG),
        List<InetSocketAddress> addresses = ((ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        ((addresses), ());
        ChannelBuilder channelBuilder = (());
        NetworkClient client = new NetworkClient(
                new Selector((ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), , time, "producer", channelBuilder),
                , time);
         = new Sender(client,
                (ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                (short) parseAcks((ProducerConfig.ACKS_CONFIG)),
                new SystemTime(),
        String ioThreadName = "kafka-producer-network-thread" + (() > 0 ? " | " + clientId : "");
         = new KafkaThread(ioThreadName, , true);

         = ("errors");

        (JMX_PREFIX, clientId);
        ("Kafka producer started");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed
        // this is to prevent resource leak. see KAFKA-2121
        close(0, , true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);








(2)构造NetworkClient 。




metadata在new出来后,((addresses), ());实际上并没有更新元数据信息。metadata类中最重要的就是Cluaster属性,cluster中主要变量:

private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> internalTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;


public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;



public static Cluster bootstrap(List<InetSocketAddress> addresses) {
    List<Node> nodes = new ArrayList<>();
    int nodeId = -1;
    for (InetSocketAddress address : addresses)
        (new Node(nodeId--, (), ()));
    return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());




public synchronized void update(Cluster cluster, long now) {
    (cluster, "cluster should not be null");

     = false;
     = now;
     = now;
     += 1;

    if (topicExpiryEnabled) {
        // Handle expiry of topics from the metadata refresh set.
        for (Iterator<<String, Long>> it = ().iterator(); (); ) {
            <String, Long> entry = ();
            long expireMs = ();
            if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
                (now + TOPIC_EXPIRY_MS);
            else if (expireMs <= now) {
                ("Removing unused topic {} from the metadata list, expiryMs {} now {}", (), expireMs, now);
    for (Listener listener: listeners)

    String previousClusterId = ().clusterId();
    if () {
        // the listener may change the interested topics, which could cause another metadata refresh.
        // If we have already fetched all topics, however, another fetch should be unnecessary.
         = false;
         = getClusterForCurrentTopics(cluster);
    } else {
         = cluster;

    // The bootstrap cluster is guaranteed not to have any useful information
    if (!()) {
        String clusterId = ().clusterId();
        if (clusterId == null ? previousClusterId != null : !(previousClusterId))
            ("Cluster ID: {}", ().clusterId());
    ("Updated cluster metadata version {} to {}", , );



private NetworkClient(MetadataUpdater metadataUpdater,
                      Metadata metadata,
                      Selectable selector,
                      String clientId,
                      int maxInFlightRequestsPerConnection,
                      long reconnectBackoffMs,
                      int socketSendBuffer,
                      int socketReceiveBuffer,
                      int requestTimeoutMs,
                      Time time) {

    /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
     * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
     * super constructor is invoked.
    //第一次,走这里,metadataUpdater 传过来的就是 null
    if (metadataUpdater == null) {
        if (metadata == null)
            throw new IllegalArgumentException("`metadata` must not be null");
         = new DefaultMetadataUpdater(metadata);
    } else {
         = metadataUpdater;
     = selector;
     = clientId;
     = new InFlightRequests(maxInFlightRequestsPerConnection);
     = new ClusterConnectionStates(reconnectBackoffMs);
     = socketSendBuffer;
     = socketReceiveBuffer;
     = 0;
     = new Random();
     = requestTimeoutMs;
     = time;




public KafkaThread(final String name, Runnable runnable, boolean daemon) {
    super(runnable, name);
    setUncaughtExceptionHandler(new () {
        public void uncaughtException(Thread t, Throwable e) {
            ("Uncaught exception in " + name + ": ", e);


public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              String clientId,
              int requestTimeout) {
     = client;
     = accumulator;
     = metadata;
     = guaranteeMessageOrder;
     = maxRequestSize;
     = true;
     = acks;
     = retries;
     = time;
     = clientId;
     = new SenderMetrics(metrics);
     = requestTimeout;


public void run() {
    ("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    while (running) {
        try {
        } catch (Exception e) {
            ("Uncaught error in kafka producer I/O thread: ", e);


void run(long now) {
    //第一次到这里,直接从metadata里获取cluster对象,此时的cluster对象是个空的 ,并没有去broker上拉取元数据信息
    Cluster cluster = ();
    // get the list of partitions with data ready to send
     result = (cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!()) {//第一次会跳过这里
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : )

    // remove any nodes we aren't ready to send to
    Iterator<Node> iter = ();
    long notReadyTimeout = Long.MAX_VALUE;
    while (()) {
        Node node = ();
        if (!(node, now)) {
            notReadyTimeout = (notReadyTimeout, (node, now));

    // create produce requests
    Map<Integer, List<RecordBatch>> batches = (cluster,
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<RecordBatch> batchList : ()) {
            for (RecordBatch batch : batchList)

    List<RecordBatch> expiredBatches = (, now);
    // update sensors
    for (RecordBatch expiredBatch : expiredBatches)
        ((), );

    List<ClientRequest> requests = createProduceRequests(batches, now);
    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
    // that isn't yet sendable (. lingering, backing off). Note that this specifically does not include nodes
    // with sendable data that aren't ready to send since they would cause busy looping.
    long pollTimeout = (, notReadyTimeout);
    if (() > 0) {
        ("Nodes with data ready to send: {}", );
        ("Created {} produce requests: {}", (), requests);
        pollTimeout = 0;
    for (ClientRequest request : requests)
        (request, now);

    // if some partitions are already ready to be sent, the select time would be 0;
    // otherwise if some partition already has some data accumulated but not ready yet,
    // the select time will be the time difference between now and its linger expiry time;
    // otherwise the select time will be the time difference between now and the metadata expiry time;
    (pollTimeout, now);

后面的 流程比较复杂,总体的调用流程是:

run(long now)
	->(pollTimeout, now);
			->maybeUpdate(now, node);
		->handleCompletedReceives(responses, updatedNow);
			->(req, now, body)
				->handleResponse(().header(), body, now);
					->(cluster, now)
