spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统

时间:2021-08-30 08:06:14

前言:随着系统的业务功能不断增强,传统的单机、单任务,单线程的运行模式已经逐渐的被淘汰,取而代之的是分布式,多任务,多线程,当然,现在开源的这方面的框架也非常的多,大概的思想也都类似,下面就结合我这一年多的工作心得,分享一个简单易实现的分布式,多任务,多线程的异步任务处理系统的基本实现。

1.系统部署图

spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统

该系统主要由3部分构成,任务生产者集群,消息中间件集群,任务消费者集群,下面来分别说下这3部分的作用:

任务生产者集群:顾名思义,主要用来产生消息任务,并将这些消息任务发送到消息中间件集群中,任务生产者集群可能使用的是不同的开发语言,开发框架以及不同的开发平台。

消息中间件集群:消息中间件集群主要的作用是使生产者和消费者解耦,屏蔽各个异构系统之间的区别,以及保证消息的传送,超时重试和消息任务的负载均衡。并确保同一消息只被一个消费者消费。为了实现系统的高可用性,此处使用了集群模式,实现master-slave模式。

任务消费者集群:这个是我们的业务核心,通过消费消息中间件传送过来的消息,从而实现我们的业务功能需求。为了保证任务被及时的处理,我们会用到spring的线程池,来实现任务的异步调度。

2.系统设计

(1)为了保证系统的高可用性,消息中间件集群我们采用的是主备结构,配置文件如下:

