如何编写消耗MSMQ的事务性多线程WCF服务

时间:2022-06-19 21:03:53

I have a WCF service that posts messages to a private, non-transactional MSMQ queue. I have another WCF service (multi-threaded) that processes the MSMQ messages and inserts them in the database.

我有一个WCF服务,它将消息发布到私有的非事务性MSMQ队列。我有另一个WCF服务(多线程),它处理MSMQ消息并将它们插入数据库。

My issue is with sequencing. I want the messages to be in certain order. For example MSG-A need to go to the database before MSG-B is inserted. So my current solution for that is very crude and expensive from database perspective.

我的问题是测序。我希望消息按特定顺序排列。例如,MSG-A需要在插入MSG-B之前转到数据库。因此,从数据库的角度来看,我目前的解决方案非常粗糙且昂贵。

I am reading the message, if its MSG-B and there is no MSG-A in the database, I throw it back on the message queue and I keep doing that till MSG-A is inserted in the database. But this is a very expensive operation as it involves table scan (SELECT stmt).

我正在阅读消息,如果它的MSG-B和数据库中没有MSG-A,我会把它扔回消息队列,我一直这样做,直到MSG-A插入数据库。但这是一个非常昂贵的操作,因为它涉及表扫描(SELECT stmt)。

The messages are always posted to the queue in sequence.

消息始终按顺序发布到队列中。

Short of making my WCF Queue Processing service Single threaded (By setting the service behavior attribute InstanceContextMode to Single), can someone suggest a better solution?

没有使我的WCF队列处理服务单线程(通过将服务行为属性InstanceContextMode设置为Single),有人可以提出更好的解决方案吗?

Thanks

Dan

2 个解决方案

#1


Instead of immediately pushing messages to the DB after taking them out of the queue, keep a list of pending messages in memory. When you get an A or B, check to see if the matching one is in the list. If so, submit them both (in the right order) to the database, and remove the matching one from the list. Otherwise, just add the new message to that list.

将消息从队列中取出后,不要立即将消息推送到数据库,而是在内存中保留待处理消息的列表。当您获得A或B时,请检查匹配的是否在列表中。如果是这样,请将它们(按正确的顺序)提交到数据库,然后从列表中删除匹配的一个。否则,只需将新消息添加到该列表即可。

If checking for a match is too expensive a task to serialize - I assume you are multithreading for a reason - the you could have another thread process the list. The existing multiple threads read, immediately submit most messages to the DB, but put the As and Bs aside in the (threadsafe) list. The background thread scavenges through that list finding matching As and Bs and when it finds them it submits them in the right order (and removes them from the list).

如果检查匹配是一个太昂贵的序列化任务 - 我假设你是多线程的原因 - 你可以让另一个线程处理列表。读取现有的多个线程,立即将大多数消息提交给DB,但将As和B放在(threadsafe)列表中。后台线程通过该列表清除匹配的As和Bs,当它找到它们时,它以正确的顺序提交它们(并从列表中删除它们)。

The bottom line is - since your removing items from the queue with multiple threads, you're going to have to serialize somewhere, in order to ensure ordering. The trick is to minimize the number of times and length of time you spend locked up in serial code.

底线是 - 由于你从多个线程的队列中删除项目,你将不得不在某个地方序列化,以确保订购。诀窍是尽量减少锁定在串行代码中的次数和时间长度。

There might also be something you could do at the database level, with triggers or something, to reorder the entries when it detects this situation. I'm afraid I don't know enough about DB programming to help there.

您可以在数据库级别使用触发器或其他东西在检测到这种情况时对条目进行重新排序。我担心我对DB编程的了解不足以帮助那里。

UPDATE: Assuming the messages contain some id that lets you associate a message 'A' with the correct associated message 'B', the following code will make sure A goes in the database before B. Note that it does not make sure they are adjacent records in the database - there could be other messages between A and B. Also, if for some reason you get an A or B without ever receiving the matching message of the other type, this code will leak memory since it hangs onto the unmatched message forever.

更新:假设消息包含一些id,允许您将消息“A”与正确的关联消息“B”相关联,以下代码将确保A在B之前进入数据库。请注意,它不能确保它们是相邻的数据库中的记录 - 在A和B之间可能还有其他消息。另外,如果由于某种原因你得到A或B而没有收到另一种类型的匹配消息,这段代码将泄漏内存,因为它挂起到不匹配的消息永远。

