
时间:2022-01-03 02:17:16

I have created a windows service that consumes messages from a RabbitMq. The consumption is done in a async manner, where messages are pushed to the service.


It scales well horizontally with an approx. linear performance gain, even when just creating more service instances on the same server. Since we have a defined goal on throughput, i find it a bit more simple, deployment wise to control scaling by simply creating n number of tasks within the windows service, rather than deploying n number of service instances.


Rewriting the windows service to simply start n tasks that each run the "MQ consumption" code, thus in essence doing what multiple instance of the windows service does, does not yield the same performance, effectively there is not performance gain at all, it actually seems that there is a larger overhead on code execution!.


I do know that there are scheduling issues, but the bottom line question is. would it be possible to use tasks and get the "approximately" same performance as if one started multiple instances of the application? and if so, how would an implementation look like, because my current setup scales poorly.


Some code snippets:


This is the basic task creation, where each task has an instance of the taskwrapper, that basically is an abstraction of the whole business logic of consuming and processing messages:


public void Start()
        for (var i = 0; i < _levelOfConcurrency; i++)
            var task = Task.Factory.StartNew(() => new TaskWrapper().TaskRun(_cancelationTokenSource.Token,_sleeptime),
            _tasks[i] = task;
            Console.WriteLine("Task created {0}", i);

 public void Stop()

This is what each task basically has in they runloop in the taskwrapper class:


public void TaskRun(CancellationToken cancellationToken,TimeSpan sleeptime)
        _semaphoreDataController.Start(); // Process messages async, fully selfcontained/threaded

        //Keep alive and occasionally check if cancelation is requested
        while (!cancellationToken.IsCancellationRequested)

I hope someone can enlighten me :-D


3 个解决方案



To the core of your question:


It hardly depends on your programming style and the program it self.


It is possible to gain the same performance with one multi-threaded process as with multiple instances. But as I said this depends on your service. I would recommend you to read this online version of the book: Parallel programming with .net. It opened my eyes about PLinq, task and threads.


But in the end, its all running on your CPU and interprocess communication is more complex than the inter-thread one.




So further investigation has determined the primary limiting factor in the code.


The way i use communicate with RabbitMQ is via Burrow.NET, and it will create a dedicated channel for the application to communicate with the queue.


This means that even thou n number of parallel tasks are running they can only consume from the queue from one channel, thus limiting the speed of the service.


Having multiple instances of the service, provides n number of channels to the RabbitMQ, thus linear performance gain.




The DedicatedPublishingChannel as its name is used for publishing messages only so I don't think It made the speed problem of consuming messages. Beside, eventhough there is only 1 channel to subscribe from the queue, the speed of messages being delivered to the application is always very fast compare to the time to process those messages. I've made an application to migrate millions of msgs (Tweet json) from the queue to MongoDB and it's always a bottle neck of writting data to database.

DedicatedPublishingChannel作为其名称仅用于发布消息,因此我认为它不会消耗消息的速度问题。此外,尽管从队列中只有一个通道可以订阅,但与处理这些消息的时间相比,传递给应用程序的消息速度总是非常快。我已经创建了一个应用程序,将数百万个msgs(Tweet json)从队列迁移到MongoDB,它始终是将数据写入数据库的瓶颈。

Burrow.NET handles the task management to process messages in parallel for you:


ITunnel tunnel;
// Create tunnel
tunnel.Subscribe(new SubscriptionOption<Message>
    BatchSize = 10, // Number of threads you want to process those messages
    MessageHandler = message => 
        // Process your message here
    QueuePrefetchSize = 10, // Should be double the message processing speed
    SubscriptionName = "Anything"
// Message will be acked automatically after finish

In my opinion, I would just use as limit IO resource as possible. I got a problem that my application created so many channels and ate quote alot server's memory, It's even worse if the app creates many unneccessary connections, that's why Burrow.NET will help you make as less connections/channels as possible.


If you think that your messages can be processed fast enough, you can set the prefetch size with a high number and tune it with the BatchSize (aka thread count). For example, if my app could process messages from a queue at 5msgs/sec, I would keep the QueuePrefetchSize = 10.

如果您认为可以足够快地处理消息,则可以使用较大的数字设置预取大小,并使用BatchSize(也称为线程计数)进行调整。例如,如果我的应用程序可以以5毫秒/秒的速度处理来自队列的消息,那么我将保持QueuePrefetchSize = 10。

If you use the above method with auto ack, you won't have to worry about not ack the messaage because it will block the new messages to come. For instance, you have QueuePrefetchSize = 10 to have 10 messages being processed in parallel by 10 threads. You decided to use tunnel.SubscribeAsync with asynchronously ack the message but for some reasons (an unhandled exception :D may be) 4 of messages were not acked, it would be equivalent to having a QueuePrefetchSize of 6. Therefore, only 6 messages can be processed the same time and properly the speed will decrease. So please try the approach above and see how it goes.

如果你将上述方法与auto ack一起使用,你就不必担心不会消息,因为它会阻止新消息的到来。例如,您有QueuePrefetchSize = 10,有10个线程并行处理10条消息。您决定使用tunnel.SubscribeAsync同步异步消息,但由于某些原因(未处理的异常:D可能)4消息未被激活,这相当于QueuePrefetchSize为6.因此,只有6条消息可以是同时处理,速度会降低。所以请尝试上面的方法,看看它是如何进行的。



To the core of your question:


It hardly depends on your programming style and the program it self.


It is possible to gain the same performance with one multi-threaded process as with multiple instances. But as I said this depends on your service. I would recommend you to read this online version of the book: Parallel programming with .net. It opened my eyes about PLinq, task and threads.


But in the end, its all running on your CPU and interprocess communication is more complex than the inter-thread one.




So further investigation has determined the primary limiting factor in the code.


The way i use communicate with RabbitMQ is via Burrow.NET, and it will create a dedicated channel for the application to communicate with the queue.


This means that even thou n number of parallel tasks are running they can only consume from the queue from one channel, thus limiting the speed of the service.


Having multiple instances of the service, provides n number of channels to the RabbitMQ, thus linear performance gain.




The DedicatedPublishingChannel as its name is used for publishing messages only so I don't think It made the speed problem of consuming messages. Beside, eventhough there is only 1 channel to subscribe from the queue, the speed of messages being delivered to the application is always very fast compare to the time to process those messages. I've made an application to migrate millions of msgs (Tweet json) from the queue to MongoDB and it's always a bottle neck of writting data to database.

DedicatedPublishingChannel作为其名称仅用于发布消息,因此我认为它不会消耗消息的速度问题。此外,尽管从队列中只有一个通道可以订阅,但与处理这些消息的时间相比,传递给应用程序的消息速度总是非常快。我已经创建了一个应用程序,将数百万个msgs(Tweet json)从队列迁移到MongoDB,它始终是将数据写入数据库的瓶颈。

Burrow.NET handles the task management to process messages in parallel for you:


ITunnel tunnel;
// Create tunnel
tunnel.Subscribe(new SubscriptionOption<Message>
    BatchSize = 10, // Number of threads you want to process those messages
    MessageHandler = message => 
        // Process your message here
    QueuePrefetchSize = 10, // Should be double the message processing speed
    SubscriptionName = "Anything"
// Message will be acked automatically after finish

In my opinion, I would just use as limit IO resource as possible. I got a problem that my application created so many channels and ate quote alot server's memory, It's even worse if the app creates many unneccessary connections, that's why Burrow.NET will help you make as less connections/channels as possible.


If you think that your messages can be processed fast enough, you can set the prefetch size with a high number and tune it with the BatchSize (aka thread count). For example, if my app could process messages from a queue at 5msgs/sec, I would keep the QueuePrefetchSize = 10.

如果您认为可以足够快地处理消息,则可以使用较大的数字设置预取大小,并使用BatchSize(也称为线程计数)进行调整。例如,如果我的应用程序可以以5毫秒/秒的速度处理来自队列的消息,那么我将保持QueuePrefetchSize = 10。

If you use the above method with auto ack, you won't have to worry about not ack the messaage because it will block the new messages to come. For instance, you have QueuePrefetchSize = 10 to have 10 messages being processed in parallel by 10 threads. You decided to use tunnel.SubscribeAsync with asynchronously ack the message but for some reasons (an unhandled exception :D may be) 4 of messages were not acked, it would be equivalent to having a QueuePrefetchSize of 6. Therefore, only 6 messages can be processed the same time and properly the speed will decrease. So please try the approach above and see how it goes.

如果你将上述方法与auto ack一起使用,你就不必担心不会消息,因为它会阻止新消息的到来。例如,您有QueuePrefetchSize = 10,有10个线程并行处理10条消息。您决定使用tunnel.SubscribeAsync同步异步消息,但由于某些原因(未处理的异常:D可能)4消息未被激活,这相当于QueuePrefetchSize为6.因此,只有6条消息可以是同时处理,速度会降低。所以请尝试上面的方法,看看它是如何进行的。