Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。
有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。
Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
生产者:
// 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 发送消息 for (String severity : routingKeys) { String message = "Send the message level: " + severity; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } 消费者 // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity); }
Fanout exchange: 会向响应的queue广播。
生产者 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发消息 for (int i=0; i<5; i++) { String message = "Hello World!" + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } 消费者 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");
Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。
生产者 // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 待发送的消息 String routingKeys[] = new String[]{ "quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit" }; // 发送消息 for (String severity : routingKeys) { String message = "From "+severity+" routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println("TopicSend [x] Sent '" + severity + "':'" + message + "'"); } 消费者 // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String routingKeys[] = new String[] { "*.orange.*" }; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey); }
匿名: 直接发送到queue。
生产者 for (int i=0; i<5; i++) { String message = "hello world! " + i; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } 消费者 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
github练习代码:https://github.com/m2492565210/rabbitmq