Disruptor原理与编程实践

时间:2022-07-13 17:39:57

1.Disruptor的背景

  disruptor是LAMX架构的一种设计,而LAMX是一种新型的零售金融交易平台。disruptor主要用于大规模低延迟的高并发业务场景,其核心disruptor是一个基于事件源驱动机制的业务逻辑处理器,整个业务逻辑处理器完全运行在内存中,disruptor在无锁的网络情况下,实现了Queue的并发。

2.Disruptor的适用场景

  disruptor适用于大规模低延迟的并发场景。可用于读写操作分离、数据缓存,速度匹配(因为其实现了生产者-消费者模型)、或者是基于内存的事件流处理机制的场景。

3.Disruptor的设计思想与原理

  disruptor的主要设计思想是无锁的高并发,在设计上采用内存屏障的机制和CAS操作实现此思想。主流的并发程序
都离不开锁对资源的管控,或者尽量避开锁的使用。
  其主要的实现原理总结有如下三点,当然还有很多地方设计得很巧妙,需要细细阅读源码和官方文档。虽然这个 过程对我来说很尴尬,但痛并快乐者,有朝闻道、夕可死也的感觉。
  1.采用消费者-生产者模型进行读写的分离。
  2.用循环缓存(实际是一个循环队列)实现了数据的暂存和读写速度的匹配。
  3.用内存屏障加序列号的方式实现了无锁的并发机制。
  Disruptor原理与编程实践

4.Disruptor的编程实践

 -disruptor的主要编程部件
   1.Disruptor:用于控制整个消费者-生产者模型的处理器
   2.RingBuffer:用于存放数据
   3.EventHandler:一个用于处理事件的接口(可以当做生产者,也可以当做消费者)。
   4.EventFactory:事件工厂类。
   5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。
   6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。
   7.用于运行disruptor的线程或者线程池。

 -disruptor编程主要的编程流程
   1.定义事件
   2.定义事件工厂
   3.定义事件处理类
   4.定义事件处理的线程或者线程池
   5.指定等待策略
   6.通过disruptor处理器组装生产者和消费者
   7.发布事件
   8.关闭disruptor业务逻辑处理器
   具体处理流程参看:Disruptor体验与对比
 -disruptor实现无锁高并发,主要采用的消费者-生产者模型。所以编程的实践场景如下
   1.一个生产者—一个消费者的场景
   2.一个生产者—多个消费者的场景
   3.多个生产者—一个消费者的场景
   4.多个生产者—多个消费者的场景
   5.生产者-消费流(这个是我自己取的名字,因为disruptor可以实现菱形的事件处理模型,这种结构有点像storm里面的实时计算数据处理模型。但这个是面向线程的,storm是面向进程的,有点像,但不是一个层次上的东西,不过两者可以结合起来使用。)
   Disruptor原理与编程实践
   更为详细的代码参看:并发框架Disruptor的几个Demo
   具体的代码实现如下:

package com.lejr.moneybox.task.web;

import com.google.gson.Gson;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.UUID;
import java.util.concurrent.*;

/** * Created by lucien on 2016/12/9. */
public class DisruptorTest {
    //1.定义事件
    public class TradeOrder{
        private String orderId;
        private double amount;

        public String getOrderId() {
            return orderId;
        }

        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }

        public double getAmount() {
            return amount;
        }

