一. 背景
总前提:队列无论是在生产者声明还是在消费者声明,只有声明了,才能在RabbitMQ的管理界面看到该队列
生产者直接发送消息到队列,消费者直接消费队列中的消息,而不用指定exchange并绑定。这种需求下,分三种情况:① 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列;② 生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息;③ 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。下面分别进行说明:
1. 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列,但是此时,声明队列的一方要先运行,否则消费者连不上队列,要报错
① 生产者代码
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer {
private final static String QUEUE_NAME = "QUEUE1"; public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!"; // 发行消息到队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'"); channel.close();
connection.close();
}
}
2. 消费者
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Reqv { private final static String QUEUE_NAME = "QUEUE1"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672); Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 消费者不声明队列,直接从队列中消费
channel.basicConsume(QUEUE_NAME, true, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" 【[x] Received 】:" + message);
}
}
}
2. 生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息,生产者可先运行)(不报错),但是发的消息无效(被丢弃),只有声明队列的一方运行后,在管理界面才能看到该队列
① 生产者
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer {
private final static String QUEUE_NAME = "QUEUE2"; public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); String message = "Hello World!"; // 发行消息到队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'"); channel.close();
connection.close();
}
}
② 消费者
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Reqv { private final static String QUEUE_NAME = "QUEUE2"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672); Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 消费者不声明队列,直接从队列中消费
channel.basicConsume(QUEUE_NAME, true, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" 【[x] Received 】:" + message);
}
}
}
3. 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。
① 生产者
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer {
private final static String QUEUE_NAME = "QUEUE2"; public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!"; // 发行消息到队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'"); channel.close();
connection.close();
}
}
② 消费者
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Reqv { private final static String QUEUE_NAME = "QUEUE2"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672); Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); // 消费者不声明队列,直接从队列中消费
channel.basicConsume(QUEUE_NAME, true, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" 【[x] Received 】:" + message);
}
}
}