RabbitMQ封装实战

时间:2022-04-17 02:19:08

先说下背景:上周开始给项目添加曾经没有过的消息中间件。虽然说,一路到头非常容易,直接google,万事不愁可是生活远不仅是眼前的“苟且”。首先是想使用其他项目使用过的一套对mq封装的框架,融合进来。虽然折腾了上周六周日两天,总算吧老框架融进项目中了,可是周一来公司和大数据哥们儿一联调发现,收不到数据!所以没办法,当场使用原生那一套撸了个版本出来可是,可是,可是,俗话说得好:生命在于折腾!在上周末融合老框架的时候,我把源码读了遍,发现了很多很好的封装思想,Ok,这周末总算闲了下来,我就运用这个思想,封装一个轻量级的呗,说干就干!

主要思想

说到封装,我想,应该主要是要尽可能减小用户使用的复杂度,尽量少的进行配置,书写,甚至能尽量少的引入第三发或是原生类库。所以在这种想法之下,这套框架的精髓主要在以下几点:

  • 使用注解,减少用户配置
  • 将不同的生产者消费者的初始化方式统一
  • 初次注册生产者或者消费者的时候,进行队列的自动注册
  • 再统一的初始化方式中,使用动态代理的方式,代理到具体的生产者或是消费者的发送接收方法

在这种模式下,我们不用过多的配置,直接建立一个接口,接口上面使用注解声明队列的名称,然后使用同一的Bean进行初始化,就齐活了!

统一初始化Bean的实现

不说啥,直接上代码:


public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> { private Logger logger = LoggerFactory.getLogger(getClass()); private Class<?> serviceInterface; @Autowired
private ConnectionFactory rabbitConnectionFactory; @Value("${mq.queue.durable}")
private String durable; @Value("${mq.queue.exclusive}")
private String exclusive; @Value("${mq.queue.autoDelete}")
private String autoDelete; @SuppressWarnings("unchecked") /**
这个方法很特殊,继承自FactoryBean,就是说管理权归属IoC容器。每次注册一个队列的时候,并且注入到具体的service中使用的时候,就会调用这个getObject方法。所以,对于使用本类初始化的bean,其类型并非本类,而是本类的属性serviceInterface类型,因为最终getObject的结果是返回了一个动态代理,代理到了serviceInterface。
**/
@Override
public T getObject() throws Exception { //初始化
if (getQueueName() != null) {
logger.info("指定的目标列队名[{}],覆盖接口定义。", getQueueName());
} else {
RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class);
if (name == null)
throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "没有指定@RPCQueueName");
setQueueName(name.value());
}
//创建队列
declareQueue();
logger.info("建立MQ客户端代理接口[{}],目标队列[{}]。", serviceInterface.getCanonicalName(), getQueueName()); return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//动态代理到目标接口
} private void declareQueue() {
Connection connection = rabbitConnectionFactory.createConnection();
Channel channel = connection.createChannel(true);
try {
channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive)
, Boolean.valueOf(autoDelete), null);
logger.info("注册队列成功!");
} catch (IOException e) {
logger.warn("队列注册失败", e);
}
}
...... } public class RabbitMQProducerInterceptor implements InvocationHandler { private Logger logger = LoggerFactory.getLogger(getClass()); private String queueName; @Autowired
private AmqpTemplate amqpTemplate; @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object sendObj;
Class<?>[] parameterTypes = method.getParameterTypes();
String methodName = method.getName();
boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String);
if (isSendOneJson) {
sendObj = args[0];
logger.info("发送单一json字符串消息:{}", (String) sendObj);
} else {
sendObj = new RemoteInvocation(methodName, parameterTypes, args);
logger.info("发送封装消息体:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj));
} logger.info("发送异步消息到[{}],方法名为[{}]", queueName, method.getName());
//异步方式使用,同时要告知服务端不要发送响应
amqpTemplate.convertAndSend(queueName, sendObj);
return null; } ......
}

下面是核心的配置文件


<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans default-lazy-init="false"
xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <rabbit:connection-factory id="rabbitConnectionFactory"
host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}"
username="${mq.username}" password="${mq.password}" /> <!-- 供自动创建队列 -->
<rabbit:admin connection-factory="rabbitConnectionFactory" /> <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/> <!-- 创建生产者 -->
<bean id="sendMsg" class="com.example.demo.RabbitMQProducerFactoryBean">
<property name="serviceInterface" value="com.example.demo.ISendMsg" />
</bean> </beans>

说明:每次要使用mq,直接导入这个基本配置,和基础jar包即可。对于配置文件中的生产者声明,已经直接简化到三行,这一部分可以单独创建一个类似于producer-config.xml专门的配置文件。

附属类

这里主要就是涉及一个注解类:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RPCQueueName { String value();
}

说明:主要用于队列名称的声明。可以拓展的再建立其他的注解类,并在RabbitMQProducerFactoryBean中进行具体的逻辑实现。对于未来功能添加,起到了非常好的解耦效果。

具体的接口:

@RPCQueueName("test.demo.ISendMsg")
public interface ISendMsg { void sendMsg(String msg);
}

说明:这样,就声明了个队列名叫test.demo.ISendMsg的生产者,每次讲IsendMsg注入到要发送消息的Service里面,直接调用sendMsg即可向注解声明的队列发送消息了。

恩,开源

