WaitStrategy
在Disruptor中,有很多需要等待的情况。例如:使用了SequenceBarrier的消费者需要在某种条件下等待,比如A消费者和B消费者,假设A消费者必须消费B消费者消费完的。
这些等待,还有唤醒等待的方法,由如下的WaitStrategy实现:
我们先来看接口类:
public interface WaitStrategy {
/**
* @param sequence 需要等待available的sequence
* @param cursor 对应RingBuffer的Cursor
* @param dependentSequence 需要等待(依赖)的Sequence
* @param barrier 多消费者注册的SequenceBarrier
* @return 已经available的sequence
* @throws AlertException
* @throws InterruptedException
* @throws TimeoutException
*/
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
/**
* 唤醒所有等待的消费者
*/
void signalAllWhenBlocking();
}
我们在生产上主要用到三个实现:
BlockingWaitStrategy:
public final class BlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
//检查是否Alert,如果Alert,则抛出AlertException
//Alert在这里代表对应的消费者被halt停止了
barrier.checkAlert();
//在processorNotifyCondition上等待唤醒
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
//生产者生产消息后,会唤醒所有等待的消费者线程
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
}
所有的WaitStrategy都需要考虑两个情况:
1. 线程或者消费者被停止,或者被中断。由于消费者实现了可以被停止,所以,WaitStrategy也需要能检测到停止信号
2. 等待与唤醒:在不能消费时,等待,在有新消息时,唤醒检查是否满足可以消费的条件。
BlockingWaitStrategy是一种利用锁和等待机制的WaitStrategy,CPU消耗少,但是延迟比较高。
原理是,所有消费者首先查是否Alert,如果是,则抛AlertException从等待中返回。之后检查sequence是否已经被发布,就是当前cursorSequence是否大于想消费的sequence。若不满足,利用processorNotifyCondition.await()等待。
按照之前生产者解析所述,由于生产者在生产Event之后会调用signalAllWhenBlocking()唤醒等待,让所有消费者重新检查。所以,之前的waitFor方法中lock await中只检查cursorSequence.get() < sequence而不是dependentSequence.get() < sequence.
同时,我们之前提到,WaitStrategy也需要能检测到停止信号。而await()并不响应线程interrupt。所以,终止消费者时,也需要调用signalAllWhenBlocking来唤醒await。
SleepingWaitStrategy:
SleepingWaitStrategy是另一种较为平衡CPU消耗与延迟的WaitStrategy,在不同次数的重试后,采用不同的策略选择继续尝试或者让出CPU或者sleep。这种策略延迟不均匀。
public class SleepingWaitStrategy implements WaitStrategy {
//重试200次
private static final int DEFAULT_RETRIES = 200;
private final int retries;
public SleepingWaitStrategy() {
this(DEFAULT_RETRIES);
}
public SleepingWaitStrategy(int retries) {
this.retries = retries;
}
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
int counter = retries;
//直接检查dependentSequence.get() < sequence
while ((availableSequence = dependentSequence.get()) < sequence) {
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking() {
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException {
//检查是否需要终止
barrier.checkAlert();
//如果在200~100,重试
if (counter > 100) {
--counter;
}
//如果在100~0,调用Thread.yield()让出CPU
else if (counter > 0) {
--counter;
Thread.yield();
}
//<0的话,利用LockSupport.parkNanos(1L)来sleep最小时间
else {
LockSupport.parkNanos(1L);
}
return counter;
}
}
BusySpinWaitStrategy:
BusySpinWaitStrategy是一种延迟最低,最耗CPU的策略。通常用于消费线程数小于CPU数的场景。
public class BusySpinWaitStrategy implements WaitStrategy {
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
//一直while自旋检查
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking() {
}
}