rabbit工作队列模式

时间:2024-12-31 00:03:20

工作队列比简单队列在消费者这边多了一个方法。

channel.basicQos(1);公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)

生产者:

 package com.kf.queueDemo.fairQueue;

 import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 公平模式发送消息
* @author kf
*
*/
public class FairProducer { //队列名称
private static String QUEUENAME = "SIMPLEQUEUE"; public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = RabbitConnectionUtils.getConnection(); //创建通道
Channel channel = connection.createChannel(); //通道里放入队列
/**
* 第一个参数是 队列名称
* 第二个参数指 要不要持久化
*/
channel.queueDeclare(QUEUENAME, false, false, false, null); /* //消息体
String mes = "demo_message汉字"; //发送消息
*//**
* 参数为 exchange, routingKey, props, body
* exchange 交换机
* routingKey 路由键
*
* body 消息体
*//*
channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/ /**
* 集群环境下,多个消费者情况下。消费者默认采用均摊
*/
for(int i=1; i<11; i++){
String mes = "demo_message汉字"+i;
System.out.println("发送消息"+mes);
channel.basicPublish("", QUEUENAME, null, mes.getBytes());
} // System.out.println("发送消息"+mes); channel.close();
connection.close();
} }

消费者1:

 package com.kf.queueDemo.fairQueue;

 import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* 公平模式消费者
* @author kf
*
*/
public class FairConsumer1 {
//队列名称
private static String QUEUENAME = "SIMPLEQUEUE"; public static void main(String[] args) throws IOException, TimeoutException{
System.out.println("01开始接收消息");
Connection connection = RabbitConnectionUtils.getConnection(); //创建通道
final Channel channel = connection.createChannel(); //通道里放入队列
/**
* 第一个参数是 队列名称
* 第二个参数指 要不要持久化
*/
channel.queueDeclare(QUEUENAME, false, false, false, null); //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){
//监听队列
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {try {
Thread.sleep(500);
} catch (Exception e) {
}finally {
System.out.println("------------进入监听---------");
String s = new String(body, "utf-8");
System.out.println("获取到的消息是:"+s);
//手动应答。
/**
* 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}}
}; //设置应答模式
/**
* 参数: 对列名,是否自动签收,监听的类
*/
System.out.println("获取消息的方法之前");
channel.basicConsume(QUEUENAME, false, consumer);
System.out.println("获取消息的方法之后"); } }

消费者2:

package com.kf.queueDemo.fairQueue;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* 公平模式消费者
* @author kf
*
*/
public class FairConsumer2 {
//队列名称
private static String QUEUENAME = "SIMPLEQUEUE"; public static void main(String[] args) throws IOException, TimeoutException{
System.out.println("02开始接收消息");
Connection connection = RabbitConnectionUtils.getConnection(); //创建通道
final Channel channel = connection.createChannel(); //通道里放入队列
/**
* 第一个参数是 队列名称
* 第二个参数指 要不要持久化
*/
channel.queueDeclare(QUEUENAME, false, false, false, null); //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){
//监听队列
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (Exception e) {
}finally {
System.out.println("------------进入监听---------");
String s = new String(body, "utf-8");
System.out.println("获取到的消息是:"+s);
//手动应答。
/**
* 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式
*/
channel.basicAck(envelope.getDeliveryTag(), false);
} }
}; //设置应答模式
/**
* 参数: 对列名,是否自动签收,监听的类
*/
System.out.println("获取消息的方法之前");
channel.basicConsume(QUEUENAME, false, consumer);
System.out.println("获取消息的方法之后"); } }

相关文章