【9】JMicro微服务-发布订阅消息服务

时间:2022-12-05 12:30:37

如非授权,禁止用于商业用途,转载请注明出处
作者:mynewworldyyl

1. JMicro消息服务目前实现特性

a. JMicro只支持发布订阅消息服务,不支持队列式消息服务;

b. 不支持消息持久化,所以不能保证消息一定能被消费者消费;

c. 发布消息时如果没有消费者,消息直接丢弃;

d. 如果消息按配置的重发时间间隔重复发送多次(可配置)失败,消息直接丢弃;

e. 如果消息失败重发队列超过预先配置的数量,最先失败的消息会被直接丢弃,而新失败消息直接入失败队列排队等待重发;

f. 基于以上几点,如果要求绝对可靠的消息系统,而不是高性能的消息系统,不建议使用JMicro消息服务;

JMicro追求的是简单并且高性能,如果消费者服务是正常等待消息状态,那么消息肯定能高效从消息发送者直达消费者服务,如果消费者不在线,或出了问题无法消费消息,直接失败告诉使用者系统出了问题才应该是最佳的避免问题的方法。

一味地为了解决或支持次要的需求而增加系统的复杂度本身也会增加系统出问题的隐患,特别是复杂度达到一定程度后,很可能意味着软件的失败。

Do one thing and do it well! 微服务口号。

2. 启动消息服务

新打开一个新的CMD窗口,CD进入/jmicro.pubsub,运行

mvn clean install -Pbuild-main  (只需构建一次,以后可以重复运行)

运行pubsub服务

java -jar target/jmicro.pubsub-0.0.1-SNAPSHOT-jar-with-dependencies.jar

如果启动时最后出现如下信息,说明pubsub没有启动,需要将/PubSubManager/enableServer的值从false,改为true

【9】JMicro微服务-发布订阅消息服务

通过ZKUI修改方式如下图:

【9】JMicro微服务-发布订阅消息服务

将enableServer的值从false改为true,然后重启服务即可,因为这个值是启动时就检测,只使用一次,即时生效是指:修改值后,之后的使用会使用新的值。

3. 开发消费者

/jmicro.example.provider/src/main/java/org/jmicro/example/pubsub/impl/SimplePubsubImpl.java

 @Service(maxSpeed=-1,baseTimeUnit=Constants.TIME_SECONDS)
 @Component
 public class SimplePubsubImpl implements ISimplePubsub {

     private final static Logger logger = LoggerFactory.getLogger(SimplePubsubImpl.class);

     @Subscribe(topic="/jmicro/test/topic01")
     public void helloTopic(PSData data) {
         System.out.println("helloTopic: "+data.getTopic()+", data: "+ data.getData().toString());
     }

     @Subscribe(topic="/jmicro/test/topic01")
     public void testTopic(PSData data) {
         System.out.println("testTopic: "+data.getTopic()+", data: "+ data.getData().toString());
     }

     @Subscribe(topic=MonitorConstant.TEST_SERVICE_METHOD_TOPIC)
     public void statis(PSData data) {

         Map<Integer,Double> ps = (Map<Integer,Double>)data.getData();

         logger.info("总请求:{}, 总响应:{}, TO:{}, TOF:{}, QPS:{}"
                 ,ps.get(MonitorConstant.CLIENT_REQ_BEGIN)
                 ,ps.get(MonitorConstant.STATIS_TOTAL_RESP)
                 ,ps.get(MonitorConstant.CLIENT_REQ_TIMEOUT)
                 ,ps.get(MonitorConstant.CLIENT_REQ_TIMEOUT_FAIL)
                 ,ps.get(MonitorConstant.STATIS_QPS)
                 );

         //System.out.println("Topic: "+data.getTopic()+", data: "+ data.getData().toString());
     }

 }

消息消费者也是一个RPC服务,由Component和Service注解实现类,不同点是方法不再由SMethod注解,而是由Subcribe注解,并增加topic属性,值是消息的主题。

方法唯一参数是org.jmicro.api.pubsub.PSData,代码如下

public final class PSData implements Serializable{

    private static final long serialVersionUID = 389875668374730999L;

    private Map<String,Object> context = new HashMap<>();

    private byte flag = 0;

    private String topic;

    private Object data;

    public byte getFlag() {
        return flag;
    }

