I have created a windows service that consumes messages from a RabbitMq. The consumption is done in a async manner, where messages are pushed to the service.
我创建了一个使用RabbitMq消息的Windows服务。消耗以异步方式完成,消息被推送到服务。
It scales well horizontally with an approx. linear performance gain, even when just creating more service instances on the same server. Since we have a defined goal on throughput, i find it a bit more simple, deployment wise to control scaling by simply creating n number of tasks within the windows service, rather than deploying n number of service instances.
它大约水平伸缩。线性性能增益,即使只是在同一台服务器上创建更多服务实例。由于我们对吞吐量有一个明确的目标,我发现通过简单地在Windows服务中创建n个任务来控制扩展,而不是部署n个服务实例,这样做更简单一些。
Rewriting the windows service to simply start n tasks that each run the "MQ consumption" code, thus in essence doing what multiple instance of the windows service does, does not yield the same performance, effectively there is not performance gain at all, it actually seems that there is a larger overhead on code execution!.
重写windows服务只需启动每个运行“MQ消耗”代码的n个任务,因此本质上是执行windows服务的多个实例所做的,不会产生相同的性能,实际上根本没有性能提升,实际上似乎代码执行有更大的开销!
I do know that there are scheduling issues, but the bottom line question is. would it be possible to use tasks and get the "approximately" same performance as if one started multiple instances of the application? and if so, how would an implementation look like, because my current setup scales poorly.
我知道有调度问题,但最重要的问题是。是否有可能使用任务并获得“大致”相同的性能,就像一个人启动应用程序的多个实例一样?如果是这样,实现将如何,因为我当前的设置很差。
Some code snippets:
一些代码片段:
This is the basic task creation, where each task has an instance of the taskwrapper, that basically is an abstraction of the whole business logic of consuming and processing messages:
这是基本任务创建,其中每个任务都有一个taskwrapper实例,它基本上是消费和处理消息的整个业务逻辑的抽象:
public void Start()
{
for (var i = 0; i < _levelOfConcurrency; i++)
{
var task = Task.Factory.StartNew(() => new TaskWrapper().TaskRun(_cancelationTokenSource.Token,_sleeptime),
TaskCreationOptions.LongRunning);
_tasks[i] = task;
Console.WriteLine("Task created {0}", i);
}
}
public void Stop()
{
_cancelationTokenSource.Cancel();
Task.WaitAll(_tasks);
}
This is what each task basically has in they runloop in the taskwrapper class:
这是每个任务在taskwrapper类中的runloop中基本上具有的:
public void TaskRun(CancellationToken cancellationToken,TimeSpan sleeptime)
{
_semaphoreDataController.Start(); // Process messages async, fully selfcontained/threaded
//Keep alive and occasionally check if cancelation is requested
while (!cancellationToken.IsCancellationRequested)
{
Thread.Sleep(sleeptime);
}
Dispose();
}
I hope someone can enlighten me :-D
我希望有人可以启发我:-D
3 个解决方案
#1
0
To the core of your question:
问题的核心:
It hardly depends on your programming style and the program it self.
它几乎不取决于你的编程风格和它自己的程序。
It is possible to gain the same performance with one multi-threaded process as with multiple instances. But as I said this depends on your service. I would recommend you to read this online version of the book: Parallel programming with .net. It opened my eyes about PLinq, task and threads.
使用一个多线程进程可以获得与多个实例相同的性能。但正如我所说,这取决于你的服务。我建议你阅读本书的在线版本:使用.net进行并行编程。它开启了我对PLinq,任务和线程的关注。
But in the end, its all running on your CPU and interprocess communication is more complex than the inter-thread one.
但最终,它在CPU上运行并且进程间通信比线程间通信更复杂。
#2
0
So further investigation has determined the primary limiting factor in the code.
因此,进一步的调查确定了代码中的主要限制因素。
The way i use communicate with RabbitMQ is via Burrow.NET, and it will create a dedicated channel for the application to communicate with the queue.
我使用RabbitMQ进行通信的方式是通过Burrow.NET,它将为应用程序创建一个专用通道与队列进行通信。
This means that even thou n number of parallel tasks are running they can only consume from the queue from one channel, thus limiting the speed of the service.
这意味着即使许多并行任务正在运行,它们也只能从一个通道的队列中消耗,从而限制了服务的速度。
Having multiple instances of the service, provides n number of channels to the RabbitMQ, thus linear performance gain.
拥有多个服务实例,为RabbitMQ提供n个通道,从而实现线性性能提升。
#3
0
The DedicatedPublishingChannel as its name is used for publishing messages only so I don't think It made the speed problem of consuming messages. Beside, eventhough there is only 1 channel to subscribe from the queue, the speed of messages being delivered to the application is always very fast compare to the time to process those messages. I've made an application to migrate millions of msgs (Tweet json) from the queue to MongoDB and it's always a bottle neck of writting data to database.
DedicatedPublishingChannel作为其名称仅用于发布消息,因此我认为它不会消耗消息的速度问题。此外,尽管从队列中只有一个通道可以订阅,但与处理这些消息的时间相比,传递给应用程序的消息速度总是非常快。我已经创建了一个应用程序,将数百万个msgs(Tweet json)从队列迁移到MongoDB,它始终是将数据写入数据库的瓶颈。
Burrow.NET handles the task management to process messages in parallel for you:
Burrow.NET处理任务管理以便为您并行处理消息:
ITunnel tunnel;
// Create tunnel
tunnel.Subscribe(new SubscriptionOption<Message>
{
BatchSize = 10, // Number of threads you want to process those messages
MessageHandler = message =>
{
// Process your message here
},
QueuePrefetchSize = 10, // Should be double the message processing speed
SubscriptionName = "Anything"
});
// Message will be acked automatically after finish
In my opinion, I would just use as limit IO resource as possible. I got a problem that my application created so many channels and ate quote alot server's memory, It's even worse if the app creates many unneccessary connections, that's why Burrow.NET will help you make as less connections/channels as possible.
在我看来,我会尽可能使用限制IO资源。我遇到了一个问题,即我的应用程序创建了很多通道并且引用了很多服务器的内存,如果应用程序创建了许多不必要的连接,那就更糟了,这就是为什么Burrow.NET会帮助你尽可能少地建立连接/通道。
If you think that your messages can be processed fast enough, you can set the prefetch size with a high number and tune it with the BatchSize (aka thread count). For example, if my app could process messages from a queue at 5msgs/sec, I would keep the QueuePrefetchSize = 10.
如果您认为可以足够快地处理消息,则可以使用较大的数字设置预取大小,并使用BatchSize(也称为线程计数)进行调整。例如,如果我的应用程序可以以5毫秒/秒的速度处理来自队列的消息,那么我将保持QueuePrefetchSize = 10。
If you use the above method with auto ack, you won't have to worry about not ack the messaage because it will block the new messages to come. For instance, you have QueuePrefetchSize = 10 to have 10 messages being processed in parallel by 10 threads. You decided to use tunnel.SubscribeAsync with asynchronously ack the message but for some reasons (an unhandled exception :D may be) 4 of messages were not acked, it would be equivalent to having a QueuePrefetchSize of 6. Therefore, only 6 messages can be processed the same time and properly the speed will decrease. So please try the approach above and see how it goes.
如果你将上述方法与auto ack一起使用,你就不必担心不会消息,因为它会阻止新消息的到来。例如,您有QueuePrefetchSize = 10,有10个线程并行处理10条消息。您决定使用tunnel.SubscribeAsync同步异步消息,但由于某些原因(未处理的异常:D可能)4消息未被激活,这相当于QueuePrefetchSize为6.因此,只有6条消息可以是同时处理,速度会降低。所以请尝试上面的方法,看看它是如何进行的。
#1
0
To the core of your question:
问题的核心:
It hardly depends on your programming style and the program it self.
它几乎不取决于你的编程风格和它自己的程序。
It is possible to gain the same performance with one multi-threaded process as with multiple instances. But as I said this depends on your service. I would recommend you to read this online version of the book: Parallel programming with .net. It opened my eyes about PLinq, task and threads.
使用一个多线程进程可以获得与多个实例相同的性能。但正如我所说,这取决于你的服务。我建议你阅读本书的在线版本:使用.net进行并行编程。它开启了我对PLinq,任务和线程的关注。
But in the end, its all running on your CPU and interprocess communication is more complex than the inter-thread one.
但最终,它在CPU上运行并且进程间通信比线程间通信更复杂。
#2
0
So further investigation has determined the primary limiting factor in the code.
因此,进一步的调查确定了代码中的主要限制因素。
The way i use communicate with RabbitMQ is via Burrow.NET, and it will create a dedicated channel for the application to communicate with the queue.
我使用RabbitMQ进行通信的方式是通过Burrow.NET,它将为应用程序创建一个专用通道与队列进行通信。
This means that even thou n number of parallel tasks are running they can only consume from the queue from one channel, thus limiting the speed of the service.
这意味着即使许多并行任务正在运行,它们也只能从一个通道的队列中消耗,从而限制了服务的速度。
Having multiple instances of the service, provides n number of channels to the RabbitMQ, thus linear performance gain.
拥有多个服务实例,为RabbitMQ提供n个通道,从而实现线性性能提升。
#3
0
The DedicatedPublishingChannel as its name is used for publishing messages only so I don't think It made the speed problem of consuming messages. Beside, eventhough there is only 1 channel to subscribe from the queue, the speed of messages being delivered to the application is always very fast compare to the time to process those messages. I've made an application to migrate millions of msgs (Tweet json) from the queue to MongoDB and it's always a bottle neck of writting data to database.
DedicatedPublishingChannel作为其名称仅用于发布消息,因此我认为它不会消耗消息的速度问题。此外,尽管从队列中只有一个通道可以订阅,但与处理这些消息的时间相比,传递给应用程序的消息速度总是非常快。我已经创建了一个应用程序,将数百万个msgs(Tweet json)从队列迁移到MongoDB,它始终是将数据写入数据库的瓶颈。
Burrow.NET handles the task management to process messages in parallel for you:
Burrow.NET处理任务管理以便为您并行处理消息:
ITunnel tunnel;
// Create tunnel
tunnel.Subscribe(new SubscriptionOption<Message>
{
BatchSize = 10, // Number of threads you want to process those messages
MessageHandler = message =>
{
// Process your message here
},
QueuePrefetchSize = 10, // Should be double the message processing speed
SubscriptionName = "Anything"
});
// Message will be acked automatically after finish
In my opinion, I would just use as limit IO resource as possible. I got a problem that my application created so many channels and ate quote alot server's memory, It's even worse if the app creates many unneccessary connections, that's why Burrow.NET will help you make as less connections/channels as possible.
在我看来,我会尽可能使用限制IO资源。我遇到了一个问题,即我的应用程序创建了很多通道并且引用了很多服务器的内存,如果应用程序创建了许多不必要的连接,那就更糟了,这就是为什么Burrow.NET会帮助你尽可能少地建立连接/通道。
If you think that your messages can be processed fast enough, you can set the prefetch size with a high number and tune it with the BatchSize (aka thread count). For example, if my app could process messages from a queue at 5msgs/sec, I would keep the QueuePrefetchSize = 10.
如果您认为可以足够快地处理消息,则可以使用较大的数字设置预取大小,并使用BatchSize(也称为线程计数)进行调整。例如,如果我的应用程序可以以5毫秒/秒的速度处理来自队列的消息,那么我将保持QueuePrefetchSize = 10。
If you use the above method with auto ack, you won't have to worry about not ack the messaage because it will block the new messages to come. For instance, you have QueuePrefetchSize = 10 to have 10 messages being processed in parallel by 10 threads. You decided to use tunnel.SubscribeAsync with asynchronously ack the message but for some reasons (an unhandled exception :D may be) 4 of messages were not acked, it would be equivalent to having a QueuePrefetchSize of 6. Therefore, only 6 messages can be processed the same time and properly the speed will decrease. So please try the approach above and see how it goes.
如果你将上述方法与auto ack一起使用,你就不必担心不会消息,因为它会阻止新消息的到来。例如,您有QueuePrefetchSize = 10,有10个线程并行处理10条消息。您决定使用tunnel.SubscribeAsync同步异步消息,但由于某些原因(未处理的异常:D可能)4消息未被激活,这相当于QueuePrefetchSize为6.因此,只有6条消息可以是同时处理,速度会降低。所以请尝试上面的方法,看看它是如何进行的。