RabbitMQ 笔记-Exchanges

时间:2023-03-08 17:21:19
RabbitMQ 笔记-Exchanges

Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

  Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

        
生产者:
     // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 发送消息 for (String severity : routingKeys) { String message = "Send the message level: " + severity; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } 消费者 // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity); }

  Fanout exchange: 会向响应的queue广播。

生产者
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 分发消息
        for (int i=0; i<5; i++) {
            String message = "Hello World!" + i;
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

消费者
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
    

  Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

生产者
            // 声明一个匹配模式的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            // 待发送的消息
            String routingKeys[] = new String[]{
                "quick.orange.rabbit",
                "lazy.orange.elephant",
                "quick.orange.fox",
                "lazy.brown.fox",
                "quick.brown.fox",
                "quick.orange.male.rabbit",
                "lazy.orange.male.rabbit"
            };

            // 发送消息
            for (String severity : routingKeys) {
                String message = "From "+severity+" routingKey' s message!";
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                System.out.println("TopicSend [x] Sent '" + severity + "':'" + message + "'");
            }

消费者
        // 声明一个匹配模式的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        // 路由关键字
        String routingKeys[] = new String[] {
            "*.orange.*"
        };

        // 绑定路由关键字

        for (String bindingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
        }

  匿名: 直接发送到queue。

生产者
        for (int i=0; i<5; i++) {
            String message = "hello world! " + i;
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

消费者
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

github练习代码:https://github.com/m2492565210/rabbitmq