如何控制来自ActiveMQ在c#应用程序中的消息量?

时间:2022-04-13 09:50:23

I'm using ActiveMQ in a .Net program and I'm flooded with message-events.

我在。net程序中使用ActiveMQ,我被消息事件淹没了。

In short when I get a queue-event 'onMessage(IMessage receivedMsg)' I put the message into an internal queue out of which X threads do their thing.

简言之,当我得到一个队列事件的onMessage(IMessage receivedMsg)时,我将消息放入一个内部队列中,其中X线程执行它们的任务。

At first I had: 'AcknowledgementMode.AutoAcknowledge' when creating the session so I'm guessing that all the messages in the queue got sucked down and put into the memory queue (which is risky since with a crash, everything is lost).

一开始我用的是“确认模式”。在创建会话时自动确认“,因此我猜测队列中的所有消息都被吸收并放入内存队列(这是有风险的,因为在崩溃时,所有消息都丢失了)。

So then I used: 'AcknowledgementMode.ClientAcknowledge' when creating the session, and when a worker was ready with the message it calls the 'commit()' method on the message. However, still all the messages get sucked down from the queue.

于是我用了“确认模式”。当创建会话时,当工作人员准备好消息时,它会在消息上调用“commit()”方法。然而,所有的消息仍然从队列中被吸出。

How can I configure it that ONLY an X amount of messages are being processed or are in an internal queue, and that not everything is being 'downloaded' right away?

我如何配置它,只有X数量的消息正在处理或处于内部队列中,并且不是所有的消息都正在“下载”?

2 个解决方案

#1


3  

Are you on .NET 4.0? You could use a BlockingCollection . Set it to the maximum amount it may contain. As soon as a thread tries to put in an excess element, the Add operation will block until the collection falls below the threshold again.

你在。net 4.0上吗?您可以使用块集合。将它设置为它可能包含的最大数量。一旦线程尝试放入多余的元素,添加操作就会阻塞,直到集合再次降到阈值以下。

Maybe that would do it for throttling?

也许这能用于节流?

There is also an API for throttling in the Rx framework, but I do not know how it is implemented. If you implement your Queue source as Observable, this API would become available for you, but I don't know if this hits your needs.

在Rx框架中还有一个用于节流的API,但是我不知道如何实现它。如果您将队列源实现为可观察的,那么这个API将对您可用,但是我不知道这是否满足您的需要。

#2


2  

You can set the client prefetch to control how many messages the client will be sent. When the Session is in Auto Ack, the client will only ack a message once its been delivered to your app via the onMessage callback or through a synchronous receive. By default the client will prefetch 1000 messages from the broker, if the client goes down these messages would be redelivered to another client it this was a Queue, otherwise for a topic they are just discarded as a topic is a broadcast based channel. If you set the prefetch to one then you client would only be sent one message from the sever, then each time your onMessage callback completes a new message would be dispatched as the client would ack that message, that is if the session is in Auto Ack mode.

您可以设置客户端预取来控制客户端将发送多少消息。当会话处于自动Ack中时,客户端只有通过onMessage回调或同步接收将消息发送到应用程序时才会对消息进行Ack。默认情况下,客户端将从代理中预取1000条消息,如果客户端下线,这些消息将被重新发送到另一个客户端,这是一个队列,否则对于一个主题,它们将被丢弃,因为主题是一个基于广播的通道。如果您将预取设置为一个,那么您的客户端将只从服务器发送一条消息,那么每当onMessage回调完成时,将发送一条新消息,因为客户端将对该消息进行ack,即会话处于自动ack模式。

Refer to the NMS configuration page for all the options: http://activemq.apache.org/nms/configuring.html

所有选项请参考NMS配置页面:http://activemq.apache.org/nms/configuring.html

Regards

问候

Tim. FuseSource.com

蒂姆。FuseSource.com

#1


3  

Are you on .NET 4.0? You could use a BlockingCollection . Set it to the maximum amount it may contain. As soon as a thread tries to put in an excess element, the Add operation will block until the collection falls below the threshold again.

你在。net 4.0上吗?您可以使用块集合。将它设置为它可能包含的最大数量。一旦线程尝试放入多余的元素,添加操作就会阻塞,直到集合再次降到阈值以下。

Maybe that would do it for throttling?

也许这能用于节流?

There is also an API for throttling in the Rx framework, but I do not know how it is implemented. If you implement your Queue source as Observable, this API would become available for you, but I don't know if this hits your needs.

在Rx框架中还有一个用于节流的API,但是我不知道如何实现它。如果您将队列源实现为可观察的,那么这个API将对您可用,但是我不知道这是否满足您的需要。

#2


2  

You can set the client prefetch to control how many messages the client will be sent. When the Session is in Auto Ack, the client will only ack a message once its been delivered to your app via the onMessage callback or through a synchronous receive. By default the client will prefetch 1000 messages from the broker, if the client goes down these messages would be redelivered to another client it this was a Queue, otherwise for a topic they are just discarded as a topic is a broadcast based channel. If you set the prefetch to one then you client would only be sent one message from the sever, then each time your onMessage callback completes a new message would be dispatched as the client would ack that message, that is if the session is in Auto Ack mode.

您可以设置客户端预取来控制客户端将发送多少消息。当会话处于自动Ack中时,客户端只有通过onMessage回调或同步接收将消息发送到应用程序时才会对消息进行Ack。默认情况下,客户端将从代理中预取1000条消息,如果客户端下线,这些消息将被重新发送到另一个客户端,这是一个队列,否则对于一个主题,它们将被丢弃,因为主题是一个基于广播的通道。如果您将预取设置为一个,那么您的客户端将只从服务器发送一条消息,那么每当onMessage回调完成时,将发送一条新消息,因为客户端将对该消息进行ack,即会话处于自动ack模式。

Refer to the NMS configuration page for all the options: http://activemq.apache.org/nms/configuring.html

所有选项请参考NMS配置页面:http://activemq.apache.org/nms/configuring.html

Regards

问候

Tim. FuseSource.com

蒂姆。FuseSource.com