Kafka-RecordAccumulator分析-RecordBatch

时间:2024-01-22 20:57:19

了解了MemoryRecords的具体实现之后,来分析RecordBatch类的实现。

每个RecordBatch对象中封装了一个MemoryRecords对象,除此之外,还封装了很多控制信息和统计信息,下面简单介绍一下。

  • recordCount:记录了保存的Record的个数。
  • maxRecordSize:最大Record的字节数。
  • attempts:尝试发送当前RecordBatch的次数。
  • lastAttemptMs:最后一次尝试发送的时间戳。
  • records:指向用来存储数据的MemoryRecords对象。
  • topicParition:当前RecordBatch中缓存的消息都会发送给此TopicPartition。
  • produceFuture:ProduceRequestResult类型,标识RecordBatch状态的Future对象。
  • lastAppendTime:最后一次向RecordBatch追加消息的时间戳。
  • thunks:Thunk对象的集合,在后面会详细介绍。
  • offsetCounter:用来记录某消息在RecordBatch中的偏移量。
  • retry:是否正在重试。如果RecordBatch中的数据发送失败,则会重新尝试发送。

图中,以RecordBatch为中心,刻画了其相关类间的对应关系。

在这里插入图片描述
下面分析一下ProduceRequestResult这个类的功能。

ProduceRequestResult并未实现java.util.concurrent.Future接口,但是其通过包含一个count值为1的CountDownLatch对象,实现了类似于Future的功能(Future、CountDownLatch等工具的使用)。

当RecordBatch中全部的消息被正常响应、或超时、或关闭生产者时,会调用ProduceRequestResult.done方法,将produceFuture标记为完成并通过ProduceRequestResult.error字段区分“异常完成”还是“正常完成”,之后调用CountDownLatch对象的countDown方法。

此时,会唤醒阻塞在CountDownLatch对象的await方法的线程(这些线程通过ProduceRequestResult的await方法等待上述三个事件的发生)。

分区会为其中记录的消息分配一个offset并通过此offset维护消息顺序。

在ProduceRequestResult中还有一个需要注意的字段baseOffset,表示的是服务端为此RecordBatch中第一条消息分配的offset,这样每个消息可以根据此offset以及自身在此RecordBatch中的相对偏移量,计算出其在服务端分区中的偏移量了。

在介绍Tunk类之前,请回顾KafkaProducer.send方法的第二个参数,是一个Callback对象,它是针对单个消息的回调函数(每个消息都会有一个对应的Callback对象作为回调)。

RecordBatch.thunks字段可以理解为消息的回调对象队列,Thunk中的callback字段就指向对应消息的Callback对象,其另一个字段future是FutureRecordMetadata类型。

FutureRecordMetadata类有两个关键字段。

  • result:ProduceRequestResult类型,指向对应消息所在RecordBatch的produceFuture字段。
  • relativeOffset:long类型,记录了对应消息在RecordBatch中的偏移量。

FutureRecordMetadata实现了java.util.concurrent.Future接口,但其实现基本都是委托给了ProduceRequestResult对应的方法,由此可以看出,消息应该是按照RecordBatch进行发送和确认的。

当生产者已经收到某消息的响应时,FutureRecordMetadata.get方法就会返回RecordMetadata对象,其中包含消息在Partition中的offset等其他元数据,可供用户自定义Callback使用。

分析完RecordBatch依赖的组件,现在回来看看RecordBatch类的核心方法。tryAppend方法是最核心的方法,其功能是尝试将消息添加到当前的RecordBatch中缓存。

在这里插入图片描述
当RecordBatch成功收到正常响应、或超时、或关闭生产者时,都会调用RecordBatch的done()方法。

在done()方法中,会回调RecordBatch中全部消息的Callback回调,并调用其produceFuture字段的done()方法。RecordBatch.done()方法的调用关系如图所示。

在这里插入图片描述