Kafka3.0源码学习-一、生产者源码

时间:2024-01-26 16:18:18

1、初始化

生产者 main 线程初始化,点击 main()方法中的 KafkaProducer()

KafkaProducer(ProducerConfig config,
          Serializer<K> keySerializer,
          Serializer<V> valueSerializer,
          ProducerMetadata metadata,
          KafkaClient kafkaClient,
          ProducerInterceptors<K, V> interceptors,
          Time time) {
try {
    this.producerConfig = config;
    this.time = time;

    // 获取事务id
    String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

    // 获取客户端id
    this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

    ......
    // 监控kafka运行情况
    JmxReporter jmxReporter = new JmxReporter();
    jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
    reporters.add(jmxReporter);
    MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
            config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
    this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
    // 获取分区器
    this.partitioner = config.getConfiguredInstance(
            ProducerConfig.PARTITIONER_CLASS_CONFIG,
            Partitioner.class,
            Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
    long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
    // key和value的序列化
    if (keySerializer == null) {
        this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                 Serializer.class);
        this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
    }
    ......
    // 拦截器处理(拦截器可以有多个)
    List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
            ProducerInterceptor.class,
            Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
    ......
    // 单条日志大小 默认1m
    this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
    // 缓冲区大小 默认32m
    this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
    // 压缩,默认是none
    this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

    this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
    int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

    this.apiVersions = new ApiVersions();
    this.transactionManager = configureTransactionState(config, logContext);
    // 缓冲区对象 默认是32m
    // 批次大小 默认16k
    // 压缩方式,默认是none
    // liner.ms 默认是0
    //  内存池
    this.accumulator = new RecordAccumulator(logContext,
            config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
            this.compressionType,
            lingerMs(config),
            retryBackoffMs,
            deliveryTimeoutMs,
            metrics,
            PRODUCER_METRIC_GROUP_NAME,
            time,
            apiVersions,
            transactionManager,
            new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

    // 连接上kafka集群地址
    List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
            config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
            config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
    // 获取元数据
    if (metadata != null) {
        this.metadata = metadata;
    } else {
        this.metadata = new ProducerMetadata(retryBackoffMs,
                config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                logContext,
                clusterResourceListeners,
                Time.SYSTEM);
        this.metadata.bootstrap(addresses);
    }
    this.errors = this.metrics.sensor("errors");

    this.sender = newSender(logContext, kafkaClient, this.metadata);
    String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
    // 把sender线程放到后台
    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    // 启动sender线程
    this.ioThread.start();
    config.logUnused();
    AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
    log.debug("Kafka producer started");
}
......
}

生产者 sender 线程初始化,KafkaProducer.java中点击 newSender()方法,查看发送线程初始化

Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
    // 缓存请求的个数 默认是5个
    int maxInflightRequests = configureInflightRequests(producerConfig);
    // 请求超时时间,默认30s
    int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
    ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
    Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);

    // 创建一个客户端对象
    // clientId  客户端id
    // maxInflightRequests  缓存请求的个数 默认是5个
    // RECONNECT_BACKOFF_MS_CONFIG 重试时间
    // RECONNECT_BACKOFF_MAX_MS_CONFIG 总的重试时间
    // 发送缓冲区大小send.buffer.bytes  默认128kb
    // 接收数据缓存 receive.buffer.bytes 默认是32kb
    KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
            new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                    this.metrics, time, "producer", channelBuilder, logContext),
            metadata,
            clientId,
            maxInflightRequests,
            producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
            producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
            producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
            producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
            requestTimeoutMs,
            producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
            producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
            time,
            true,
            apiVersions,
            throttleTimeSensor,
            logContext);
    // 0 :生产者发送过来,不需要应答;  1 :leader收到,应答;  -1 :leader和isr队列里面所有的都收到了应答
    short acks = configureAcks(producerConfig, log);
    // 创建sender线程
    return new Sender(logContext,
            client,
            metadata,
            this.accumulator,
            maxInflightRequests == 1,
            producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
            acks,
            producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
            metricsRegistry.senderMetrics,
            time,
            requestTimeoutMs,
            producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
            this.transactionManager,
            apiVersions);
}

Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并找到 sender 对象中的 run()方法

@Override
public void run() {
  ......
  while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
      try {
      // sender 线程从缓冲区准备拉取数据,刚启动拉不到数据
          runOnce();
      } catch (Exception e) {
          log.error("Uncaught error in kafka producer I/O thread: ", e);
      }
  }
......
}

2、发送数据到缓冲区

2.1 发送总体流程

从send()方法进入

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            // 拦截器对数据进行加工
            interceptRecord = interceptor.onSend(interceptRecord);
          ......
    return interceptRecord;
}


    //从拦截器处理中返回,点击 doSend()方法
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            // 获取元数据
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        // 序列化相关操作
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        ......
        // 分区操作
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        // 保证数据大小能够传输(序列化后的  压缩后的)
        ensureValidRecordSize(serializedSize);

        ......
        // accumulator缓存  追加数据  result是是否添加成功的结果
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

        ......
        // 批次大小已经满了 获取有一个新批次创建
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            // 唤醒发送线程
            this.sender.wakeup();
        }
        return result.future;
        ......
}