    public void setFlag(byte flag) {
        this.flag = flag;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Map<String, Object> getContext() {
        return context;
    }

    public void setContext(Map<String, Object> context) {
        this.context = context;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    public void put(String key,Object v) {
        this.context.put(key, v);
    }

    @SuppressWarnings("unchecked")
    public <T> T get(String key) {
        return (T) this.context.get(key);
    }
}

data即是消息主体内容,可以是任意JMicro支持的参数类型,支持的参数类型后面会有专题小节详细说明,目前只需要知道:

a. Java 8种基本字据类型,字符串,java.util.Date,java.nio.ByteBuffer;

b. 由a组成的任意POJO类型;

c. 由a,b,c,d组成的任意数组类型,集合类型(LIST,SET);

d. Key为String类型,值为a,b,c,d组成的任意Map类型;

运行消费者

java -jar target/jmicro.example.provider-0.0.1-SNAPSHOT-jar-with-dependencies.jar

4. 消息生产者

public class TestPubSubServer extends JMicroBaseTestCase{

    @Test
    public void testPresurePublish() {

        final Random ran = new Random();

        PubSubManager psm = of.get(PubSubManager.class);

        AtomicInteger id = new AtomicInteger(0);

        Runnable r = ()->{
            while(true) {
                try {
                    //Thread.sleep(2000);
                    Thread.sleep(ran.nextInt(100));
                    psm.publish(new HashMap<String,Object>(), TOPIC,
                            "test pubsub server id: "+id.getAndIncrement());
                } catch (Throwable e) {
                    System.out.println(e.getMessage());;
                }
            }
        };

        new Thread(r).start();
        new Thread(r).start();
        /*new Thread(r).start();
        new Thread(r).start();
        new Thread(r).start();*/

        JMicro.waitForShutdown();
    }

    @Test
    public void testPublishStringMessage() {
        PubSubManager psm = of.get(PubSubManager.class);
        psm.publish(new HashMap<String,Object>(), TOPIC, "test pubsub server");
        JMicro.waitForShutdown();
    }
}

这是一个基于JUnit4的单元测试类,继承自org.jmicro.test.JMicroBaseTestCase,消息发布通过org.jmicro.api.pubsub.PubSubManager的三个publish重载方法实现

PubSubManager psm = of.get(PubSubManager.class); psm.publish(new HashMap<String,Object>(), TOPIC, "test pubsub server");
of是JMicroBaseTestCase成员变量,IObjectFactory类型,系统启动就可使用,类似Spring容器,目前知道点就行。

PubSubManager也是系统预实例化的消息发送接口,从of拿来即用。

Eclipse或IDEA分别运行上面3个单元测试方法,查看消费者终端输出,消费端输出代码如下图:

【9】JMicro微服务-发布订阅消息服务

testPublishStringMessage:发送一个简单字符串消息,消费端输出如下图:

【9】JMicro微服务-发布订阅消息服务

testPresurePublish开启多个线程重复发送消息,用于消息服务系统做压力测试 ,消费端输出如下图:

【9】JMicro微服务-发布订阅消息服务

 

【9】JMicro微服务-发布订阅消息服务的更多相关文章

  1. Kafka是分布式发布-订阅消息系统

    Kafka是分布式发布-订阅消息系统 https://www.biaodianfu.com/kafka.html Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apa ...

  2. 分布式发布订阅消息系统 Kafka 架构设计&lbrack;转&rsqb;

    分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开 ...

  3. Kafka&lpar;分布式发布-订阅消息系统&rpar;工作流程说明

    Kafka系统架构Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和 ...

  4. kafka 基础知识梳理-kafka是一种高吞吐量的分布式发布订阅消息系统

    一.kafka 简介 今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点 ...

  5. (三)ActiveMQ之发布- 订阅消息模式实现

    一.概念 发布者/订阅者模型支持向一个特定的消息主题发布消息.0或多个订阅者可能对接收来自特定消息主题的消息感兴趣.在这种模型下,发布者和订阅者彼此不知道对方.这种模式好比是匿名公告板.这种模式被概括 ...

  6. JMS发布&sol;订阅消息传送例子

    前言 基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要 ...

  7. 高吞吐量的分布式发布订阅消息系统Kafka--安装及测试

    一.Kafka概述 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因 ...

  8. 分布式发布订阅消息系统Kafka

    高吞吐量的分布式发布订阅消息系统Kafka--安装及测试   一.Kafka概述 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览, ...

  9. Kafka logo分布式发布订阅消息系统 Kafka

    分布式发布订阅消息系统 Kafka kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳 ...

随机推荐

  1. &lbrack;LeetCode&rsqb; Number of Connected Components in an Undirected Graph 无向图中的连通区域的个数

    Given n nodes labeled from 0 to n - 1 and a list of undirected edges (each edge is a pair of nodes), ...

  2. bzoj2653&colon; middle

    首先,对于每个询问,我们二分答案 然后对于序列中大于等于中位数的数,我们把它们置为1,小于中位数的数,置为-1 那么如果一个区间和大于等于0,那么就资磁,否则就不滋磁 这个区间和呢,我们可以用主席树维 ...

  3. iis 301重定向

    把www.a.com重定向到www.b.com 只需在www.a.com上面右键属性---主目录,重定向到url,下面填上www.b.com,再把资源永久重定向勾选上即可. 注意,如果你需要把域名后面 ...

  4. 专题:『Channel Bonding&sol;team』——EXPERIMANTAL!!!

    Linux内核支持的多网卡聚合方法——bond.team bond 优点:经过长时间的实践检验,具有较高的稳定性:kernel-2.4及以上内核均广泛支持 缺点:需要通过sysfs或发行版定制的网卡配 ...

  5. hugo-最好用的静态网站生成器

    hugo最好用的静态网站生成器 Hugo是由Go语言实现的静态网站生成器.简单.易用.高效.易扩展.快速部署. 快速开始 安装Hugo 1. 二进制安装(推荐:简单.快速) 到 Hugo Releas ...

  6. POJ1995&lpar;整数快速幂)

    http://poj.org/problem?id=1995 题意:求(A1^B1 + A2^B2 + .....Ah^Bh)%M 直接快速幂,以前对快速幂了解不深刻,今天重新学了一遍so easy ...

  7. VBA Excel WideCharToMultiByte Compile error on 64-bit System

    Compile Error: The code in this project must be updated for use on64-bit systems. Please review and ...

  8. macOS下通过docker在局域网成功访问mysql5&period;6数据库

    1.获取mysql镜像 docker pull mysql:5.6 注意:此处之所以获取mysql5.6是因为mysql5.7在centos7中启动可能会报错, 2.查看镜像列表 docker ima ...

  9. redis在PHP中的基本使用案例&lpar;觉得比较实用&rpar;

    源地址  http://www.t086.com/article/4901

  10. springboot项目的重定向和转发

    下面是idea软件创建的项目目录,这里总结了一下转发与重定向的问题,详解如下. 首先解释一下每个文件夹的作用,如果你是用的是idea创建的springboot项目,会在项目创建的一开始resource ...