JavaEE(4) - JMS实现企业PTP消息处理

时间:2022-04-08 15:27:21

1. 在Weblogic服务器上配置PTP消息目的

配置持久化:

Services-->Persistence Stores-->New(Create FileStore, Create JDBCStore) (Name: crazyit)

配置JMS服务器:

Services-->Messaging-->JMS Servers-->New(Name: crazyitServer; Directory: D:\domains\base_domain\crazyit)

配置JMS模块:

Services-->Messaging-->JMS Modules-->New(Name: CrazyitModule)

为JMS模块配置子部署:

Services-->Messaging-->JMS Modules--><Module Name>-->Subdeployments-->New(Name: my-sub)

向JMS模块添加资源:

Services-->Messaging-->JMS Modules--><Module Name>-->Configuration-->New-->Queue(Name: MessageQueue)

2. 编写PTP消息的生产者

NetBeans创建java project(JmsPTP) (MessageSender.java)

需要wlclient.jar, webservices.jar, wljmsclient.jar

package lee;

import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException; public class MessageSender {
public void sendMessage() throws NamingException, JMSException
{
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory"; //获取JNDI服务所需的Context
Context ctx = getInitialContext(); ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);
Destination dest = (Destination)ctx.lookup("MessageQueue");
Connection conn = connFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer sender = session.createProducer(dest);
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setTimeToLive(20000); TextMessage msg = session.createTextMessage(); msg.setText("Hello");
sender.send(msg); msg.setText("Welcome to JMS");
sender.send(msg); session.close();
conn.close();
} private Context getInitialContext() {
final String INIT_FACTORY = "weblogic.jndi.WLInitialContextFactory";
final String SERVER_URL = "t3://localhost:7001";
Context ctx = null;
try
{
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY, INIT_FACTORY);
props.put(Context.PROVIDER_URL , SERVER_URL); ctx = new InitialContext(props);
}
catch(NamingException e)
{
System.err.println("不能连接WebLogic Server在:" + SERVER_URL);
e.printStackTrace();
}
return ctx;
} public static void main(String[] args) throws Exception
{
MessageSender sender = new MessageSender();
sender.sendMessage();
}
}

3. 编写PTP消息的同步接收者(SyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties; public class SyncConsumer {
public void receiveMessage() throws JMSException, NamingException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory"; //获取JNDI服务所需的Context
Context ctx = getInitialContext(); ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
Destination dest = (Destination) ctx.lookup("MessageQueue"); Connection conn = connFactory.createConnection();
conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = session.createConsumer(dest); TextMessage msg = (TextMessage) receiver.receive();
System.out.println(msg);
System.out.println("同步接收到的消息:" + msg.getText()); session.close();
conn.close();
} //工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
} public static void main(String[] args) throws Exception {
SyncConsumer consumer = new SyncConsumer();
consumer.receiveMessage();
}
}

4. 编写PTP消息的异步接收者(AsyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties; //JMS异步消费者就是一个监听器,故实现MessageListener接口
public class AsyncConsumer implements MessageListener {
public AsyncConsumer() throws NamingException, JMSException, InterruptedException {
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory"; Context ctx = getInitialContext(); ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
Destination dest = (Destination) ctx.lookup("MessageQueue");
Connection conn = connFactory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = session.createConsumer(dest); receiver.setMessageListener(this);
Thread.sleep(20000); session.close();
conn.close();
} //实现消息监听器必须实现的方法。
public void onMessage(Message m) {
TextMessage msg = (TextMessage) m;
System.out.println(msg);
try {
System.out.println("异步接收的消息:" + msg.getText());
}
catch (JMSException ex) {
ex.printStackTrace();
}
} //工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
} public static void main(String[] args) throws Exception {
AsyncConsumer consumer = new AsyncConsumer();
}
}