工作队列
在上一个教程中,我们写了一个从一个已经命好名的队列中收发消息的程序。在这个教程中,我们将创建一个工作队列用来在多个工作者之间分发耗时(time-consuming)任务。
工作队列(又名:任务队列)背后的主要思想是避免立即做资源密集型的任务并且要等到它完成。相反,我们调度这个任务在以后完成。我们封装一个消息任务并把它发送给队列。一个工作进程在后台运行:取出任务并最终执行这个任务。如果跑了多个工作任务,那么消息被它们共享。
这些概念在web应用中是特别有用的,在很短的http请求完成一个复杂的任务。
准备
在前面的一个教程中我们发送了一个包含“Hello World!”的消息。现在我们打算发送一个字符串来代替一个复杂的任务。我们没有一个真正的任务,比如改变图片大小或者渲染一个pdf文件,我们假装我们很忙-通过使用Thread.sleep()
方法。我们将以.
的数量来表示任务的复杂度。每一个点表示需要“工作”1s,例如:一个假任务描述为:Hello...
表示需要花费3秒的时间。
我们将简单修改一下我们之前的例子Send.java
,官网的例子是用命令行,但是我们用IDE,所以不和官网的一样了。官网用命令行连发了5条消息,我们将用for循环来实现。新的程序我们命名为NewTask.java
,以下是所有代码:
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列例子
*/
public class NewTask {
private static final String QUEUE_NAME = "work.queue";
private static final String[] strings = {
"First message.",
"Second message..",
"Third message...",
"Fourth message....",
"Fifth message....."
};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String[] messages = strings;
for (String message: messages){
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Send '" + message + "'");
}
channel.close();
connection.close();
}
}
我们老的Recv.java
也是只需要稍微修改一下。只需要对.
进行一个处理,我们将新的程序命名为Worker.java
。以下是全部代码:
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列消费端
*/
public class Worker {
private static final String QUEUE_NAME = "work.queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
}finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String message) {
for (char ch : message.toCharArray()){
if (ch == '.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
轮循分发(Round-robin dispatching)
任务队列的一个好处就是能很简单的并行工作。如果我们有积压的工作,我们只需要添加更多的工作者,很容易扩展。
首先,让我们同时跑两个工作者实例。它们都将从队列中获取消息,结果怎样,我们拭目以待。
你需要开三个控制台,两个跑工作者程序。这两个就是我们的消费者 - C1
和 C2
。
第三个控制台我们发布新的任务。一旦你启动好了消费者们,你就可以发布消息了:即执行生产者程序NewTask.java
。以下是执行结果:
下面是当执行上面的程序之后,两个worker的输出:
worker-1
worker-2
默认的,RabbitMQ会有序的将一个个消息交付给下一个消费者。平均每个消费者将会得到相同数量的消息。这种分发消息的方式叫轮循(round-robin)。可以尝试3个或更多的工作者。
消息确认(message acknowledgment)
完成一个任务需要花费一些时间,你可以想象一个需要较长时间完成的任务在执行的中途中挂了会发生什么。我们当前的代码,一旦RabbitMQ将消息交付给客户,它将立即从内存中被移除。在这个例子中,如果你在执行过程中杀死一个工作者我们将丢失这个消息。我们也会丢失所有分发给指定的这个工作者但是还没有处理的消息。
但是我们不想丢失任何任务。如果一个工作者挂了,我们希望这个任务能交付给另一个工作者。
为了确保消息永远不会丢失,RabbitMQ提供了消息确认机制,消费者向RabbitMQ中发送一个确认表示消息已经接收、处理并且RabbitMQ可以*的删除它了。
如果一个消费者挂了(通道关闭,连接关闭或者TCP连接丢失)没有发送确认,RabbitMQ将会理解成消息没有被完全处理并将消息发回队列。如果这个时候有其他消费者在线,它将被快速的交付给另一个消费者。通过这种方式能确保没有消息丢失,即使工作者偶尔挂掉。
这里没有消息超时,当消费者挂了RabbitMQ将会重新交付消息。如果一个消息的处理过程花费了很长很长的时间,这个是允许的。
消息确认默认是打开的,在上一个例子中我们可以通过设置标志autoAck=true
将其显示的关掉。这里我们将标志设置为false
并当我们完成任务后发送一个确认。以下是修改后的代码片段:
//只接受一个未确认的消息(见下文)
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
}finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动确认设置为false
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
使用这些代码,我们能确保即使我们在消息处理的过程中杀死消费者,也不会有消息丢失,因为所有未确认的消息在工作线程挂掉后都会被重新交付到其他消费者。
消息持久化(Message Durability)
我们已经学习如何确保即使消费者挂了,消息也不会丢失。但是如果RabbitMQ服务挂了,我们还是会丢失我们的消息。
当RabbitMQ退出或者崩溃,它将忽略掉队列和消息,除非你告诉它不要忽略。两个步骤确保消息不会丢失:我们需要将队列和消息都标志为持久化的。
第一,我们需要保证RabbitMQ永远不会丢失我们的队列。为了能实现它,我们需要将其定义为持久化的。
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
如果QUEUE_NAME
这个队列名在之前就已经使用,并且没有设置为持久化,那么我们需要重新设置一个队列名,不然就会有冲突。RabbitMQ是不允许设置一个既是持久化又是非持久化的队列存在的。
上面的定义需要将消费者和生产者都改掉。
在这里即使RabbitMQ重启我们也能确保队列不会丢失。现在我们需要使我们的消息是持久化的——通过设置MessageProperties
(它实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN
。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
注意:消息持久化
设置消息持久化并不能完全保证消息不会丢失。即使告知RabbitMQ将消息持久化到磁盘,但是RabbitMQ还是会出现已经接受到消息但是却没有保存它的情况。并且,RabbitMQ不会为每个消息执行
fsync(2)
——它可能只是保存到内存中但是没有真的写到磁盘中。持久化并不是强持久化的,但是对于简单的任务队列已经足够使用了。如果你需要一个强持久化,你可以使用publisher confirms。
公平分发
你可能已经注意到分发还是不能完全的达到我们想要的效果。例如:有这么一种情形,对于两个工作者,基数任务很繁重,偶数任务很轻松,一个工作者就会不间断的工作而另一个将几乎不做什么任务。然而RabbitMQ并不知道这些,依然在这样分发消息。
发生这种情况的原因是RabbitMQ只是单纯的当消息发送到队列后将消息进行分发。它并不关心消费者未确认的消息数量。它只是盲目的将每N个消息发送给n个消费者。
我们可以使用basicQos
方法设置prefetchCount = 1
防止这样的失败。这个将告知RabbitMQ不要在同一时间将很多消息给消费者。换句话说,在前一个消息完成并确认之前不要再将新的消息分发给这个消费者。而是将消息分发到下一个不忙的消费者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意:队列大小
如果所有的消息者都在忙,你的队列又使用完了。你需要关注这个,也许需要增加更多的消费者,或者其他的策略。
以下是所有的代码整合
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列例子
*/
public class NewTask {
private static final String QUEUE_NAME = "task.queue";
private static final String[] strings = {
"First message.",
"Second message..",
"Third message...",
"Fourth message....",
"Fifth message....."
};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
String[] messages = strings;
for (String message: messages){
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] Send '" + message + "'");
}
channel.close();
connection.close();
}
}
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列消费端
*/
public class Worker {
private static final String QUEUE_NAME = "task.queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
//只接受一个未确认的消息(见下文)
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
}finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动确认设置为false
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String message) {
for (char ch : message.toCharArray()){
if (ch == '.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
下个教程我们将学习怎么将相同的消息交付给多个消费者。