RabbitMQ初学之二:直接发送消息到队列

时间:2021-04-15 17:38:52

一. 背景

  总前提:队列无论是在生产者声明还是在消费者声明,只有声明了,才能在RabbitMQ的管理界面看到该队列

  生产者直接发送消息到队列,消费者直接消费队列中的消息,而不用指定exchange并绑定。这种需求下,分三种情况:① 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列;② 生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息;③ 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。下面分别进行说明:

1. 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列,但是此时,声明队列的一方要先运行,否则消费者连不上队列,要报错

  ① 生产者代码

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE1";  
 8  
 9     public static void main(String[] args) throws IOException {  
10         ConnectionFactory factory = new ConnectionFactory();  
11         factory.setHost("localhost");
12         factory.setUsername("guest");
13         factory.setPassword("guest");
14         factory.setPort(5672);
15         Connection connection = factory.newConnection();  
16         Channel channel = connection.createChannel();  
17  
18         // 声明队列
19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
20         String message = "Hello World!";  
21         
22         // 发行消息到队列
23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
24         System.out.println(" [x] Sent '" + message + "'");  
25  
26         channel.close();  
27         connection.close();  
28     }  
29 }

  2. 消费者

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE1";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setUsername("guest");
14         factory.setPassword("guest");
15         factory.setHost("localhost");
16         factory.setPort(5672);  
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
22         
23         QueueingConsumer consumer = new QueueingConsumer(channel);  
24         
25         // 消费者不声明队列,直接从队列中消费
26         channel.basicConsume(QUEUE_NAME, true, consumer);  
27         while(true){  
28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
29             String message = new String(delivery.getBody(),"UTF-8");  
30             System.out.println(" 【[x] Received 】:" + message);  
31         }  
32     }  
33 }

2.  生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息,生产者可先运行)(不报错),但是发的消息无效(被丢弃),只有声明队列的一方运行后,在管理界面才能看到该队列

  ① 生产者

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE2";  
 8  
 9     public static void main(String[] args) throws IOException {  
10         ConnectionFactory factory = new ConnectionFactory();  
11         factory.setHost("localhost");
12         factory.setUsername("guest");
13         factory.setPassword("guest");
14         factory.setPort(5672);
15         Connection connection = factory.newConnection();  
16         Channel channel = connection.createChannel();  
17  
18         String message = "Hello World!";  
19         
20         // 发行消息到队列
21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
22         System.out.println(" [x] Sent '" + message + "'");  
23  
24         channel.close();  
25         connection.close();  
26     }  
27 }

 

  ② 消费者

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE2";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setUsername("guest");
14         factory.setPassword("guest");
15         factory.setHost("localhost");
16         factory.setPort(5672);  
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         // 声明队列
22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
24         
25         QueueingConsumer consumer = new QueueingConsumer(channel);  
26         
27         // 消费者不声明队列,直接从队列中消费
28         channel.basicConsume(QUEUE_NAME, true, consumer);  
29         while(true){  
30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
31             String message = new String(delivery.getBody(),"UTF-8");  
32             System.out.println(" 【[x] Received 】:" + message);  
33         }  
34     }  
35 }

 

3. 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。

  ① 生产者

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE2";  
 8  
 9     public static void main(String[] args) throws IOException {  
10         ConnectionFactory factory = new ConnectionFactory();  
11         factory.setHost("localhost");
12         factory.setUsername("guest");
13         factory.setPassword("guest");
14         factory.setPort(5672);
15         Connection connection = factory.newConnection();  
16         Channel channel = connection.createChannel();  
17  
18         // 声明队列
19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
20         String message = "Hello World!";  
21         
22         // 发行消息到队列
23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
24         System.out.println(" [x] Sent '" + message + "'");  
25  
26         channel.close();  
27         connection.close();  
28     }  
29 }

  ② 消费者

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE2";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setUsername("guest");
14         factory.setPassword("guest");
15         factory.setHost("localhost");
16         factory.setPort(5672);  
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         // 声明队列
22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
24         
25         QueueingConsumer consumer = new QueueingConsumer(channel);  
26         
27         // 消费者不声明队列,直接从队列中消费
28         channel.basicConsume(QUEUE_NAME, true, consumer);  
29         while(true){  
30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
31             String message = new String(delivery.getBody(),"UTF-8");  
32             System.out.println(" 【[x] Received 】:" + message);  
33         }  
34     }  
35 }