kafka源码分析之producer

时间:2022-10-09 16:47:08

Producerclient

示例代码

Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("client.id""DemoProducer");
props.put("key.serializer""org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer""org.apache.kafka.common.serialization.ByteArraySerializer");
producer new KafkaProducer<IntegerString>(props);
this.topic = topic;
this.isAsync = isAsync;

String messageStr = "Message_";
long startTime = System.currentTimeMillis();
if (isAsync) {

异步处理,这个过程需要定义一个回调函数来监听发送的消息的响应结果

 // Send asynchronously
  
producer.send(new ProducerRecord<byte[]byte[]>(topic,
      messageNo.getBytes()/*key*/,
      messageNo.getBytes()/*value*/),

      /*异步处理,回调函数*/

      new DemoCallBack(startTimemessageNomessageStr));
else 

同步处理,发送完成后,等待发送的响应结果。

// Send synchronously
  
try {
    producer.send(new ProducerRecord<IntegerString>(topic,
        messageNo.getBytes()/*key*/,
        messageNo.getBytes()/*value*/)).get();
    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
  catch (InterruptedException e) {
    e.printStackTrace();
  catch (ExecutionException e) {
    e.printStackTrace();
  }
}

 

关于异步处理的回调函数定义:

这个回调函数实现需要实现org.apache.kafka.clients.producer.Callback接口。

class DemoCallBack implements Callback 

 

并实现接口中的函数:

public void onCompletion(RecordMetadata metadataException exception) {

这里的startTime是发送这条消息时,生成回调函数时传入的消息发送的开始时间,

计算出来了这次发送这条消息共花的时间
  long elapsedTime = System.currentTimeMillis() - startTime;
  if (metadata != null) {

如果metadata信息不为空,表示消息添加成功,可以得到当前添加成功的消息的offset.
    System.out.println(
        "message(" key ", " message ") sent to partition(" 

           + metadata.partition() +
            "), " +
            "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
  else {

这种情况下,表示exception有值,也就是添加消息失败了,可以直接打印这个失败的消息的内容。
    exception.printStackTrace();
  }
}

 

Client端的生成与处理流程

生成KafkaProducer实例

1,首先看看KafkaProducer实例生成:

根据传入的properties配置信息,生成用于Producer的config实例。

this(new ProducerConfig(properties)nullnull);

 

2,解析必要的配置项:

2,1,配置项client.id,用于标记client端的一个编码值,默认值为producer-1。在同一个进程内,多个client端时,如果没有指定,默认根据1这个值向后增加。

2,2,配置项partitioner.class,配置用于producer写入数据时用于计算这条数据对应的partition的分配算子实例,这个实例必须是的Partitioner实现。实例初始化时会调用configure函数把配置文件传入进去,用于实例生成时使用,默认情况下分区算子是DefaultPartitioner。这个默认算子根据当前的key值进行murmur2 hash并与对应的topic的个数于模,如果key为null时,根据一个自增的integer的值与partition的个数取模.

2,3,配置项retry.backoff.ms,用于在向broker发送数据失败后的重试间隔时间,默认值为100ms

2,4,配置项metadata.max.age.ms,用于配置每个producer端缓存topic的metadata的过期时间,默认值为5分钟。配置上面的2,3,与2,4的配置,生成一个Metadata实例。

2,5,配置项max.request.size,用于配置每次producer请求的最大的字节数,默认值为1MB。

2,6,配置项buffer.memory,用于配置producer端等待向server发送的数据的缓冲区的大小,默认值为32MB。

2,7,配置项compression.type,默认值none,用于配置数据的压缩算法,默认为不压缩,可配置的值为none,gzip,snappy,lz4

2,8,配置项max.block.ms,用于配置send数据或partitionFor函数得到对应的leader时,最大的等待时间,默认值为60秒。

2,9,配置项request.timeout.ms,用于配置socket请求的最大超时时间,默认值为30秒。

 

3,生成record的累加器,这是一个用于对producer要发送的数据进行缓冲的实例:

this.accumulator new RecordAccumulator(

        config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
        this.totalMemorySize,
        this.compressionType,
        config.getLong(ProducerConfig.LINGER_MS_CONFIG),
        retryBackoffMs,
        metrics,
        time,
        metricTags);

 

3,1,RecordAccumulator实例需要的配置:

3,1,1配置项batch.size,用于批量提交的batch字节大小,默认值为16384。

3,1,2配置项linger.ms,这个配置与3,1,1配合使用,用于配置数据缓存的最大延迟时间,默认值0.

3,1,3依赖的其它配置项:2,6  2,7 2,3。

 

4,根据配置项bootstrap.servers,多个配置使用逗号分开,

生成用于socket请求的InetSocketAddress实例集合。

4,1并根据配置的broker的连接地址集合,生成Cluster的实例。把cluster实例更新到metadata的实例中。

 

5,生成NetworkClient实例,这个实例用于与各个broker进行socket通信,生成用于进行数据发送的Sender实例,并生成用于数据发送的KafkaThread线程并启动。

 

6,根据配置项key.serializer/value.serializer,生成key与value的序列化实例,这实例必须是Serializer的实现。

 

KafkaThread线程初始化

生成NetworkClient实例需要的配置项:

1,配置项connections.max.idle.ms,默认值为9分钟,用于设置连接最大的空闲时间,

2,配置项max.in.flight.requests.per.connection,默认值5,用于设置每个连接最大的请求个数

3,配置项reconnect.backoff.ms,默认值50ms,用于设置重新尝试连接的等待时间。

4,配置项send.buffer.bytes,默认值128kb,用于设置socket的发送缓冲区SO_SNDBUF的大小。

5,配置项receive.buffer.bytes,默认值32kb,用于设置socket的接收响应的缓冲区SO_RCVBUF的大小。

6,配置项request.timeout.ms,用于配置socket请求的最大超时时间,默认值为30秒。

NetworkClient client = new NetworkClient(
        new Selector(

            config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)

            this.metricstime"producer"metricTagschannelBuilder),
        this.metadata,
        clientId,
        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
        this.requestTimeoutMstime);

 

Sender是一个用于发送数据的线程。

需要的配置项:

1,配置项max.request.size,用于配置每次producer请求的最大的字节数,默认值为1MB。

2,配置项acks,默认值1,用于配置请求的ack的类型,-1,0,1三种。

3,配置项retries,默认值0,用于配置发送失败的重试次数。
this.sender new Sender(client,
        this.metadata,
        this.accumulator,
        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
        (shortparseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
        config.getInt(ProducerConfig.RETRIES_CONFIG),
        this.metrics,
        new SystemTime(),
        clientId,
        this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" 

        + (clientId.length() > " | " clientId "");

 

这里用于启动用于对producer中的数据进行发送的线程Sender实例。
this.ioThread new KafkaThread(ioThreadNamethis.sendertrue);
this.ioThread.start();

 

通过producer发送数据

Producersend函数

public Future<RecordMetadata> send(ProducerRecord<KV> record) {
    return send(recordnull);
}

如果需要考虑数据发送成功的回调处理时,需要实现Callback。
public Future<RecordMetadata> send(ProducerRecord<KV> record

Callback callback) {
    try {

这里根据请求的记录的topic的名称,得到这个topic对应的metadata信息,这里通过Metadata实例来得到。函数返回值是读取topic的metadata信息的读取时间。

1,从metadata实例中的topics集合中检查这个topic是否存在,如果不存在,把这个topic添加到集合中,

2,从metadata对应的Cluster实例(这里存储有每个broker的连接信息)中的partitionsByTopic集合中根据topic得到topic对应的partition信息的集合,如果partitionsByTopic中已经存在有对应的partitions的记录,说明这个topic的metadata信息已经被加载出来,函数直接返回0。

3,如果当前的topic在metadata中没有对应的partitions的信息,根据max.block.ms配置的最大等待时间,通过每个broker的连接,随机取出一个broker的连接,如果broker的连接不存在时,会创建这个连接并向broker发起一个TopicMetadataRequest请求得到这个topic对应的metadata信息。
        // first make sure the metadata for the topic is available
        
long waitedOnMetadataMs = waitOnMetadata(record.topic()

             this.maxBlockTimeMs);

 

这里得到总的等待时间除去得到metadata信息用去的时间后还可以用于等待添加数据到发送队列处理的等待时间。
        long remainingWaitMs = Math.max(0this.maxBlockTimeMs 

               waitedOnMetadataMs);

 

对传入的key与value进行序列化操作,并得到序列化后的byte array的key与value.
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic()record.key());
        catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " 

                    + record.key().getClass().getName() +
                    " to class " producerConfig.getClass(

                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer");
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic()

                record.value());
        catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " 

                    record.value().getClass().getName() +
                    " to class " producerConfig.getClass(

                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer");
        }

 

得到这条记录对应的partition,并根据这个partition生成TopicPartition,

在得到对应的partition时,如果传入参数中包含有partition的id时,判断这个partition的值是否在指定的范围内,必须在指定的范围内,如果partition没有传入时,通过指定的partitioner的实例,根据record的kv信息,生成一个partition的id值。
        int partition = partition(recordserializedKeyserializedValue

                 metadata.fetch());

得到一条记录的长度,这个记录的长度为size(4),offset(8),crc(4),magic(1),attr(1),

            Keysize(4),key,valuesize(4),value
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey

                   serializedValue);
        ensureValidRecordSize(serializedSize);
        TopicPartition tp = new TopicPartition(record.topic()partition);


        log.trace("Sending record {} with callback {} to topic {} partition {}",

               recordcallbackrecord.topic()partition);

 

向client端的消息缓冲区内写入这条消息。
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp

              serializedKeyserializedValuecallbackremainingWaitMs);


        if (result.batchIsFull || result.newBatchCreated) {

如果当前的缓冲区的batch的大小已经满了,或者说这个缓冲区中重新生成了一个batch时,唤醒sender的线程,让sender的run函数继续执行,完成对数据的发送操作。
            log.trace("Waking up the sender since topic {} partition {} is either full 

                  or getting a new batch"record.topic()partition);


            this.sender.wakeup();
        }
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    
catch (ApiException e) {
        log.debug("Exception occurred during message send:"e);
        if (callback != null)
            callback.onCompletion(nulle);
        this.errors.record();
        return new FutureFailure(e);
    catch (InterruptedException e) {
        this.errors.record();
        throw new InterruptException(e);
    catch (BufferExhaustedException e) {
        this.errors.record();
        this.metrics.sensor("buffer-exhausted-records").record();
        throw e;
    catch (KafkaException e) {
        this.errors.record();
        throw e;
    }
}

 

Producer的缓冲区的append

在执行producer的send函数时,并不是直接就向socket发起网络请求,而是先把数据存储到发送的缓冲区中,这个缓冲区的实现是一个RecordAccumulator实例。

这个实例生成时,需要的配置项:

配置项batch.size,用于批量提交的batch字节大小,默认值为16384。

配置项linger.ms,这个配置与3,1,1配合使用,用于配置数据缓存的最大延迟时间,默认值0.

配置项buffer.memory,用于配置producer端等待向server发送的数据的缓冲区的大小,默认值为32MB。

配置项compression.type,默认值none,用于配置数据的压缩算法,默认为不压缩,可配置的值为none,gzip,snappy,lz4

配置项retry.backoff.ms,用于在向broker发送数据失败后的重试间隔时间,默认值为100ms

 

接下来看看用于添加数据到缓冲区的append函数
public RecordAppendResult append(TopicPartition tpbyte[] keybyte[] valueCallback callbacklong maxTimeToBlock) throws InterruptedException {


    
appendsInProgress.incrementAndGet();
    try {
        if (closed)
            throw new IllegalStateException(

                  "Cannot send after the producer is closed.");

 

首先从当前的batchs集合中得到对应这个partition的RecordBatch的双端队列。

如果batchs集合中还不存在partition对应的双端队列时,生成一个ArrayDeque的队列实例,并放入到batchs的集合中。这个函数返回batchs集合中对应这个partition的双端队列实例。
        // check if we have an in-progress batch
        
Deque<RecordBatch> dq = dequeFor(tp);
        synchronized (dq) {

得到这个队列中,最后一个RecordBatch的实例值,在一个ArrayDeque的双端队列中,实例初始化时默认生成16个元素的数组(2的倍数),如果是addLast是从0开始向后添加,如果是addFirst是从数组尾部向前添加。
            RecordBatch last = dq.peekLast();
            if (last != null) {

如果这个双端队列中,得到了一个RecordBatch的用于存储batch的实例时,表示这个队列中是存在待提交的batch的信息。向这个recordBatch中添加这个kvy-value进去。

第一次进行partition的消息添加时,这个流程不会被执行。

这里向队列中最后一个RecordBatch添加这个kv消息,这个流程被执行时,表示这个RecordBatch一定存在一个大于0的record的记录数,

向这个RecordBatch中添加消息的流程:

1,检查这个batch是否处于可写的状态,Sender线程未对此batch进行提交时,这个值为true,同时这个buffer中没有写入任何内容时,这个buffer的大小能够放下当前的kv,

2,检查这个batch中当前的内存位置加上当前要send进去的kv的大小是否超过了batchSize的大小,

3,如果流程执行到这里,表示这个kv能被添加到这个RecordBatch中,向RecordBatch中添加这条消息,并返回这个RecordBatch的FutureRecordMetadata的信息,

4,如果当前的RecordBatch没有足够的空间来存储这个kv时,这里返回的future是一个null值。
                FutureRecordMetadata future = last.tryAppend(keyvaluecallback

                       time.milliseconds());

 

如果说future的值不为空,表示这条消息成功添加到这个buffer中,检查这个队列中的RecordBatch的个数是否大于1或者当前的RecordBatch的大小是否已经达到了不能写的情况,如果满足这两种情况中的一种那么生成这个RecordAppendResult的第二个参数为true,否则为false,第三个参数,由于这个情况下是直接拿到的队列中的一个buffe进行的添加,并不是新创建的RecordBatch,这里的值为false.
                if (future != null)
                    return new RecordAppendResult(futuredq.size() > || 

                         last.records.isFull()false);
            }
        }

如果对应的partition的双端队列中是第一次添加消息到缓冲区,或者说当前双端队列中最后一个RecordBatch内存储的消息已经达到了batch的大小,需要重新生成一个RecordBatch,计算出这个batch存储的大小。
        // we don't have an in-progress record batch try to allocate a new batch
        
int size = Math.max(this.batchSizeRecords.LOG_OVERHEAD 

                Record.recordSize(keyvalue));


        log.trace("Allocating a new {} byte message buffer for topic {} partition {}"

                    sizetp.topic()tp.partition());

 

从内存池中分配指定大小的一个缓冲区出来,分配流程:

1,如果申请的size大小与每个batchSize的大小(可以理解为内存页)相同,同时内存池中刚好有缓存起来的已经分配好的buffer,直接取出这个双端队列中的第一个buffer(采用移出的方式).

2,如果当前内存池中可用的内存加上所有分配的内存页的大小(每个内存页是一个batchSize的大小)相加大于或者等于当前要申请的size大小,如果当前可用的内存小于申请的size时,释放掉内存池的双端队列中最后一个缓存起来的已经分配的buffer的容量(这个过程是一个迭代过程,直到释放的内存达到可以存储这个size的大小为结束,每次迭代移出最后一个buffer),把当前的可用内存减去分配的size的大小,并根据这个size生成一个ByteBuffer实例。返回这个ByteBuffer.

3,这种情况表示当前的缓冲区已经没有足够的大小用来分配buffer,通过while进行迭代,直到可用的内存达到size的大小,每次迭代当前的send线程就wait住,等待Sender线程对buffer进行提交释放后唤醒线程,

3,1,在线程被唤醒后,检查如果当前的while的迭代是第一次迭代,同时要分配的size刚好就是内存页的大小,同时内存池中空闲的内存页的buffer刚好还有多于的的,取出内存池中双端队列的第一个buffer.停止迭代,返回这个buffer.

3,2这种情况下,send函数的线程已经被唤醒,但是要分配的size是一个比batchSize(内存页)大的size时,如果当前的内存池中还有缓存的内存页可用,释放缓存的内存页buffer,直到释放到可分配size的空间(如果无法释放更多的空间时,while重新迭代,线程重新进入到wait的状态,等待提交后的唤醒),这种情况下返回的buffer会根据size大小重新申请一个ByteBuffer并返回,在能够分配可用大小的内存空间后,同时当前内存池中可用的内存大于0,或者说内存池还有被缓存起来的内存页buffer,唤醒下一个等待线程。
        ByteBuffer buffer = free.allocate(sizemaxTimeToBlock);
        synchronized (dq) {
            
if (closed)
                throw new IllegalStateException(

                        "Cannot send after the producer is closed.");

 

如果这个双端队列中,得到了一个RecordBatch的用于存储batch的实例时,表示这个队列中是存在待提交的batch的信息。向这个recordBatch中添加这个kvy-value进去。

第一次进行partition的消息添加时,这个流程不会被执行。

这里向队列中最后一个RecordBatch添加这个kv消息,这个流程被执行时,表示这个RecordBatch一定存在一个大于0的record的记录数,

向这个RecordBatch中添加消息的流程:

1,检查这个batch是否处于可写的状态,Sender线程未对此batch进行提交时,这个值为true,同时这个buffer中没有写入任何内容时,这个buffer的大小能够放下当前的kv,

2,检查这个batch中当前的内存位置加上当前要send进去的kv的大小是否超过了batchSize的大小,

3,如果流程执行到这里,表示这个kv能被添加到这个RecordBatch中,向RecordBatch中添加这条消息,并返回这个RecordBatch的FutureRecordMetadata的信息,

4,如果当前的RecordBatch没有足够的空间来存储这个kv时,这里返回的future是一个nul值。
            RecordBatch last = dq.peekLast();
            if (last != null) {
                FutureRecordMetadata future = last.tryAppend(keyvaluecallback

                        time.milliseconds());

 

下面的future不为空的情况,通常情况下应该不会被发生,如果发生了,把当前申请的buffer释放掉。并把这个buffer放到这个内存池的缓存队列中,用于下一次使用时,不需要重复申请内存。

如果说future的值不为空,表示这条消息成功添加到这个buffer中,检查这个队列中的RecordBatch的个数是否大于1或者当前的RecordBatch的大小是否已经达到了不能写的情况,如果满足这两种情况中的一种那么生成这个RecordAppendResult的第二个参数为true,否则为false,第三个参数,由于这个情况下是直接拿到的队列中的一个buffe进行的添加,并不是新创建的RecordBatch,这里的值为false.
                if (future != null) {
                    
free.deallocate(buffer);
                    return new RecordAppendResult(futuredq.size() > || 

                        last.records.isFull()false);
                }
            }

 

流程执行到这里,表示当前的partition中对应的双端队列的最后一个RecordBatch不能够存储这个kv的大小,或者说当前的队列中不存在RecordBatch,新生成一个Records用于存储要send的消息集合,并生成一个RecordBatch来归类这个records,把要send的kv添加到这个records中,

并生成一个FutureRecordMetadata实例返回,

这个实例中引用一个对应此RecordBatch中的ProduceRequestResult实例,这个实例用于在Sender线程中用于控制batch是否被提交成功的处理,第二个参数是表示当前的RecordBatch中已经存储的消息条数,在这里通过RecordBatch添加消息时(tryAppend函数),在这个流程的处理中一定会返回一个FutureRecordMetadata的实例,因为这是第一次添加,RecordBatch中的buffer被定义成刚好能够存储这个kv或者说这个buffer不光能够存储这一个kv.
            MemoryRecords records = MemoryRecords.emptyRecords(buffercompression

                  this.batchSize);
            RecordBatch batch = new RecordBatch(tprecordstime.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(keyvalue

                  callbacktime.milliseconds()));

最后,把这个新生成的batch添加到队列中。
            dq.addLast(batch);
            incomplete.add(batch);

生成要返回的信息,这个信息中包含有future用于控制提交是否成功,第二个参数1:如果说队列中已经有超过一个的batch时,表示第一个batch已经满了,或者说第一个的batch已经不能存储新send进来的kv,又新创建了一个batch,2:新生成的batch的records中已经存储了大于或等于batchSize的大小的数据,

==>如果说这个值是true时,表示这个参数上面的两种情况最少满足一种。

第三个参数由于这里是新创建的一个RecordBatch,因为为true.
            return new RecordAppendResult(futuredq.size() > || 

                  batch.records.isFull()true);
        }
    } finally {
        appendsInProgress.decrementAndGet();
    }
}

 

Sender线程处理数据的发送

 

线程的run函数:
/**
 * The main run loop for the sender thread
 */
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

KafkaProducer实例生成时,KafkaThread线程启动后,会执行Sender实例中的run函数,
    // main loop, runs until close is called
    
while (running) {

如果producer没有执行shutdown操作前,run函数会一直在这个地方进行执行,不断的执行run函数传入当前的执行时的系统时间。
        try {
            run(time.milliseconds());
        catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: "e);
        }
    }

如果流程执行到这里,说明produce已经执行了shutdown操作,准备执行停止producer的操作。
    log.debug("Beginning shutdown of Kafka producer I/O thread,

              sending remaining records.");

    // okay we stopped accepting requests but there may still be
    // requests in the accumulator or waiting for acknowledgment,
    // wait until these are completed.
    
while (!forceClose && (this.accumulator.hasUnsent() || 

                this.client.inFlightRequestCount() > 0)) {

如果当前的accumulator的缓冲区还有数据没有被处理,同时networkClient中还有正在进行的请求,迭代执行run函数,直到数据被全部发送完成。
        try {
            run(time.milliseconds());
        catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: "e);
        }
    }
    if (forceClose) {
        
this.accumulator.abortIncompleteBatches();
    }

关闭网络连接。
    try {
        this.client.close();
    catch (Exception e) {
        log.error("Failed to close network client"e);
    }

    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

 

执行缓冲区数据的发送操作的函数:


/**
 * Run a single iteration of sending
 * 
 * 
@param now
 
*            The current POSIX time in milliseconds
 */
public void run(long now) {

得到当前cluster中所有的broker节点的连接信息。
    Cluster cluster = metadata.fetch();

 

这里从缓冲区中的所有的partition的batch中进行计算,取出已经准备好的需要进行发送的broker的节点集合,具体流程:

1,对缓冲区中batchs集合进行迭代,取出每个partition对应的双端队列(存储数据缓存的batch),

2,如果partition在cluster对应的partitionsByTopicPartition集合中存在,表示这个topic的metadata已经被加载过来,得到这个partition的leader,

3,如果partition的leader不存在,

       设置这个函数返回ReadyCheckResult类型的unknownLeadersExist值为true.

4,如果迭代的partition的leader存在,取出这个partition的队列中的第一个batch,如果这个batch存在,表示有缓存的数据,

4,1检查这个batch是否已经被提交过,重试次数大于0,

      同时上一次重试的时间已经大于了retry.backoff.ms(默认100ms)配置的等待时间,

     把这个partition的leader添加到返回的ReadyCheckResult实例中的readyNodes集合中。

         (readyNodes是一个set集合)

4,2如果这个partition对应的队列中已经缓存有超过一个以上的batch,

或者说有batch的缓存大小已经达到了batchSize的配置大小时,

把这个leader添加到readyNodes中。

4,3如果这个partition的队列中有batch已经达到了linger.ms(默认值0)配置的等待时间,

    把这个leader添加到readyNodes中。

5,这个返回的ReadyCheckResult实例中,属性nextReadyCheckDelayMs的值,表示要delay到的下一次时间,也就是下一次执行的wait时间,如果当前的所有的batch中没有超过等待时间时(retry.backoff.ms/linger.ms),也就是当前执行时间与等待时间的差值。
    // get the list of partitions with data ready to send
    
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster

               now);

 如果上面的执行返回的结果中unknownLeadersExist属性值为true,表示topic的metadata还没有被加载过来(这个情况一般不会发生),标记metadata需要被更新。
    
if (result.unknownLeadersExist)
        this.metadata.requestUpdate();

对返回的结果集中readyNodes集合中准备好的节点进行迭代,这个while的迭代中主要执行如下的流程:

1,通过NetworkClient检查这个node是否已经被连接,同时metadata还没有达到需要更新的时间,同时连接队列中个数数小于max.in.flight.requests.per.connection配置的连接个数。那么这个node会被保留,

2,如果当前迭代的node的连接已经超时,或者metadata需要被更新,或者node对应的broker还没有被创建连接,移出这个node.
    // remove any nodes we aren't ready to send to
    
Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(nodenow)) {
            iter.remove();

如果connection是已经被关闭掉的连接,this.client.connectionDelay(nodenow)返回的timeout是reconnect.backoff.ms(50ms)配置的值。
            notReadyTimeout = Math.min(notReadyTimeout

               this.client.connectionDelay(nodenow));
        }
    }

 

流程执行到这里时,result中的readyNodes集合中包含的是已经与broker创建有连接的node的信息。

这里根据可以发起连接的broker的nodes集合,迭代每个node中的所有的partition的队列,

取出这个队列中的第一个recordBatch(如果这个batch已经发送失败过一次,同时还没到重试的时间间隔,跳过这个batch),关闭这个batch(表示这个batch不能在写入)同时把这个batch添加到要返回的map集合中,这个迭代的过程直到找完所有的node中对应的partition中队列的第一个元素,

或者达到max.request.size配置的最大的请求的消息字节数的大小为结束。
    // create produce requests
    
Map<IntegerList<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                     result.readyNodes,
                                                                     this.maxRequestSize,
                                                                     now);

这里根据request.timeout.ms(默认30秒)配置的请求超时时间,得到缓冲区中所有请求超时的batch的集合(通过batch的最后一次写入消息的时间来判断是否达到了超时时间),如果发现batch已经起时,从缓冲区中移出这个batch,并回收这个batch对应的buffer.
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(

             this.requestTimeoutclusternow);
    // update sensors
    
for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic()

                expiredBatch.recordCount);

    sensors.updateProduceRequestMetrics(batches);

 

根据每个broker对应的partition的batch的消息集合,生成对应的ProduceRequest请求,

这个请求每一个broker生成一个请求,这个请求中包含了这个broker中所有的partition的buffer的集合。
    List<ClientRequest> requests = createProduceRequests(batchesnow);

 

这里计算出下一次执行需要的等待间隔,根据retry.backoff.ms/linger.ms配置的时间,如果说这次需要进行提交数据到指定的broker的readyNodes的集合大于0,设置这个间隔时间为0.
    
long pollTimeout = Math.min(result.nextReadyCheckDelayMsnotReadyTimeout);
    if (result.readyNodes.size() > 0) {
        log.trace("Nodes with data ready to send: {}"result.readyNodes);
        log.trace("Created {} produce requests: {}"requests.size()requests);
        pollTimeout = 0;
    }

 

迭代每一个broker的Produce的请求,通过NetworkClient向每一个broker发送对应的请求。
    for (ClientRequest request : requests)
        client.send(requestnow);

检查是否需要更新metadata,如果需要,重新向broker发送metadata的请求,并更新metadata.

接收请求的响应信息,并调用对应的callback函数。

检查连接中是否有超过指定的时间connections.max.idle.ms(默认9分钟)没有活动的broker连接,如果有,关闭这个连接。
    
this.client.poll(pollTimeoutnow);
}