RabbitMQ是消息代理。从本质上说,它接受来自生产者的信息,并将它们传递给消费者。在两者之间,它可以根据你给它的路由,缓冲规则进行传递消息。
一、专业术语
1. 生产者:
在现实生活中就好比制造商品的工厂,他们是商品的生产者。生产者只意味着发送。发送消息的程序称之为一个生产者。我们用“P”表示:
2. 队列:
队列就像存放商品的仓库或者商店,是生产商品的工厂和购买商品的用户之间的中转站。队列就像是一个仓库或者流水线。在RabbitMQ中,信息流从你的应用程序出发,来到RabbitMQ的队列,所有信息可以只存储在一个队列中。队列可以存储很多的消息,因为它基本上是一个无限制的缓冲区,前提是你的机器有足够的存储空间。多个生产者可以将消息发送到同一个队列中,多个消费者也可以只从同一个队列接收数据。这就是队列的特性。队列用下面的图表示,图上面是队列的名字:
3. 消费者
消费者就好比是从商店购买或从仓库取走商品的人,消费的意思就是接收。消费者是一个程序,主要是等待接收消息。我们的用“C”表示
注意:
生产者,消费者和队列(RabbitMQ)不必部署在同一台机器上。实际在生产环境的大多数应用中,他们都是分开部署的。
二、“Hello World”
1. 说明
在本教程中,我们我们通过2个java程序,一个发送消息的生产者,和一个接收信息并打印的消费者。想要了解rabbitmq,必须了解一些最基础的内容,我们先从一些代码片段来了解产生信息和接收消息的流程和方法。在编写代码前,我们先来用户一张图表示要做的事,在下图中,“P”是我们的生产者,“C”是我们的消费者。在中间红色框是代表RabbitMQ中的一个消息队列。箭头指向表示信息流的方向。
2.maven依赖
1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>3.5.7</version> 5 </dependency>
3. 消息生产者
1 package com.luchao.rabbitMQ.helloworld; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 /** 11 * 生产者 12 * Created by andpay on 17/5/31. 13 */ 14 public class Producer { 15 16 private static final String QUEUE_NAME = "hello"; 17 18 public static void main(String[] args) throws IOException, TimeoutException { 19 //创建连接工厂 20 ConnectionFactory connectionFactory = new ConnectionFactory(); 21 //设置RabbitMQ地址 22 connectionFactory.setHost("localhost"); 23 //创建一个新的连接 24 Connection connection = connectionFactory.newConnection(); 25 //创建一个频道 26 Channel channel = connection.createChannel(); 27 //声明一个队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响 28 // 均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。 29 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 30 String message = "Hello World!"; 31 //发送消息到队列中 32 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8")); 33 System.out.println("P [x] Sent '" + message + "'"); 34 //关闭频道、关闭连接 35 channel.close(); 36 connection.close(); 37 } 38 }
4. 消息消费者
1 package com.luchao.rabbitMQ.helloworld; 2 3 import com.rabbitmq.client.*; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 /** 9 * 消费者 10 * Created by andpay on 17/5/31. 11 */ 12 public class Consumer { 13 14 private static final String QUEUE_NAME = "hello"; 15 16 public static void main(String[] args) throws IOException, TimeoutException { 17 //创建连接工厂 18 ConnectionFactory connectionFactory = new ConnectionFactory(); 19 //设置设置RabbitMQ地址 20 connectionFactory.setHost("localhost"); 21 //创建一个新的连接 22 Connection connection = connectionFactory.newConnection(); 23 //创建频道 24 Channel channel = connection.createChannel(); 25 //声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一 26 // 次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。 27 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 28 System.out.println("C [*] Waiting for messages. To exit press CTRL+C"); 29 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){ 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 String message = new String(body, "utf-8"); 33 System.out.println("C [x] Received '" + message + "'"); 34 } 35 }; 36 //自动回复队列应答 -- RabbitMQ中的消息确认机制 37 channel.basicConsume(QUEUE_NAME, true, consumer); 38 } 39 }
5. 运行测试
启动rabbitMQ,运行生产者程序。运行结果。
C [*] Waiting for messages. To exit press CTRL+C
运行消费者程序,运行结果。
C [*] Waiting for messages. To exit press CTRL+C C [x] Received 'Hello World!'
打开rabbitMQ的web控制台看到:
Connection
channels
exchanges
以amp开头的是rabbitMQ自己的exchange,我们用的是默认的exchange,在代码中没有指定exchange。
queues
我们指定的queue为hello