Flink 中的kafka何时commit?

时间:2022-05-10 16:49:47

https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html

@Override
publicfinalvoidnotifyCheckpointComplete(longcheckpointId)throwsException{
if(!running){
LOG.debug("notifyCheckpointComplete()calledonclosedsource");
return;
}

finalAbstractFetcher<?,?>fetcher=this.kafkaFetcher;
if(fetcher==null){
LOG.debug("notifyCheckpointComplete()calledonuninitializedsource");
return;
}

if(offsetCommitMode==OffsetCommitMode.ON_CHECKPOINTS){
//onlyonecommitoperationmustbeinprogress
if(LOG.isDebugEnabled()){
LOG.debug("CommittingoffsetstoKafka/ZooKeeperforcheckpoint"+checkpointId);
}

try{
finalintposInMap=pendingOffsetsToCommit.indexOf(checkpointId);
if(posInMap==-1){
LOG.warn("Receivedconfirmationforunknowncheckpointid{}",checkpointId);
return;
}

@SuppressWarnings("unchecked")
Map<KafkaTopicPartition,Long>offsets=
(Map<KafkaTopicPartition,Long>)pendingOffsetsToCommit.remove(posInMap);

//removeoldercheckpointsinmap
for(inti=0;i<posInMap;i++){
pendingOffsetsToCommit.remove(0);
}

if(offsets==null||offsets.size()==0){
LOG.debug("Checkpointstatewasempty.");
return;
}

fetcher.commitInternalOffsetsToKafka(offsets,offsetCommitCallback);
}catch(Exceptione){
if(running){
throwe;
}
//elseignoreexceptionifwearenolongerrunning
}
}
}

/**
*Theoffsetcommitmoderepresentsthebehaviourofhowoffsetsareexternallycommitted
*backtoKafkabrokers/Zookeeper.
*
*<p>Theexactvalueofthisisdeterminedatruntimeintheconsumersubtasks.
*/
@Internal
publicenumOffsetCommitMode{

/**Completelydisableoffsetcommitting.*/
DISABLED,

/**CommitoffsetsbacktoKafkaonlywhencheckpointsarecompleted.*/
ON_CHECKPOINTS,

/**CommitoffsetsperiodicallybacktoKafka,usingtheautocommitfunctionalityofinternalKafkaclients.*/
KAFKA_PERIODIC;
}

/**
*CommitsthegivenpartitionoffsetstotheKafkabrokers(ortoZooKeeperfor
*olderKafkaversions).Thismethodisonlyevercalledwhentheoffsetcommitmodeof
*theconsumeris{@linkOffsetCommitMode#ON_CHECKPOINTS}.
*
*<p>Thegivenoffsetsaretheinternalcheckpointedoffsets,representing
*thelastprocessedrecordofeachpartition.Version-specificimplementationsofthismethod
*needtoholdthecontractthatthegivenoffsetsmustbeincrementedby1before
*committingthem,sothatcommittedoffsetstoKafkarepresent"thenextrecordtoprocess".
*
*@paramoffsetsTheoffsetstocommittoKafka(implementationsmustincrementoffsetsby1beforecommitting).
*@paramcommitCallbackThecallbackthattheusershouldtriggerwhenacommitrequestcompletesorfails.
*@throwsExceptionThismethodforwardsexceptions.
*/
publicfinalvoidcommitInternalOffsetsToKafka(
Map<KafkaTopicPartition,Long>offsets,
@NonnullKafkaCommitCallbackcommitCallback)throwsException{
//Ignoresentinels.Theymightappearhereifsnapshothasstartedbeforeactualoffsetsvalues
//replacedsentinels
doCommitInternalOffsetsToKafka(filterOutSentinels(offsets),commitCallback);
}

/**
* Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
* of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
* A request is considered completed when it is successfully acknowledged
* according to the <code>acks</code> configuration you have specified or else it results in an error.
* <p>
* Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
* however no guarantee is made about the completion of records sent after the flush call begins.
* <p>
* This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call
* gives a convenient way to ensure all previously sent messages have actually completed.
* <p>
* This example shows how to consume from one Kafka topic and produce to another Kafka topic:
* <pre>
* {@code
* for(ConsumerRecord<String, String> record: consumer.poll(100))
* producer.send(new ProducerRecord("my-topic", record.key(), record.value());
* producer.flush();
* consumer.commit();
* }
* </pre>
*
* Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
* we need to set <code>retries=&lt;large_number&gt;</code> in our config.
* </p>
* <p>
* Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will
* flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)}
* calls made since the previous {@link #beginTransaction()} are completed before the commit.
* </p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void flush() {
log.trace("Flushing accumulated records in producer.");
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
}
}