RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中
/**
* Offset store interface
*/
public interface OffsetStore {
/**
* 加载位点信息
*/
void load() throws MQClientException;
/**
* 更新缓存位点信息
*/
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
/**
* 读取本地位点信息
*
* @return The fetched offset
*/
long readOffset(final MessageQueue mq, final ReadOffsetType type);
/**
* 持久化全部队列的位点信息
*/
void persistAll(final Set<MessageQueue> mqs);
/**
* 持久化某一个队列的位点信息
*/
void persist(final MessageQueue mq);
/**
* 删除某一个队列的位点信息
*/
void removeOffset(MessageQueue mq);
/**
* 复制一份缓存位点信息
* @return The cloned offset table of given topic
*/
Map<MessageQueue, Long> cloneOffsetTable(String topic);
/**
* 将本地消费位点持久化到Broker中
* @param mq
* @param offset
* @param isOneway
*/
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
}
客户端消费进度保存也叫消费进度持久化,开源RocketMQ 4.2.0支持定时持久化和不定时持久化两种方式
定时持久化位点实现方法是org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask()
定时持久化位点逻辑是通过定时任务来实现的,在启动程序10s后,会定时调用持久化方法MQClientInstance.this.persistAllConsumerOffset(),持久化每一个消费者消费的每一个MessageQueue的消费进度。
不定时持久化也叫Pull-And-Commit,也就是在执行Pull方法的同时,把队列最新消费位点信息发给Broker,具体实现代码在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage()方法中
该方法中有两处持久化位点信息
第一处,在拉取完成后,如果拉取位点非法,则此时客户端会主动提交一次最新的消费位点信息给Broker,以便下次能使用正确的位点拉取消息,该处更新位点信息
第二处,在执行消息拉取动作时,如果是集群消费,并且本地位点值大于0,那么把最新的位点上传给Broker
代码中通过commitOffsetEnable、sysFlag两个字段表示是否可以上报消费位点给Broker。在执行Pull请求时,将sysFlag作为网络请求的消息头传递给Broker,Broker中处理该字段的逻辑在org.apache.rocketmq.broker.processor.PullMessageProcessor.processRequest()方法中
hasCommitOffsetFlag:Pull请求中的sysFlag参数,是决定Broker是否执行持久化消费位点的一个因素。
brokerAllowSuspend:Broker是否能挂起。如果Broker是挂起状态,将不能持久化位点。
storeOffsetEnable:True表示Broker需要持久化消费位点,False则不用持久化位点。