[html] view plain copy print?spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
  1. <bean id=“targetConnectionFactory” class=“org.apache.activemq.ActiveMQConnectionFactory”>  
  2.         <property name=“brokerURL” value=“failover:(tcp://localhost:61616,tcp://localhost:61617)” />  
  3.     </bean>  
spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://localhost:61617)" />
</bean>
为了减少消息中间件由于频繁连接导致的性能消耗,会使用连接池,配置文件如下:

[html] view plain copy print?spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
  1. <!– 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化   
  2.         这样可以大大减少我们的资源消耗, –>  
  3.     <bean id=“pooledConnectionFactory” class=“org.apache.activemq.pool.PooledConnectionFactory”>  
  4.         <property name=“connectionFactory” ref=“targetConnectionFactory” />  
  5.         <property name=“maxConnections” value=“20” />  
  6.     </bean>  
spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
<!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化 
这样可以大大减少我们的资源消耗, -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="20" />
</bean>
(2),为了及时的监听消息,我们使用到了JMS中的MessageListener,当然,为了有更好的扩展性和灵活性,我们可以使用SessionAwareMessageListener以及MessageListenerAdapter来实现消息驱动POJO,示例代码如下:

[java] view plain copy print?spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
  1. public class ConsumerSessionAwareMessageListener implements  
  2.         SessionAwareMessageListener<TextMessage> {  
  3.       
  4.     private Destination destination;  
  5.   
  6.     @Override  
  7.     public void onMessage(TextMessage message, Session session)  
  8.             throws JMSException {  
  9.         try {  
  10.             String receiveMessage = ((TextMessage) message).getText();  
  11.            // 创建消息生产者,用来发送回复消息到回复队列里面  
  12.             MessageProducer producer = session.createProducer(destination);  
  13.             producer.send(session.createTextMessage(”消费者回复消息!”));  
  14.             System.out.println(”消费者收到的消息为:”+receiveMessage);  
  15.         } catch (JMSException e) {  
  16.             e.printStackTrace();  
  17.         }  
  18.     }  
  19. }  
spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
public class ConsumerSessionAwareMessageListener implements
SessionAwareMessageListener<TextMessage> {

private Destination destination;

@Override
public void onMessage(TextMessage message, Session session)
throws JMSException {
try {
String receiveMessage = ((TextMessage) message).getText();
// 创建消息生产者,用来发送回复消息到回复队列里面
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("消费者回复消息!"));
System.out.println("消费者收到的消息为:"+receiveMessage);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
(3)异步任务调度

对于接收到的消息会采用异步的任务调度结合线程池来处理,示例代码如下:

[java] view plain copy print?spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
  1. @SuppressWarnings({“unchecked”“rawtypes”})  
  2. public class ConsumerReceive implements MessageListener {  
  3.   
  4.     private CustomerServiceStrategyI strategy;  
  5.   
  6.     @Override  
  7.     @Async(“mqExecutor”)// 异步的任务处理  
  8.     public void onMessage(Message message) {  
  9.         System.out.println(”当前处理任务的线程为:” + Thread.currentThread().getName());  
  10.         if (message instanceof TextMessage) {  
  11.             strategy = new TextMessageStrategy();  
  12.             strategy.doService(message);  
  13.            message.acknowledge();// 客户端消息确认机制  
  14.         }  
  15.     }  
  16. }  
spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
@SuppressWarnings({"unchecked", "rawtypes"})
public class ConsumerReceive implements MessageListener {

private CustomerServiceStrategyI strategy;

@Override
@Async("mqExecutor")// 异步的任务处理
public void onMessage(Message message) {
System.out.println("当前处理任务的线程为:" + Thread.currentThread().getName());
if (message instanceof TextMessage) {
strategy = new TextMessageStrategy();
strategy.doService(message);
message.acknowledge();// 客户端消息确认机制
}
}
}
异步任务线程池的配置如下:
[html] view plain copy print?spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
  1. <task:annotation-driven/>  
  2.     <task:executor id=“mqExecutor” pool-size=“5-10” queue-capacity=“20000” keep-alive=“2000” rejection-policy=“CALLER_RUNS”/>  
spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
<task:annotation-driven/>
<task:executor id="mqExecutor" pool-size="5-10" queue-capacity="20000" keep-alive="2000" rejection-policy="CALLER_RUNS"/>
注意:@Async(“mqExecutor”)这个注解表示该方法会通过异步的方式来执行,会直接跳过主程序

(4)消息确认机制

为了保证消息或者是请求被至少处理一次,可以引入消息的确认机制,JMS总共为我们提供了3种确认机制,分别如下:

Auto_acknowledge:JMS客户端会自动向服务器发送确认消息,如果服务器没有接收到这个确认消息,就会认为该消息未被传送,并可能会试图重新发送。

Client_acknowledge:Auto_acknowledge模式中,确认总是隐式的在onMessage处理器返回之后发生,而Client_acknowledge则是由客户端控制何时发送确认,这样的话,可以保证接收消息的客户端能够实现对“保证消息传送”更细粒度的控制。当然,这种方式需要客户端来显示的发送,例如调用message.acknowledge();方法

Dups_OK_acknowledge:如果在会话上指定这种模式的话,JMS提供者可以将一条消息向统一目的地发送两次以上,这与前面两种模式的“一次且仅仅一次”的语义就不同了,用于可以接收重复消息的程序。

(5)消息策略

由于消息生产者可能生产的消息各不一样,例如TextMessageMapMessage等,可以根据不同的消息使用不同的策略,示例代码如下:

[java] view plain copy print?spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
  1. 策略接口:为了更好的兼容性,此处使用了泛型  
  2.   
  3. package com.chhliu.myself.activemq.start.async;  
  4.   
  5. public interface CustomerServiceStrategyI<P, V> {  
  6.     P doService(V message);  
  7. }  
  8.   
  9. 具体的策略类:  
  10. package com.chhliu.myself.activemq.start.async;  
  11.   
  12. import javax.jms.JMSException;  
  13. import javax.jms.TextMessage;  
  14.   
  15. public class TextMessageStrategy implements CustomerServiceStrategyI<User, TextMessage> {  
  16.   
  17.     @Override  
  18.     public User doService(TextMessage message) {  
  19.         try {  
  20.             String receiveMessage = message.getText();  
  21.             System.out.println(”消费者收到的消息为:”+receiveMessage);  
  22.             return null;  
  23.         } catch (JMSException e) {  
  24.         }  
  25.         return null;  
  26.     }  
  27. }  
spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
策略接口:为了更好的兼容性,此处使用了泛型

package com.chhliu.myself.activemq.start.async;

public interface CustomerServiceStrategyI<P, V> {
P doService(V message);
}

具体的策略类:
package com.chhliu.myself.activemq.start.async;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class TextMessageStrategy implements CustomerServiceStrategyI<User, TextMessage> {

@Override
public User doService(TextMessage message) {
try {
String receiveMessage = message.getText();
System.out.println("消费者收到的消息为:"+receiveMessage);
return null;
} catch (JMSException e) {
}
return null;
}
}
(6)负载均衡

由于消息消费者是以集群模式在运行,那么具体到每一条消息,该有哪台机器来消费了,这个就涉及到消息的负载均衡,在系统中,可以利用JMS提供者从消息源头上来实现,具体的负载均衡算法会因JMS提供商的不同而不同,但大概主流的几种算法如下:哈希散列算法,轮询调度算法,first-available均衡算法,使用的时候,需要查阅JMS提供商的文档来确定。

通过上面的这几步,就基本上实现了一个简单的分布式,多任务,多线程的异步任务处理系统,整体运行结果如下:

[java] view plain copy print?spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
  1. ================生产者创建了一条消息==============  
  2. ================生产者创建了一条消息==============  
  3. 当前处理任务的线程为:mqExecutor-2  
  4. 消费者收到的消息为:hello acticeMQ:my name is chhliu!fcb25e99-1181-48bb-963f-8d20a98829ab  
  5. 当前处理任务的线程为:mqExecutor-1  
  6. 消费者收到的消息为:hello acticeMQ:my name is chhliu!12578be1-56a4-4b41-a8d4-112031617525  
  7. ================生产者创建了一条消息==============  
  8. ================生产者创建了一条消息==============  
  9. 当前处理任务的线程为:mqExecutor-5  
  10. 消费者收到的消息为:hello acticeMQ:my name is chhliu!24d8f8c2-b172-4f7e-81e8-d87b73b7825b  
  11. 当前处理任务的线程为:mqExecutor-3  
  12. 消费者收到的消息为:hello acticeMQ:my name is chhliu!00dc0934-c521-42da-83fb-9f541f667ec9  
spring+ActiveMQ+JMS+线程池实现简单的分布式,多线程,多任务的异步任务处理系统
================生产者创建了一条消息==============
================生产者创建了一条消息==============
当前处理任务的线程为:mqExecutor-2
消费者收到的消息为:hello acticeMQ:my name is chhliu!fcb25e99-1181-48bb-963f-8d20a98829ab
当前处理任务的线程为:mqExecutor-1
消费者收到的消息为:hello acticeMQ:my name is chhliu!12578be1-56a4-4b41-a8d4-112031617525
================生产者创建了一条消息==============
================生产者创建了一条消息==============
当前处理任务的线程为:mqExecutor-5
消费者收到的消息为:hello acticeMQ:my name is chhliu!24d8f8c2-b172-4f7e-81e8-d87b73b7825b
当前处理任务的线程为:mqExecutor-3
消费者收到的消息为:hello acticeMQ:my name is chhliu!00dc0934-c521-42da-83fb-9f541f667ec9
其实上面的这个简单的系统,也可以当成简单的RPC来使用,如果要实现更细粒度的RPC的话,可以引进Netty等来实现,当然这些都是后话了。

转自:http://blog.csdn.net/liuchuanhong1/article/details/52326578