RabbitMQ入门-竞争消费者模式

时间:2022-12-07 18:47:31

上一篇讲了个 哈喽World,现在来看看如果存在多个消费者的情况。

 

生产者:

package com.example.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 竞争消费者模式
*/
public class CompetingSend {

private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory
= new ConnectionFactory(); // 连接工厂
factory.setHost("localhost");
Connection connection
= factory.newConnection(); // 获取连接
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,
false, false, false, null); // 声明队列,只有他不存在的时候创建
String msg = "Hello World!";
// 发送多条消息
for (int i = 0; i < 5; i++){
channel.basicPublish(
"", QUEUE_NAME, null, (msg + "-" + i).getBytes());
System.out.println(
"Sending:" + msg);
}

channel.close();
connection.close();

}
}

 

消费者:

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 一个生产者,多个消费者
*/
public class CompetingReceiveA {

private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory
= new ConnectionFactory(); // 连接工厂
factory.setHost("localhost");
Connection connection
= factory.newConnection(); // 获取连接
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,
false, false, false, null); // 声明队列,只有他不存在的时候创建

Consumer consumer
= new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String recv
= new String(body, "UTF-8");
System.out.println(
"Receive:" + recv);
try {
doWork(recv);
}
catch (InterruptedException e) {
e.printStackTrace();
}
finally {
System.out.println(
"Done");
}
}
};

// true代表接收到消息后,给兔子发消息,让这条消息失效
channel.basicConsume(QUEUE_NAME, true, consumer);
}

// 模拟每条消息处理时间不一样
private static void doWork(String msg) throws InterruptedException {
char c = msg.charAt(msg.length() - 1);
for (int i = 0; i < Integer.parseInt(c+""); i++)
Thread.sleep(
1000);
}

}

 

先启动两个消费者,再启动生产者,查看控制台:

消费者A

RabbitMQ入门-竞争消费者模式

消费者B

RabbitMQ入门-竞争消费者模式

生产者(这里不必有疑问,这里打印的是修改之前的消息)

RabbitMQ入门-竞争消费者模式

 

 要说明的是什么观点呢?

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。一般来说,每个消费者得到的消息是一样多。但是,并不是说每个消费者的任务重量是平均的。很有可能出现A总在处理耗时任务,B一直吃西瓜的情况。

因为兔子不知道每个消息的耗时,他就会傻傻的派遣任务。

 不过,官方也有解决办法。

为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount = 1这告诉RabbitMQ不要向消费者发送多于一条消息。换句话说,在它处理并确认了前一个消息之前,不要向工作人员发送新消息。

如果当前消费者正在忙碌(没有确认消息),它会将其分派给空闲下一个消费者。

 

int prefetchCount = 1;
channel.basicQos(prefetchCount);