简单模式
我们以最普通的方式去理解,并没有整合Springboot的那种
这是最简单的模式,一个生产者,一个消费者,一个队列
测试
1、 导包,没整合,不需要编写配置
2、需要生产者消费者
- 导包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
- Producer
public class Producer {
public static void main(String[] args) {
//ip port
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//创建连接工程
connectionFactory.setHost("47.120.50.213");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//创建连接connection
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("producer");
//通过连接获取通道Channel
channel = connection.createChannel();
//通过创建交换机,声明队列,绑定关系,路由key,发送接收消息
String queueName = "queue";
/**
* 队列的名称
* 是否要持久化
* 排他性,是否独占独立
* 是否自动删除,在最后一个消费者消费完后
* 携带附属参数
*/
channel.queueDeclare(queueName,false,false,false,null);
String message = "hello world";
//发送消息到消息队列
channel.basicPublish("",queueName,null,message.getBytes());
System.out.println("消息发送成功");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}finally {
//关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
- Consumer
public class Consumer {
public static void main(String[] args) {
//ip port
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//创建连接工程
connectionFactory.setHost("47.120.50.213");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//创建连接connection
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("producer");
//通过连接获取通道Channel
channel = connection.createChannel();
//第一个是消息队列的名字
channel.basicConsume("queue", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery message) throws IOException {
System.out.println("收到的消息的是"+new String(message.getBody(),"UTF-8"));
}
},new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接收消息失败");
}
}
);
System.out.println("开始接收消息");
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}finally {
//关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
总结
代码流程
-
上述消息没有设置为持久化
-
没持久化,消息创建了依旧存在,除非服务器重启,就会删除
-
持久化,服务器重启后都不会删除
-
发送消息
-
channel.queueDeclare(queueName,false,false,false,null); String message = "hello world"; //发送消息到消息队列 channel.basicPublish("",queueName,null,message.getBytes());
-
-
接收消息
-
channel.basicConsume("queue", true, new DeliverCallback() { @Override public void handle(String s, Delivery message) throws IOException { System.out.println("收到的消息的是"+new String(message.getBody(),"UTF-8")); } },new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("接收消息失败"); } } );
-
问题
1、连接超时
这里可能是NO access ,点击admin修改
命令方式给用户分配权限
rabbitmqctl set_permissions -p / admin '*' '.*' '.*' 给用户分配权限
发现并没有解决问题
- 访问的端口时5672,因为15672是给web访问的所以需要访问5672
- 需要开通安全组与端口号,即5672,15672都需要开启
#开启端口
[root@iZf8zhsqf64x47n1tpdy6oZ rabbitmq]# firewall-cmd --zone=public --add-port=15672/tcp --permanent
#重启防火墙
firewall-cmd --reload
#需要开启远程安全组
思考
为什么基于channel而不是连接??????
一个应用有多个线程需要从rabbitmq中消费,或是生产消息,那么必然会建立很多个connection ,也就是多个tcp连接,对操作系统而言,建立和销毁tcp连接是很昂贵的开销
,如果遇到使用高峰,性能瓶颈也随之显现,rabbitmq采用类似nio的做法,连接tcp连接复用,不仅可以减少性能开销,同时也便于管理