(You could extract those two 'lock'ed blocks into a single subroutine, but I'm leaving it like this for clarity with respect to A and B.)

(你可以将这两个'锁定的块'提取到一个子程序中,但为了清楚起见,我将它留在A和B中。)

static private object dictionaryLock = new object();
static private Dictionary<int, MyMessage> receivedA = 
    new Dictionary<int, MyMessage>();
static private Dictionary<int, MyMessage> receivedB = 
    new Dictionary<int, MyMessage>();

public void MessageHandler(MyMessage message)
{
    MyMessage matchingMessage = null;
    if (IsA(message))
    {
        InsertIntoDB(message);
        lock (dictionaryLock)
        {
            if (receivedB.TryGetValue(message.id, out matchingMessage))
            {
                receivedB.Remove(message.id);
            }
            else
            {
                receivedA.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(matchingMessage);
        }
    }
    else if (IsB(message))
    {
        lock (dictionaryLock)
        {
            if (receivedA.TryGetValue(message.id, out matchingMessage))
            {
                receivedA.Remove(message.id);
            }
            else
            {
                receivedB.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(message);
        }
    }
    else
    {
        // not A or B, do whatever
    }
}

#2


If you're the only client of those queues, you could very easy add a timestamp as a message header (see IDesign sample) and save the Sent On field (kinda like an outlook message) in the database as well. You could process them in the order they were sent (basically you move the sorting logic at the time of consumption).

如果您是这些队列的唯一客户端,则可以非常轻松地将时间戳添加为消息头(请参阅IDesign示例),并将数据库中的Sent On字段(有点像Outlook消息)保存。您可以按发送顺序处理它们(基本上您在消费时移动排序逻辑)。

Hope this helps, Adrian

希望这会有所帮助,阿德里安

#1


Instead of immediately pushing messages to the DB after taking them out of the queue, keep a list of pending messages in memory. When you get an A or B, check to see if the matching one is in the list. If so, submit them both (in the right order) to the database, and remove the matching one from the list. Otherwise, just add the new message to that list.

将消息从队列中取出后,不要立即将消息推送到数据库,而是在内存中保留待处理消息的列表。当您获得A或B时,请检查匹配的是否在列表中。如果是这样,请将它们(按正确的顺序)提交到数据库,然后从列表中删除匹配的一个。否则,只需将新消息添加到该列表即可。

If checking for a match is too expensive a task to serialize - I assume you are multithreading for a reason - the you could have another thread process the list. The existing multiple threads read, immediately submit most messages to the DB, but put the As and Bs aside in the (threadsafe) list. The background thread scavenges through that list finding matching As and Bs and when it finds them it submits them in the right order (and removes them from the list).

如果检查匹配是一个太昂贵的序列化任务 - 我假设你是多线程的原因 - 你可以让另一个线程处理列表。读取现有的多个线程,立即将大多数消息提交给DB,但将As和B放在(threadsafe)列表中。后台线程通过该列表清除匹配的As和Bs,当它找到它们时,它以正确的顺序提交它们(并从列表中删除它们)。

The bottom line is - since your removing items from the queue with multiple threads, you're going to have to serialize somewhere, in order to ensure ordering. The trick is to minimize the number of times and length of time you spend locked up in serial code.

底线是 - 由于你从多个线程的队列中删除项目,你将不得不在某个地方序列化,以确保订购。诀窍是尽量减少锁定在串行代码中的次数和时间长度。

There might also be something you could do at the database level, with triggers or something, to reorder the entries when it detects this situation. I'm afraid I don't know enough about DB programming to help there.

您可以在数据库级别使用触发器或其他东西在检测到这种情况时对条目进行重新排序。我担心我对DB编程的了解不足以帮助那里。

UPDATE: Assuming the messages contain some id that lets you associate a message 'A' with the correct associated message 'B', the following code will make sure A goes in the database before B. Note that it does not make sure they are adjacent records in the database - there could be other messages between A and B. Also, if for some reason you get an A or B without ever receiving the matching message of the other type, this code will leak memory since it hangs onto the unmatched message forever.

更新:假设消息包含一些id,允许您将消息“A”与正确的关联消息“B”相关联,以下代码将确保A在B之前进入数据库。请注意,它不能确保它们是相邻的数据库中的记录 - 在A和B之间可能还有其他消息。另外,如果由于某种原因你得到A或B而没有收到另一种类型的匹配消息,这段代码将泄漏内存,因为它挂起到不匹配的消息永远。

(You could extract those two 'lock'ed blocks into a single subroutine, but I'm leaving it like this for clarity with respect to A and B.)

(你可以将这两个'锁定的块'提取到一个子程序中,但为了清楚起见,我将它留在A和B中。)

static private object dictionaryLock = new object();
static private Dictionary<int, MyMessage> receivedA = 
    new Dictionary<int, MyMessage>();
static private Dictionary<int, MyMessage> receivedB = 
    new Dictionary<int, MyMessage>();

public void MessageHandler(MyMessage message)
{
    MyMessage matchingMessage = null;
    if (IsA(message))
    {
        InsertIntoDB(message);
        lock (dictionaryLock)
        {
            if (receivedB.TryGetValue(message.id, out matchingMessage))
            {
                receivedB.Remove(message.id);
            }
            else
            {
                receivedA.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(matchingMessage);
        }
    }
    else if (IsB(message))
    {
        lock (dictionaryLock)
        {
            if (receivedA.TryGetValue(message.id, out matchingMessage))
            {
                receivedA.Remove(message.id);
            }
            else
            {
                receivedB.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(message);
        }
    }
    else
    {
        // not A or B, do whatever
    }
}

#2


If you're the only client of those queues, you could very easy add a timestamp as a message header (see IDesign sample) and save the Sent On field (kinda like an outlook message) in the database as well. You could process them in the order they were sent (basically you move the sorting logic at the time of consumption).

如果您是这些队列的唯一客户端,则可以非常轻松地将时间戳添加为消息头(请参阅IDesign示例),并将数据库中的Sent On字段(有点像Outlook消息)保存。您可以按发送顺序处理它们(基本上您在消费时移动排序逻辑)。

Hope this helps, Adrian

希望这会有所帮助,阿德里安