ActiveMQ(5.10.0) - hello world

时间:2023-02-12 08:21:50

Sending a JMS message

public class MyMessageProducer {
... // 创建连接工厂实例
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"); Connection conn = null;
try {
// 取得连接对象实例
conn = connFactory.createConnection();
// 启动连接
conn.start();
// 创建会话对象实例
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建消息目的地
Destination destination = session.createQueue("hello_queue");
// 创建消息生产者
MessageProducer msgProducer = session.createProducer(destination);
// 创建消息对象实例
Message textMsg = session.createTextMessage("This is a test message.");
// 发送消息
msgProducer.send(textMsg);
// 提交会话
session.commit();
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 关闭连接
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
} ...
}

Receiving a JMS message synchronously

public class MySynMessageConsumer {
... // 创建连接工厂实例
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"); Connection conn = null;
try {
// 取得连接对象实例
conn = connFactory.createConnection();
// 启动连接,当调用此方法后才能接收到消息
conn.start();
// 创建会话对象实例
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息目的地
Destination destination = session.createQueue("hello_queue");
// 创建消息消费者
MessageConsumer msgConsumer = session.createConsumer(destination); // 接收消息
TextMessage textMsg = (TextMessage) msgConsumer.receive(10 * 1000);
if (textMsg != null) {
System.out.println(textMsg.getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
} ...
}

Receiving a JMS message asynchronously

public class MyAsynMessageConsumer {
... // 创建连接工厂实例
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"); Connection conn = null;
try {
// 取得连接对象实例
conn = connFactory.createConnection();
// 启动连接,当调用此方法后才能接收到消息
conn.start();
// 创建会话对象实例
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息目的地
Destination destination = session.createQueue("hello_queue");
// 创建消息消费者
MessageConsumer msgConsumer = session.createConsumer(destination);
// 注册消息监听器
msgConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
TextMessage textMsg = (TextMessage) msg;
try {
System.out.println(textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(60 * 1000); } catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭连接
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
} ...
}

Others

  • 对于 Pub/Sub 模型,使用 session.createTopic 方法创建 Destination。
  • MessageConsumer 同步消费消息时,receive() 方法会阻塞线程直到接收到下一条消息;receive(long timeout) 方法在指定的时间内阻塞线程直到接收到下一条消息,如果超时,则返回 null 值;receiveNoWait() 方法立刻接收下一条消息,如果消息源中没有消息,则返回 null 值。