JMS基础篇(二)

时间:2021-04-03 16:43:50

简介

  异构集成是消息发挥作用的一个领域,大型公司内部可能会遇到很多的平台,Java,.net或者公司自己的平台等。

传送消息还应该支持异步机制,以提高系统整体的性能。异步传输一条消息意味着,发送者不必等到接收者接收或者处理消息,可以接着做后续的处理。

应用程序发送消息至另外一个应用程序,需要使用到消息中间件。消息中间件应提供容错,负载均衡,可伸缩的事务性等特性。

JMS与JDBC类似,是一种与厂商无关的API。应用程序开发者可以使用同样的API来访问不同的系统。

可以认为JMS是一种标准,各消息中间件(MOM)是JMS的具体实现。常见的MOM包括WebSphere MQ,Sonic MQ,ActiveMQ等。

JMS系统机构

消息传递系统可以分为集中式和分散式两种。

集中式的消息系统依赖于一个消息服务器,或者称为消息路由器或者代理(broker)来进行消息的接收及分发。

集中式的消息系统的结构最常见的是星形结构。

JMS基础篇(二)

图1 集中式消息系统结构

分散式消息系统基于的是IP组播,整个结构又可以为分为多个组播组。每个组播组使用一个IP地址,客户端可以加入到一个或多个组播组。消息的传递不依赖于消息服务器,由网络自身来完成处理的。

JMS基础篇(二)

图2 分散式消息系统结构

消息传送模型分为2种,点对点式和发布/订阅式。

JMS基础篇(二)

图3 消息传输模型示意图

生产消息的客户端成为生产者,消费消息的客户端成为消费者。一个JMS的客户端可以既是生产者又是消费者。

点对点模型基于的是拉取(pull)或轮询(polling),消费者从队列中去取消息。发布/订阅基于的是推送(push),消息被主动地从生产者推送至消费者。

一个简单的例子

使用JMS编写一个简单的聊天程序,代码如下:   

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.NamingException; import jms.JndiFactoryForJMS; public class Chat implements MessageListener {
private TopicPublisher publisher;
private TopicSubscriber subscriber;
private TopicSession pubSession = null, subSession = null;
private TopicConnection connection = null; public Chat() throws JMSException, InterruptedException, NamingException {
TopicConnectionFactory factory = null; Context ctx = null;
try {
JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();
ctx = factoryForJMS.getJndiContext(); // 获取连接工厂。
factory = (TopicConnectionFactory) ctx.lookup("con1");
} catch (NamingException e) {
e.printStackTrace();
} // 创建连接
connection = factory.createTopicConnection(); // 建立session
pubSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE);
subSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE);
// 指定消息队列 Topic topic = (Topic) ctx.lookup("MyTopic"); publisher = pubSession.createPublisher(topic);
subscriber = pubSession.createSubscriber(topic, null, true); subscriber.setMessageListener(this);
// 建立连接
connection.start();
} public static void main(String[] srgs) throws JMSException, InterruptedException, NamingException, IOException, CloneNotSupportedException {
Chat chat = new Chat();
BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String s = commandLine.readLine();
if (s.equalsIgnoreCase("exit")) {
chat.close();
System.exit(-1);
} else {
chat.writeMessage(s);
}
} } @Override
public void onMessage(Message message) {
TextMessage mes = (TextMessage) message;
try {
System.out.println(mes.getText());
} catch (JMSException e) {
e.printStackTrace();
} } private void writeMessage(String text) throws JMSException {
TextMessage mes = pubSession.createTextMessage(text);
publisher.publish(mes);
} private void close() throws JMSException {
connection.close();
}
}

  

将程序启动多份,在任意一个程序的console窗口中输入信息,可以看到另外程序的console窗口中出现了所输入的内容,就可以说明另外的程序收到了消息并将消息并打印出了消息内容。

分析上面基于JMS的聊天程序:

上面的JMS聊天程序是基于JNDI的,未使用JNDI的服务器,使用的是,因此需要一个配置文件,需要和上面的类放在同一级目录下。

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

   java.naming.provider.url=tcp://localhost:61616

   java.naming.security.principal=system

java.naming.security.credentials=manager

connectionFactoryNames=con1,con2

##queue.MyQueue=MyQueue

topic.MyTopic=MyTopic

topic.topic1=jms.topic1

JndiFactoryForJMS是一个初始化JNDI环境的工厂类,代码如下:

import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JndiFactoryForJMS {
protected Context context = null; public void initalize() throws NamingException
{
Properties props = new Properties();
try{
org.apache.activemq.jndi.ActiveMQInitialContextFactory af = new org.apache.activemq.jndi.ActiveMQInitialContextFactory();
props.load(this.getClass().getResourceAsStream("jndi.properties"));
context = new InitialContext(props);
}catch(Exception ex){
ex.printStackTrace();
} } public Context getJndiContext() throws NamingException {
if(context == null){
initalize();
}
return context;
} }

  

分析代码中与JNDI相关的部分:

JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();

ctx = factoryForJMS.getJndiContext();

// 获取连接工厂。

factory = (TopicConnectionFactory) ctx.lookup("con1");