在第一篇我们写了两个程序通过一个命名的队列分别发送和接收消息。在这一篇,我们将创建一个工作队列在多个工作线程间分发耗时的工作任务。
工作队列的核心思想是避免立刻处理资源密集型任务导致必须等待其执行完成。相反的,我们安排这些任务在稍晚的时间完成。我们将一个任务封装为一个消息并把它发送到队列中。一个后台的工作线程将从队列中取出任务并最终执行。当你运行多个工作线程,这些任务将在这些工作线程间共享。
这个概念对于在一个HTTP请求中处理复杂任务的Web应用尤其有用。
准备工作
在前一篇中,我们发送了一条内容为“Hello World!”的消息。现在,我们将要发送一些代表复杂任务的字符串。我们并没有诸如改变图片大小或者渲染PDF文件这样的真实的任务,所以假设任务会导致系统的繁忙--通过使用Threed.Sleep()函数。我们会采用许多的点(.)在字符串中来表达他的复杂性,每一个点将消耗一秒钟的工作时间。例如,假设有一个任务“Hello...”将消耗3秒钟。
我们会把上一个例子中的Send.cs文件中的代码稍微调整一下,使得对任意的消息都能通过命令行发送。这个程序将调度任务到我们的工作队列中,所以让我们将它命名为NewTask.cs:
1 var message = GetMessage(args);
2 var body = Encoding.UTF8.GetBytes(message);
3
4 var properties = channel.CreateBasicProperties();
5 properties.SetPersistent(true);
6
7 channel.BasicPublish(exchange: "",
8 routingKey: "task_queue",
9 basicProperties: properties,
10 body: body);
获取命令行消息的帮助方法:
1 private static string GetMessage(string[] args)
2 {
3 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
4 }
旧有的Receive.cs代码同样需要稍作修改:需要一个为消息中每一个点模拟一秒的时间消耗。它将会处理RabbitMQ发布的消息,执行任务,所以我们称之为Worker.cs。
1 var consumer = new EventingBasicConsumer(channel);
2 consumer.Received += (model, ea) =>
3 {
4 var body = ea.Body;
5 var message = Encoding.UTF8.GetString(body);
6 Console.WriteLine(" [x] Received {0}", message);
7
8 int dots = message.Split('.').Length - 1;
9 Thread.Sleep(dots * 1000);
10
11 Console.WriteLine(" [x] Done");
12
13 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
14 };
15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
模拟虚拟任务的执行时间:
1 int dots = message.Split('.').Length - 1;
2 Thread.Sleep(dots * 1000);
像第一篇中那样编译程序:
1 $ csc /r:"RabbitMQ.Client.dll" NewTask.cs
2 $ csc /r:"RabbitMQ.Client.dll" Worker.cs
轮转调度
使用工作队列的好处之一是能够很轻松的并行任务。如果我们要加强对积压工作的处理,只需要按照上面的方法添加更多的Worker,非常容易扩展。
首先,我们同时运行两个Worker。它们都会从队列中获取消息,但是究竟是怎样做到的呢?让我们看看。
你需要打开三个控制台。两个运行Worker,这两个控制台程序将充当消费者--C1与C2。
1 shell1$ Worker.exe
2 Worker
3 [*] Waiting for messages. To exit press CTRL+C
1 shell2$ Worker.exe
2 Worker
3 [*] Waiting for messages. To exit press CTRL+C
在第三个控制台程序中,我们将发布一些新的任务。一旦你已经运行了消费者,你就可以发布新消息了:
1 shell3$ NewTask.exe First message.
2 shell3$ NewTask.exe Second message..
3 shell3$ NewTask.exe Third message...
4 shell3$ NewTask.exe Fourth message....
5 shell3$ NewTask.exe Fifth message.....
让我们看看有什么发送到了Worker端:
1 shell1$ Worker.exe
2 [*] Waiting for messages. To exit press CTRL+C
3 [x] Received 'First message.'
4 [x] Received 'Third message...'
5 [x] Received 'Fifth message.....'
1 shell2$ Worker.exe
2 [*] Waiting for messages. To exit press CTRL+C
3 [x] Received 'Second message..'
4 [x] Received 'Fourth message....'
默认情况下,RabbitMQ会按顺序将消息逐个发送到消费者。平均情况下,每一个消费者将会获得相同数量的消息。这种分发消息的方式成为轮转调度。可以使用三个以上的Worker试一试。
消息确认
处理一个任务可能花费数秒钟。你可能会担心消费者开始一个较长的任务,但是在完成部分之后就出错了。在我们现在的代码中,一旦RabbitMQ分发了一条消息给消费者它就会马上在队列中删除这条消息。在这样的情况下,如果你中止某一个Worker,因为消息正在执行中,我们将丢失该消息。我们也将丢失所有分发到该Worker但是未被处理的消息。
但是我们不想丢失任何一个任务。如果一个Worker中止了,我们希望这个任务能被分发给其他Worker。
为了确保消息绝不丢失,RabbitMQ提供了消息确认机制。消费者回发一个确认给RabbitMQ,告知某个消息已经被接收、处理,然后RabbitMQ就可以随心所欲的删除它了。
如果一个消费者在没有回发确认就中止了,RabbitMQ会认为该消息没有被完全的处理,并会将该消息重新分发给其他的消费者。通过这种方式,你可以确定没有消息会丢失,即使有Worker会不可意料的中止。
没有消息会超时,RabbitMQ仅仅会在Worker的连接中止的时候重新分发消息。即使处理一个消息花费的时间很长很长也不会有什么关系。
消息确认在默认情况下是开启的。在前面的示例中我们通过将noAck参数设置为true显示的关闭了消息确认。现在是时候移除该标记了,使完成一个任务时发回一个恰当的确认。
1 var consumer = new EventingBasicConsumer(channel);
2 consumer.Received += (model, ea) =>
3 {
4 var body = ea.Body;
5 var message = Encoding.UTF8.GetString(body);
6 Console.WriteLine(" [x] Received {0}", message);
7
8 int dots = message.Split('.').Length - 1;
9 Thread.Sleep(dots * 1000);
10
11 Console.WriteLine(" [x] Done");
12
13 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
14 };
15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
使用这段代码,我们可以确保及时在消费者正在执行时你用CTRL+C强制中断了程序,也不会丢失任何消息。在消费者中止后不久,所有为收到确认的消息都将被重新分发。
被遗忘的确认
缺少BasicAck是一个非常常见的错误。这是一个简单的错误,但是后果却相当严重。当客户端退出的时候,消息会被重新分发(看起来像是随机分发的),但是RabbitMQ会占用越来越多的内存因为它不能释放未确认的消息。
为了调试这种错误,你可以使用rabbitmqctl打印messages_unacknowledged字段:
1 $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
2 Listing queues ...
3 hello 0 0
4 ...done.
消息持久化
我们已经学习了如何确保即使消费者中止,任务也不会丢失。但是如果RabbitMQ服务中止的时候,我们的任务还是会丢失。
当RabbitMQ退出或者崩溃,它会忘记存在的队列和队列中的消息,除非你告诉它不要这样。确保消息不丢失,有两件事情是必须的:我们必须同时把队列和消息标记为持久的(durable)。
首先,我们需要确保RabbitMQ永远不会丢失队列。为了做到这件事,我们需要将队列申明为持久的:
1 channel.QueueDeclare(queue: "hello",
2 durable: true,
3 exclusive: false,
4 autoDelete: false,
5 arguments: null);
尽管此命令本身是正确的,但是在当前设置下它不会起作用。因为我们已经定义过一个叫做hello的队列。RabbitMQ不允许使用不同的参数重定义一个已经存在的队列,任何尝试做这样的事情的程序都将返回一个错误。但是有一个变通的方法--让我们用不同的名称申明一个队列,例如task_queue:
1 channel.QueueDeclare(queue: "task_queue",
2 durable: true,
3 exclusive: false,
4 autoDelete: false,
5 arguments: null);
这个队列申明的改变需要被应用于生产者和消费者。
这个时候,我们可以确定即使是RabbitMQ重启了,task_queue也不会丢失。现在我们需要把我们的消息标记为持久的(persistent),通过把IBasicProperties.SetPersistent设置为true。
1 var properties = channel.CreateBasicProperties();
2 properties.SetPersistent(true);
消息持久注记
将消息标记为持久的并不能完全的保证消息不会丢失。尽管告知了RabbitMQ将消息保存在磁盘上,仍旧有很短的时间里RabbitMQ接收到一个消息并且还没有保存。所以RabbitMQ不会对每条消息做fsync--它可能仅仅被存放在Cache中而不是实际写入到磁盘里面。消息的持久化保证并不健壮,但是对于简单的任务队列已经足够。如果你需要一个更加健壮的保证,你可以使用发布者确认。
公平调度
你可能已经注意到调度依旧不能完全按照我们期望的方式工作。设想一个有两个Worker的应用场景,当所有奇数消息都很庞大而偶数消息很轻量的时候,一个Worker总是非常的繁忙而另一个几乎不做什么事情。嗯,RabbitMQ并不会知道这事儿,它依然会平均的分发消息。
出现这种情况是因为RabbitMQ只是在消息进入队列后就将其分发。它并不会去检查每个消费者所拥有的未确定消息的数量。它只是不假思索的将第N个消息调度到第N个消费者。
为了应对这种情况,我们可以使用basicQos方法并且把参数prefetchCount设置为1。这将告诉RabbitMQ不要同一时间调度给同一个消费者超过一条消息。或者,当一个消费者正在处理或确认前一个消息时不要将新消息调度给它。相反的,它会把这个消息调度给下一个不忙碌的消费者。
1 channel.BasicQos(0, 1, false);
队列大小注记
如果所有的消费者都很忙碌,你的队列可能被填满。你希望能盯着这个问题,并且添加更多的消费者,或者使用其他策略。
组合在一起
NewTask.cs类的最终代码如下:
1 using System;
2 using RabbitMQ.Client;
3 using System.Text;
4
5 class NewTask
6 {
7 public static void Main(string[] args)
8 {
9 var factory = new ConnectionFactory() { HostName = "localhost" };
10 using(var connection = factory.CreateConnection())
11 using(var channel = connection.CreateModel())
12 {
13 channel.QueueDeclare(queue: "task_queue",
14 durable: true,
15 exclusive: false,
16 autoDelete: false,
17 arguments: null);
18
19 var message = GetMessage(args);
20 var body = Encoding.UTF8.GetBytes(message);
21
22 var properties = channel.CreateBasicProperties();
23 properties.SetPersistent(true);
24
25 channel.BasicPublish(exchange: "",
26 routingKey: "task_queue",
27 basicProperties: properties,
28 body: body);
29 Console.WriteLine(" [x] Sent {0}", message);
30 }
31
32 Console.WriteLine(" Press [enter] to exit.");
33 Console.ReadLine();
34 }
35
36 private static string GetMessage(string[] args)
37 {
38 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
39 }
40 }
Worker.cs类的最终代码如下:
1 using System;
2 using RabbitMQ.Client;
3 using RabbitMQ.Client.Events;
4 using System.Text;
5 using System.Threading;
6
7 class Worker
8 {
9 public static void Main()
10 {
11 var factory = new ConnectionFactory() { HostName = "localhost" };
12 using(var connection = factory.CreateConnection())
13 using(var channel = connection.CreateModel())
14 {
15 channel.QueueDeclare(queue: "task_queue",
16 durable: true,
17 exclusive: false,
18 autoDelete: false,
19 arguments: null);
20
21 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
22
23 Console.WriteLine(" [*] Waiting for messages.");
24
25 var consumer = new EventingBasicConsumer(channel);
26 consumer.Received += (model, ea) =>
27 {
28 var body = ea.Body;
29 var message = Encoding.UTF8.GetString(body);
30 Console.WriteLine(" [x] Received {0}", message);
31
32 int dots = message.Split('.').Length - 1;
33 Thread.Sleep(dots * 1000);
34
35 Console.WriteLine(" [x] Done");
36
37 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
38 };
39 channel.BasicConsume(queue: "task_queue",
40 noAck: false,
41 consumer: consumer);
42
43 Console.WriteLine(" Press [enter] to exit.");
44 Console.ReadLine();
45 }
46 }
47 }
你可以使用消息确定和BasicQos设置一个工作队列。持久化选项使得消息RabbitMQ重启的时候也得以保全。
要了解关于IModel和IBasicProperties的更多信息,你可以浏览在线的RabbitMQ .NET客户端API引用。
现在我们可以前进道教程三并了解如何向多个消费者发送相同的消息。
原文链接:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html