1.Disruptor的背景
disruptor是LAMX架构的一种设计,而LAMX是一种新型的零售金融交易平台。disruptor主要用于大规模低延迟的高并发业务场景,其核心disruptor是一个基于事件源驱动机制的业务逻辑处理器,整个业务逻辑处理器完全运行在内存中,disruptor在无锁的网络情况下,实现了Queue的并发。
2.Disruptor的适用场景
disruptor适用于大规模低延迟的并发场景。可用于读写操作分离、数据缓存,速度匹配(因为其实现了生产者-消费者模型)、或者是基于内存的事件流处理机制的场景。
3.Disruptor的设计思想与原理
disruptor的主要设计思想是无锁的高并发,在设计上采用内存屏障的机制和CAS操作实现此思想。主流的并发程序
都离不开锁对资源的管控,或者尽量避开锁的使用。
其主要的实现原理总结有如下三点,当然还有很多地方设计得很巧妙,需要细细阅读源码和官方文档。虽然这个 过程对我来说很尴尬,但痛并快乐者,有朝闻道、夕可死也的感觉。
1.采用消费者-生产者模型进行读写的分离。
2.用循环缓存(实际是一个循环队列)实现了数据的暂存和读写速度的匹配。
3.用内存屏障加序列号的方式实现了无锁的并发机制。
4.Disruptor的编程实践
-disruptor的主要编程部件
1.Disruptor:用于控制整个消费者-生产者模型的处理器
2.RingBuffer:用于存放数据
3.EventHandler:一个用于处理事件的接口(可以当做生产者,也可以当做消费者)。
4.EventFactory:事件工厂类。
5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。
6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。
7.用于运行disruptor的线程或者线程池。
-disruptor编程主要的编程流程
1.定义事件
2.定义事件工厂
3.定义事件处理类
4.定义事件处理的线程或者线程池
5.指定等待策略
6.通过disruptor处理器组装生产者和消费者
7.发布事件
8.关闭disruptor业务逻辑处理器
具体处理流程参看:Disruptor体验与对比
-disruptor实现无锁高并发,主要采用的消费者-生产者模型。所以编程的实践场景如下
1.一个生产者—一个消费者的场景
2.一个生产者—多个消费者的场景
3.多个生产者—一个消费者的场景
4.多个生产者—多个消费者的场景
5.生产者-消费流(这个是我自己取的名字,因为disruptor可以实现菱形的事件处理模型,这种结构有点像storm里面的实时计算数据处理模型。但这个是面向线程的,storm是面向进程的,有点像,但不是一个层次上的东西,不过两者可以结合起来使用。)
更为详细的代码参看:并发框架Disruptor的几个Demo
具体的代码实现如下:
package com.lejr.moneybox.task.web;
import com.google.gson.Gson;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.UUID;
import java.util.concurrent.*;
/** * Created by lucien on 2016/12/9. */
public class DisruptorTest {
//1.定义事件
public class TradeOrder{
private String orderId;
private double amount;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
}
//2.定义事件工厂
public class TradeOrderEventFactory implements EventFactory<TradeOrder> {
@Override
public TradeOrder newInstance() {
return new TradeOrder();
}
}
//3.定义事件处理类---订单入库操作
public class TradeDBHandler implements EventHandler<TradeOrder>,WorkHandler<TradeOrder>{
@Override
public void onEvent(TradeOrder tradeOrder, long l, boolean b) throws Exception {
this.onEvent(tradeOrder);
}
@Override
public void onEvent(TradeOrder tradeOrder) throws Exception {
tradeOrder.setOrderId(UUID.randomUUID().toString());
System.out.println("订单号为:"+tradeOrder.getOrderId()+"金额:"+tradeOrder.getAmount());
}
}
//3.定义事件处理类---订单消费操作
public class TradeOrderVarConsumer implements EventHandler<TradeOrder>{
@Override
public void onEvent(TradeOrder event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("进行订单消费:"+event.getOrderId());
}
}
//3.定义事件处理类---订单金额转换
public class TradeOrderTrasfer implements EventTranslator<TradeOrder>{
@Override
public void translateTo(TradeOrder event, long sequence) {
event.setAmount(Math.random()*99);
System.out.println("设置订单金额:"+event.getOrderId());
}
}
//3.定义事件处理类---发送订单mq
public class TradeOrderJMSSender implements EventHandler<TradeOrder>{
@Override
public void onEvent(TradeOrder event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("发送订单JMS:"+event.getOrderId()+",金额:"+event.getAmount());
}
}
//7.发布事件
public class TradeOrderProductor implements Runnable{
CountDownLatch cdl;
private final int count= 100000;
Disruptor disruptor;
public TradeOrderProductor(CountDownLatch cdl,Disruptor disruptor){
this.disruptor =disruptor;
this.cdl = cdl;
}
@Override
public void run() {
TradeOrderTrasfer tof;
try {
for (int i = 0; i < count; i++) {
tof = new TradeOrderTrasfer();
disruptor.publishEvent(tof);
}
}finally {
cdl.countDown();
}
}
}
//定义RingBuffer的大小
private static final int BUFFER_SIZE = 1024;
//4.定义事件处理的线程或者线程池
ExecutorService excutorService = Executors.newFixedThreadPool(5);
//批处理模式
public void BatchEventProcessor() throws ExecutionException, InterruptedException {
final RingBuffer<TradeOrder> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeOrder>() {
@Override
public TradeOrder newInstance() {
return new TradeOrder();
}
},BUFFER_SIZE,/**5.指定等待策略**/new YieldingWaitStrategy());
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
BatchEventProcessor<TradeOrder> batchEventProcessor = new BatchEventProcessor<TradeOrder>(ringBuffer,sequenceBarrier,new TradeDBHandler());
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
excutorService.submit(batchEventProcessor);
Future<?> future = excutorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
long seq;
Gson son = new Gson();
for(int i=0;i<10000;i++){
System.out.println("product:"+i);
seq = ringBuffer.next();
System.out.println("seq:"+seq);
ringBuffer.get(seq).setAmount(Math.random()*9_9);
System.out.println("tradeOrder:"+son.toJson(ringBuffer.get(seq)));
ringBuffer.publish(seq);
}
return null;
}
});
future.get();
Thread.sleep(10000);
batchEventProcessor.halt();
excutorService.shutdown();
}
//workerPool模式
public void workerPool() throws InterruptedException {
final RingBuffer<TradeOrder> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeOrder>() {
@Override
public TradeOrder newInstance() {
return new TradeOrder();
}
},BUFFER_SIZE,new YieldingWaitStrategy());
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
ExecutorService executorService = Executors.newFixedThreadPool(5);
WorkHandler<TradeOrder> workHandler = new TradeDBHandler();
WorkerPool<TradeOrder> workerPool = new WorkerPool<TradeOrder>(ringBuffer,sequenceBarrier, new IgnoreExceptionHandler(),workHandler);
workerPool.start(executorService);
long seq;
for(int i =0;i<8;i++){
seq = ringBuffer.next();
ringBuffer.get(seq).setAmount(Math.random()*99);
ringBuffer.publish(seq);
}
Thread.sleep(1000);
workerPool.halt();
executorService.shutdown();
}
/** * 6.通过disruptor处理器组装生产者和消费者 * 菱形结构的处理过程流 * * /--------->TradeOrderVarConsumer--\ * TradeOrderProductor--->RingBuffer--> ----->TradeOrderJMSSender * \--------->TradeDBHandler--------/ * * @throws InterruptedException */
public void processerFlow() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Disruptor<TradeOrder> disruptor = new Disruptor<TradeOrder>(new EventFactory<TradeOrder>(){
@Override
public TradeOrder newInstance() {
return new TradeOrder();
}
},BUFFER_SIZE,executorService, ProducerType.SINGLE,/**5.指定等待策略**/new BusySpinWaitStrategy());
EventHandlerGroup<TradeOrder> eventHandlerGroup = disruptor.handleEventsWith(new TradeOrderVarConsumer(),new TradeDBHandler());
eventHandlerGroup.then(new TradeOrderJMSSender());
disruptor.start();//启动disruptor
CountDownLatch cdl = new CountDownLatch(1);
executorService.submit(new TradeOrderProductor(cdl,disruptor));
cdl.await();//用于让任务完全消费掉
//8.关闭disruptor业务逻辑处理器
disruptor.shutdown();
executorService.shutdown();
}
public static void main(String args[]) throws ExecutionException, InterruptedException {
DisruptorTest disruptor = new DisruptorTest();
// disruptor.BatchEventProcessor();
// disruptor.workerPool();
disruptor.processerFlow();
}
}
5.Disruptor的各个版本的差异
disruptor主要版本差异在2.0,disruptor2.0以前是sequenceBarrier与RingBuffer是独立的,而在2.0以后,Barrier已经融合在RingBuffer中。相应的消费者和生产者的类名也有变化。
具体区别参考:Disruptor2.0的更新摘要
6.Disruptor的DLS编程
用于简化disruptor编程的工具。
主要结构使用如下:
Executor executor = Executors.newCachedThreadPool();
BatchHandler handler1 = new MyBatchHandler1();
BatchHandler handler2 = new MyBatchHandler2();
BatchHandler handler3 = new MyBatchHandler3();
DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY, RING_BUFFER_SIZE, executor);
dw.consumeWith(handler1, handler2).then(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();
DSL如何使用请参考:DSL的使用
关于更多的disruptor细节可以参看:并发编程网