消息中间件(四)

时间:2023-01-02 04:24:14

一,AMQP的经典实现 RabbitMQ

1.安装

a.下载

i.下载Erlang:
http://www.erlang.org/downloads/19.2

消息中间件(四)
ii.下载Windows版RabbitMq:
http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6.exe

b.配置

ERLANG_HOME    C:\Program Files\erl8.2
path下添加   %ERLANG_HOME%\bin

RABBITMQ_BASE  C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6      
path下添加  %RABBITMQ_BASE%\sbin;%RABBITMQ_BASE%\ebin

2.启动

a.cmd进入rabbitMQ的sbin目录,执行 rabbitmq-server.bat  启动rabbitMQ服务

 

消息中间件(四)

b.访问web控制台:http://localhost:15672/#/     用户名:guest  密码:guest

消息中间件(四)

e. 如果无法访问,说明没安装插件,在sbin目录下执行: 

rabbitmq-plugins.bat enable rabbitmq_management 



3.java客户端

a.导包

客户端Jar包和源码包下载地址:
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0.jar
http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.0.0/amqp-client-5.0.0-sources.jar
还需要slf4j-api-1.6.1.jar

如果是Maven工程加入:
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>

注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的*仓库查)的版本

b.生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class DirectProducer {


    private final static String EXCHANGE_NAME = "direct_logs";


    public static void main(String[] args) throws IOException, TimeoutException {


        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");




        Connection connection = factory.newConnection();//连接
		Channel channel = connection.createChannel();//信道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//交换器
        String[]serverities = {"error","info","warning"};//路由键名称
		//将信道设置为发送方确认
        channel.confirmSelect();
		
		//异步确认
		//deliveryTag代表了(channel)唯一的投递
        //multiple:false
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }


            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }
        });
		
		//1、mandatory参数为true,投递消息时无法找到一个合适的队列 消息返回给生产者 false 丢弃消息(缺省)
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("msg:"+msg);
            }
        });
		
		//发送消息
        for(int i=0;i<3;i++){
            String server = serverities[i];
            String message = "Hello world_"+(i+1);
			//通过信道,发送消息到EXCHANGE_NAME交换器上,绑定到server路由键(null参数为持久化参数)
            channel.basicPublish(EXCHANGE_NAME,server,true,message.getBytes());
			
            System.out.println("Sent "+server+":"+message);
			//同步确认模式
			//if (channel.waitForConfirms()){
              //  System.out.println(ROUTE_KEY+":"+msg);
            //}


        }


        channel.close();
        connection.close();
    }


}

c.消费者

import com.rabbitmq.client.*;


import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class ConsumerAll {
	private static final String EXCHANGE_NAME = "direct_logs";


	public static void main(String[] argv) throws IOException,
			InterruptedException, TimeoutException {


		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection connection = factory.newConnection();//连接
		Channel channel = connection.createChannel();//信道
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//交换器
		//声明随机队列
		//String queueName = channel.queueDeclare().getQueue();
		//声明自定义队列
		String queueName = "consumer_confirm";
		channel.queueDeclare(queueName,false,false,
				false,null);//队列名称,是否持久化,是否私有化,是否自动删除,队列参数。
		String[]serverities = {"error","info","warning"};
		for(String server:serverities){
			//队列和交换器的绑定
			//将队列绑定到EXCHANGE_NAME交换器上的server路由器上。
			channel.queueBind(queueName,EXCHANGE_NAME,server);
		}
		System.out.println("Waiting message.......");
		//监听器
		Consumer consumerA = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
									   AMQP.BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body,"UTF-8");
				System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
				//消费者自行确认(消费者确认机制)
				this.getChannel().basicAck(envelope.getDeliveryTag(),false);//投递标识符,是否批量恢复
			}
		};
		//消费者消费消息
		//消费queueName上的消息
		//true表示自动确认(消费者确认机制)
		//consumerA 消费者消费消息后的回调函数
		channel.basicConsume(queueName,false,consumerA);//false不需要自动确认。
	}
}

4.与spring集成

a.配置

i,使用Maven构建一个标准的Spring+SpringMVC的工程
ii,  在pom.xml中增加

 <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.0.0.RELEASE</version>
</dependency>


