1 为什么要使用消息中间件
说到消息中间件,就不得不说它最具代表性的三个特点,那就是解耦、异步和削峰。
如果在系统中用户成功登录系统之后,此时会调用短信服务,通知用户登录成功,然后再调用积分系统,给用户的账号增加积分,接着再调用日志系统,把登录的行为保存日志中,如果在调用某一个系统的时候调用失败,那么可能造成整个业务操作回滚,这种情况就是多个系统之间耦合度太高导致的。像前面说的这种情况,无形之中会增加用户登录的耗时时间,因为用户并不想了解登录的时候系统到底调用了哪些服务,而只是单纯的想登录系统,所以,这个时候就需要加上消息中间件,当用户登录的时候,把登录的消息放到消息中间件里,然后消息中间件返回给客户端登录是否成功的结果,接着消息中间件会异步的把消息发送给需要调用的系统,这样就可以节省下来许多等待的时间。
2 JMS规范
定义:中文全称为Java消息服务,是一个Java平台中面向消息中间件的API,用于在两个系统之间或者是分布式系统中发送或接受消息,实现异步通信。
相关概念:
提供者:实现JMS规范的消息中间件服务器
客户端:发送或接受消息的应用程序
生产者/发布者:创建并发送消息的客户端
消费者/订阅者:接受并处理消息的客户端
消息:应用程序之间传递的数据内容
消息模式:在客户端之间传递消息的方式,JMS定义了主题(发布/订阅)和队列(点对点)两种模式
队列模式:① 客户端包括生产者和消费者 ② 队列中的消息只能被一个消费者所消费 ③ 消费者可以随时消费队列中的消息
主题模式: ①客户端包括发布者和订阅者 ② 主题中的消息可以被多个消费者消费 ③ 消费者不能消费订阅之前就发送到主题中的消息
编码接口:①ConnectionFactory:用于创建连接到消息中间件的连接工厂② Connection:代表应用程序和消息服务器之间的应用链路 ③ Destination:指消息发布和接收的地点,包括队列和主题④Session:表示一个单线程的上下文,用于发送和接收消息
3 队列模式
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huyj.spring-jms</groupId> <artifactId>spring-jms</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spring.version>4.2.5.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> <!--排除activemq-core中引入的spring-context--> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> </dependencies> </project>
//1.创建ConnectionFactory ConnectionFactory connetionFactory = new ActiveMQConnectionFactory(url); //2.创建connection Connection connection = connetionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createQueue(queueName); //6.创建一个生产者 MessageProducer messageProducer = session.createProducer(destination); //设置生产者的模式,有两种可选,默认情况下是支持持久化的 //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存 //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 //messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for(int i=0;i<100;i++){ //7.创建消息 TextMessage textMessage = session.createTextMessage("test"+i); //8.发布消息 messageProducer.send(textMessage); logger.info("队列消息发送:"+textMessage.getText()); } //9.关闭连接 connection.close();
//1.创建ConnectionFactory ConnectionFactory connetionFactory = new ActiveMQConnectionFactory(url); //2.创建connection Connection connection = connetionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建一个目标 Destination destination = session.createQueue(queueName); //6.创建一个生产者 MessageConsumer messageConsumer = session.createConsumer(destination); //7.创建一个监听器 // 消息的监听是一个异步的过程 messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try{ logger.info("接收消息:"+textMessage.getText()); }catch (JMSException e){ e.printStackTrace(); } } }); //8.关闭连接 connection.close();
4 主题模式
只需要修改创建目标处的代码:session.createQueue改为session.createTopic即可
5 Spring集成JMS连接ActiveMQ
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <!--开启注解的配置 针对类似@Resource这样的注解--> <context:annotation-config/> <!--ActiveMQ为我们提供的ConnectionFactory--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.112.141:61616"/> </bean> <!--spring jms为我们提供的连接池--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"></property> </bean> <!--一个队列目的地,点对点的--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue-spring"/> </bean> <!--一个主题目的地,发布订阅模式--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic-spring"/> </bean> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <import resource="common.xml"/> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> <bean class="com.huyj.jms.producer.impl.ProducerServiceImpl"/> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!--导入公共配置--> <import resource="common.xml"/> <!--配置消息监听器--> <bean id="consumerMessageListener" class="com.huyj.jms.consumer.ConsumerMessageListener"/> <!--配置消息监听容器--> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="consumerMessageListener"/> </bean> </beans>
public class ProducerService{ @Autowired private JmsTemplate jmsTemplate; //用于发送和接收消息的模板 @Resource(name = "queueDestination") private Destination destination; public void sendMessage(final String message) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); return textMessage; } }); System.out.println("发送消息:"+message); } }
public class AppProducer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml"); ProducerService service = context.getBean(ProducerService.class); for(int i=0;i<100;i++){ service.sendMessage("test"+i); } context.close(); } }
public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
public class AppConsumer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml"); } }