【RocketMQ源码学习】- 4. Client 事务消息源码解析

时间:2023-03-10 06:28:05
【RocketMQ源码学习】- 4. Client 事务消息源码解析

介绍

> 基于4.5.2版本的源码
1. RocketMQ是从4.3.0版本开始支持事务消息的。
2. RocketMQ的消息队列能够保证生产端,执行数据和发送MQ消息事务一致性,而消费端的事务一致则有消费重试来补偿实现
3. 基于2PC思想来实现,增加一个补偿逻辑来处理二阶段超时或者失败的消息

名词解释

名词 解释
prepare消息 又名Half Message,半消息,标识该消息处于"暂时不能投递"状态,不会被Comsumer所消费,待服务端收到生成者对该消息的commit或者rollback响应后,消息会被正常投递或者回滚(丢弃)消息
RMQ_SYS_TRANS_HALF_TOPIC prepare消息在被投递到Mq服务器后,会存储于Topic为RMQ_SYS_TRANS_HALF_TOPIC的消费队列中
RMQ_SYS_TRANS_OP_HALF_TOPIC 在prepare消息被commit或者rollback处理后,会存储到Topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中,标识prepare消息已被处理

RocketMQ事务消息流程概要

【RocketMQ源码学习】- 4. Client 事务消息源码解析

1.事务消息发送及提交:
  1. 发送消息(half消息)。
  2. 服务端响应消息写入结果。
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
  4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态
  3. 根据本地事务状态,重新Commit或者Rollback

Producer发送事务消息示例Demo

下面是发送事务消息的Demo, 省略了一部分代码,不可运行,包含了 一、需要去设置事务监听,二、设置检查事务的线程

public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor()// 创建线程池代码,省略了一部分 producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 省略的code。。。 // 发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null); producer.shutdown();
}
}

手动实现事务监听的类

  1. 需要去设置事务监听
  2. 设置检查事务的线程
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
System.out.println("执行本地事务" + msg.getTransactionId());
return LocalTransactionState.UNKNOW;
} @Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("检查本地事务:" + msg.getTransactionId());
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

发送消息的源码

  1. 发送消息时,RocketMQ会先以同步的方式发送Half消息
  2. 当发送成功后执行实现的executeLocalTransaction方法
  3. 把方法的事务状态以oneWay的发送告诉broker
  4. 针对事务为unknow的,broker会发起调用checkLocalTransaction来检查本地事务状态。
// DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 1. 获取事务监听,事务监听不能为空
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
// 2. 为msg添加属性{"TRAN_MSG": "true", "PGROUP": defaultMQProducer.getProducerGroup()}
// 给消息设置Half属性, 用于后续broker接收到消息判断是否是事务消息的prepare
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 3.发送第一阶段消息,以同步的方式发送,并获取返回值
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
} LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {  // 第一阶段消息发送成功
try {
if (sendResult.getTransactionId() != null) {
              // 设置事务id属性
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
 // 设置事务ID transactionId=UNIQ_KEY
            if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
// 用线程来执行本地事务
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 发送消息成功后,执行本地操作
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {  // 若返回的事务状态为空,则设置本地事务状态为unknow
localTransactionState = LocalTransactionState.UNKNOW;
} if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT: // 刷盘超时
case FLUSH_SLAVE_TIMEOUT: // 从节点刷盘超时
case SLAVE_NOT_AVAILABLE: // 从节点不可用
// 设置rollBack
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
} try {
// 重点:设置结束事务
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
} // 组装TransactionSendResult, 并返回
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

注:发送同步消息的代码解析在另一篇文章 https://www.cnblogs.com/milicool/p/11836450.html

上面的代码做了三件事:

  1. 发送一阶段消息

  2. 执行本地事务,返回事务执行的状态,有三种,未知、回滚、提交

  3. 执行endTransaction方法,想事务执行的状态告诉broker,

结束事务endTransaction方法解析

  1. 发送的第一阶段消息,在broker接收端,会把第一阶段消息的topic修改为RMQ_SYS_TRANS_HALF_TOPIC,这个topic是对消费者不可见的
  2. endTranscation方法会把本地事务执行的状态告诉broker
  3. 如果是提交或者回滚则把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC, 如果是事务状态是unknow,会过一段时间执行检查事务的方法

查看endTransaction方法

// DefaultMQProducerImpl#endTransaction方法
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) {
// 获取broker地址
brokerAddr = this.mQClientFactory.findBrokerAddressInPublis
// 构建EndTransactionRequestHeader
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset()); // offset是prepare消息中offsetMsgId中获取的
requestHeader.setCommitOrRollback // 设置回滚/提交状态
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
// 调用无返回值的发消息队列的方法
mQClientFactory.getMQClientAPIImpl().endTransactionOneway
} // MQClientAPIImpl#endTransactionOneway方法
public void endTransactionOneway(
final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 创建远程调用的命令 cmd=END_TRANSACTION 在broker会根据cmd执行事务消息逻辑
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); request.setRemark(remark);
/**
* 调用无返回值发送消息的方法, 内部做的操作:
* 1. 根据broker的iP地址, 获取连接的channel, 并把channel缓存到channelTables的map中 (在读写channelTables中会使用重入锁来控制并发)
* 2. 调用doBeforeRpcHooks方法做acl的鉴权
* 3. 调用this.invokeOnewayImpl(channel, request, timeoutMillis);发送请求
*/
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
} // invokeOneway内部是调用NettyRemotingAbstract#invokeOnewayImpl方法
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
// 对semaphoreOneway上锁
acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
// 上锁成功
// netty写请求
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {} // 写完释放锁,
// 报异常也释放锁
// 上锁失败
// timeoutMillis <= 0 请求太多异常
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
// timeoutMillis > 0
throw new RemotingTimeoutException(info);
}

