前言
AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
RabbitMq与spring boot整合简单了解
引入springboot amqp包
<!-- amqp-->
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
username: admin
password: 123456
host: 192.168.56.128
port: 5672
listener:
simple:
concurrency: 5 #消费端最小并发数
max-concurrency: 10 #消费端最大并发数
prefetch: 5 #一次请求中预处理的消息数量
cache:
channel:
size: 50 # 缓存的channel数量
一、发布订阅
/**
* Create by Administrator on 2018/10/12
* Exchange三种模式配置
* @author admin
*/
@Configuration
public class RabbitMqExchangeConfig {
/**
* 广播交换器
* @return
*/
@Bean
public FanoutExchange fanout(){
return new FanoutExchange("");
}
private static class FanoutConfig {
//AnonymousQueue类型的队列,它的名字是由客户端生成的,而且是非持久的,独占的,自动删除的队列
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}
//队列和交换机绑定
//这种关系可以读作:这个队列对这个交换器里的消息感兴趣。
//虽然 Queue类型有多个实例,但spring会自动更加名字匹配,bean名字匹配参数名字
@Bean
public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
return (autoDeleteQueue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) {
return (autoDeleteQueue2).to(fanout);
}
}
}
@Service
public class Tut3Sender {
private Logger logger = ();
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
int dots = 0;
int count = 0;
public void send(){
StringBuilder builder = new StringBuilder("Hello");
if(dots++ == 3){
dots = 1;
}
for(int i =0;i<dots;i++){
(".");
}
((++count));
String message = ();
//向交换机发送信息
((),"", message);
(" [x] Sent '" + message + "'");
}
}
@RabbitListener 监听队列情况,属性queues为队列名数组
@Component
public class Tut3Receiver {
private Logger logger = ();
@RabbitListener(queues="#{}")
public void receive1(String in ) throws InterruptedException{
receive(in,1);
}
@RabbitListener(queues="#{}")
public void receive2(String in ) throws InterruptedException{
receive(in,2);
}
public void receive(String in, int receiver) throws InterruptedException {
StopWatch watch = new StopWatch();
();
("instance " + receiver + " [x] Received '" + in + "'");
doWork(in);
();
("instance " + receiver + " [x] Done in " + () + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : ()) {
if (ch == '.') {
(1000);
}
}
}
}
写一个Controller进行接口测试
@Controller
public class RabbitMqController {
//发布
@Autowired
private Tut3Sender tut3Sender;
@RequestMapping("/sendFanout")
@ResponseBody
private String sendFanout(){
for(int i =0;i<10;i++) {
();
}
return "ok";
}
}
二、路由
@Configuration
public class RabbitMqExchangeConfig {
/**
* 广播交换器
* @return
*/
@Bean
public FanoutExchange fanout(){
return new FanoutExchange("");
}
/**
* @Bean 通过使用静态类封闭
*/
private static class DirectConfig{
@Bean
public Queue autoDeleteQueue3() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue4() {
return new AnonymousQueue();
}
// orange ->queue3
// -> black ->queue3,queue4
// green ->queue4
//
@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue3) {
return (autoDeleteQueue3).to(direct).with("orange");
}
@Bean
public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue3) {
return (autoDeleteQueue3).to(direct).with("black");
}
@Bean
public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue4) {
return (autoDeleteQueue4).to(direct).with("green");
}
@Bean
public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue4) {
return (autoDeleteQueue4).to(direct).with("black");
}
}
}
@Service
public class Tut4Sender {
private Logger logger = ();
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
private int index;
private int count;
private final String[] keys = {"orange", "black", "green"};
/**
* 三次分发给不同的 key
*/
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (++ == 3) {
= 0;
}
String key = keys[];
(key).append(' ');
((++));
String message = ();
((), key, message);
(" [x] Sent '" + message + "'");
}
}
@Component
public class Tut3Receiver {
private Logger logger = ();
@RabbitListener(queues="#{}")
public void receive3(String in ) throws InterruptedException{
receive(in,3);
}
@RabbitListener(queues="#{}")
public void receive4(String in ) throws InterruptedException{
receive(in,4);
}
public void receive(String in, int receiver) throws InterruptedException {
StopWatch watch = new StopWatch();
();
("instance " + receiver + " [x] Received '" + in + "'");
doWork(in);
();
("instance " + receiver + " [x] Done in " + () + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : ()) {
if (ch == '.') {
(1000);
}
}
}
}
三、主题
@Configuration
public class RabbitMqExchangeConfig {
/**
* 主题交换器
* @return
*/
@Bean
public TopicExchange topic() {
return new TopicExchange("");
}
private static class TopicConfig{
@Bean
public Queue autoDeleteQueue5() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue6() {
return new AnonymousQueue();
}
// *.orange.* ->queue5
// -> *.*.rabbit->queue5
// lazy.# ->queue6
//星号匹配一个单词,哈希号匹配多个单词
@Bean
public Binding binding3a(TopicExchange topic,Queue autoDeleteQueue5){
return (autoDeleteQueue5).to(topic).with("*.orange.*");
}
@Bean
public Binding binding3b(TopicExchange topic,Queue autoDeleteQueue5){
return (autoDeleteQueue5).to(topic).with("*.*.rabbit");
}
@Bean
public Binding binding4a(TopicExchange topic,Queue autoDeleteQueue6){
return (autoDeleteQueue6).to(topic).with("lazy.#");
}
}
}
@Service
public class Tut5Sender {
private Logger logger = ();
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
private int index;
private int count;
private final String[] keys = {"",
"", "",
"", "", ""};
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (++ == ) {
= 0;
}
String key = keys[];
(key).append(' ');
((++));
String message = ();
((), key, message);
(" [x] Sent '" + message + "'");
}
}
@Component
public class Tut3Receiver {
private Logger logger = ();
@RabbitListener(queues="#{}")
public void receive5(String in ) throws InterruptedException{
receive(in,5);
}
@RabbitListener(queues="#{}")
public void receive6(String in ) throws InterruptedException{
receive(in,6);
}
public void receive(String in, int receiver) throws InterruptedException {
StopWatch watch = new StopWatch();
();
("instance " + receiver + " [x] Received '" + in + "'");
doWork(in);
();
("instance " + receiver + " [x] Done in " + () + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : ()) {
if (ch == '.') {
(1000);
}
}
}
}
参考:rabbit tutorials
spring amqp reference