ActiveMQ做消息队列拦截功能
操作步骤
- 首先先到ActiveMQ官网下载最新的最稳定的版本 http://activemq.apache.org/activemq-5158-release.html 我下载的是Windows版本的
- 直接解压双击运行bin/win64/activemq.bat . 弹出黑窗口 (黑窗口不要关) 访问localhost:8161 如果出现页面 说明运行成功 登录密码为 admin /admin
- 测试向本地的ActiveMQ服务器发送消息
//1.创建连接工厂 默认接收消息的端口为61616
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616");
//2.创建连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象 发送的主题列
Topic topic = session.createTopic("test");
//6.创建消息生产者对象
MessageProducer producer = session.createProducer(topic);
//7.创建消息对象(文本消息)
TextMessage textMessage = session.createTextMessage("这是一条消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
- 查看监控页面 消息已经发送成功
- 默认的消息服务器是没有权限校验的 也就是说 谁都可以对消息队列服务器中的消息进行发送和监听 只要知道消息队列服务器的url
- 要想做权限校验 就需要对消息队列服务器做开发 来进行 权限的拦截 而ActiveMQ本身就支持本身做插件开发的
- 编写权限拦截器插件 请根据自己的需求编写校验
/**
* 因为不同的需求有着不同的校验方式
* @author GEP
* ActiveMQ消息拦截器
*/
public class ActivemqFilter extends BrokerFilter {
private static final Logger logger = LoggerFactory.getLogger(ActivemqFilter.class);
private final String PRODUCER_IDENTTIFICATION = "producer";
private final String CUSTOMER_IDENTTIFICATION="customer";
/**
* 消息生产者对应的标识
*/
private final String PRODUCER_NUMBER = "1";
/**
* 消息消费者对应的标识
*/
private final String CUSTOMER_NUMBER = "2";
/**
* 超级管理员对应的标识
*/
private final String SUPER_NUMBER = "0";
//用户 这里是封装成实体对象
private User user ;
//数据库连接 将用户的信息存到数据库方便进行管理
private JdbcTemplate jdbcTemplate;
public ActivemqFilter(Broker next,JdbcTemplate jdbcTemplate) {
super(next);
this.jdbcTemplate = jdbcTemplate;
}
/**
* 发送消息经过拦截器 这里是创建连接之后 对发送消息进行校验
* @param producerExchange
* @param messageSend
* @throws Exception
*/
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
//判断是否经过创建连接
if(this.getUser()==null){
throw new SecurityException("请先去登录");
}
//获得发送时传递过来的对列名称
String physicalName = messageSend.getDestination().getPhysicalName();
if(StringUtils.isBlank(physicalName)){
throw new SecurityException("请输入队列名称");
}
//校验用户是否有发送消息的权利
if(PRODUCER_NUMBER.equals(this.getUser().getActiveMQStatus())||SUPER_NUMBER.equals(this.getUser().getActiveMQStatus())){
//校验用户是否有对该队列的发送权利
if(!physicalName.equals(this.getUser().getActiveMQQueuesName())){
throw new SecurityException("你没有"+physicalName+"队列的发送权利");
}
//进行数据封装
String data = null;
//判断消息类型
//如果是文本类型的消息
if(messageSend instanceof ActiveMQTextMessage){
ActiveMQTextMessage message=(ActiveMQTextMessage)messageSend;
data = message.getText();
//如果是Map类型的消息
}else if(messageSend instanceof ActiveMQMapMessage){
ActiveMQMapMessage mapMessage = (ActiveMQMapMessage)messageSend;
data = mapMessage.getContentMap().toString();
}else{
throw new SecurityException("暂不支持该类型的消息");
}
//设置消息的发送结果
String status = null;
try {
super.send(producerExchange, messageSend);
//1表示发送成功
status="1";
}catch (Exception e){
//0表示发送失败
status="0";
}
//封装消息记录
...
//对数据库进行插入 将发送消息的记录 记录到数据库中
jdbcTemplate.update(...);
}else{
throw new SecurityException("您没有发送消息的权利");
}
}
/**
* 消费消息经过拦截器
* @param consumerExchange
* @param ack
* @throws Exception
*/
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
//判断是否经过创建连接
if(this.getUser()==null){
throw new SecurityException("请先去登录");
}
//判断队列名是否为空
String physicalName = ack.getDestination().getPhysicalName();
if(StringUtils.isBlank(physicalName)){
throw new SecurityException("请输入队列名称");
}
//判断用户是否有有接受消息的权利
if (CUSTOMER_NUMBER.equals(this.getUser().getActiveMQStatus()) || SUPER_NUMBER.equals(this.getUser().getActiveMQStatus())) {
//校验用户是否有对该队列的接受权利
if(!physicalName.equals(this.getUser().getActiveMQQueuesName())){
throw new SecurityException("你没有"+physicalName+"队列的接收权利");
}
//判断消息消费状态
String status = null;
try {
super.acknowledge(consumerExchange, ack);
status = "1";
} catch (Exception e) {
status = "0";
}
//封装消息记录
...
//对数据库进行插入 将发送消息的记录 记录到数据库中
jdbcTemplate.update(...);
} else {
throw new SecurityException("您没有接收消息的权利");
}
}
/**
* 创建链接的时候进行校验
* @param context
* @param info
* @throws Exception
*/
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
//获得连接是对方传的用户名和密码
String userName = info.getUserName().trim();
String password = info.getPassword().trim();
logger.info(userName+"请求连接");
//用户校验
auth(userName,password);
//创建连接
super.addConnection(context, info);
}
/**
* 用户校验 具体实现需要根据需求来进行编写 请不要直接复制粘贴
* @param userName
* @param password
*/
public void auth(String userName,String password){
//如果用户名密码为空
if(StringUtils.isBlank(userName)||StringUtils.isBlank(password)){
throw new SecurityException("用户名或密码不能为空");
}
//进行数据库查询
String sql = 查询自己数据库中的用户;
List<User> users = jdbcTemplate.query(sql, new UserMapper(), userName, password);
//如果没查到
if(isEmpty(users)){
throw new SecurityException("用户名或密码错误");
}
this.setUser(users.get(0));
//校验用户的连接权限 校验用户的时间段 校验用户的状态 这里需要自己来编写
...
}
public User getUser() {
return user;
}
public void setUser(User user) {
this.user = user;
}
/**
* 判断是否查到用户
*/
public boolean isEmpty(List list){
if(list==null){
return true;
}
return list.size()==0;
}
}
- 编写注册插件的注册类
/**
* @author GEP
* 自定义消息插件
*/
public class ActivemqPlugin implements BrokerPlugin {
//需要一个连接数据可的jdbcTemplate对象
private JdbcTemplate jdbcTemplate;
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public ActivemqPlugin(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public ActivemqPlugin() {
}
/**
* 注册插件
* @param broker
* @return
* @throws Exception
*/
public Broker installPlugin(Broker broker) throws Exception {
return new ActivemqFilter(broker,jdbcTemplate);
}
}
- 将编写好的代码打成jar包放到ActiveMQ的lib包下,还要放入数据库连接所需的jar包
- 编辑ActiveMQ conf/activemq.xml 文件
添加读取数据库连接配置db,properties
注册springjdbcTemplate
注册插件
11. 编写将db.properties文件 并将其放到conf 目录下
12.重启ActiveMQ 测试发送 (数据库中必须先有符合自己校验规则的用户对象)
先不带用户名密码的
直接报没有校验信息的错误 再试试错误的用户名密码试试
用户名密码错误
再试试正确的用户名密码 错误的队列名称
再试试正确的用户名正确的队列名
没有报错发送成功(有一条是没做权限之前发送的)
搞定