【disruptor】2、disruptor中生产者线程与消费者之间的协调

时间:2022-06-18 20:21:10

 

 

 

由于ringbuffer是一个环形的队列,那么生产者和消费者在遍历这个队列的时候,如何制衡呢?

 

1、生产快,消费慢,数据丢失?

生产者速度过快,导致一个对象还没消费完,就循环生产了一个新的对象要加入ringbuffer,导致消费不完整,造成数据丢失?

 我们注意到,在我们获取生产者下一个位置的时候,是通过ringbuffer的next方法,而这个next方式是调用了sequencer的next方法

 

【disruptor】2、disruptor中生产者线程与消费者之间的协调

 

 

【disruptor】2、disruptor中生产者线程与消费者之间的协调

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

这个对象,在我们创建disruptor对象的时候,创建的

 【disruptor】2、disruptor中生产者线程与消费者之间的协调

 

 

【disruptor】2、disruptor中生产者线程与消费者之间的协调

 

 【disruptor】2、disruptor中生产者线程与消费者之间的协调

 

所以这个ringbuffer就是disruptor中的sequencer对象,那么在进行获取next的时候,这里是如何获取下一个的呢?是否会对这个生产获取下一个序列进行相应的等待策略,避免产生相应的干扰!!!

 

这个各位看官还需多看看里面的代码以及封装(特别是封装,真是九转十八弯),多熟悉,我这绕着绕着很容易就绕晕了,刚开始也是云里雾里。

 

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

回归主线,继续看看next方法

 

long nextSequence = nextValue + n;      这个标识这次要生产的数据到什么位子
long wrapPoint = nextSequence - bufferSize;   是否超出范围,第一次是没有超出最开始的ringbuffer大小,第二次就是判断是否要对没有被消费的部分进行覆盖了
long cachedGatingSequence = this.cachedValue;   这个是消费者消费的情况,判断从哪里开始到当前位置是没有被消费的

 

 

【disruptor】2、disruptor中生产者线程与消费者之间的协调

 

 那么这个判断条件是什么意思呢?

 wrapPoint > cachedGatingSequence   这个条件是判断,当前超出的部分是否会覆盖到还未被消费的部分数据

 

cachedGatingSequence > nextValue   这个判断是,未被消费的位置开始,是否在下一个生产者位置的前面,

  如果还未被消费的标识比下一个要被生产的位置还小,那说明生产在消费的前面,消费者可以继续消费

  如果未被消费的标识比下一个要生产的位置还大,消费盘跑到生产前面了,会造成重复消费

 

 由于是环形的队列,当消费者和生产者同时启动的时候,而cachedValue  ,nextValue  是累加计数的,那么我们要保证生产者不能超过消费者一个ringbuffer的大小,不然会产生消费数据丢失

所以,nextValue  + 当前要生产的数据  - ringbuffersize 要保证比未被消费的地方小,不然超出到消费者前面一圈,那么部分还没有被消费就又被从新设置了

 

 

2、生产慢,消费快,数据重复消费?

生产者速度过慢,导致消费者消费一圈了,生产者还没有生产新的数据出来,会不会导致重复消费?

 

这里的关键就是两个变量

cachedValue 和 nextValue

cachedValue  进行累加统计被消费的个数,可以记录消费到哪里了的位置

nextValue  进行记录累加统计被生产的位置,可以记录生产到那个位置了

 

通过这两个变量保证消费的进度永远保持在生产的进度后面,也就是 cachedValue  < nextValue  的时候才可以继续生产和消费,违背这个规则就要对一方进行阻塞,或者自旋

 

 

 

总结:

 

【disruptor】2、disruptor中生产者线程与消费者之间的协调