写了个springboot的小demo:

github地址

接下来我会更新消费者的封装,今天先放一放,出去动动。。哈哈

RabbitMQ封装实战的更多相关文章

  1. Appium python自动化测试系列之滑动函数封装实战&lpar;八&rpar;

    8.1 什么是函数的封装 教科书上函数的封装太官方,我们这里暂且将函数的封装就是为了偷懒把一些有共性的功能或者一些经常用的功能以及模块放在一起,方便我们以后再其他地方调用.这个只是个人的理解所以大家懂 ...

  2. 【NetCore】RabbitMQ 封装

    RabbitMQ 封装 代码 https://gitee.com/wosperry/wosperry-rabbit-mqtest/tree/master 参考Abp事件总线的用法,对拷贝的Demo进行 ...

  3. NET下RabbitMQ实践&lbrack;实战篇&rsqb;

    之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布.今天就介绍一下我们产品中如何使用RabbitMQ的!          在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能 ...

  4. springboot集成rabbitmq(实战)

    RabbitMQ简介RabbitMQ使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现(AMQP的主要特征是面向消息.队列.路由.可靠性.安全).支持多种客户端,如:Python.Ru ...

  5. Ubuntu16&period;04安装rabbitmq(实战)

    安装Erlang 由于RabbitMQ需要基于Erlang/OTP,所以在安装RabbitMQ之前需要先安装Erlang/OTP.同样的,在Ubuntu标准的repositories中,Erlang/ ...

  6. 干货!基于SpringBoot的RabbitMQ多种模式队列实战

    目录 环境准备 安装RabbitMQ 依赖 连接配置 五种队列模式实现 1 点对点的队列 2 工作队列模式Work Queue 3 路由模式Routing 4 发布/订阅模式Publish/Subsc ...

  7. 深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议

    前言 消息队列在现今数据量超大,并发量超高的系统中是十分常用的.本文将会对现时最常用到的几款消息队列框架 ActiveMQ.RabbitMQ.Kafka 进行分析对比.详细介绍 RabbitMQ 在 ...

  8. CentOS 7 安装RabbitMQ 3&period;3

    1.安装erlang 语言环境 安装依赖文件 #yum install ncurses-devel 进入 http://www.erlang.org/download.html 选择源文件下载 wge ...

  9. CentOS7 安装RabbitMQ

    第一.下载erlang和rabbitmq-server的rpm: http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos. ...

随机推荐

  1. web工程常见部署方式总结

    作为一个web测试工程师,对测试所属的平台架构,项目部署情况应该是有所了解的,下面在此基础上总结下web项目在各种场景下常用的部署方式: 第一种方法: 开发常用部署方法,直接在myeclipse里部署 ...

  2. EF中限制字段显示长度

    在EF中有些添加的字段 文本显示超多文字,想截取显示又没有截取功能. 怎么办? 我们可以在EF中类的属性中设置 你想限制这个用户名只能有10个字符长度 public String UserName { ...

  3. 403&period; Frog Jump

    做完了终于可以吃饭了,万岁~ 假设从stone[i]无法跳到stone[i+1]: 可能是,他们之间的距离超过了stone[i]所能跳的最远距离,0 1 3 7, 从3怎么都调不到7: 也可能是,他们 ...

  4. C&num;枚举器接口IEnumerator的实现

    原文(http://blog.csdn.net/phpxin123/article/details/7897226) 在C#中,如果一个类要使用foreach结构来实现迭代,就必须实现IEnumera ...

  5. 解决将&sol;etc&sol;passwd文件中1000改为0后只能guest进入系统的问题 &vert;&vert;ubuntu下将普通用户权限升级为root用户权限的方法;

    其实我现在才知道linux系统对于用户权限管理比较严,在ubuntu下系统不允许root权限的用户进入图像界面系统.由于之前没弄过权限这个东西瞬间掉坑了了. 我是想修改一下root下的nginx.co ...

  6. Button的几种常用的xml背景,扁平化,下划线,边框包裹,以及按压效果

    Button的几种常用的xml背景,扁平化,下划线,边框包裹,以及按压效果 分享下我项目中用到的几种Button的效果,说实话,还真挺好看的 一.标准圆角 效果是这样的 他的实现很简单,我们只需要两个 ...

  7. iBatis第一章:基础知识概述 &amp&semi; MVC思想

    一.java是一门十分受开发人员欢迎的语言,在开发语言排行榜中名列前茅,人们对其看法不尽相同,就我自身感受而言,我觉得java语言的主要优势体现在如下几方面:1.java属于开源语言,开发人员可以找到 ...

  8. 广州&period;NET微软技术俱乐部微信群各位技术大牛的blog

    1. .net core和微服务领域的张善友张队长和马洪喜,证明了.net core和微服务已经在各方面都不比java差2. Xamarin的卢建晖,证明了.net在移动开发领域也是很牛的.3. wi ...

  9. 动态规划———最长公共子序列(LCS)

    最长公共子序列+sdutoj2080改编: http://acm.sdut.edu.cn/onlinejudge2/index.php/Home/Contest/contestproblem/cid/ ...

  10. lua系列之 lua-cjson模块安装报错问题解决

    lua-cjson下载 下载地址 报错信息 [root@LeoDevops lua-cjson]# make cc -c -O3 -Wall -pedantic -DNDEBUG -I/usr/loc ...