Flink 1.14.*版本kafkaSource源码
@Internal
public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
//从kafka服务端消费数据的线程
final KafkaConsumerThread consumerThread;
//构造函数
public KafkaFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics);
this.deserializer = deserializer;
this.handover = new Handover();
this.consumerThread = new KafkaConsumerThread(LOG, this.handover, kafkaProperties, this.unassignedPartitionsQueue, this.getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup);
this.kafkaCollector = new KafkaFetcher.KafkaCollector();
}
public void runFetchLoop() throws Exception {
try {
//启动单独的消费线程,从kafka服务端消费数据到handover
this.consumerThread.start();
//从handover获取数据执行反序化,和
while(this.running) {
ConsumerRecords<byte[], byte[]> records = this.handover.pollNext();
Iterator var2 = this.subscribedPartitionStates().iterator();
while(var2.hasNext()) {
KafkaTopicPartitionState<T, TopicPartition> partition = (KafkaTopicPartitionState)var2.next();
List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records((TopicPartition)partition.getKafkaPartitionHandle());
this.partitionConsumerRecordsHandler(partitionRecords, partition);
}
}
} finally {
this.consumerThread.shutdown();
}
try {
this.consumerThread.join();
} catch (InterruptedException var8) {
Thread.currentThread().interrupt();
}
}
protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
Iterator var3 = partitionRecords.iterator();
while(var3.hasNext()) {
ConsumerRecord<byte[], byte[]> record = (ConsumerRecord)var3.next();
//反序列化操作,deserializer这个由研发自定义的,即文章开头的DeserializationSchema deserializationSchema
this.deserializer.deserialize(record, this.kafkaCollector);
this.emitRecordsWithTimestamps(this.kafkaCollector.getRecords(), partition, record.offset(), record.timestamp());
if (this.kafkaCollector.isEndOfStreamSignalled()) {
this.running = false;
break;
}
}
}
}
@Internal
public abstract class AbstractFetcher<T, KPH> {
protected void emitRecordsWithTimestamps(Queue<T> records, KafkaTopicPartitionState<T, KPH> partitionState, long offset, long kafkaEventTimestamp) {
synchronized(this.checkpointLock) {
Object record;
while((record = records.poll()) != null) {
//获取时间戳
long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);
//发送到下游,即source结束,
this.sourceContext.collectWithTimestamp(record, timestamp);
//保存分区状态
partitionState.onEvent(record, timestamp);
}
//提交位点到分区的状态
partitionState.setOffset(offset);
}
}
}