        public void setAmount(double amount) {
            this.amount = amount;
        }
    }
    //2.定义事件工厂
    public  class TradeOrderEventFactory implements EventFactory<TradeOrder> {
        @Override
        public TradeOrder newInstance() {
            return new TradeOrder();
        }
    }
    //3.定义事件处理类---订单入库操作
    public class TradeDBHandler implements EventHandler<TradeOrder>,WorkHandler<TradeOrder>{

        @Override
        public void onEvent(TradeOrder tradeOrder, long l, boolean b) throws Exception {
              this.onEvent(tradeOrder);
        }

        @Override
        public void onEvent(TradeOrder tradeOrder) throws Exception {
            tradeOrder.setOrderId(UUID.randomUUID().toString());
            System.out.println("订单号为:"+tradeOrder.getOrderId()+"金额:"+tradeOrder.getAmount());
        }
    }
    //3.定义事件处理类---订单消费操作
    public class TradeOrderVarConsumer implements EventHandler<TradeOrder>{
        @Override
        public void onEvent(TradeOrder event, long sequence, boolean endOfBatch) throws Exception {
             System.out.println("进行订单消费:"+event.getOrderId());
        }
    }
    //3.定义事件处理类---订单金额转换
    public class TradeOrderTrasfer implements EventTranslator<TradeOrder>{
        @Override
        public void translateTo(TradeOrder event, long sequence) {
            event.setAmount(Math.random()*99);
            System.out.println("设置订单金额:"+event.getOrderId());
        }
    }
    //3.定义事件处理类---发送订单mq
    public class TradeOrderJMSSender implements EventHandler<TradeOrder>{
        @Override
        public void onEvent(TradeOrder event, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("发送订单JMS:"+event.getOrderId()+",金额:"+event.getAmount());
        }
    }
    //7.发布事件
    public class TradeOrderProductor implements Runnable{
        CountDownLatch cdl;
        private final int  count= 100000;
        Disruptor disruptor;
        public  TradeOrderProductor(CountDownLatch cdl,Disruptor disruptor){
            this.disruptor =disruptor;
            this.cdl = cdl;
        }
        @Override
        public void run() {
            TradeOrderTrasfer tof;
            try {
                for (int i = 0; i < count; i++) {
                    tof = new TradeOrderTrasfer();
                    disruptor.publishEvent(tof);
                }
            }finally {
                cdl.countDown();
            }
        }
    }
    //定义RingBuffer的大小
    private static final int BUFFER_SIZE = 1024;

    //4.定义事件处理的线程或者线程池
    ExecutorService excutorService = Executors.newFixedThreadPool(5);

    //批处理模式
    public void BatchEventProcessor() throws ExecutionException, InterruptedException {

        final RingBuffer<TradeOrder> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeOrder>() {
            @Override
            public TradeOrder newInstance() {
                return new TradeOrder();
            }
        },BUFFER_SIZE,/**5.指定等待策略**/new YieldingWaitStrategy());

        SequenceBarrier sequenceBarrier =  ringBuffer.newBarrier();
        BatchEventProcessor<TradeOrder> batchEventProcessor = new BatchEventProcessor<TradeOrder>(ringBuffer,sequenceBarrier,new TradeDBHandler());
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
        excutorService.submit(batchEventProcessor);

        Future<?> future = excutorService.submit(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
               long seq;
                Gson son = new Gson();
                for(int i=0;i<10000;i++){
                    System.out.println("product:"+i);
                    seq = ringBuffer.next();
                    System.out.println("seq:"+seq);
                    ringBuffer.get(seq).setAmount(Math.random()*9_9);
                    System.out.println("tradeOrder:"+son.toJson(ringBuffer.get(seq)));
                    ringBuffer.publish(seq);
                }
                return null;
            }
        });
        future.get();
        Thread.sleep(10000);
        batchEventProcessor.halt();
        excutorService.shutdown();
    }

    //workerPool模式
    public void workerPool() throws InterruptedException {

        final RingBuffer<TradeOrder> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeOrder>() {
            @Override
            public TradeOrder newInstance() {
                return new TradeOrder();
            }
        },BUFFER_SIZE,new YieldingWaitStrategy());
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        WorkHandler<TradeOrder> workHandler = new TradeDBHandler();
        WorkerPool<TradeOrder> workerPool = new WorkerPool<TradeOrder>(ringBuffer,sequenceBarrier, new IgnoreExceptionHandler(),workHandler);
        workerPool.start(executorService);
        long seq;
        for(int i =0;i<8;i++){
            seq = ringBuffer.next();
            ringBuffer.get(seq).setAmount(Math.random()*99);
            ringBuffer.publish(seq);
        }
        Thread.sleep(1000);
        workerPool.halt();
        executorService.shutdown();
    }

    /** * 6.通过disruptor处理器组装生产者和消费者 * 菱形结构的处理过程流 * * /--------->TradeOrderVarConsumer--\ * TradeOrderProductor--->RingBuffer--> ----->TradeOrderJMSSender * \--------->TradeDBHandler--------/ * * @throws InterruptedException */
    public void processerFlow() throws InterruptedException {
         ExecutorService executorService = Executors.newFixedThreadPool(5);
         Disruptor<TradeOrder> disruptor = new Disruptor<TradeOrder>(new EventFactory<TradeOrder>(){
             @Override
             public TradeOrder newInstance() {
                 return new TradeOrder();
             }
         },BUFFER_SIZE,executorService, ProducerType.SINGLE,/**5.指定等待策略**/new BusySpinWaitStrategy());

        EventHandlerGroup<TradeOrder> eventHandlerGroup = disruptor.handleEventsWith(new TradeOrderVarConsumer(),new TradeDBHandler());
        eventHandlerGroup.then(new TradeOrderJMSSender());
        disruptor.start();//启动disruptor
        CountDownLatch cdl = new CountDownLatch(1);
        executorService.submit(new TradeOrderProductor(cdl,disruptor));
        cdl.await();//用于让任务完全消费掉
        //8.关闭disruptor业务逻辑处理器
        disruptor.shutdown();
        executorService.shutdown();
    }

    public static void main(String args[]) throws ExecutionException, InterruptedException {
        DisruptorTest disruptor = new DisruptorTest();
// disruptor.BatchEventProcessor();
// disruptor.workerPool();
        disruptor.processerFlow();
    }


}

5.Disruptor的各个版本的差异

  disruptor主要版本差异在2.0,disruptor2.0以前是sequenceBarrier与RingBuffer是独立的,而在2.0以后,Barrier已经融合在RingBuffer中。相应的消费者和生产者的类名也有变化。
具体区别参考:Disruptor2.0的更新摘要

6.Disruptor的DLS编程

  用于简化disruptor编程的工具。
  主要结构使用如下:

Executor executor = Executors.newCachedThreadPool();
BatchHandler handler1 = new MyBatchHandler1();
BatchHandler handler2 = new MyBatchHandler2();
BatchHandler handler3 = new MyBatchHandler3();
DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY, RING_BUFFER_SIZE, executor);
dw.consumeWith(handler1, handler2).then(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();

  DSL如何使用请参考DSL的使用

  关于更多的disruptor细节可以参看并发编程网