一,AMQP的经典实现 RabbitMQ
1.安装
a.下载
i.下载Erlang:
http://www.erlang.org/downloads/19.2
ii.下载Windows版RabbitMq:
http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6.exe
b.配置
ERLANG_HOME C:\Program Files\erl8.2 path下添加 %ERLANG_HOME%\bin RABBITMQ_BASE C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6 path下添加 %RABBITMQ_BASE%\sbin;%RABBITMQ_BASE%\ebin
2.启动
a.cmd进入rabbitMQ的sbin目录,执行 rabbitmq-server.bat 启动rabbitMQ服务
b.访问web控制台:http://localhost:15672/#/ 用户名:guest 密码:guest
e. 如果无法访问,说明没安装插件,在sbin目录下执行:
rabbitmq-plugins.bat enable rabbitmq_management
3.java客户端
a.导包
客户端Jar包和源码包下载地址:
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0.jar
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0-sources.jar
还需要slf4j-api-1.6.1.jar
如果是Maven工程加入:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的*仓库查)的版本
b.生产者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DirectProducer { private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器 String[]serverities = {"error","info","warning"};//路由键名称 //将信道设置为发送方确认 channel.confirmSelect(); //异步确认 //deliveryTag代表了(channel)唯一的投递 //multiple:false channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("Ack deliveryTag="+deliveryTag +"multiple:"+multiple); } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Ack deliveryTag="+deliveryTag +"multiple:"+multiple); } }); //1、mandatory参数为true,投递消息时无法找到一个合适的队列 消息返回给生产者 false 丢弃消息(缺省) channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); System.out.println("msg:"+msg); } }); //发送消息 for(int i=0;i<3;i++){ String server = serverities[i]; String message = "Hello world_"+(i+1); //通过信道,发送消息到EXCHANGE_NAME交换器上,绑定到server路由键(null参数为持久化参数) channel.basicPublish(EXCHANGE_NAME,server,true,message.getBytes()); System.out.println("Sent "+server+":"+message); //同步确认模式 //if (channel.waitForConfirms()){ // System.out.println(ROUTE_KEY+":"+msg); //} } channel.close(); connection.close(); } }
c.消费者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerAll { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection();//连接 Channel channel = connection.createChannel();//信道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器 //声明随机队列 //String queueName = channel.queueDeclare().getQueue(); //声明自定义队列 String queueName = "consumer_confirm"; channel.queueDeclare(queueName,false,false, false,null);//队列名称,是否持久化,是否私有化,是否自动删除,队列参数。 String[]serverities = {"error","info","warning"}; for(String server:serverities){ //队列和交换器的绑定 //将队列绑定到EXCHANGE_NAME交换器上的server路由器上。 channel.queueBind(queueName,EXCHANGE_NAME,server); } System.out.println("Waiting message......."); //监听器 Consumer consumerA = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("Accept:"+envelope.getRoutingKey()+":"+message); //消费者自行确认(消费者确认机制) this.getChannel().basicAck(envelope.getDeliveryTag(),false);//投递标识符,是否批量恢复 } }; //消费者消费消息 //消费queueName上的消息 //true表示自动确认(消费者确认机制) //consumerA 消费者消费消息后的回调函数 channel.basicConsume(queueName,false,consumerA);//false不需要自动确认。 } }
4.与spring集成
a.配置
i,使用Maven构建一个标准的Spring+SpringMVC的工程
ii, 在pom.xml中增加
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.0.RELEASE</version> </dependency>
iii.增加命名空间
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">
iv.生产者配置
<!-- rabbitMQ配置 --> <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="127.0.0.1"/> <property name="username" value="guest"/> <property name="password" value="guest"/> <property name="channelCacheSize" value="8"/> <property name="port" value="5672"></property> </bean> <!--Spring的rabbitmq admin--> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!--生产者创建队列--> <rabbit:queue name="p_create_queue" durable="false"/> <!--fanout交换器--> <rabbit:fanout-exchange name="fanout-exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="false"> <rabbit:bindings> <rabbit:binding queue="p_create_queue"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!--topic交换器--> <rabbit:topic-exchange name="topic-exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="false"> </rabbit:topic-exchange> <!-- rabbitTemplate 消息模板类 --> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="rabbitConnectionFactory"></constructor-arg> </bean>
v. 消费者配置
b.生产者代码
c.消费者代码
5.与springBoot集成
a.配置
i. 新建一个SpringBoot工程
ii. 增加Maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
6.零碎知识
a.rabbitMQ的集群
i. 概述
rabbitMq内建集群可以使得客户端在节点崩溃的情况下可以运行,利用线性扩展来扩充消息的吞吐量.rabbitMQ的集群并不能保证消息的万无一失,当一个节点崩溃了以后,节点所有队列上的消息都会丢失。默认不会将队列的消息在集群中复制。队列在集群中不会被复制,其他节点只会保存队列所处的节点和元数据,消息传递给所有拥有该队列的节点。
i.本机集群
ii.多机集群
b.rabbitMQ的镜像队列
i.概述
如果RabbitMQ集群是由多个broker节点构成的,那么从服务的整体可用性上来讲,该集群对于单点失效是有弹性的,但是同时也需要注意:尽管exchange和binding能够在单点失效问题上幸免于难,但是queue和其上持有的message却不行,这是因为queue及其内容仅仅存储于单个节点之上,所以一个节点的失效表现为其对应的queue不可用。
引入RabbitMQ的镜像队列机制,将queue镜像到cluster中其他的节点之上。在该实现下,如果集群中的一个节点失效了,queue能自动地切换到镜像中的另一个节点以保证服务的可用性。在通常的用法中,针对每一个镜像队列都包含一个master和多个slave,分别对应于不同的节点。slave会准确地按照master执行命令的顺序进行命令执行,故slave与master上维护的状态应该是相同的。除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave们,故看似从镜像队列中的消费操作实际上是在master上执行的。
RabbitMQ的镜像队列同时支持publisherconfirm和事务两种机制。在事务机制中,只有当前事务在全部镜像queue中执行之后,客户端才会收到Tx.CommitOk的消息。同样的,在publisher confirm机制中,向publisher进行当前message确认的前提是该message被全部镜像所接受了。
ii.使用
添加policy
Rabbitmqctl set_policy Name Pattern Definition
Name | 策略的名字 | ||||||
Pattern | 队列匹配模式(正则表达式) | ||||||
Definition |
镜像的定义(ha-mode,ha-params,ha-sycn-mod)
|
eg:对队列名称以“queue_”队列进行镜像,只在两个节点上完成复制
Rabbitmqctlset_policy ha_queue_two “^queue_” ‘{“ha-mode”:”exactly”,”ha-params”:2,”ha-sycn-mode“:“atuomatic”}’
代码:
c.浅谈互联网时代的消息中间件