iii.增加命名空间

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:mvc="http://www.springframework.org/schema/mvc"
	   xmlns:tx="http://www.springframework.org/schema/tx"
	   xmlns:jee="http://www.springframework.org/schema/jee"
	   xmlns:p="http://www.springframework.org/schema/p"
	   xmlns:aop="http://www.springframework.org/schema/aop"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:task="http://www.springframework.org/schema/task"
	   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
	   	http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/context
		http://www.springframework.org/schema/context/spring-context-4.0.xsd
		http://www.springframework.org/schema/jee
		http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
		http://www.springframework.org/schema/mvc
		http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
		http://www.springframework.org/schema/tx
		http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
		http://www.springframework.org/schema/aop
		http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
		http://www.springframework.org/schema/task
		http://www.springframework.org/schema/task/spring-task-4.0.xsd
		http://www.springframework.org/schema/rabbit
		http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">


iv.生产者配置

	<!-- rabbitMQ配置 -->
	<bean id="rabbitConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="127.0.0.1"/>
        <property name="username" value="guest"/>
        <property name="password" value="guest"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
    </bean>
    <!--Spring的rabbitmq admin-->
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

	<!--生产者创建队列-->
    <rabbit:queue name="p_create_queue" durable="false"/>

	<!--fanout交换器-->
    <rabbit:fanout-exchange name="fanout-exchange"
        xmlns="http://www.springframework.org/schema/rabbit" durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="p_create_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

	<!--topic交换器-->
    <rabbit:topic-exchange name="topic-exchange"
         xmlns="http://www.springframework.org/schema/rabbit" durable="false">
    </rabbit:topic-exchange>
	<!-- rabbitTemplate 消息模板类 -->

    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
    </bean>

v. 消费者配置


b.生产者代码

c.消费者代码


5.与springBoot集成

a.配置

i.  新建一个SpringBoot工程
ii. 增加Maven依赖

<dependency> 
  <groupId>org.springframework.boot</groupId>  
  <artifactId>spring-boot-starter-web</artifactId> 
</dependency>
<dependency> 
  <groupId>org.springframework.boot</groupId>  
  <artifactId>spring-boot-starter-amqp</artifactId> 
</dependency>


6.零碎知识

a.rabbitMQ的集群

i. 概述

    rabbitMq内建集群可以使得客户端在节点崩溃的情况下可以运行,利用线性扩展来扩充消息的吞吐量.rabbitMQ的集群并不能保证消息的万无一失,当一个节点崩溃了以后,节点所有队列上的消息都会丢失。默认不会将队列的消息在集群中复制。队列在集群中不会被复制,其他节点只会保存队列所处的节点和元数据,消息传递给所有拥有该队列的节点。

i.本机集群

ii.多机集群


b.rabbitMQ的镜像队列

i.概述

如果RabbitMQ集群是由多个broker节点构成的,那么从服务的整体可用性上来讲,该集群对于单点失效是有弹性的,但是同时也需要注意:尽管exchange和binding能够在单点失效问题上幸免于难,但是queue和其上持有的message却不行,这是因为queue及其内容仅仅存储于单个节点之上,所以一个节点的失效表现为其对应的queue不可用。

引入RabbitMQ的镜像队列机制,将queue镜像到cluster中其他的节点之上。在该实现下,如果集群中的一个节点失效了,queue能自动地切换到镜像中的另一个节点以保证服务的可用性。在通常的用法中,针对每一个镜像队列都包含一个master和多个slave,分别对应于不同的节点。slave会准确地按照master执行命令的顺序进行命令执行,故slave与master上维护的状态应该是相同的。除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave们,故看似从镜像队列中的消费操作实际上是在master上执行的。

RabbitMQ的镜像队列同时支持publisherconfirm和事务两种机制。在事务机制中,只有当前事务在全部镜像queue中执行之后,客户端才会收到Tx.CommitOk的消息。同样的,在publisher confirm机制中,向publisher进行当前message确认的前提是该message被全部镜像所接受了。

ii.使用

添加policy

Rabbitmqctl set_policy Name Pattern Definition
Name 策略的名字
Pattern 队列匹配模式(正则表达式)
Definition

镜像的定义(ha-mode,ha-params,ha-sycn-mod)

ha-mode

镜像队列的模式 all/exactly/nodes

ha-params 表示几个节点上复制/节点名称
ha-sycn-mode automaticmanual

eg:对队列名称以“queue_”队列进行镜像,只在两个节点上完成复制

Rabbitmqctlset_policy ha_queue_two “^queue_” ‘{“ha-mode”:”exactly”,”ha-params”:2,”ha-sycn-mode“:“atuomatic”}’

代码:



c.浅谈互联网时代的消息中间件


https://blog.csdn.net/dervish0927/article/details/80034986