到这里,就完成了事务消息流程中1,2,3,4这个几个步骤,下面还有两个重要的流程:

  1. 针对一阶段发送的Half消息,broker要进行处理
  2. 针对endTransaction提交进来的事务状态,broker端要进行处理,如提交的是unknow状态,要进行检查本地事务的状态,如果是提交或者回滚则把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC

Broker端处理消息

接下来,我们查看下收到消息的broker是如何处理消息。从服务端接受到消息流程是这样的
【RocketMQ源码学习】- 4. Client 事务消息源码解析
重点查看SendMessageProcessor#sendMessage方法是如何存储消息的
 // SendMessageProcessor#sendMessage
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 生成command返回值类
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); // 设置requestId
response.setOpaque(request.getOpaque());
// 设置broker所在的区域ID,取自BrokerConfig#regionId
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
// 设置是否需要跟踪 traceOn
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request);
// startTimstamp可以判断broke是否可用
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
} response.setCode(-1);
// 检查 broker是否有写的权限 请求的topic是否可以发消息 、topicConfig、queueIdInt
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
} final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { // queueIdInt小于0,则重新设置queueIdInt
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
} MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
// 此时的topic还是真实的topic
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt); // 如果topic是%RETRY%重试消息,则需要重新设置重试次数,消费次数信息
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
} msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties()); // 属性的赋值
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress()); // channel客户端的地址
msgInner.setStoreHost(this.getStoreHost()); // 要存储的store的IP地址
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
// 发送Half消息时,在属性中设置了PROPERTY_TRANSACTION_PREPARED为true,这里根据这个属性判断是否是事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) { //
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 重点:事务消息进入这里,把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC,以同步刷盘的方式存入store
// prepareMessage方法会调用TransactionalMessageServiceImpl类下transactionalMessageBridge.putHalfMessage方法
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
} // 处理返回值
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
} // TransactionalMessageBridge#putHalfMessage
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
// putMessage把topic消息写到commitlog
// parseHalfMessageInner方法如下
return store.putMessage(parseHalfMessageInner(messageInner));
} // 解析Half消息
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 把真实的topic和真实的queueId放在消息的属性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 设置默认的事务状态为TRANSACTION_NOT_TYPE=>unknow
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 将消息的topic设置为RMQ_SYS_TRANS_HALF_TOPIC,这个是对消费者不可见的
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 设置queueId=0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

Broker处理EndTransaction消息

接下来,我们看一下,Broker是如何处理client用oneway方式发过来的endTransaction消息
做的事:
用offset查询prepare消息,如果是提交,把prepare消息的改为真实的topic消息写到commitlog中,写成功则进行删除prepare消息
如果是回滚操作,

 // EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
