rabbitmq消息消费者

时间:2021-01-08 14:39:08

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloudparent</artifactId>
        <groupId>com.cxy</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitMqConsumer</artifactId>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
    </dependencies>

</project>

消费者代码:

package com.cxy.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/***
 * @ClassName: Consumer1
 * @Description:
 * @Auther: cxy
 * @Date: 2019/3/24:11:37
 * @version : V1.0
 */
public class Consumer1 {
    private  static  final  String Queue="helloworld";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory= new ConnectionFactory();
        //设置连接地址
        connectionFactory.setHost("192.168.230.134");
        //设置端口
        connectionFactory.setPort(5672);
        //设置密码用户名
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟机,每个虚拟机相当于一个小的mq
        connectionFactory.setVirtualHost("/");
        Connection connection =null;
        try {
            //建立连接
            connection = connectionFactory.newConnection();
            //建立通道,生产着和消费者都是在通道中完成
            Channel channel = connection.createChannel();
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
             参数1,声明队列
             参数2 是否持久化
             参数3 是否排他,是否独战连接,队列只允许该链接中访问,如果连接关闭,队列也就删除了
             参数4:是否自动删除,如果将此参数设置true,那么就变成零时队列
             参数5 :扩展参数,例如存活时间
           */
            channel.queueDeclare(Queue,true,false,false,null);
          /*
         String basicConsume(String queue, boolean autoAck, Consumer callback)
         参数一:队列名称
         参数二:自动回复
         参数三 消费者方法
          */
            DefaultConsumer defaultConsumer=new DefaultConsumer(channel) {
                //当接受到消息时候,此方法被调用
                /**
                * @Author cxy
                * @Description //TODO
                * @Date  2019/3/24
                * @Param [consumerTag, envelope, properties, body]
                * @return void
                 *
                 * consumerTag 用来标识.可以再监听队列时候设置
                 * envelope 信封,通过envelope可以通过这个获取到很多东西
                 * properties 额外的消息属性
                 * body:消息体
                **/
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //获取交换机
                    String exchange = envelope.getExchange();
                    //消息id,用来表示那个消息消费了
                    long deliveryTag = envelope.getDeliveryTag();
                    String message=new String(body,"utf-8");
                    System.out.println("receive");
                }
            };
           channel.basicConsume(Queue,true ,defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

由于注释内容都写得很详细就没有单独写文字了.运行之后可以发掘管控台中消息没有了,

rabbitmq消息消费者

在正式开发中不会使用这种原生得代码去使用,会采用springboot去整合相关内容,至于以上代码为什么还要去监听队列,防止如果队列不存在,程序会存在异常,所以这样,在正式开发中

会采用手动会签得方式,如果,没有会签,就会进行消息重试.保证消息不会丢失