先看
AbstractFetcher
这个可以理解就是,consumer中具体去kafka读数据的线程,一个fetcher可以同时读多个partitions的数据来看看
/**
* Base class for all fetchers, which implement the connections to Kafka brokers and
* pull records from Kafka partitions.
*
* <p>This fetcher base class implements the logic around emitting records and tracking offsets,
* as well as around the optional timestamp assignment and watermark generation.
*
* @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
* the Flink data streams.
* @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
*/
public abstract class AbstractFetcher<T, KPH> { /** The source context to emit records and watermarks to */
private final SourceContext<T> sourceContext; //用于发送数据的context /** All partitions (and their state) that this fetcher is subscribed to */
private final KafkaTopicPartitionState<KPH>[] allPartitions; //用于记录每个topic partition的状态,比如offset // ------------------------------------------------------------------------ protected AbstractFetcher(
SourceContext<T> sourceContext,
List<KafkaTopicPartition> assignedPartitions,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext) throws Exception
{
// create our partition state according to the timestamp/watermark mode
this.allPartitions = initializePartitions(
assignedPartitions,
timestampWatermarkMode,
watermarksPeriodic, watermarksPunctuated,
runtimeContext.getUserCodeClassLoader()); // if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions; PeriodicWatermarkEmitter periodicEmitter =
new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext);
periodicEmitter.start(); //定期的发出waterMark
}
} // ------------------------------------------------------------------------
// Core fetcher work methods
// ------------------------------------------------------------------------ public abstract void runFetchLoop() throws Exception; //核心的函数,需要重载 // ------------------------------------------------------------------------
// Kafka version specifics
// ------------------------------------------------------------------------ /**
* Creates the Kafka version specific representation of the given
* topic partition.
*
* @param partition The Flink representation of the Kafka topic partition.
* @return The specific Kafka representation of the Kafka topic partition.
*/
public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);//生成KafkaPartitionHandle,这个其实是kafka中对partition的描述 /**
* Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
* older Kafka versions).
*
* @param offsets The offsets to commit to Kafka.
* @throws Exception This method forwards exceptions.
*/
public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception; //如果commit相应的kafka offset,比如写到zk // ------------------------------------------------------------------------
// snapshot and restore the state
// ------------------------------------------------------------------------ /**
* Takes a snapshot of the partition offsets.
*
* <p>Important: This method mus be called under the checkpoint lock.
*
* @return A map from partition to current offset.
*/
public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { //产生所有partitions的snapshot,主要是offset
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock); HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
if (partition.isOffsetDefined()) {
state.put(partition.getKafkaTopicPartition(), partition.getOffset());
}
}
return state;
} /**
* Restores the partition offsets.
*
* @param snapshotState The offsets for the partitions
*/
public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) { //从checkpoint中去恢复offset
for (KafkaTopicPartitionState<?> partition : allPartitions) {
Long offset = snapshotState.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setOffset(offset);
}
}
} // ------------------------------------------------------------------------
// emitting records
// ------------------------------------------------------------------------ /**
*
* <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
* That makes the fast path efficient, the extended paths are called as separate methods.
*
* @param record The record to emit
* @param partitionState The state of the Kafka partition from which the record was fetched
* @param offset The offset from which the record was fetched
*/
protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) { //真正的emit record
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
// fast path logic, in case there are no watermarks // emit the record, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
sourceContext.collect(record); //发出record
partitionState.setOffset(offset); //更新local offset
}
}
else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { //如果有需要定期的watermark
emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset);
}
else {
emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset);
}
} /**
* Record emission, if a timestamp will be attached from an assigner that is
* also a periodic watermark generator.
*/
private void emitRecordWithTimestampAndPeriodicWatermark(
T record, KafkaTopicPartitionState<KPH> partitionState, long offset)
{ // extract timestamp - this accesses/modifies the per-partition state inside the
// watermark generator instance, so we need to lock the access on the
// partition state. concurrent access can happen from the periodic emitter
final long timestamp;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (withWatermarksState) {
timestamp = withWatermarksState.getTimestampForRecord(record); //调用waterMark.extractTimestamp来获取该record的event time
} // emit the record with timestamp, using the usual checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, timestamp); //这个emit接口,会在发送record的情况下,还加上event time
partitionState.setOffset(offset);
}
} // ------------------------------------------------------------------------ /**
* The periodic watermark emitter. In its given interval, it checks all partitions for
* the current event time watermark, and possibly emits the next watermark.
*/
private static class PeriodicWatermarkEmitter implements Triggerable { private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions; private final SourceContext<?> emitter; private final StreamingRuntimeContext triggerContext; private final long interval; private long lastWatermarkTimestamp; //------------------------------------------------- PeriodicWatermarkEmitter(
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
SourceContext<?> emitter,
StreamingRuntimeContext runtimeContext)
{
this.allPartitions = checkNotNull(allPartitions);
this.emitter = checkNotNull(emitter);
this.triggerContext = checkNotNull(runtimeContext);
this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
this.lastWatermarkTimestamp = Long.MIN_VALUE;
} //------------------------------------------------- public void start() {
triggerContext.registerTimer(System.currentTimeMillis() + interval, this); //注册timer
} @Override
public void trigger(long timestamp) throws Exception {
// sanity check
assert Thread.holdsLock(emitter.getCheckpointLock()); long minAcrossAll = Long.MAX_VALUE;
for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { // we access the current watermark for the periodic assigners under the state
// lock, to prevent concurrent modification to any internal variables
final long curr;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (state) {
curr = state.getCurrentWatermarkTimestamp(); //获取waterMark
} minAcrossAll = Math.min(minAcrossAll, curr);
} // emit next watermark, if there is one
if (minAcrossAll > lastWatermarkTimestamp) {
lastWatermarkTimestamp = minAcrossAll;
emitter.emitWatermark(new Watermark(minAcrossAll)); //emit waterMark
} // schedule the next watermark
triggerContext.registerTimer(System.currentTimeMillis() + interval, this); //再次注册timer
}
}
}
Kafka08Fetcher
基于kafka 0.8版本的fetcher,
public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition>
核心的函数,是重写
runFetchLoop
@Override
public void runFetchLoop() throws Exception {
// the map from broker to the thread that is connected to that broker
final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>(); //cache每个partition node到每个SimpleConsumerThread的对应关系 // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets
final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig); //Zookeeper Handler,用于r/w数据到zookeeper
this.zookeeperOffsetHandler = zookeeperOffsetHandler; PeriodicOffsetCommitter periodicCommitter = null;
try {
// read offsets from ZooKeeper for partitions that did not restore offsets
{
List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
if (!partition.isOffsetDefined()) { //遍历每个partition,如果没有定义offset,即offset没有从checkpoint中恢复,加入partitionsWithNoOffset
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
}
}
//这步仅仅对于没有从ckeckpoint中读到offset的partitionsWithNoOffset
Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset); //从zk中读出,相应partition的offset
for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
if (offset != null) {
partition.setOffset(offset); //为partition设置从zk中读出的offset
}
}
} // start the periodic offset committer thread, if necessary
if (autoCommitInterval > 0) { //定期触发commit offsets,比如发送到zk,路径,topic_groupid + "/" + partition;
periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler,
subscribedPartitions(), errorHandler, autoCommitInterval);
periodicCommitter.setName("Periodic Kafka partition offset committer");
periodicCommitter.setDaemon(true);
periodicCommitter.start();
} // Main loop polling elements from the unassignedPartitions queue to the threads
while (running) { // wait for max 5 seconds trying to get partitions to assign
// if threads shut down, this poll returns earlier, because the threads inject the
// special marker into the queue
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign =
unassignedPartitionsQueue.getBatchBlocking(5000);
partitionsToAssign.remove(MARKER); //这边这个marker干嘛用的。。。,防止上面被block? if (!partitionsToAssign.isEmpty()) {
LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size());
Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = //通过broker server找到partitions的leader,返回的结果,map(leader <-> partition list)
findLeaderForPartitions(partitionsToAssign, kafkaConfig); // assign the partitions to the leaders (maybe start the threads)
for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader :
partitionsWithLeaders.entrySet())
{
final Node leader = partitionsWithLeader.getKey(); //leader node
final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue(); //这个leader node可以读取的partition列表
SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader); //找到leader node对应的consumer thread if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
// start new thread
brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler); //如果没有相应的consumer thread,创建新的consumer thread
brokerToThread.put(leader, brokerThread); //
}
else {
// put elements into queue of thread
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = //
brokerThread.getNewPartitionsQueue(); for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
if (!newPartitionsQueue.addIfOpen(fp)) { //
// we were unable to add the partition to the broker's queue
// the broker has closed in the meantime (the thread will shut down)
// create a new thread for connecting to this broker
List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>();
seedPartitions.add(fp);
brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
brokerToThread.put(leader, brokerThread);
newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions
}
}
}
}
}
}
}
catch (InterruptedException e) {
//......
}
finally {
//......
}
}
其他一些接口实现,
// ------------------------------------------------------------------------
// Kafka 0.8 specific class instantiation
// ------------------------------------------------------------------------ @Override
public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
return new TopicAndPartition(partition.getTopic(), partition.getPartition()); //对于kafka0.8,KafkaPartitionHandle就是TopicAndPartition
} // ------------------------------------------------------------------------
// Offset handling
// ------------------------------------------------------------------------ @Override
public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
zkHandler.writeOffsets(offsets); //commit offsets是写到zookeeper的
}
} // ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------ private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
Node leader,
ExceptionProxy errorHandler) throws IOException, ClassNotFoundException
{
// each thread needs its own copy of the deserializer, because the deserializer is
// not necessarily thread safe
final KeyedDeserializationSchema<T> clonedDeserializer =
InstantiationUtil.clone(deserializer, userCodeClassLoader); // seed thread with list of fetch partitions (otherwise it would shut down immediately again
SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue,
clonedDeserializer, invalidOffsetBehavior); brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
taskName, leader.id(), leader.host(), leader.port()));
brokerThread.setDaemon(true);
brokerThread.start(); //创建和启动SimpleConsumerThread LOG.info("Starting thread {}", brokerThread.getName());
return brokerThread;
}
下面来看看SimpleConsumerThread
class SimpleConsumerThread<T> extends Thread
核心函数run,主要做的是,不停的读取数据的事情,
// these are the actual configuration values of Kafka + their original default values.
this.soTimeout = getInt(config, "socket.timeout.ms", 30000); //Kafka的一些配置
this.minBytes = getInt(config, "fetch.min.bytes", 1);
this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536);
this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); // ------------------------------------------------------------------------
// main work loop
// ------------------------------------------------------------------------ @Override
public void run() {
try {
// create the Kafka consumer that we actually use for fetching
consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); //创建SimpleConsumer // make sure that all partitions have some offsets to start with
// those partitions that do not have an offset from a checkpoint need to get
// their start offset from ZooKeeper
getMissingOffsetsFromKafka(partitions); //为没有offset信息的partition,重置offset,从latest或earlist // Now, the actual work starts :-)
int offsetOutOfRangeCount = 0; //用于统计实际执行情况,非法offset,或重连的计数
int reconnects = 0;
while (running) { // ----------------------------------- partitions list maintenance ---------------------------- // check queue for new partitions to read from:
List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch(); //对于new partitions的处理,主要就是把它们加到partitions当中
if (newPartitions != null) {
// found some new partitions for this thread's broker // check if the new partitions need an offset lookup
getMissingOffsetsFromKafka(newPartitions); // 为新的partition重置offset // add the new partitions (and check they are not already in there)
for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
partitions.add(newPartition);
}
} if (partitions.size() == 0) { //如果partitions为空,即没有需要消费的partition
if (newPartitionsQueue.close()) { //如果此时newPartitionsQueue为closed,那么就不可能会有新的partitions加入,那么该thread就没有存在的意义,不需要继续run
// close succeeded. Closing thread
running = false; //关闭线程 LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.",
getName()); // add the wake-up marker into the queue to make the main thread
// immediately wake up and termination faster
unassignedPartitions.add(MARKER); break;
} else { //如果newPartitionsQueue没有被关闭,那就等待新的partitions,continue
// close failed: fetcher main thread concurrently added new partitions into the queue.
// go to top of loop again and get the new partitions
continue;
}
} // ----------------------------------- request / response with kafka ---------------------------- FetchRequestBuilder frb = new FetchRequestBuilder(); //创建FetchRequestBuilder
frb.clientId(clientId);
frb.maxWait(maxWait);
frb.minBytes(minBytes); for (KafkaTopicPartitionState<?> partition : partitions) {
frb.addFetch(
partition.getKafkaTopicPartition().getTopic(),
partition.getKafkaTopicPartition().getPartition(),
partition.getOffset() + 1, // request the next record
fetchSize);
} kafka.api.FetchRequest fetchRequest = frb.build(); //创建FetchRequest,一个request可以同时读多个partition,取决于partition和consumer数量的比例 FetchResponse fetchResponse;
try {
fetchResponse = consumer.fetch(fetchRequest); //从kafka读到数据,包含在FetchResponse中,其中包含从多个partition中取到的数据
}
catch (Throwable cce) {
//noinspection ConstantConditions
if (cce instanceof ClosedChannelException) { //链接kafka异常 // we don't know if the broker is overloaded or unavailable.
// retry a few times, then return ALL partitions for new leader lookup
if (++reconnects >= reconnectLimit) { //如果达到重连limit,说明确实无法连接到kafka
LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
unassignedPartitions.add(fp); //把负责的partitions加入unassignedPartitions,表明这些partition是没人处理的
}
this.partitions.clear(); //把partitons清空
continue; // jump to top of loop: will close thread or subscribe to new partitions //这步会走到上面的partitions.size() == 0的逻辑
}
try { //如果需要重试,先关闭consumer,然后重新创建consumer,然后continue重试
consumer.close();
} catch (Throwable t) {
LOG.warn("Error while closing consumer connection", t);
}
// delay & retry
Thread.sleep(100);
consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
continue; // retry
} else {
throw cce;
}
}
reconnects = 0; // ---------------------------------------- error handling ---------------------------- if (fetchResponse == null) {
throw new IOException("Fetch from Kafka failed (request returned null)");
} if (fetchResponse.hasError()) { //如果fetchResponse有错误
String exception = "";
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); // iterate over partitions to get individual error codes
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
boolean partitionsRemoved = false; while (partitionsIterator.hasNext()) {
final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition()); //取得对于该partition的error code if (code == ErrorMapping.OffsetOutOfRangeCode()) { //非法offset,那么需要重新初始化该partition的offset
// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
partitionsToGetOffsetsFor.add(fp);
}
else if (code == ErrorMapping.NotLeaderForPartitionCode() || //如果由于各种不可用,导致无法从该broker上读取到partition的数据
code == ErrorMapping.LeaderNotAvailableCode() ||
code == ErrorMapping.BrokerNotAvailableCode() ||
code == ErrorMapping.UnknownCode())
{
// the broker we are connected to is not the leader for the partition.
LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
LOG.debug("Error code = {}", code); unassignedPartitions.add(fp); //那么把该partition放回unassignedPartitions,等待重新分配 partitionsIterator.remove(); // unsubscribe the partition ourselves,从当前的partitions列表中把该partition删除
partitionsRemoved = true;
}
else if (code != ErrorMapping.NoError()) {
exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
}
}
if (partitionsToGetOffsetsFor.size() > 0) {
// safeguard against an infinite loop.
if (offsetOutOfRangeCount++ > 3) { //如果对于partitions,3次重置offset后,offset仍然有非法的,抛异常,防止无限循环
throw new RuntimeException("Found invalid offsets more than three times in partitions "
+ partitionsToGetOffsetsFor + " Exceptions: " + exception);
}
// get valid offsets for these partitions and try again.
LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); //重置这些partitions的offset, 根据配置会reset到earliest或latest LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
continue; // jump back to create a new fetch request. The offset has not been touched.
}
else if (partitionsRemoved) {
continue; // create new fetch request
}
else {
// partitions failed on an error
throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
}
} else {
// successful fetch, reset offsetOutOfRangeCount.
offsetOutOfRangeCount = 0;
} // ----------------------------------- process fetch response ---------------------------- int messagesInFetch = 0;
int deletedMessages = 0;
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator(); partitionsLoop:
while (partitionsIterator.hasNext()) {
final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next(); final ByteBufferMessageSet messageSet = fetchResponse.messageSet( //取出fetchResponse关于该partition的数据,封装成ByteBufferMessageSet
currentPartition.getTopic(), currentPartition.getPartition()); for (MessageAndOffset msg : messageSet) { //对于每天message
if (running) {
messagesInFetch++;
final ByteBuffer payload = msg.message().payload(); //读出message内容
final long offset = msg.offset(); //读出message offset if (offset <= currentPartition.getOffset()) { //旧数据,ignore
// we have seen this message already
LOG.info("Skipping message with offset " + msg.offset()
+ " because we have seen messages until (including) "
+ currentPartition.getOffset()
+ " from topic/partition " + currentPartition.getTopic() + '/'
+ currentPartition.getPartition() + " already");
continue;
} // If the message value is null, this represents a delete command for the message key.
// Log this and pass it on to the client who might want to also receive delete messages.
byte[] valueBytes;
if (payload == null) {
deletedMessages++;
valueBytes = null;
} else {
valueBytes = new byte[payload.remaining()];
payload.get(valueBytes); //将内容,读入valueBytes
} // put key into byte array
byte[] keyBytes = null;
int keySize = msg.message().keySize(); if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
ByteBuffer keyPayload = msg.message().key();
keyBytes = new byte[keySize]; //将key读入keyBytes
keyPayload.get(keyBytes);
} final T value = deserializer.deserialize(keyBytes, valueBytes, //将message反序列化成对象
currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) {
// remove partition from subscribed partitions.
partitionsIterator.remove();
continue partitionsLoop;
} owner.emitRecord(value, currentPartition, offset); //emit 数据
}
else {
// no longer running
return;
}
}
}
} // end of fetch loop }
}
最后,看看
FlinkKafkaConsumerBase
/**
* Base class of all Flink Kafka Consumer data sources.
* This implements the common behavior across all Kafka versions.
*
* <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
* {@link AbstractFetcher}.
*
* @param <T> The type of records produced by this data source
*/
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
ResultTypeQueryable<T>
这个是对所有版本kafka的抽象,
@Override
public void run(SourceContext<T> sourceContext) throws Exception { // figure out which partitions this subtask should process
final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(allSubscribedPartitions, //对应于topic partition数和consumer数,一个consumer应该分配哪些partitions,这里逻辑就是简单的取模
getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask()); // we need only do work, if we actually have partitions assigned
if (!thisSubtaskPartitions.isEmpty()) { // (1) create the fetcher that will communicate with the Kafka brokers
final AbstractFetcher<T, ?> fetcher = createFetcher( //创建Fetcher
sourceContext, thisSubtaskPartitions,
periodicWatermarkAssigner, punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext()); // (2) set the fetcher to the restored checkpoint offsets
if (restoreToOffset != null) { //这个如果从checkpoint中读出offset状态
fetcher.restoreOffsets(restoreToOffset); //恢复offset
} // publish the reference, for snapshot-, commit-, and cancel calls
// IMPORTANT: We can only do that now, because only now will calls to
// the fetchers 'snapshotCurrentState()' method return at least
// the restored offsets
this.kafkaFetcher = fetcher;
if (!running) {
return;
} // (3) run the fetcher' main work method
fetcher.runFetchLoop(); //开始run fetcher
}
} @Override
public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); //snapshot当前的offset // the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingCheckpoints.put(checkpointId, currentOffsets); //cache当前的checkpointid,等待该checkpoint完成 // truncate the map, to prevent infinite growth
while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { //删除过期的,或老的checkpoints
pendingCheckpoints.remove(0);
} return currentOffsets;
} @Override
public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
LOG.info("Setting restore state in the FlinkKafkaConsumer");
restoreToOffset = restoredOffsets;
} @Override
public void notifyCheckpointComplete(long checkpointId) throws Exception { //当这个checkpoint完成时,需要通知kafkaconsumer,这个时候才会真正的commit offset final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; try {
final int posInMap = pendingCheckpoints.indexOf(checkpointId); @SuppressWarnings("unchecked")
HashMap<KafkaTopicPartition, Long> checkpointOffsets =
(HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap); // remove older checkpoints in map
for (int i = 0; i < posInMap; i++) { //比该checkpoint老的未完成的checkpoint已经没有意义,删除
pendingCheckpoints.remove(0);
} fetcher.commitSpecificOffsetsToKafka(checkpointOffsets); //真正的commit offset,这个是通用接口,虽然对于kafka0.8,Fetcher里面本身也是会定期提交的,checkpoint一般秒级别比定期提交更频繁些
}
} /**
* Selects which of the given partitions should be handled by a specific consumer,
* given a certain number of consumers.
*
* @param allPartitions The partitions to select from
* @param numConsumers The number of consumers
* @param consumerIndex The index of the specific consumer
*
* @return The sublist of partitions to be handled by that consumer.
*/
protected static List<KafkaTopicPartition> assignPartitions(
List<KafkaTopicPartition> allPartitions,
int numConsumers, int consumerIndex)
{
final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
allPartitions.size() / numConsumers + 1); for (int i = 0; i < allPartitions.size(); i++) {
if (i % numConsumers == consumerIndex) {
thisSubtaskPartitions.add(allPartitions.get(i));
}
} return thisSubtaskPartitions;
}
针对kafka0.8的consumer
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { /**
* Creates a new Kafka streaming source consumer for Kafka 0.8.x
*
* This constructor allows passing multiple topics and a key/value deserialization schema.
*
* @param topics
* The Kafka topics to read from.
* @param deserializer
* The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(deserializer); // validate the zookeeper properties
validateZooKeeperConfig(props); this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); //当offset非法的时候,选择从哪里重置,这里支持earlist或latest
this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000); //offset commit的间隔,默认是1分钟 // Connect to a broker to get the partitions for all topics
List<KafkaTopicPartition> partitionInfos =
KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, props)); //这里只是取出topic相关的partition的信息 setSubscribedPartitions(partitionInfos); //将这部分,即该consumer消费的partitions,加入到SubscribedPartitions,表明这些已经有consumer消费了
} /**
* Send request to Kafka to get partitions for topic.
*
* @param topics The name of the topics.
* @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
*/
public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { //这里的逻辑是如果从kafka取得topic的partititon信息,这里配置配的是broker list,而非zk,所以他每次会随机从所有的brokers中挑一个去读取partitions信息
String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
String[] seedBrokers = seedBrokersConfString.split(",");
List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); final String clientId = "flink-kafka-consumer-partition-lookup";
final int soTimeout = getInt(properties, "socket.timeout.ms", 30000);
final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536); Random rnd = new Random();
retryLoop: for (int retry = 0; retry < numRetries; retry++) {
// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
// parallel source instances start. Still, we try all available brokers.
int index = rnd.nextInt(seedBrokers.length);
brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
String seedBroker = seedBrokers[index];
LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
if (++index == seedBrokers.length) {
index = 0;
} URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId); TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); // clear in case we have an incomplete list from previous tries
partitions.clear();
for (TopicMetadata item : metaData) {
if (item.errorCode() != ErrorMapping.NoError()) {
// warn and try more brokers
LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
"for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
continue brokersLoop;
}
if (!topics.contains(item.topic())) {
LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
continue brokersLoop;
}
for (PartitionMetadata part : item.partitionsMetadata()) {
Node leader = brokerToNode(part.leader());
KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
partitions.add(pInfo);
}
}
break retryLoop; // leave the loop through the brokers
} catch (Exception e) {
LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
"" + e.getClass() + ". Message: " + e.getMessage());
LOG.debug("Detailed trace", e);
// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// sleep shorter.
}
} finally {
if (consumer != null) {
consumer.close();
}
}
} // brokers loop
} // retries loop
return partitions;
} private static long getInvalidOffsetBehavior(Properties config) {
final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
if (val.equals("none")) {
throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
+ "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'.");
}
else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9
return OffsetRequest.LatestTime();
} else {
return OffsetRequest.EarliestTime();
}
}