[disruptor详解]01-disruptor原理

时间:2023-01-15 16:54:36


disruptor原理

  • 使用循环队列,且要求队列大小为2的N次方,以满足位运算快速计算索引的要求(比取模速度快)
  • 使用原子变量记录生产者和消费者的个数,并且使用​​cache line​​进行隔离,避免多线程情况下由于两个变量处于同一​​cache line​​的伪共享问题
  • 无锁设计。通过原子变量,每个生产者和消费者都需要先申请数组中可以操作的元素索引,申请到后才能进行读取或者写入
  • 引入状态记录数组来记录数组元素的状态,判断该索引位置是否可以进行读取或者写入
  • 使用互斥锁和条件变量,通过设置支持进程间通信
  • 使用共享内存支持进程间的数据交互,实现​​zero copy​


disruptor 环形队列

所谓的环形队列,实际上就是一个申请在堆上的数组,假设其大小为​​cap​​。对该数组的访问将由​​2​​个索引进行——​​cursor​​(读取者目前所在索引位置)、​​next​​(写入者目前所在索引位置),前者表示的是读取者索引,后者表示写者索引。不管是​​cursor​​还是​​next​​,我们在获取索引时都需要让他们和​​cap​​进行取模操作,​​disruptor​​为了加快操作速度,强制要求​​cap​​为​​2​​的​​N​​次方,然后使用以下算法来代替取模操作,以下为相关算法:

// 将index和cap进行取模操作,前提是cap必须是2的N次方
int64_t translated_index = (index & (cap - 1));

// 快速判断cap是否为2的N次方
bool is_power2 = cap && !( (cap-1) & cap) ;


在环形队列的应用上有​​2​​种循环方式:

  1. 当索引小于​​cap​​的时候累加,当索引等于​​cap​​的时候将索引置零。这种情况可以直接使用索引对队列进行访问。
  2. 索引一直累加,当需要访问队列的时候,需要将索引对​​cap​​取模

​disruptor​​​选择的是第二种,该方法的好处是写索引永远是大于等于读索引的。这样生产者写入数据时只需要判断写入索引减去读取索引是否大于等于​​cap​​​,如果是则生产者需要等待消费者。​​disruptor​​中获取写索引的实现就使用了这一特性。


disruptor 消费者辅助数组

​disruptor​​引入了一个大小为​​MAX_CONSUMER​​的消费者辅助数组存放消费者索引,用满足多消费者使用场景。假设该辅助数组名为​​array_of_consumer_indexes​​,其使用逻辑如下:

  • ​array_of_consumer_indexes​​所有元素初始化为​​-1​​,此时表示该​​id​​没有消费者注册
  • 通过​​id​​注册消费者,当该​​id​​已注册时,直接返回​​array_of_consumer_indexes[id]+1​​;当该​​id​​未注册时,返回当前最大的消费者读取索引(实际就是上面提到的​​curor​​)
  • 当生产者写入数据时,先获取​​array_of_consumer_indexes​​中最小的消费者索引,然后判断要写入的索引位置与最小消费者索引差值是否超过了数组个数,如果超过了则需要进行等待,否则可以直接写入
  • 当身份为​​id​​​的消费者消费的索引到达了​​index​​​时,需要将​​array_of_consumer_indexes[id]​​​更新为​​index​


等待策略

​disruptor​​目前有​​3​​种等待策略,分别如下:

  • YieldingWaitStrategy:无锁,累加100次,每次都进行资源是否可用的判断,如果可用则返回所需资源;如果不可用则一直累加到100次之后,让出线程调度时间片,当下次系统进行调度的时候再判断资源是否可用,如果不可用则继续​​yield​​。该策略的逻辑就是​​spin​​—>​​yield​​.
  • SleepingWaitStrategy:无锁,该策略先快速累加​​100​​次,此过程和​​YieldingWaitStrategy​​的累加一样;再进行第二次​​100​​次累加,在此过程中如果条件不满足则会进行​​yield​​操作;之后则每次循环都休眠​​1ns​​。该策略逻辑为​​spin​​—>​​yield​​—>​​sleep​
  • BlockingWaitStrategy:该策略为加锁阻塞策略,没有什么好介绍的。唯一要提一下的是,它获取时间使用的是​​gettimeofday​​​,该函数获取的时间会跟随系统时间改变,因此使用​​clock_gettime​​​更加合适,因为​​clock_gettime​​不仅精度更高,而且可以通过指定参数获取系统启动后运行的时间,且该时间不能被更改!!


共享内存管理

​disruptor​​的共享内存使用的是下列一系列函数:

  • ​shmget​​:创建或者获取共享内存​​id​
  • ​shmat​​:将共享内存​​id​​所代表的内存地址映射到当前进程
  • ​shmdt​​:取消共享内存​​id​​所代表的内存地址在当前进程的映射
  • ​shmctl​​:删除共享内存

以上函数没有太多要介绍的,具体使用可以参考​​[IPC基础]01-Linux共享内存API简介​​​和​​[IPC基础]02-共享内存使用示例​​。


在共享内存上进行数据收发

共享内存上的环形队列管理器

​disruptor​​在创建共享内存时,会将一些管理信息以及用于同步用的互斥锁和条件变量也创建在共享内存上。关于该部分的内容,将在代码解读部分进行说明。

共享内存上的环形队列

​disruptor​​通过​​SharedMemRingBuffer​​创建锁、条件变量以及所需要的数据空间,以达到在进程间进行数据同步以及高效的进行数据交互的目的。​​SharedMemRingBuffer​​所管理的数据类型固定为​​OneBufferData​​,我们可以通过修改轻松的达到管理任何类型数据的目的,可以参考​​[IPC基础]03-通过共享内存和互斥锁、条件变量实现进程同步​​。

首先​​SharedMemRingBuffer​​创建一个共享内存,其大小为​​sizeof(_RingBufferStatusOnSharedMem_) + sizeof(OneBufferData)*size​​,然后将共享内存​​attach​​到所需要的进程,通过内存偏移量计算出用户数据的其实地址,就可以在该空间上进行数据的读写了。有一点要稍微介绍一下的,​​disruptor​​将申请的​​N​​个元素通过内存地址映射到了一个包含​​N​​个元素指针的环形队列上,通过该操作,可以方便的对共享内存进行访问管理。