ActiveMQ实战(二)-代码示例

时间:2021-05-30 20:34:24

在ActiveMQ 解压缩后的目录如下:

ActiveMQ实战(二)-代码示例


各个目录说明如下:

  • bin:ActiveMQ的启动脚本
  • conf:ActiveMQ的所有配置文件
  • data:日志文件及持久性消息数据
  • docs:ActiveMQ官方文档
  • examples:ActiveMQ官方提供的demo
  • lib:ActiveMQ运行所需的library
  • webapps:ActiveMQ的Web控制台
  • webapps-demo:ActiveMQ webapps相关的demo
  • activemq-all-5.13.2.jar:ActiveMQ CLI jar包,用于用户系统调用

其中,examples是ActiveMQ官方提供的samples,初学者可以通过研究这些samples熟悉ActiveMQ的使用。

本文将通过ActiveMQ 实现一个简单的Publish-Subscribe 例子。

1、Producer

package com.ricky.codelab.activemq.amqp;

import java.io.IOException;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.JmsConnectionFactory;
import com.ricky.codelab.activemq.util.PropertyUtils;

public class ProducerDemo {

    private int messages = 10000;

    public static void main(String[] args) {

        try {
            new ProducerDemo().publish();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void publish() throws JMSException, IOException{

        Properties props = PropertyUtils.load("activemq.properties");

        String type = props.getProperty("activemq.type", "topic:event");

        String user = props.getProperty("activemq.username", "admin");
        String password = props.getProperty("activemq.password", "password");
        String server = props.getProperty("activemq.server", "localhost:5672");

        String connectionURI = "amqp://" + server;

        JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

        Connection connection = factory.createConnection(user, password);
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String[] arr = type.split(":");
        Destination destination = null;
        if (arr[0].equals("topic")) {
            destination = session.createTopic(type.split(":")[1]);
        } else {
            destination = session.createQueue(type.split(":")[1]);
        }

        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for (int i = 1; i <= messages; i++) {
            TextMessage msg = session.createTextMessage("#:" + i);
            msg.setIntProperty("id", i);
            producer.send(msg);
            if ((i % 1000) == 0) {
                System.out.println(String.format("Sent %d messages", i));
            }
        }

        producer.send(session.createTextMessage("SHUTDOWN"));
        try {
            Thread.sleep(1000 * 3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        connection.close();
    }
}

2、Consumer

package com.ricky.codelab.activemq.amqp;

import java.io.IOException;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.JmsConnectionFactory;
import com.ricky.codelab.activemq.util.PropertyUtils;

public class ConsumerDemo {

    public static void main(String[] args) {

        try {
            new ConsumerDemo().subscribe();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void subscribe() throws IOException, JMSException{

        Properties props = PropertyUtils.load("activemq.properties");

        String type = props.getProperty("activemq.type", "topic:event");

        String user = props.getProperty("activemq.username", "admin");
        String password = props.getProperty("activemq.password", "password");
        String server = props.getProperty("activemq.server", "localhost:5672");

        String connectionURI = "amqp://" + server;

        JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

        Connection connection = factory.createConnection(user, password);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String[] arr = type.split(":");

        Destination destination = null;
        if (arr[0].equals("topic")) {
            destination = session.createTopic(arr[1]);
        } else {
            destination = session.createQueue(arr[1]);
        }

        MessageConsumer consumer = session.createConsumer(destination);
        long start = System.currentTimeMillis();
        long count = 1;
        System.out.println("Waiting for messages...");
        while (true) {
            Message msg = consumer.receive();
            if (msg instanceof TextMessage) {
                String body = ((TextMessage) msg).getText();
                if ("SHUTDOWN".equals(body)) {
                    long diff = System.currentTimeMillis() - start;
                    System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
                    connection.close();
                    try {
                        Thread.sleep(10);
                    } catch (Exception e) {}
                    System.exit(1);
                } else {
                    try {
                        if (count != msg.getIntProperty("id")) {
                            System.out.println("mismatch: " + count + "!=" + msg.getIntProperty("id"));
                        }
                    } catch (NumberFormatException ignore) {
                    }

                    if (count == 1) {
                        start = System.currentTimeMillis();
                    } else if (count % 1000 == 0) {
                        System.out.println(String.format("Received %d messages.", count));
                    }
                    count++;
                }

            } else {
                System.out.println("Unexpected message type: " + msg.getClass());
            }
        }
    }
}

点此下载完成代码。


参考资料:
https://activemq.apache.org/initial-configuration.html
http://activemq.apache.org/examples.html