在ActiveMQ 解压缩后的目录如下:
各个目录说明如下:
- bin:ActiveMQ的启动脚本
- conf:ActiveMQ的所有配置文件
- data:日志文件及持久性消息数据
- docs:ActiveMQ官方文档
- examples:ActiveMQ官方提供的demo
- lib:ActiveMQ运行所需的library
- webapps:ActiveMQ的Web控制台
- webapps-demo:ActiveMQ webapps相关的demo
- activemq-all-5.13.2.jar:ActiveMQ CLI jar包,用于用户系统调用
其中,examples是ActiveMQ官方提供的samples,初学者可以通过研究这些samples熟悉ActiveMQ的使用。
本文将通过ActiveMQ 实现一个简单的Publish-Subscribe 例子。
1、Producer
package com.ricky.codelab.activemq.amqp;
import java.io.IOException;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.JmsConnectionFactory;
import com.ricky.codelab.activemq.util.PropertyUtils;
public class ProducerDemo {
private int messages = 10000;
public static void main(String[] args) {
try {
new ProducerDemo().publish();
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public void publish() throws JMSException, IOException{
Properties props = PropertyUtils.load("activemq.properties");
String type = props.getProperty("activemq.type", "topic:event");
String user = props.getProperty("activemq.username", "admin");
String password = props.getProperty("activemq.password", "password");
String server = props.getProperty("activemq.server", "localhost:5672");
String connectionURI = "amqp://" + server;
JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);
Connection connection = factory.createConnection(user, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String[] arr = type.split(":");
Destination destination = null;
if (arr[0].equals("topic")) {
destination = session.createTopic(type.split(":")[1]);
} else {
destination = session.createQueue(type.split(":")[1]);
}
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 1; i <= messages; i++) {
TextMessage msg = session.createTextMessage("#:" + i);
msg.setIntProperty("id", i);
producer.send(msg);
if ((i % 1000) == 0) {
System.out.println(String.format("Sent %d messages", i));
}
}
producer.send(session.createTextMessage("SHUTDOWN"));
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
connection.close();
}
}
2、Consumer
package com.ricky.codelab.activemq.amqp;
import java.io.IOException;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.JmsConnectionFactory;
import com.ricky.codelab.activemq.util.PropertyUtils;
public class ConsumerDemo {
public static void main(String[] args) {
try {
new ConsumerDemo().subscribe();
} catch (IOException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void subscribe() throws IOException, JMSException{
Properties props = PropertyUtils.load("activemq.properties");
String type = props.getProperty("activemq.type", "topic:event");
String user = props.getProperty("activemq.username", "admin");
String password = props.getProperty("activemq.password", "password");
String server = props.getProperty("activemq.server", "localhost:5672");
String connectionURI = "amqp://" + server;
JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);
Connection connection = factory.createConnection(user, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String[] arr = type.split(":");
Destination destination = null;
if (arr[0].equals("topic")) {
destination = session.createTopic(arr[1]);
} else {
destination = session.createQueue(arr[1]);
}
MessageConsumer consumer = session.createConsumer(destination);
long start = System.currentTimeMillis();
long count = 1;
System.out.println("Waiting for messages...");
while (true) {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
if ("SHUTDOWN".equals(body)) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
connection.close();
try {
Thread.sleep(10);
} catch (Exception e) {}
System.exit(1);
} else {
try {
if (count != msg.getIntProperty("id")) {
System.out.println("mismatch: " + count + "!=" + msg.getIntProperty("id"));
}
} catch (NumberFormatException ignore) {
}
if (count == 1) {
start = System.currentTimeMillis();
} else if (count % 1000 == 0) {
System.out.println(String.format("Received %d messages.", count));
}
count++;
}
} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}
}
点此下载完成代码。
参考资料:
https://activemq.apache.org/initial-configuration.html
http://activemq.apache.org/examples.html