2.2 分区选择

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    // 如果指定分区,按照指定分区配置
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

//点击 partition,跳转到 Partitioner 接口,选择默认的分区器 DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                     int numPartitions) {
    // 没有指定key
    if (keyBytes == null) {
        // 按照粘性分区处理
        return stickyPartitionCache.partition(topic, cluster);
    }
    // 如果指定key,按照key的hashcode值 对分区数求模
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

2.3 发送消息大小校验

private void ensureValidRecordSize(int size) {
    // 单条信息最大值 maxRequestSize 1m
    if (size > maxRequestSize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
                ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
    // totalMemorySize  缓存大小 默认32m
    if (size > totalMemorySize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                ProducerConfig.BUFFER_MEMORY_CONFIG +
                " configuration.");
}

2.4 内存池

public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock,
                                 boolean abortOnNewBatch,
                                 long nowMs) throws InterruptedException {
    ......
    try {
        // check if we have an in-progress batch
        // 获取或者创建一个队列(按照每个主题的分区)
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            // 尝试向队列里面添加数据(正常添加不成功)
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
            if (appendResult != null)
                return appendResult;
        }

        // we don't have an in-progress record batch try to allocate a new batch
        if (abortOnNewBatch) {
            // Return a result that will cause another call to append.
            return new RecordAppendResult(null, false, false, true);
        }

        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        // this.batchSize 默认16k    数据大小17k
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
        // 申请内存  内存池分配内存  双端队列
        buffer = free.allocate(size, maxTimeToBlock);

        // Update the current time in case the buffer allocation blocked above.
        nowMs = time.milliseconds();
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new KafkaException("Producer closed while send in progress");

            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }
            // 封装内存buffer
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            // 再次封装(得到真正的批次大小)
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
            FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                    callback, nowMs));
            // 向队列的末尾添加批次
            dq.addLast(batch);
            incomplete.add(batch);

            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}

3、sender 线程发送数据

void runOnce() {
    // 事务相关操作
    if (transactionManager != null) {
        try {
            transactionManager.maybeResolveSequences();

            // do not continue sending if the transaction manager is in a failed state
            if (transactionManager.hasFatalError()) {
                RuntimeException lastError = transactionManager.lastError();
                if (lastError != null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, time.milliseconds());
                return;
            }

            // Check whether we need a new producerId. If so, we will enqueue an InitProducerId
            // request which will be sent below
            transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

            if (maybeSendAndPollTransactionalRequest()) {
                return;
            }
        } catch (AuthenticationException e) {
            // This is already logged as error, but propagated here to perform any clean ups.
            log.trace("Authentication exception while processing transactional request", e);
            transactionManager.authenticationFailed(e);
        }
    }

    long currentTimeMs = time.milliseconds();
    // 发送数据
    long pollTimeout = sendProducerData(currentTimeMs);
    // 获取发送结果
    client.poll(pollTimeout, currentTimeMs);
}


private long sendProducerData(long now) {
    // 获取元数据
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    // 1、判断32m缓存是否准备好
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    // 如果 Leader 信息不知道,是不能发送数据的
    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic, now);

        log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
            result.unknownLeaderTopics);
        this.metadata.requestUpdate();
    }

    ......

    // create produce requests
    // 发送每个节点数据,进行封装,这样一个分区的就可以打包一起发送
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    addToInflightBatches(batches);
    ......

    // 发送请求
    sendProduceRequests(batches, now);
    return pollTimeout;
}


// 是否准备发送
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
        Deque<ProducerBatch> deque = entry.getValue();
        synchronized (deque) {
            // When producing to a large number of partitions, this path is hot and deques are often empty.
            // We check whether a batch exists first to avoid the more expensive checks whenever possible.
            ProducerBatch batch = deque.peekFirst();
            if (batch != null) {
                TopicPartition part = entry.getKey();
                Node leader = cluster.leaderFor(part);
                if (leader == null) {
                    // This is a partition for which leader is not known, but messages are available to send.
                    // Note that entries are currently not removed from batches when deque is empty.
                    unknownLeaderTopics.add(part.topic());
                } else if (!readyNodes.contains(leader) && !isMuted(part)) {
                    long waitedTimeMs = batch.waitedTimeMs(nowMs);
                    // 如果不是第一次拉取,  且等待时间小于重试时间 默认100ms ,backingOff=true
                    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                    // 如果backingOff是true 取retryBackoffMs; 如果不是第一次拉取取lingerMs,默认0
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    // 批次大小满足发送条件
                    boolean full = deque.size() > 1 || batch.isFull();
                    // 如果超时,也要发送
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
                    // full linger.ms
                    boolean sendable = full
                        || expired
                        || exhausted
                        || closed
                        || flushInProgress()
                        || transactionCompleting;
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);
                    } else {
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}