ActiveMQ 学习笔记

时间:2022-06-13 01:15:00
http://somebody-hjh.iteye.com/blog/726050
一、概述 

  Message,即消息。人与人之间通过消息传递信息。言语、眼神、肢体动作都可被视为消息体。当然还有我们经常用到的邮件、短信。计算机系统也由消息来主导运行。每一条指令的执行,每一个数据包的传递。软件系统间的合作也不例外,消息告诉各个系统应该怎样协作。事件处理机制,也是消息传送的过程。消息无处不在。 

  消息分为同步消息和异步消息。同步消息在接收到对方的返回前,需要挂起,直到返回或超时。异步消息只需要发送消息,不需要对方系统的立即反馈。 

  同步消息如java RPC调用,同步调用依赖于被调用方,如果被调用方失败或网络错误,那么程序就没办法继续执行下去,造成一个系统最薄弱的环节依赖于对方系统。而多个系统通过同步调用方式耦合在一起的时候,那么可靠性取决于最薄弱的一方系统。而异步调用能增强一个系统的健壮性。当然,不是任何情况都适合异步调用,还是那句话,能异步的地方,尽量异步。 

  异步消息,如同一个邮箱系统,我们把信件丢入邮桶,邮递员会更具上面的地址,送达到这封信要去的地方。邮箱和信件格式由邮局提供定义,比如邮箱需要有一个口子投递邮件,邮件需要有地址,邮政编码等等。而这些邮箱具体的加工和制作均交由各自的厂商来完成。
在java消息领域,我们也有一个称为消息中间件的东西,来提供这样一个服务。消息的发送、消费接口、消息体的格式等都由JMS来定义,而具体的实现由各个消息中间件厂商来实现。JMS是sun公司对于消息中间件的一个规范。对java领域里的消息起到举足轻重的作用。以前的消息交互,均各自实现一套格式,如同一个国家的人都用不同的方言跟另外来自不同省份的人交流一样。自从规范了普通话,我们的交流成本降低了。这也正如JMS规范在整个java消息领域的作用。 二、JMS简介 JMS1.1规范定义了一些概念和一组API,可以使得在我们利用消息系统的过程中,不依赖于各个厂商的具体实现,便能写出消息代码。由于不依赖具体厂商实现,这样的代码有很好的移植性。
JMS1.1定义了的部分概念:
1、JMS客户端:接收或发送消息的java系统
2、JMS消息体:系统间发送的消息体
3、JMS提供商:JMS规范实现厂商
4、JMS管理对象:预先配置好的用于JMS客户端的JMS对象。如ConnectionFactory跟JMS服务端的连接工厂,Destination 接收和发送消息的目标地址等
5、JMS Domain:JMS定义了两种域模型,一种是PTP(point-to-point)即点对点消息传输模型,一种是pub/sub(publish-subscribe)即发布订阅模型。
PTP通过一个先进先出的queue实现。很多人在这个PTP概念上有所误解,所谓点对点不是指生产者和消费者只有一个。PTP如下图所示:

ActiveMQ 学习笔记

我们可以看到,一个或多个生产者发送消息,消息m2先抵达了queue,然后m1也发出了,并一同存在于一个先进先出的queue里面。消费者也存在一个或多个,对queue里的消息进行消费。但消息被随机的一个消费者消费且仅消费一次。 

在pub/sub消息模型中,消息被广播给所有订阅者。如下图: 

ActiveMQ 学习笔记

ActiveMQ 学习笔记

ActiveMQ 学习笔记

点对点连接的实例代码:

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue; public class QueueSender { public static void main(String[] args) throws JMSException{
ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection=factory.createConnection();
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue=new ActiveMQQueue("queue.somebody");
MessageProducer producer=session.createProducer(queue);
for(int i=0;i<10;i++){
Message message=session.createTextMessage("Hello "+new Date());
producer.send(message);
}
session.close();
connection.close();
}
} import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue; public class QueueReceiver { public static void main(String[] args) throws JMSException {
// TODO Auto-generated method stub
ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection=factory.createConnection();
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue=new ActiveMQQueue("queue.somebody");
MessageConsumer receiver1=session.createConsumer(queue);
MessageConsumer receiver2=session.createConsumer(queue);
receiver1.setMessageListener(new QueueMessageListener("1"));
receiver2.setMessageListener(new QueueMessageListener("2"));
connection.start();
}
} import javax.jms.Message;
import javax.jms.MessageListener; public class QueueMessageListener implements MessageListener {
private String symbol;
public QueueMessageListener(String symbol){
this.symbol=symbol;
} @Override
public void onMessage(Message arg0) {
System.out.println(arg0.toString()+" from "+symbol+"!");
} }

订阅式的代码:

http://witcheryne.iteye.com/blog/836509