JavaEE(7) - JMS消息事务和异常

时间:2022-11-07 23:49:53

1. 使用事务性Session为消息增加事务(NetBeans创建java project: TxSession)

MessageSender.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties; public class MessageSender { public void sendMessage() throws NamingException, JMSException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageQueue");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
conn.setExceptionListener(new ExceptionListener(){
public void onException(javax.jms.JMSException e){
}
});
//JMS连接创建JMS会话
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息生产者
MessageProducer sender = session.createProducer(dest);
//设置消息生产者生产出来的消息的传递模式、有效时间。
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setTimeToLive(20000);
//通过JMS会话创建一个文本消息
TextMessage msg = session.createTextMessage();
//设置消息内容
msg.setText("Hello");
//发送消息
sender.send(msg);
int a = 4 / 0;
msg.setText("Welcome to JMS");
//再次发送消息
sender.send(msg);
//关闭资源
session.close();
conn.close();
} //工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参加(4)
} public static void main(String[] args) throws Exception {
MessageSender sender = new MessageSender();
sender.sendMessage();
}
}

SyncConsumer.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties; public class SyncConsumer { public void receiveMessage() throws JMSException, NamingException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageQueue");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//启动JMS连接,让它开始传输JMS消息
conn.start();
//JMS连接创建JMS会话
Session session = conn.createSession(false/*不是事务性会话*/, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息消费者
MessageConsumer receiver = session.createConsumer(dest);
//同步接收消息,如果没有接收到消息,该方法会阻塞线程
TextMessage msg = (TextMessage) receiver.receive();
System.out.println(msg);
System.out.println("同步接收到的消息:" + msg.getText());
//关闭资源
session.close();
conn.close();
} //工具方法,用来获取命名服务的Context对象 private Context getInitialContext() {
// 参看(4)
} public static void main(String[] args) throws Exception {
SyncConsumer consumer = new SyncConsumer();
consumer.receiveMessage();
}
}

2. 使用JTA全局事务为消息增加事务(NetBeans创建java project: JTA)

MessageSender.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;
import javax.transaction.*; public class MessageSender { public void sendMessage() throws Exception {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageQueue");
//通过JNDI查找获取JTA事务管理器
UserTransaction tx = (UserTransaction) ctx.lookup("javax.transaction.UserTransaction");
tx.begin();
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//JMS连接创建JMS会话
Session session = conn.createSession(false/*使用事务性会话*/, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息生产者
MessageProducer sender = session.createProducer(dest);
//设置消息生产者生产出来的消息的传递模式、有效时间。
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setTimeToLive(20000);
//通过JMS会话创建一个文本消息
TextMessage msg = session.createTextMessage();
//设置消息内容
msg.setText("Hello");
//发送消息
sender.send(msg);
int a = 4 / 0;
msg.setText("Welcome to JMS");
//再次发送消息
sender.send(msg);
//此处还可执行JDBC操作、EJB等事务操作
//...
//提交事务
tx.commit();
//关闭资源
session.close();
conn.close();
} //工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
} public static void main(String[] args) throws Exception {
MessageSender sender = new MessageSender();
sender.sendMessage();
}
}

SyncConsumer.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties; public class SyncConsumer { public void receiveMessage() throws JMSException, NamingException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageQueue");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//启动JMS连接,让它开始传输JMS消息
conn.start();
//JMS连接创建JMS会话
Session session = conn.createSession(false/*不是事务性会话*/, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息消费者
MessageConsumer receiver = session.createConsumer(dest);
//同步接收消息,如果没有接收到消息,该方法会阻塞线程
TextMessage msg = (TextMessage) receiver.receive();
System.out.println(msg);
System.out.println("同步接收到的消息:" + msg.getText());
//关闭资源
session.close();
conn.close();
} //工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
} public static void main(String[] args) throws Exception {
SyncConsumer consumer = new SyncConsumer();
consumer.receiveMessage();
}
}

3. 监听JMS服务器上的异常

MessageSender.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties; public class MessageSender { public void sendMessage() throws NamingException, JMSException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageQueue");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
conn.setExceptionListener(new ExceptionListener(){
public void onException(javax.jms.JMSException e){
}
});
//JMS连接创建JMS会话
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息生产者
MessageProducer sender = session.createProducer(dest);
//设置消息生产者生产出来的消息的传递模式、有效时间。
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setTimeToLive(20000);
//通过JMS会话创建一个文本消息
TextMessage msg = session.createTextMessage();
//设置消息内容
msg.setText("Hello");
//发送消息
sender.send(msg);
int a = 4 / 0;
msg.setText("Welcome to JMS");
//再次发送消息
sender.send(msg);
//关闭资源
session.close();
conn.close();
} //工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
} public static void main(String[] args) throws Exception {
MessageSender sender = new MessageSender();
sender.sendMessage();
}
}

在单个JMS会话期间,如果需要使用事务性的操作,则应该使用事务性会话。但其他资源如数据库、EJB操作等则不能参与事务性的会话。如果需要让JMS、数据库或EJB操作等都参与到事务中,则应该考虑使用JTA全局事务。