spring与disruptor集成的简单示例[z]

时间:2023-01-26 13:51:35

[z]https://www.jb51.net/article/135475.htm

disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor

BaseQueueHelper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
 * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
 *
 * 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
 *
 * @author xielongwang
 * @create 2018-01-18 下午3:49
 * @email xielong.wang@nvr-china.com
 * @description
 */
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
 
  /**
   * 记录所有的队列,系统退出时统一清理资源
   */
  private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
  /**
   * Disruptor 对象
   */
  private Disruptor<E> disruptor;
  /**
   * RingBuffer
   */
  private RingBuffer<E> ringBuffer;
  /**
   * initQueue
   */
  private List<D> initQueue = new ArrayList<D>();
 
  /**
   * 队列大小
   *
   * @return 队列长度,必须是2的幂
   */
  protected abstract int getQueueSize();
 
  /**
   * 事件工厂
   *
   * @return EventFactory
   */
  protected abstract EventFactory<E> eventFactory();
 
  /**
   * 事件消费者
   *
   * @return WorkHandler[]
   */
  protected abstract WorkHandler[] getHandler();
 
  /**
   * 初始化
   */
  public void init() {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
    disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
    disruptor.setDefaultExceptionHandler(new MyHandlerException());
    disruptor.handleEventsWithWorkerPool(getHandler());
    ringBuffer = disruptor.start();
 
    //初始化数据发布
    for (D data : initQueue) {
      ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
        @Override
        public void translateTo(E event, long sequence, D data) {
          event.setValue(data);
        }
      }, data);
    }
 
    //加入资源清理钩子
    synchronized (queueHelperList) {
      if (queueHelperList.isEmpty()) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
            for (BaseQueueHelper baseQueueHelper : queueHelperList) {
              baseQueueHelper.shutdown();
            }
          }
        });
      }
      queueHelperList.add(this);
    }
  }
 
  /**
   * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
   * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
   *
   * @return WaitStrategy
   */
  protected abstract WaitStrategy getStrategy();
 
  /**
   * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
   */
  public synchronized void publishEvent(D data) {
    if (ringBuffer == null) {
      initQueue.add(data);
      return;
    }
    ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
      @Override
      public void translateTo(E event, long sequence, D data) {
        event.setValue(data);
      }
    }, data);
  }
 
  /**
   * 关闭队列
   */
  public void shutdown() {
    disruptor.shutdown();
  }
}

EventFactory.java

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * @author xielongwang
 * @create 2018-01-18 下午6:24
 * @email xielong.wang@nvr-china.com
 * @description
 */
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
 
  @Override
  public SeriesDataEvent newInstance() {
    return new SeriesDataEvent();
  }
}

MyHandlerException.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class MyHandlerException implements ExceptionHandler {
 
  private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);
 
  /*
   * (non-Javadoc) 运行过程中发生时的异常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
   * , long, java.lang.Object)
   */
  @Override
  public void handleEventException(Throwable ex, long sequence, Object event) {
    ex.printStackTrace();
    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
  }
 
  /*
   * (non-Javadoc) 启动时的异常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
   * Throwable)
   */
  @Override
  public void handleOnStartException(Throwable ex) {
    logger.error("start disruptor error ==[{}]!", ex.getMessage());
  }
 
  /*
   * (non-Javadoc) 关闭时的异常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
   * .Throwable)
   */
  @Override
  public void handleOnShutdownException(Throwable ex) {
    logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
  }
}

SeriesData.java (代表应用A发送给应用B的消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SeriesData {
  private String deviceInfoStr;
  public SeriesData() {
  }
 
  public SeriesData(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }
 
  public String getDeviceInfoStr() {
    return deviceInfoStr;
  }
 
  public void setDeviceInfoStr(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }
 
  @Override
  public String toString() {
    return "SeriesData{" +
        "deviceInfoStr='" + deviceInfoStr + '\'' +
        '}';
  }
}

SeriesDataEvent.java

1
2
public class SeriesDataEvent extends ValueWrapper<SeriesData> {
}

SeriesDataEventHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
  private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
  @Autowired
  private DeviceInfoService deviceInfoService;
 
  @Override
  public void onEvent(SeriesDataEvent event) {
    if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
      logger.warn("receiver series data is empty!");
    }
    //业务处理
    deviceInfoService.processData(event.getValue().getDeviceInfoStr());
  }
}

