.NET分布式事务处理总结【下】 - 包含MSMQ的分布式事务处理

时间:2021-05-21 06:18:15

.NET直接提供对MSMQ的访问支持,只需要添加System.Messaging程序集引用即可方便地操作MSMQ。MSMQ支持两种事务处理模式:内部事务处理以及基于MS-DTC的分布式事务处理。

MSMQ的内部事务处理

MSMQ的内部事务处理是指,仅采用MSMQ本身提供的事务处理机制完成事务处理。比如,假设有一系列的消息需要发布到MSMQ,那么,就可以启动一个内部事务,确保这些消息的发布过程是一个原子操作。要使用MSMQ的内部事务处理机制,在创建消息队列的时候,就需要勾选“事务性”选项,如下图所示:

.NET分布式事务处理总结【下】 - 包含MSMQ的分布式事务处理

首先,需要创建一个MessageQueueTransaction的对象,并使用Begin调用以启动MSMQ的内部事务处理。然后,在MessageQueue的Send方法中,使用Send(object, MessageQueueTransaction)的重载函数发送消息,将创建好的MessageQueueTransaction对象作为第二个参数传递给Send方法;在完成所有消息的发送之后,使用MessageQueueTransaction对象的Commit方法提交事务。如果在发送消息的过程中遇到问题,则使用MessageQueueTransaction对象的Abort调用回滚事务。请参见下面的示例代码:

隐藏行号 复制代码 MSMQ的内部事务处理
  1. using (MessageQueue messageQueue = new MessageQueue(@".\private$\TPCDemoQueue", 
  2.     false, false, QueueAccessMode.SendAndReceive))
  3. {
  4.     MessageQueueTransaction trans = new MessageQueueTransaction();
  5.     try
  6.     {
  7.         trans.Begin();
  8.         for (int i = 0; i < 5; i++)
  9.         {
  10.             messageQueue.Send(new Message(i), trans);
  11.         }
  12.         trans.Commit();
  13.     }
  14.     catch
  15.     {
  16.         trans.Abort();
  17.     }
  18.     messageQueue.Close();
  19. }

注意:如果你的消息队列在创建的时候没有设置“事务性”选项,那么,在完成消息队列的创建以后,你将无法修改该选项。更糟糕的是,在非事务性队列上执行上面的代码,则无法将消息发布到消息队列上,框架本身也不会提示任何错误信息,指示消息并未发布成功。

在分布式事务处理中使用MSMQ

在分布式事务处理的上下文中(比如,.NET 2.0+的TransactionScope中),上面所提到的MessageQueueTransaction将毫无用处,也就是说,MessageQueueTransaction与分布式事务处理毫无关系。你所要做的是,用正常的方式初始化一个MessageQueue的实例,然后,调用Send方法发布消息,在发布消息的时候,通过设置MessageQueueTransactionType的值来告诉MessageQueue,目前正处于一个分布式事务的上下文中。于是,你需要使用Send/Receive的重载方法:Send(object, MessageQueueTransactionType)以及Receive(MessageQueueTransactionType)。如下:

隐藏行号 复制代码 分布式事务中的MSMQ调用
  1. using (TransactionScope transaction = new TransactionScope())
  2. {
  3.     Message inputMsg = inputQueue.Receive(MessageQueueTransactionType.Automatic);
  4.     // do some work
  5.     transaction.Complete();
  6. }

注意:对于一些生命周期相对较长的事务处理,比如,假设你的用例是这样的:你首先需要从一个消息队列中获得消息,然后更新你的数据库记录,那么你的代码可能会是这样的:

隐藏行号 复制代码 分布式事务中的MSMQ调用
  1. using (TransactionScope transaction = new TransactionScope())
  2. {
  3.     using (MessageQueue someQueue = new MessageQueue("<queue connection>"))
  4.     {
  5.         Message msg = someQueue.Receive();
  6.         // do something else
  7.     }
  8.     transaction.Complete();
  9. }

这样做其实是不对的!因为Receive方法是一种同步调用,如果消息队列中根本没有任何内容,那么Receive调用就会被阻塞,直到消息队列中出现新的消息。这就意味着你的分布式事务一直都是处于开启的状态,而且可能由于等待时间过长而导致超时,最终导致一个MessageQueueException。

正确的做法是,在MessageQueue上使用BeginPeek调用(注意:不是BeginReceive方法,因为BeginReceive方法并不是用来处理事务性队列的),然后订阅PeekComplete事件,在事件处理过程中,再使用TransactionScope以及Receive等方法实现消息的获取。例如:

隐藏行号 复制代码 分布式事务中的MSMQ调用
  1. MessageQueue inputQueue = new MessageQueue("<queue connection>");
  2. inputQueue.PeekCompleted += (s, e) =>
  3.     {
  4.         using (TransactionScope transaction = new TransactionScope())
  5.         {
  6.             Message inputMsg = inputQueue.Receive(MessageQueueTransactionType.Automatic);
  7.             // do some work
  8.             transaction.Complete();
  9.         }
  10.         inputQueue.BeginPeek();
  11.     };
  12. inputQueue.BeginPeek();

最后再提醒一下,就是如果你所要做的事情仅限于与MSMQ打交道,那么只要使用MSMQ的内部事务处理机制就可以了,毕竟使用分布式事务处理会涉及到MS-DTC,从而造成过大的系统开销,影响性能。