消息中间件ActiveMQ

时间:2023-01-25 18:42:35

消息中间件ActiveMQ

一、分布式系统架构

1.背景

随着互联网的快速发展和云服务概念的普及,使得我们在搭建系统的时候越来越倾向于分布式系统架构,那么分布式系统架构要涉及到哪些知识点以及我们如何来实现这个分布式系统架构。

2.Java项目的架构演变历程

单一的应用架构–>垂直应用架构–>分布式应用架构:

消息中间件ActiveMQ

①单一应用架构(传统的架构思维):
一个项目就是一个系统。

②垂直应用架构(一个系统拆分成多个子系统):
一定程度上解决了系统难以维护的问题,每个团队就负责自己的子系统,降低了维护成本,提升了开发效率。也解决了系统扩容的问题,随着业务的扩展,可以通过增加一个子系统来解决,不会影响其他子系统。同样也在一定程度上解决了系统访问进行分流的问题,根据子系统的不同性质和访问量进行合理的部署规划。但是垂直应用系统架构中各个子系统都存在着相同逻辑的业务代码(权限验证、日志记录、缓存、系统安全性、系统报表),不能共用,容易造成信息孤岛,重复造*。

③分布式架构(一个系统拆分成多个服务):
随着垂直应用架构的子系统越来越多,为了避免重复造*,我们将共用的核心业务抽取出来,作为单独的系统进行服务,各个子系统就可以重复调用,就演变成了分布式架构。架构一套大型的,成熟的分布式系统要涉及到许多中间件的应用和支撑系统,这些中间件和支撑统称为分布式系统的基础设施。

3.分布式系统的基础设施

①SpringBoot:微服务框架:简化配置,快速搭建开发框架
②Dubbo:分布式远程服务调用框架(RPC),与spring无缝集成
③ActiveMQ:支持JMS规范的分布式消息中间件。
④Zookeeper:分布式服务的配置和调度以及协调中心
⑤Redis:支持集群的内存数据库,可以快速的读取非结构化数据。
⑥Solr:垂直化搜索引擎,大型电商平台的站内搜索解决方案(基于lucene)
⑦Nginx:反向代理和负载均衡服务器
⑧Mysql:基于分布式系统的分库分表以及集群 mariaDB

二、消息中间件的定义和作用

消息中间件ActiveMQ

消息中间件是在分布式系统中完成消息发送和接收的基础软件。通过消息中间件,应用程序之间可以进行可靠的异步通讯,从而降低系统的耦合度,提高系统的可扩展性和可用性。

应用程序A:消息生产者
应用程序B:消息消费者

1.JMS的概念

JMS(Java Message Server):即java消息服务,他是一套基于java平台关于消息服务的一套接口,主要用于两个或多个应用程序之间或者在分布式系统中间发送消息和接收消息,进行异步通信。
JMS是一套接口,各个厂商可以通过实现这套接口来实现JMS服务:ActiveMQ、RabbitMQ、RocketMQ

2.ActiveMQ

官方网站:http://activemq.apache.org

要求:jdk环境

启动:

消息中间件ActiveMQ

访问:localhost:8161
默认用户名和密码:admin

三、实例

第一步:引入jar包:

消息中间件ActiveMQ

代码:

JmsProducer:消息生产者


package com.hcx.activemq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/** * 定义消息的生产者 * * @author HCX * */
public class JmsProducer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    /** * 定义消息并发送,等待消息的接收者(消费者)消费此消息 * * @param args * @throws JMSException */
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory;// 消息中间件的连接工厂
        Connection connection = null;// 连接
        Session session = null;// jms.Session;
        Destination destination;// 消息的目的地
        MessageProducer messageProducer = null;// 消息生产者

        // 实例化连接工厂,创建一个连接
        connectionFactory = new ActiveMQConnectionFactory(JmsProducer.USERNAME, JmsProducer.PASSWORD,
                JmsProducer.BROKENURL);
        // 通过连接工厂获取连接
        try {
            connection = connectionFactory.createConnection();

            // 启动连接
            connection.start();
            // 创建session,进行消息的发送
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            // 创建消息队列
            destination = session.createQueue("talkWithMon");

            // 创建一个消息生产者
            messageProducer = session.createProducer(destination);

            // 模拟发送消息
            for (int i = 0; i < 5; i++) {
                // 创建一条消息
                TextMessage textMessage = session.createTextMessage("给妈妈发送的消息:" + i);
                System.out.println("textMessage: " + textMessage);
                messageProducer.send(textMessage);
            }

            // 如果设置了事务,session就必须要提交
            session.commit();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            // 关闭连接
            if (connection != null) {
                connection.close();
            }
        }
    }
}

JmsConsumer:消息消费者


package com.hcx.activemq.p2p;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/** * 定义消息的消费者 * * @author HCX * */
public class JmsConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL;


    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory;// 消息中间件的连接工厂
        Connection connection = null;// 连接
        Session session = null;// jms.Session;
        Destination destination;// 消息的目的地
        MessageConsumer messageConsumer = null;// 消息消费者

        // 实例化连接工厂,创建一个连接
        connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME, JmsConsumer.PASSWORD,
                JmsConsumer.BROKENURL);
        // 通过连接工厂获取连接
        try {
            connection = connectionFactory.createConnection();

            // 启动连接
            connection.start();
            // 创建session,进行消息的接收
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            // 创建消息队列 注意:队列的名字要相同
            destination = session.createQueue("talkWithMon");

            // 创建一个消息消费者
            messageConsumer = session.createConsumer(destination);

            // 模拟接收消息
            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
                if(null!=textMessage){
                    System.out.println("收到儿子发送的消息: "+textMessage);
                }else{
                    break;
                }
            }

        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

结果:

消息中间件ActiveMQ