SeriesDataEventQueueHelper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
  private static final int QUEUE_SIZE = 1024;
  @Autowired
  private List<SeriesDataEventHandler> seriesDataEventHandler;
 
  @Override
  protected int getQueueSize() {
    return QUEUE_SIZE;
  }
 
  @Override
  protected com.lmax.disruptor.EventFactory eventFactory() {
    return new EventFactory();
  }
 
  @Override
  protected WorkHandler[] getHandler() {
    int size = seriesDataEventHandler.size();
    SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
    return paramEventHandlers;
  }
 
  @Override
  protected WaitStrategy getStrategy() {
    return new BlockingWaitStrategy();
    //return new YieldingWaitStrategy();
  }
 
  @Override
  public void afterPropertiesSet() throws Exception {
    this.init();
  }
}

ValueWrapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class ValueWrapper<T> {
  private T value;
  public ValueWrapper() {}
  public ValueWrapper(T value) {
    this.value = value;
  }
 
  public T getValue() {
    return value;
  }
 
  public void setValue(T value) {
    this.value = value;
  }
}

DisruptorConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class DisruptorConfig {
 
  /**
   * smsParamEventHandler1
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler1() {
    return new SeriesDataEventHandler();
  }
 
  /**
   * smsParamEventHandler2
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler2() {
    return new SeriesDataEventHandler();
  }
 
  /**
   * smsParamEventHandler3
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler3() {
    return new SeriesDataEventHandler();
  }
 
 
  /**
   * smsParamEventHandler4
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler4() {
    return new SeriesDataEventHandler();
  }
 
  /**
   * smsParamEventHandler5
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler5() {
    return new SeriesDataEventHandler();
  }
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//注入SeriesDataEventQueueHelper消息生产者
@Autowired
private SeriesDataEventQueueHelper seriesDataEventQueueHelper;
 
@RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
  long startTime1 = System.currentTimeMillis();
 
  if (StringUtils.isEmpty(deviceData)) {
    logger.info("receiver data is empty !");
    return new DataResponseVo<String>(400, "failed");
  }
  seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
  long startTime2 = System.currentTimeMillis();
  logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
  return new DataResponseVo<String>(200, "success");
}

应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控

spring与disruptor集成的简单示例[z]的更多相关文章

  1. Spring MVC 文件上传简单示例&lpar;form、ajax方式 &rpar;

    1.Form Upload SpringMVC 中,文件的上传是通过 MultipartResolver 实现的,所以要实现上传,只要注册相应的 MultipartResolver 即可. Multi ...

  2. SpringBoot &vert; 第十一章:Redis的集成和简单使用

    前言 上几节讲了利用Mybatis-Plus这个第三方的ORM框架进行数据库访问,在实际工作中,在存储一些非结构化或者缓存一些临时数据及热点数据时,一般上都会用上mongodb和redis进行这方面的 ...

  3. Quartz学习——Spring和Quartz集成详解(三)

    Spring是一个很优秀的框架,它无缝的集成了Quartz,简单方便的让企业级应用更好的使用Quartz进行任务的调度.下面就对Spring集成Quartz进行简单的介绍和示例讲解!和上一节 Quar ...

  4. Spring和Quartz集成

    本文转载自:http://blog.csdn.net/u010648555/article/details/54891264 Spring是一个很优秀的框架,它无缝的集成了Quartz,简单方便的让企 ...

  5. SpringBoot &vert; 第三十三章:Spring web Servcies集成和使用

    前言 最近有个单位内网系统需要对接统一门户,进行单点登录和待办事项对接功能.一般上*系统都会要求做统一登录功能,这个没啥问题,反正业务系统都是做单点登录的,改下shiro相关类就好了.看了接入方案, ...

  6. Spring和ActiveMQ集成实现队列消息以及PUB&sol;SUB模型

    前言:本文是基于Spring和ActiveMQ的一个示例文章,包括了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,只是做了比较简单的实现,无任何业务方面的东西,作为一个 ...

  7. Spring Boot中集成Spring Security 专题

    check to see if spring security is applied that the appropriate resources are permitted: @Configurat ...

  8. Java-Springboot-集成spring-security简单示例&lpar;Version-springboot-2-1-3-RELEASE

    使用Idea的Spring Initializr或者SpringBoot官网下载quickstart 添加依赖 1234 <dependency><groupId>org.sp ...

  9. HTML-003-模拟IDE代码展开收起功能简单示例

    当先我们在日常的编程开发工作中使用编程工具(例如 Eclipse.Sublime 等等)都有相应的代码折叠展开功能,如下图所示,极大的方便了我们的编码工作.

随机推荐

  1. 安装时遇到:正在尝试其它镜像。 http&colon;&sol;&sol;mirrors&period;btte&period;net&sol;centos&sol;7&period;2&period;1511&sol;extras&sol;x86&lowbar;64&sol;repodata&sol;repomd&period;xml&colon; &lbrack;Errno 14&rsqb; curl&num;6 - &quot&semi;Could not resolve host&colon; mirrors&period;btte&period;net&semi; 未知的错误&quot&semi;

    我出现这种错误是因为网络链接问题,因为我设置虚拟机网络链接为VmNET8,设置了nat模式,使得我本地机可以访问虚拟机的linux服务器.但是打开虚拟机的浏览器却不能上网了.所以现在我用xshell装 ...

  2. HDU 4923

    题目大意: 给出一串序列Ai{0,1},求一个序列Bi[0,1](Bi<Bi+1),使得sigama(Ai-Bi)^2最小 思路: 若B相同,则取A的平均数可使方差最小 若B有序,   若A== ...

  3. 使用GULP打包、压缩与打版本号

    这篇文章讲我整理的一种打包项目的方式,以下是我的依赖清单 "devDependencies": { "gulp": "^3.9.1", &q ...

  4. 《全栈营销之如何制作个人博客》之二:php环境安装及个人博客后台搭建 让你的博客跑起来

    上一节我们讲了个人博客用什么开发语言,用什么CMS系统,从这一节我们就开始真正的干货,这一节我们讨论一下PHP环境的安装,及个人博客后台的搭建,让你的博客在正常的PHP环境中运行起来,你就可以进行后台 ...

  5. vue环境下新建项目

    1.之前电脑上安装了node和npm,查看下版本信息. 2.现在安装vue-cli脚手架,可以全局安装: npm install --global  vue-cli 之前自己电脑没有安装过webpac ...

  6. iOS学习——iOS开发小知识点集合

    在iOS学习和开发过程中,经常会遇到一些很小的知识点和问题,一两句话就可以解释清楚了,这样的知识点写一篇随笔又没有必要,但是又想mark一下,以备不时之需,所以就有了本文.后面遇到一些小的知识点会不断 ...

  7. &lbrack;转帖&rsqb;system&lpar;&rpar;、exec&lpar;&rpar;、fork&lpar;&rpar;三个与进程有关的函数的比较

    system().exec().fork()三个与进程有关的函数的比较 https://www.cnblogs.com/qingergege/p/6601807.html 启动新进程(system函数 ...

  8. Python实现Plugin

    1. Plugin与Python 插件的历史最早可追溯至1970年代,它是一种程序组件,通过和应用程序的互动,为应用程序增加一些所需要的特定的功能[维基].插件允许第三方开发者对已有的程序功能进行扩展 ...

  9. vmware虚拟机的tomcat启动以后,主机无法访问

    处理: 关闭防火墙服务:/etc/init.d/iptables stop ..................... 在wmware中安装linux后安装好数据库,JDK及tomcat后启动服务,虚 ...

  10. ORA-16038 ORA-19809 ORA-00312

    问题表现: 连接数据库启动报错,ORA-03113, 查看详细的alert日志发现更多报错,如下 ORA-19809: 超出了恢复文件数的限制ORA-19804: 无法回收 209715200 字节磁 ...