Flink 1.14.*版本kafkaSource源码

时间:2025-04-03 21:33:54
@Internal public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> { //从kafka服务端消费数据的线程 final KafkaConsumerThread consumerThread; //构造函数 public KafkaFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); this.consumerThread = new KafkaConsumerThread(LOG, this.handover, kafkaProperties, this.unassignedPartitionsQueue, this.getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup); this.kafkaCollector = new KafkaFetcher.KafkaCollector(); } public void runFetchLoop() throws Exception { try { //启动单独的消费线程,从kafka服务端消费数据到handover this.consumerThread.start(); //从handover获取数据执行反序化,和 while(this.running) { ConsumerRecords<byte[], byte[]> records = this.handover.pollNext(); Iterator var2 = this.subscribedPartitionStates().iterator(); while(var2.hasNext()) { KafkaTopicPartitionState<T, TopicPartition> partition = (KafkaTopicPartitionState)var2.next(); List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records((TopicPartition)partition.getKafkaPartitionHandle()); this.partitionConsumerRecordsHandler(partitionRecords, partition); } } } finally { this.consumerThread.shutdown(); } try { this.consumerThread.join(); } catch (InterruptedException var8) { Thread.currentThread().interrupt(); } } protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception { Iterator var3 = partitionRecords.iterator(); while(var3.hasNext()) { ConsumerRecord<byte[], byte[]> record = (ConsumerRecord)var3.next(); //反序列化操作,deserializer这个由研发自定义的,即文章开头的DeserializationSchema deserializationSchema this.deserializer.deserialize(record, this.kafkaCollector); this.emitRecordsWithTimestamps(this.kafkaCollector.getRecords(), partition, record.offset(), record.timestamp()); if (this.kafkaCollector.isEndOfStreamSignalled()) { this.running = false; break; } } } } @Internal public abstract class AbstractFetcher<T, KPH> { protected void emitRecordsWithTimestamps(Queue<T> records, KafkaTopicPartitionState<T, KPH> partitionState, long offset, long kafkaEventTimestamp) { synchronized(this.checkpointLock) { Object record; while((record = records.poll()) != null) { //获取时间戳 long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp); //发送到下游,即source结束, this.sourceContext.collectWithTimestamp(record, timestamp); //保存分区状态 partitionState.onEvent(record, timestamp); } //提交位点到分区的状态 partitionState.setOffset(offset); } } }