// 创建默认response返回值,
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); // 判断是slave节点,直接返回 ,代码省略。。。
// 打印日志:根据是否是检查本地事务的日志,是否是提交、回滚、未知,打印不同的日志,代码省略。。。 OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 提交事务请求
// 根据commitLogOffset获取文件中的message,获取到了返回success
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查文件中的消息和请求的是否一致
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 生成要保存的消息 这里是commit操作,会把topic修改为真实的topic,queueId也修改为真实的
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 重要:把真实的topic消息存储到commitlog
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 重要:删除prepare消息, 其实并没有删除prepare消息,而是把消息改为RMQ_SYS_TRANS_OP_HALF_TOPIC,存入opQueueMap和store
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 回滚事务请求
// 查询到half消息则返回成功
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查请求request是否和获取到half消息一致
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 重要:删除prepare消息, 其实并没有删除prepare消息,而是把消息改为RMQ_SYS_TRANS_OP_HALF_TOPIC,存入opQueueMap和store
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
} // 返回
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
} // 查看一下TransactionalMessageServiceImpl#deletePrepareMessage方法,删除prepareMassage
1. deletePrepareMessage方法调用的是 transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)方法
2. putOpMessage方法调用的是addRemoveTagInTransactionOp(messageExt, messageQueue)方法
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
// buildOpTopic方法设置消息的topic为RMQ_SYS_TRANS_OP_HALF_TOPIC
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
// 把RMQ_SYS_TRANS_OP_HALF_TOPIC消息存入opQueueMap,同时写入store
writeOp(message, messageQueue);
return true;
}

本地事务状态回查

》基于数据库模式的事务状态回查

问题

  1. 说说为什么要回查
    整体的流程是:
    a. 发送half消息
    b. 发送失败,流程结束;发送成功,执行本地事务
    c. 把本地事务执行的状态,发送消息给broker
    所以在b中事务执行的状态可能为UNKNOW状态,或者c过程中,消息发送超时或者失败,broker不知道事务执行是否是成功还是失败,所以broker会启动事务补偿机制来检查本地事务的执行状态
  2. 最大回查次数?
    @ImportantField
    private int transactionCheckMax = 15;
  3. 第一次间隔多久回查?
    60s
    private long transactionCheckInterval = 60 * 1000;

本地事务回查的流程

 /**
* ClientRemotingProcessor#checkTransactionState
* 内部获取请求的messageExt等信息后,会调用 DefaultMQProducerImpl#checkTransactionState
*/
// DefaultMQProducerImpl#checkTransactionState
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
// 获取broker地址、消息体、请求体、生产的group,留后续发送使用 【代码省略】 // 调用线程
@Override
public void run() {
// 新版本TransactionCheckListener不用了
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
// getCheckListener()来获取我们重写的检查本地事务的方法
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
// 初始化结果
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
// 老版本从这里调用执行检查本地事务的接口
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
// 新版本调用我们实现的checkLocalTransaction检查本地线程
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
} // 处理事务状态的方法, 代码详情如下:
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
} // 处理事务状态的方法
private void processTransactionState(
final LocalTransactionState localTransactionState, // 存储检查事务执行的状态值
final String producerGroup,
final Throwable exception) {
// 设置
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true); // 检查事务的标志位
// 获取uniqueKey,此时uniquekey=msgId
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE: // 提交状态
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE: // 回滚状态
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW: // 未知状态
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
} String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
} try {
// 再次执行endTransactionOneway, 上文已经对结束事务的状态做了说明
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
}; // 提交线程的任务
this.checkExecutor.submit(request);
}

结语

重要的

  1. 自定义的TransactionListenerImpl事务监听类
  2. 发一阶段half消息的方法sendMessageInTransaction
  3. broker接受到TRAN_MSG属性为true的消息会把他的topic修改为RMQ_SYS_TRANS_HALF_TOPIC
  4. Broke处理EndTransaction消息会把提交和回滚状态的消息,存放到RMQ_SYS_TRANS_OP_HALF_TOPIC的topic中

希望

希望写的这篇文字,能帮助到想看这部分知识的人,当然由于本人的水平有限,难免出现错误,欢迎评论

【RocketMQ源码学习】- 4. Client 事务消息源码解析

======  【多学一点,for Better】======