activemq 是 基于 jms 协议的 消息队列
消息 流程:
jsm 的 消息流程鼻尖简单 生产者发送消息到目的地,消费者 监听这个目的地,然后收到消息。 相比 amqp 的 消息流程简单很多。
producer:生产者,产生消息。
consumer:消费者
destination:目的地 jms 定义了两种 目的地,一种是queue 一种是 topic
queue:点对点的 消息队列,一个消息被发送到queue 里面 ,只会被一个消费者消费,并且只会被消费一次。queue 里面消息是持久的。 消费者上线以后可以慢慢消费积压 的消息。
topic: 发布订阅机制。发布到 topic 的消息 ,会被 这个当前topic 上面的每一个 消费者消费。并且topic 的消息不持久,没有在线的消费者永远收不到这条消息了。 类似于 amqp 的 fanout 。
spring boot 集成 的 activemq 消息发送的代码:
@Autowired
private JmsMessagingTemplate jmsTemplate; /**
* 把消息发送给mq
*
* @author ZHANGYUKUN
* @param message
* @throws Exception
*/
public void send(Message message) {
if (StringUtils.isEmpty(message.getDestination())) {
throw new RuntimeException("没有目的地");
} Destination destination = new ActiveMQQueue(message.getDestination());
jmsTemplate.convertAndSend(destination, message );
}
消费者的实现:
@Component
public class UserMessageConsumers { @Autowired
MessageService messageService; @Autowired
UserServiceImpl userService; @JmsListener(destination = "user-service-insert")
public void consume(Message message) {
User user = JSONObject.parseObject( message.getContent() , User.class ); if( userService.queryByUUID( user.getUuid() ) == null ) {
userService.insert( user );
} messageService.ack(message.getUuid());
} }
Maven 依赖:
<!-- mq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency> <!-- mq pool -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>