高性能内存队列Disruptor

时间:2023-02-03 18:04:18

1 背景

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。


内存队列 使用场景一般在系统内部,提高在高并发的情况下系统的性能,一般作用于线程间的消息传递

分布式消息队列 使用场景一般在系统和系统间的消息传递,吞吐量高,也适用于消息流数据处理的中间件

2 JAVA内存队列

介绍Disruptor之前,先介绍一下常用线程安全的内置队列。Java的内置队列下表所示:

队列

有界性

数据结构

ArrayBlockingQueue

bounded

加锁

arraylist

LinkedBlockingQueue

optionally-bounded

加锁

linkedlist

ConcurrentLinkedQueue

unbounded

无锁

linkedlist

LinkedTransferQueue

unbounded

无锁

linkedlist

PriorityBlockingQueue

unbounded

加锁

heap

DelayQueue

unbounded

加锁

heap

队列的底层一般分成三种:数组、链表和堆

堆一般情况下是为了实现带有优先级特性的队列,暂不考虑

  • 基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全
  • 基于链表的线程安全队列分成
  • LinkedBlockingQueue 通过锁的方式来实现线程安全
  • ConcurrentLinkedQueue 上面表格中的LinkedTransferQueue都是通过原子变量compare and swap这种不加锁的方式来实现

通过不加锁的方式实现的队列都是*的(无法保证队列的长度在确定的范围内) 而加锁的方式,可以实现有界队列,在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue

在实际使用过程中,ArrayBlockingQueue会因为加锁和伪共享等出现严重的性能问题

3 Disruptor原理

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

Ring Buffer 如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

高性能内存队列Disruptor

Sequence  Disruptor 通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。 (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。

Sequencer  Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

Sequence Barrier 用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

高性能内存队列Disruptor

Wait Strategy 定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

Event 在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

EventProcessor EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

EventHandler Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

Producer 即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

高性能内存队列Disruptor

4 代码样例

代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。

以下代码基于3.3.4版本的Disruptor包

/**
* @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;

public class DisruptorMain
{
public static void main(String[] args) throws Exception
{
// 队列中的元素
class Element {

private int value;

public int get(){
return value;
}

public void set(int value){
this.value= value;
}

}

// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};

// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};

// 处理Event的handler
EventHandler<Element> handler = new EventHandler<Element>(){
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch)
{
System.out.println("Element: " + element.get());
}
};

// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();

// 指定RingBuffer的大小
int bufferSize = 16;

// 创建disruptor,采用单生产者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

// 设置EventHandler
disruptor.handleEventsWith(handler);

// 启动disruptor的线程
disruptor.start();

RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

for (int l = 0; true; l++)
{
// 获取下一个可用位置的下标
long sequence = ringBuffer.next();
try
{
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
}
finally
{
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}

5 应用场景

5.1 Log4j2异步日志打印

log4j2支持日志的异步打印,日志异步输出的好处在于,使用单独的进程来执行日志打印的功能,可以提高日志执行效率,减少日志功能对正常业务的影响。


异步日志在程序的classpath需要加载disruptor-3.0.0.jar或者更高的版本。


5.2 海量job处理

现在有8个库1024张表,大量的job需要处理,每时每刻任务都在海量增加


启动8台机器,每台机器扫描一个库的待执行job,共128个表需要扫描,这里可以启动128个线程去并发扫描,每查出来一次,立马通过disruptor发布出去,另外一端监听到发布的任务之后调用任务处理接口进行处理,就算有任务执行异常,也不会阻塞其它的任务,可以边发布边处理,最大程度提升任务处理能力。


一直积压的任务有旁路报警机制,每次执行失败的job执行次数+1,当大于指定阈值则报警。


一旦无法放入disruptor就会报警,表明队列已满,处理不过来了,得扩容下游处理任务的机器


disruptor的消费末端通过线程池严格控制消费能力,不会出现任务生产过快消费不过来的情况


如果有多种不同类型的任务要处理,可以初始化多个不同size的ringbuffer去处理,定义不同的evenHandler


局限性应该是它是个内存队列,处理不了分布式场景的