[z]https://www.jb51.net/article/135475.htm
disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor
BaseQueueHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
/** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
*
* 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
*
* @author xielongwang
* @create 2018-01-18 下午3:49
* @email xielong.wang@nvr-china.com
* @description
*/
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
/**
* 记录所有的队列,系统退出时统一清理资源
*/
private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
/**
* Disruptor 对象
*/
private Disruptor<E> disruptor;
/**
* RingBuffer
*/
private RingBuffer<E> ringBuffer;
/**
* initQueue
*/
private List<D> initQueue = new ArrayList<D>();
/**
* 队列大小
*
* @return 队列长度,必须是2的幂
*/
protected abstract int getQueueSize();
/**
* 事件工厂
*
* @return EventFactory
*/
protected abstract EventFactory<E> eventFactory();
/**
* 事件消费者
*
* @return WorkHandler[]
*/
protected abstract WorkHandler[] getHandler();
/**
* 初始化
*/
public void init() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( "DisruptorThreadPool" ).build();
disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
disruptor.setDefaultExceptionHandler( new MyHandlerException());
disruptor.handleEventsWithWorkerPool(getHandler());
ringBuffer = disruptor.start();
//初始化数据发布
for (D data : initQueue) {
ringBuffer.publishEvent( new EventTranslatorOneArg<E, D>() {
@Override
public void translateTo(E event, long sequence, D data) {
event.setValue(data);
}
}, data);
}
//加入资源清理钩子
synchronized (queueHelperList) {
if (queueHelperList.isEmpty()) {
Runtime.getRuntime().addShutdownHook( new Thread() {
@Override
public void run() {
for (BaseQueueHelper baseQueueHelper : queueHelperList) {
baseQueueHelper.shutdown();
}
}
});
}
queueHelperList.add( this );
}
}
/**
* 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
* 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
*
* @return WaitStrategy
*/
protected abstract WaitStrategy getStrategy();
/**
* 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
*/
public synchronized void publishEvent(D data) {
if (ringBuffer == null ) {
initQueue.add(data);
return ;
}
ringBuffer.publishEvent( new EventTranslatorOneArg<E, D>() {
@Override
public void translateTo(E event, long sequence, D data) {
event.setValue(data);
}
}, data);
}
/**
* 关闭队列
*/
public void shutdown() {
disruptor.shutdown();
}
} |
EventFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
|
/** * @author xielongwang
* @create 2018-01-18 下午6:24
* @email xielong.wang@nvr-china.com
* @description
*/
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
@Override
public SeriesDataEvent newInstance() {
return new SeriesDataEvent();
}
} |
MyHandlerException.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class MyHandlerException implements ExceptionHandler {
private Logger logger = LoggerFactory.getLogger(MyHandlerException. class );
/*
* (non-Javadoc) 运行过程中发生时的异常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
* , long, java.lang.Object)
*/
@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
ex.printStackTrace();
logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
}
/*
* (non-Javadoc) 启动时的异常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
* Throwable)
*/
@Override
public void handleOnStartException(Throwable ex) {
logger.error("start disruptor error ==[{}]!", ex.getMessage());
}
/*
* (non-Javadoc) 关闭时的异常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
* .Throwable)
*/
@Override
public void handleOnShutdownException(Throwable ex) {
logger.error( "shutdown disruptor error ==[{}]!" , ex.getMessage());
}
} |
SeriesData.java (代表应用A发送给应用B的消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class SeriesData {
private String deviceInfoStr;
public SeriesData() {
}
public SeriesData(String deviceInfoStr) {
this .deviceInfoStr = deviceInfoStr;
}
public String getDeviceInfoStr() {
return deviceInfoStr;
}
public void setDeviceInfoStr(String deviceInfoStr) {
this .deviceInfoStr = deviceInfoStr;
}
@Override
public String toString() {
return "SeriesData{" +
"deviceInfoStr='" + deviceInfoStr + '\ '' +
'}' ;
}
} |
SeriesDataEvent.java
1
2
|
public class SeriesDataEvent extends ValueWrapper<SeriesData> {
} |
SeriesDataEventHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler. class );
@Autowired
private DeviceInfoService deviceInfoService;
@Override
public void onEvent(SeriesDataEvent event) {
if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
logger.warn( "receiver series data is empty!" );
}
//业务处理
deviceInfoService.processData(event.getValue().getDeviceInfoStr());
}
} |
SeriesDataEventQueueHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
@Component public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
private static final int QUEUE_SIZE = 1024 ;
@Autowired
private List<SeriesDataEventHandler> seriesDataEventHandler;
@Override
protected int getQueueSize() {
return QUEUE_SIZE;
}
@Override
protected com.lmax.disruptor.EventFactory eventFactory() {
return new EventFactory();
}
@Override
protected WorkHandler[] getHandler() {
int size = seriesDataEventHandler.size();
SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray( new SeriesDataEventHandler[size]);
return paramEventHandlers;
}
@Override
protected WaitStrategy getStrategy() {
return new BlockingWaitStrategy();
//return new YieldingWaitStrategy();
}
@Override
public void afterPropertiesSet() throws Exception {
this .init();
}
} |
ValueWrapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public abstract class ValueWrapper<T> {
private T value;
public ValueWrapper() {}
public ValueWrapper(T value) {
this .value = value;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this .value = value;
}
} |
DisruptorConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Configuration @ComponentScan (value = { "com.portal.disruptor" })
//多实例几个消费者 public class DisruptorConfig {
/**
* smsParamEventHandler1
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler1() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler2
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler2() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler3
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler3() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler4
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler4() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler5
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler5() {
return new SeriesDataEventHandler();
}
} |
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//注入SeriesDataEventQueueHelper消息生产者 @Autowired private SeriesDataEventQueueHelper seriesDataEventQueueHelper;
@RequestMapping (value = "/data" , method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public DataResponseVo<String> receiverDeviceData( @RequestBody String deviceData) {
long startTime1 = System.currentTimeMillis();
if (StringUtils.isEmpty(deviceData)) {
logger.info( "receiver data is empty !" );
return new DataResponseVo<String>( 400 , "failed" );
}
seriesDataEventQueueHelper.publishEvent( new SeriesData(deviceData));
long startTime2 = System.currentTimeMillis();
logger.info( "receiver data ==[{}] millisecond ==[{}]" , deviceData, startTime2 - startTime1);
return new DataResponseVo<String>( 200 , "success" );
} |
应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控
spring与disruptor集成的简单示例[z]的更多相关文章
-
Spring MVC 文件上传简单示例(form、ajax方式 )
1.Form Upload SpringMVC 中,文件的上传是通过 MultipartResolver 实现的,所以要实现上传,只要注册相应的 MultipartResolver 即可. Multi ...
-
SpringBoot | 第十一章:Redis的集成和简单使用
前言 上几节讲了利用Mybatis-Plus这个第三方的ORM框架进行数据库访问,在实际工作中,在存储一些非结构化或者缓存一些临时数据及热点数据时,一般上都会用上mongodb和redis进行这方面的 ...
-
Quartz学习——Spring和Quartz集成详解(三)
Spring是一个很优秀的框架,它无缝的集成了Quartz,简单方便的让企业级应用更好的使用Quartz进行任务的调度.下面就对Spring集成Quartz进行简单的介绍和示例讲解!和上一节 Quar ...
-
Spring和Quartz集成
本文转载自:http://blog.csdn.net/u010648555/article/details/54891264 Spring是一个很优秀的框架,它无缝的集成了Quartz,简单方便的让企 ...
-
SpringBoot | 第三十三章:Spring web Servcies集成和使用
前言 最近有个单位内网系统需要对接统一门户,进行单点登录和待办事项对接功能.一般上*系统都会要求做统一登录功能,这个没啥问题,反正业务系统都是做单点登录的,改下shiro相关类就好了.看了接入方案, ...
-
Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型
前言:本文是基于Spring和ActiveMQ的一个示例文章,包括了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,只是做了比较简单的实现,无任何业务方面的东西,作为一个 ...
-
Spring Boot中集成Spring Security 专题
check to see if spring security is applied that the appropriate resources are permitted: @Configurat ...
-
Java-Springboot-集成spring-security简单示例(Version-springboot-2-1-3-RELEASE
使用Idea的Spring Initializr或者SpringBoot官网下载quickstart 添加依赖 1234 <dependency><groupId>org.sp ...
-
HTML-003-模拟IDE代码展开收起功能简单示例
当先我们在日常的编程开发工作中使用编程工具(例如 Eclipse.Sublime 等等)都有相应的代码折叠展开功能,如下图所示,极大的方便了我们的编码工作.
随机推荐
-
安装时遇到:正在尝试其它镜像。 http://mirrors.btte.net/centos/7.2.1511/extras/x86_64/repodata/repomd.xml: [Errno 14] curl#6 - ";Could not resolve host: mirrors.btte.net; 未知的错误";
我出现这种错误是因为网络链接问题,因为我设置虚拟机网络链接为VmNET8,设置了nat模式,使得我本地机可以访问虚拟机的linux服务器.但是打开虚拟机的浏览器却不能上网了.所以现在我用xshell装 ...
-
HDU 4923
题目大意: 给出一串序列Ai{0,1},求一个序列Bi[0,1](Bi<Bi+1),使得sigama(Ai-Bi)^2最小 思路: 若B相同,则取A的平均数可使方差最小 若B有序, 若A== ...
-
使用GULP打包、压缩与打版本号
这篇文章讲我整理的一种打包项目的方式,以下是我的依赖清单 "devDependencies": { "gulp": "^3.9.1", &q ...
-
《全栈营销之如何制作个人博客》之二:php环境安装及个人博客后台搭建 让你的博客跑起来
上一节我们讲了个人博客用什么开发语言,用什么CMS系统,从这一节我们就开始真正的干货,这一节我们讨论一下PHP环境的安装,及个人博客后台的搭建,让你的博客在正常的PHP环境中运行起来,你就可以进行后台 ...
-
vue环境下新建项目
1.之前电脑上安装了node和npm,查看下版本信息. 2.现在安装vue-cli脚手架,可以全局安装: npm install --global vue-cli 之前自己电脑没有安装过webpac ...
-
iOS学习——iOS开发小知识点集合
在iOS学习和开发过程中,经常会遇到一些很小的知识点和问题,一两句话就可以解释清楚了,这样的知识点写一篇随笔又没有必要,但是又想mark一下,以备不时之需,所以就有了本文.后面遇到一些小的知识点会不断 ...
-
[转帖]system()、exec()、fork()三个与进程有关的函数的比较
system().exec().fork()三个与进程有关的函数的比较 https://www.cnblogs.com/qingergege/p/6601807.html 启动新进程(system函数 ...
-
Python实现Plugin
1. Plugin与Python 插件的历史最早可追溯至1970年代,它是一种程序组件,通过和应用程序的互动,为应用程序增加一些所需要的特定的功能[维基].插件允许第三方开发者对已有的程序功能进行扩展 ...
-
vmware虚拟机的tomcat启动以后,主机无法访问
处理: 关闭防火墙服务:/etc/init.d/iptables stop ..................... 在wmware中安装linux后安装好数据库,JDK及tomcat后启动服务,虚 ...
-
ORA-16038 ORA-19809 ORA-00312
问题表现: 连接数据库启动报错,ORA-03113, 查看详细的alert日志发现更多报错,如下 ORA-19809: 超出了恢复文件数的限制ORA-19804: 无法回收 209715200 字节磁 ...