SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析

时间:2024-04-10 20:15:42

  

编写不易,转载请注明(http://shihlei.iteye.com/blog/2429846)!

一 概述

书接前篇,《Hystrix:断路器》 对断路器和Hystrix做了简单的介绍,旨在帮助读者做个简单入门。本文简单分下Hystrix的源码实现,帮助读者更好的了解Hystrix。

 

分析版本:

 

<dependency>  
    <groupId>com.netflix.hystrix</groupId>  
    <artifactId>hystrix-core</artifactId>  
    <version>1.5.12</version>  
</dependency>
 

 

如之前所述:Hystrix 底层基于 RxJava,RxJava 是响应式编程开发库,因此Hystrix的整个实现策略简单说即:把一个HystrixCommand封装成一个Observable(待观察者),针对自身要实现的核心功能,对Observable进行各种装饰,并在订阅各步装饰的Observable,以便在指定事件到达时,添加自己的业务。

 

阅读之前,建议对RxJava做个了解,Hystrix 实现基于RxJava,大量使用RxJava的相关操作,推荐之前的文章《响应式编程 RxJava》《RxJava2.x 操作Demo 》

 

二 Hystrix的执行流程

 

 SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析

 

流程详细说明参见《Hystrix:断路器》处理流程分析部分;

 

根据执行流程,将实现分为如下几块:

(1)主流程HystrixCommand 包装成 Observable 执行:AbstractCommand

(2)异步执行:HystrixContextScheduler

(3)超时中断:HystrixObservableTimeoutOperator

(4)断路器:HystrixCircuitBreaker

(5)滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()

(6)事件处理流:HystrixEventStream

(7)getFallback() 执行

 

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()

 

1)AbstractCommand<R> toObservable() 

流程中的(1)(2)步HystrixCommand的所有执行方法 execute( ) , queue() , observe() , toObservable() 最终都归结于 AbstractCommand<R> toObservable() 方法调用,直接看该方法。

 

public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;

        // 。。。。。。

        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };

	// 。。。。。。
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {

                // 。。。。。。

                // 返回一个冷Observable,有订阅才会执行,并且数据是最新的
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 把observable 放进缓存 。。。。。。
                if (requestCacheEnabled && cacheKey != null) {
                    // wrap it for caching
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // another thread beat us so we'll use the cached value instead
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // we just created an ObservableCommand so we cast and return it
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
    }
 

 

    分析(删除了无用的代码):

(1)重点方法 applyHystrixSemantics() ,包装HystrixCommand; 最终的返回值,通过 Observable 的 defer 操作做延迟绑定,保证观察者获取HystrixCommand执行的最新数据,包装后的Observable:hystrixObservable

(2)hystrixObservable 再进行缓存包装 ,也是通过defer操作完成,提供了requestCache能力,因为不是本篇的重点,不做分析。

 

RxJava defer操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 1) defer操作符 

 

 

    2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();

 

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
       	  	
    	//。。。。。。

        if (circuitBreaker.attemptExecution()) { //判断是否可执行
	//。。。。。。        	
            if (executionSemaphore.tryAcquire()) { // 判断是否具有信号量资源
                try {
                    
                    //设置开始时间,用于监控执行超时。
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

                    // 继续包装HystrixCommand Observable,注册观察者,执行
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else { //信号量资源不足,拒绝执行,执行getFallBack()方法
                return handleSemaphoreRejectionViaFallback();
            }
        } else { // 不可执行,快速失效,执行getFallBack()方法
            return handleShortCircuitViaFallback();
        }
}
 

    分析(删除了无用的代码):

(1)circuitBreaker.attemptExecution() 断路器状态判断,是否可以执行;可执行则继续包装;不可执行,直接调用getFallBack();关于 HystrixCircuitBreaker 后面详细介绍

(2)executionSemaphore.tryAcquire() 判断是否有信号量资源,没有则直接调用getFallBack();

(3)如果条件都满足,则 调用 executeCommandAndObserve(_cmd)  继续包装 HystrixCommand执行。

 

    3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要,根据用户指定的隔离级别(线程,信号量),包装相应的执行环境,执行Command

 

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

       	// 。。。。。。

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) { // 如果开启超时功能,则包装超时中断能力
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

        } else { //否则不需要包装中断能力
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
}

 

分析(删除了无用的代码):

(1)本方法包括两部分

(a)注册观察者,监听Command执行事件,用于更新统计信息,先不详细介绍,忽略了

(b)包装超时熔断能力:HystrixObservableTimeoutOperator

 

(2)超时熔断能力通过 Observable 的 left 操作 在原有的 HystrixObservable 上进行包装实现的,核心 HystrixObservableTimeoutOperator

execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));

 

RxJava left 操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 2)lift 操作符 

 

(3)最终调用  executeCommandWithSpecifiedIsolation(_cmd) 根据隔离级别选择是“线程方式”隔离执行还是“信号量方式”隔离执行;

 

 4)  executeCommandWithSpecifiedIsolation(_cmd)  具体的执行方法:

 

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { //Hystrix 配置的隔离策略是线程 
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
          
            return Observable.defer(new Func0<Observable<R>>() {

            // 。。。。。。

            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else { // Hystrx配置的隔离策略是信号量
            // 。。。。。。
            return Observable.defer(new Func0<Observable<R>>() {
                // 。。。。。。
            });
        }
}

 

分析(删除了无用的代码):

(1)这里就两件事:

(a)如果隔离策略是线程,则“线程策略”执行

(b)如果隔离策略是信号量,则“信号量策略”执行

 

(2)咋启动的线程呢,RxJava本身就提供异步执行的能力,通过 Observable 的 subscribeOn 绑定执行方式,底层具体实现是 线程调度:Scheduler。

 

马上进入下一章节,详细讲解一下

 

四 异步执行:HystrixContextScheduler

 

1)概述

 

Hystrix 要完成超时中断,需要异步执行,调用线程才能隔离影响,该功能借助RxJava的 Scheduler 机制实现;

 

关于 RxJava Scheduler 可以提前阅读下:《RxJava2.x 操作Demo》:》 三 线程调度:Scheduler  做个了解;

 

Hystrix实现异步的类结构如下:


SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析
  

主要职能:

HystrixThreadPoolDefault:依赖BlockingQueue<Runnable> queue 和 ThreadPoolExecutor threadPool; 统一线程池和任务队列

 

(a)Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();

HystrixContextScheduler: 这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作

ThreadPoolScheduler:线程池调度器,负责产生ThreadPoolWorker 向线程池提交任务

 

(b)Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)

HystrixContextSchedulerWorker:代理ThreadPoolWorker

ThreadPoolWorker:负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力

 

各个实现组件做个比较

 

(1)执行代码

RxJava :observable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())

 

Hystrix: 

.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
   @Override
        public Boolean call() {
            return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
        }
}));

 

(2)核心组件

(a)RxJava: Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();

Hystrix:HystrixContextScheduler --》 依赖:ThreadPoolScheduler  返回的worker 即 ThreadPoolWorker

 

(b)RxJava: Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)

Hystrix:HystrixContextSchedulerWorker --》 依赖:ThreadPoolScheduler --》 ThreadPoolWorker

 

2)具体实现 

(1)Hystrix指定Scheduler:

通过executeCommandWithSpecifiedIsolation(_cmd)  里的subscribeOn()方法完成;

 
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
	        @Override
	        public Boolean call() {
	            return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
	        }
    }));
 

分析:这里使用 HystrixThreadPool 获得 Scheduler(调度器) ; 默认实现 HystrixThreadPoolDefault:维护线程池和任务队列。

 

(2)HystrixThreadPoolDefault:HystrixThreadPool 内部类
static class HystrixThreadPoolDefault implements HystrixThreadPool {

  	// 。。。。。。

 	// Hystrix线程池配置,demo中有指定
        private final HystrixThreadPoolProperties properties;

        //任务队列
        private final BlockingQueue<Runnable> queue;

        //线程池
        private final ThreadPoolExecutor threadPool;

        //线程监控
        private final HystrixThreadPoolMetrics metrics;

        // 队列大小
        private final int queueSize;

        public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();

            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            this.queue = this.threadPool.getQueue();

            // 。。。。。。
        }

       	// 。。。。。。

        @Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            touchConfig();
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }

        // 。。。。。。
}

 

分析(删除了无用的代码):HystrixThreadPoolDefault 主要维护一个线程池和任务队列,用于异步操作,最后会传给 HystrixContextScheduler 调度器使用。

 

(3)HystrixContextScheduler :上下文调度器

 

public class HystrixContextScheduler extends Scheduler {

    private final HystrixConcurrencyStrategy concurrencyStrategy;
    private final Scheduler actualScheduler;
    private final HystrixThreadPool threadPool;

    // 。。。。。。

    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }

    @Override
    public Worker createWorker() {
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }

   	// 。。。。。。
}

 

分析(删除了无用的代码):这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作。

 

(4)HystrixContextSchedulerWorker :

 

上下文Worker,只代理,具体的调度工作由传入的actualWorker 完成(即 ThreadPoolWorker)

 

    private class HystrixContextSchedulerWorker extends Worker {

        private final Worker worker;

        private HystrixContextSchedulerWorker(Worker actualWorker) {
            this.worker = actualWorker;
        }

        //。。。。。。。

        @Override
        public Subscription schedule(Action0 action) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }

}

 

分析(删除了无用的代码):HystrixContextSchedulerWorker,这个调度器除了封装线程池和同步策略外,只一个HystrixContextSchedulerWorker 外,没有特殊的操作。

    

(5) ThreadPoolScheduler:

真正的线程池调度Scheduler,创建一个 ThreadPoolWorker

 

private static class ThreadPoolScheduler extends Scheduler {

        private final HystrixThreadPool threadPool;
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

        @Override
        public Worker createWorker() {
            return new ThreadPoolWorker(threadPool, shouldInterruptThread);
        }

 }

 

(6) ThreadPoolWorker:

负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力;

    

private static class ThreadPoolWorker extends Worker {

        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

      	// 。。。。。。

        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            ScheduledAction sa = new ScheduledAction(action);

            subscription.add(sa);
            sa.addParent(subscription);

            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }

        // 。。。。。。
}

 

 分析(删除了无用的代码):schedule() 方法向线程池提交任务,关键返回一个Subscription,用于调用端使用,或中断当前任务执行;

 

五 超时中断:HystrixObservableTimeoutOperator

 

1)概述:

 

Hystrix实现异步的类结构如下:


SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析
 

 

其中:

HystrixObservableTimeoutOperator:包装超时中断的主流程,借助下面的组件实现超时中断逻辑

(1)HystrixTimer:依赖ScheduledExecutor,最终使用JDK的ScheduledThreadPoolExecutor 实现指定时间后的回调

(2)TimerListener:实现超时的回调逻辑

(3)CompositeSubscription:RxJava 1.x 提供,通过unsubscribe()方法可以中断当前任务的执行。

 

2)看一下细节:

(1)回到之前讲解的主流程:

 

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中

》 3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行

 

 

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
   	// 。。。。。。

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
        		// 添加超时中断逻辑
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

    } else {
    	// 。。。。。。
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
 

 

分析(删除了无用的代码):超时中断主要通过:lift 操作,使用new HystrixObservableTimeoutOperator<R>(_cmd) 包装 HystrixObservable实现的。

 

(2)HystrixObservableTimeoutOperator 实现

 

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

    final AbstractCommand<R> originalCommand;

    public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
        this.originalCommand = originalCommand;
    }

    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {

    	// 创建一个组合订阅,用于添加一组Disposable,快速解除订阅,这里用于超时,中断任务
        final CompositeSubscription s = new CompositeSubscription();

        child.add(s);

        //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
        final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

        //创建一个超时回调类
        TimerListener listener = new TimerListener() {

            @Override
            public void tick() {
                //设置TimedOutStatus
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    // 报告超时
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                    // 通过 CompositeSubscription 的 unsubscribe 中断正在异步执行的HystrixCommand
                    s.unsubscribe();

                    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                        @Override
                        public void run() {
                            child.onError(new HystrixTimeoutException());
                        }
                    });

                    //启动timeoutRunnable,就是执行上面的 child.onError(new HystrixTimeoutException()); 发送error时间,通知观察者
                    timeoutRunnable.run();
                }
            }

            //获取超时时间
            @Override
            public int getIntervalTimeInMilliseconds() {
                return originalCommand.properties.executionTimeoutInMilliseconds().get();
            }
        };

        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

       	// 设置当前时间,用于比对超时
        originalCommand.timeoutTimer.set(tl);

       Subscriber<R> parent = new Subscriber<R>() {

            @Override
            public void onCompleted() {
                if (isNotTimedOut()) {
                    // stop timer and pass notification through
                    tl.clear();
                    child.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (isNotTimedOut()) {
                    // stop timer and pass notification through
                    tl.clear();
                    child.onError(e);
                }
            }

            @Override
            public void onNext(R v) {
                if (isNotTimedOut()) {
                    child.onNext(v);
                }
            }

            private boolean isNotTimedOut() {
                // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                        originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
            }

        };

        // if s is unsubscribed we want to unsubscribe the parent
        s.add(parent);
}
 

分析(删除了无用的代码):

(1)创建 TimerListener 实例,添加监听到超时的回调逻辑,指定超时时间(用户通过executionTimeoutInMilliseconds 配置的超时时间)

主要逻辑:

(a)设置HystrixCommand状态为超时

(b)创建 CompositeSubscription 绑定之前异步的Subscription 使 调用端可以根据超时中断 当前任务

(c)若超时,异步发送超时事件,通知HystrixCommand执行的观察者

 

(2)CompositeSubscription 绑定父子Subscription ,用于中断任务

 

RxJava CompositeSubscription使用,可参见 《RxJava2.x 操作Demo》:》 四 解除订阅,中断任务:Disposable 》 (2) CompositeDisposable 尽管 1.x 和2.x 有差别

 

(3) HystrixTimer 和 HystrixTimerListener:比较简单,就是ScheduledThreadPoolExecutor 定时触发

 

public static interface TimerListener {
  
    //指定时间周期触发回调
    public void tick();

    //时间周期
    public int getIntervalTimeInMilliseconds();
}

public class HystrixTimer {

	private static final Logger logger = LoggerFactory.getLogger(HystrixTimer.class);

	private static HystrixTimer INSTANCE = new HystrixTimer();

	// 。。。。。。

	public Reference<TimerListener> addTimerListener(final TimerListener listener) {
	    startThreadIfNeeded();
	    // add the listener

	    Runnable r = new Runnable() {

	        @Override
	        public void run() {
	            try {
	                listener.tick();
	            } catch (Exception e) {
	                logger.error("Failed while ticking TimerListener", e);
	            }
	        }
	    };

	    // 使用
	    ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
	    return new TimerReference(listener, f);
	}


	static class ScheduledExecutor {
	    volatile ScheduledThreadPoolExecutor executor;
	    
	    // 。。。。。。

	    public void initialize() {

	    	// 。。。。。。
	        executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
	        initialized = true;
	    }

	   // 。。。。。。
	}
}

 

六 断路器:HystrixCircuitBreaker

 

1)概述

 

HystrixCircuitBreaker 提供了状态维护功能,其使用体现在如下部分:

(1)HystrixCommand 执行前 通过 HystrixCircuitBreaker 根据状态判断是否允许执行:

open:禁止执行;

close 可以执行;

half_close:进入open状态已超过等待重试时间,则可以执行,否则禁止执行。

 

(2)订阅HystrixCommandMetrics 滑动窗口的数据统计更新事件,根据统计数据(错误率),决定是否进入open状态。

 

HystrixCircuitBreaker的类结构如下:

 
SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析

 

 2)看看细节:

(1)HystrixCircuitBreaker接口:定义了主要服务

 

 

public interface HystrixCircuitBreaker {

    /**
     * 是否允许HystrixCommand请求执行,如果断路器是half-open(半开)状态,则根据逻辑允许部分请求执行和改变状态。
     */
    boolean allowRequest();

    /**
     * 断路器当前是否打开状态
     */
    boolean isOpen();

    /**
     * half-open(半开)状态的反馈机制,执行成功时调用
     */
    void markSuccess();

    /**
     * half-open(半开)状态的反馈机制,执行失败调用
     */
    void markNonSuccess();

    /**
     * HystrixCommand执行行前,判断是否尝试执行
     */
    boolean attemptExecution();
}
  

 

 分析:这里就几个方法,简单做了注释;

 
(2)内部类 HystrixCircuitBreakerImpl 断路器实现(重要):

 

HystrixCircuitBreakerImpl实现断路器的核心逻辑:状态定义,状态维护。

 

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
	// 用户配置定义
    private final HystrixCommandProperties properties;

    // HystrixCommand 执行指标数据
    private final HystrixCommandMetrics metrics;

    // 断路器状态定义
    enum Status {
        CLOSED, OPEN, HALF_OPEN;
    }

    // 断路器当前状态,利用原子操作保证线程安全
    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);

    // 断路器是否打开,利用原子操作保证线程安全,circuitOpened > 0 则为打开状态,同时该变量记录了打开的时间,用于间隔“circuitBreakerSleepWindowInMilliseconds”重试,
    private final AtomicLong circuitOpened = new AtomicLong(-1);

    //指标数据订阅
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //订阅 metrics 指标数据,一旦HystrixCommand执行结束,触发监控更新,“断路器” HystrixCircuitBreaker 重新计算并进入“关闭”或“打开“状态
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }

    /**
    * 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新
    */ 
    private Subscription subscribeToStream() {

        return metrics.getHealthCountsStream()
                .observe()
                .subscribe(new Subscriber<HealthCounts>() {
                   
                    // 。。。。。。

                    @Override
                    public void onNext(HealthCounts hc) {
                        
                        if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

                        // 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态;
                        // 指标由用户通过“statisticalWindowVolumeThreshold”定义;  

                        } else {
                            if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

                                // 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态
                                // 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义;  

                            } else {

                                // 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间  
                                // 特别注意:这里使用了 compareAndSet ,不会重复打开
                                if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {                                   
                                    circuitOpened.set(System.currentTimeMillis());
                                }
                            }
                        }
                    }
                });
    }

    @Override
    public void markSuccess() {
    	// 半开状态下,尝试执行成功,断路器关闭
        if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {                
            // 。。。。。。
        }
    }

    @Override
    public void markNonSuccess() {
    	// 半开状态,尝试执行失败,断路器打开
        if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
            // 。。。。。。
        }
    }

    // 判断断路器是否打开,很简单,不解释
    @Override
    public boolean isOpen() {
        if (properties.circuitBreakerForceOpen().get()) {
            return true;
        }
        if (properties.circuitBreakerForceClosed().get()) {
            return false;
        }
        //circuitOpened > 0 打开状态
        return circuitOpened.get() >= 0;
    }

    //HystrixCommand执行行前,判断是否允许请求
    @Override
    public boolean allowRequest() {
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        
        if (circuitOpened.get() == -1) { 
        	// 断路器关闭,可以执行
            return true;
        } else {
        	//断路器半开状态,不允许执行
            if (status.get().equals(Status.HALF_OPEN)) {
                return false;
            } else {
            	//根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间
                return isAfterSleepWindow();
            }
        }
    }

    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        return currentTime > circuitOpenTime + sleepWindowTime;
    }

    @Override
    public boolean attemptExecution() {
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        if (circuitOpened.get() == -1) {
        	//断路器半开状态,不允许执行
            return true;
        } else {
        	//根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间
            if (isAfterSleepWindow()) { 
            	//等待重试时间已到
                if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                    // 首次请求,断路器状态进入“半开状态”,可以尝试执行HystrixCommand
                    return true;
                } else {
                	// 非首次进入“半开状态”,禁止执行
                    return false;
                }
            } else {
            	// 等待重试时间未到,继续禁止执行
                return false;
            }
        }
    }
}
 

 

 分析:这里逻辑较多,直接在代码做相应的解释,这里就不再赘述了

 

 七 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()

 

1)概述

上面介绍 HystrixCircuitBreakerImpl 的实现,比较关键的一步,在构造函数中调用了subscribeToStream()方法;细看该方法,HystrixCircuitBreakerImpl 订阅了metrics.getHealthCountsStream(), 通过判断滑动窗口内的统计数据 HystrixCommandMetrics.HealthCounts 完成自身状态的更新;

 

/**
* 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新
*/ 
private Subscription subscribeToStream() {

	return metrics.getHealthCountsStream()
	        .observe()
	        .subscribe(new Subscriber<HealthCounts>() {
	           
	            // 。。。。。。

	            @Override
	            public void onNext(HealthCounts hc) {
	                
	                if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

	                // 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态;
	                // 指标由用户通过“statisticalWindowVolumeThreshold”定义;  

	                } else {
	                    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

	                        // 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态
	                        // 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义;  

	                    } else {

	                        // 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间  
	                        // 特别注意:这里使用了 compareAndSet ,不会重复打开
	                        if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {                                   
	                            circuitOpened.set(System.currentTimeMillis());
	                        }
	                    }
	                }
	            }
	        });
}

 

下面介绍下Hystrix滑动窗口设计(类图):

 
SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析
 
 

(1)健康统计数据:

(a)HealthCounts:具体的统计信息,包括totalCount:执行总数;errorCount:错误数;errorPercentage:错误率;配合上面的代码供状态转换使用

 

(2)滑动窗口:

(a)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅

(b)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出

(c)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据

 

2)看看细节

(1)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
	// 桶数量
    protected final int numBuckets;
    // 基于Bucket的观察流
    protected final Observable<Bucket> bucketedStream;
    // 
    protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);

    private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;

    private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());

    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;

       	// Event事件按照一个bucket的时间周期的合并逻辑,其实是定义了一个开放两个抽象方法getEmptyBucketSummary() , appendRawEventToBucket 用于聚合
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };

        final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        //借助windows()操作缓存Event事件,积累到一个bucket的时间周期,统一处理
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) 
                        //聚合一个bucket的时间周期Event数据
                        .flatMap(reduceBucketToSummary)               
                        .startWith(emptyEventCountsToStart);           
            }
        });
    }

    abstract Bucket getEmptyBucketSummary();

    abstract Output getEmptyOutputValue();

    // 。。。。。。
}

 

分析(删除了无用的代码):BucketedCounterStream 提供了基于Bucket的抽象,核心就是构造函数

1)构造函数可以看到 numBuckets 提供了桶的数量,bucketSizeInMs 提供一个桶的时间周期

2)然后 针对源 inputEventStream 是通过 windows() 操作指定了聚合事件的周期,flatMap()操作指定 聚合方法

3)最后通过抽象方法 appendRawEventToBucket 参数提供调用端指定具体的聚合策略

 

注:这里只抽象到桶的级别和聚合,具体输出类型 Output 需要子类继续重写实现

 

(2)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出

 

public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
    protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
        this.sourceStream = bucketedStream
        		// 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定窗口数量
                .window(numBuckets, 1) 
                // 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定聚合策略      
                .flatMap(reduceWindowToSummary) 
                .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
                .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
                .share()                       
                .onBackpressureDrop();  
    }
    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }
    
    // 。。。。。。
}

  

分析(删除了无用的代码):BucketedRollingCounterStream提供了滑动窗口的抽象,核心还是在构造函数中

1)然后 针对bucket源 bucketedStream 是通过 windows() 操作对最近的 numBuckets 个 bucket数据进行聚合,flatMap()操作指定 聚合方法

2)最后通过抽象方法 reduceBucket 参数提供调用端指定具体的聚合策略

 
(3)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据

 

/**
* 看泛型:HystrixCommandCompletion:command执行结束事件;long[]:bucket 类型;HystrixCommandMetrics.HealthCounts:滑动时间窗口内的执行统计数据
*/
public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts> {

    private static final ConcurrentMap<String, HealthCountsStream> streams = new ConcurrentHashMap<String, HealthCountsStream>();

    private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;

    private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
        @Override
        public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
            return healthCounts.plus(bucketEventCounts);
        }
    };

    // 。。。。。。

    private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
                               Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
        // 特别注意 HystrixCommandCompletionStream 这里直接绑死了 这个事件流
        super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
    }

   // 。。。。。。
}

 

分析(删除了无用的代码):

HealthCountsStream实现具体的滑动窗口业务,构造函数绑定 HystrixCommandCompletionStream 事件流,直接观察 HystrixCommandCompletion:HystrixCommand执行结束事件,聚合成 类型;滑动窗口内 HystrixCommandMetrics 更新统计需要的 HealthCounts

 

(4)HealthCounts:具体的统计信息 没有特殊的,不解释了
 public static class HealthCounts {
        private final long totalCount;
        private final long errorCount;
        private final int errorPercentage;

        HealthCounts(long total, long error) {
            this.totalCount = total;
            this.errorCount = error;
            if (totalCount > 0) {
                this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
            } else {
                this.errorPercentage = 0;
            }
        }

 		//。。。。。。

        public HealthCounts plus(long[] eventTypeCounts) {
            long updatedTotalCount = totalCount;
            long updatedErrorCount = errorCount;

            long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
            long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
            long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
            long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
            long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];

            updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
            updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
            return new HealthCounts(updatedTotalCount, updatedErrorCount);
        }

  		//。。。。。。
}

 

八 事件处理机制:HystrixEventStream

 

1)概述:

 

 六 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream() 讲述了 “断路器”如何根据HystrixCommand处理结果更新状态。

核心:HealthCountsStream 是在构造函数中绑定了 HystrixCommandCompletionStream 事件流 ,关注HystrixCommand执行结束事件,那么 HystrixEventStream 事件流式如何产生的,我们下面就简单介绍下;

 

HystrixEventStream 的集成体系如下:

 
SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析
 

 

2)看看细节:

(1)回到之前的主流程:

 

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()

》 1)流程中的(1)(2)步HystrixCommand的所有执行方法execute(),queue(),observe(),toObservable()最终都是调用的都是 AbstractCommand<R> toObservable(),直接看该方法。

 

 public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    final Action0 terminateCommandCleanup = new Action0() {

        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    // 。。。。。。

    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            // 。。。。。。

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

 

分析(删除了无用的代码):订阅了HystrixCommand的 结束事件,会执行 terminateCommandCleanup ;主要代码在 handleCommandEnd() 方法中;

 

(2)handleCommandEnd(): HystrixCommand 命令执行结束,根据执行结果做相应的处理

 

private void handleCommandEnd(boolean commandExecutionStarted) {

	// 。。。。。。

    // 记录用户执行时间
    long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
    if (executionResultAtTimeOfCancellation == null) {
        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
    } else {
        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
    }

    // 。。。。。。
}

 

分析(删除了无用的代码):主要是调用 HystrixCommandMetrics 的 markCommandDone 方法

 

(3)HystrixCommandMetrics:通过HystrixThreadEventStream 执行 executionDone 执行命令结束的相关操作

 

void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
    HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
    if (executionStarted) {
        concurrentExecutionCount.decrementAndGet();
    }
}

 

(4)HystrixThreadEventStream:线程事件流,事件和事件监听器采用观察者模式,互相进行了隔离

 

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
    	//根据处理结果,构造一个 HystrixCommandCompletion 事件
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);

        //通知事件完成主题的监听者
        writeOnlyCommandCompletionSubject.onNext(event);
}

 

分析(删除了无用的代码):最终调用了 HystrixThreadEventStream 完成事件的生成和绑定客户端的情况

 

public class HystrixThreadEventStream {
    private final long threadId;
    private final String threadName;

    private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject;
    private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
    private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject;

    // 。。。。。。

    HystrixThreadEventStream(Thread thread) {
        this.threadId = thread.getId();
        this.threadName = thread.getName();
        writeOnlyCommandStartSubject = PublishSubject.create();
        writeOnlyCommandCompletionSubject = PublishSubject.create();
        writeOnlyCollapserSubject = PublishSubject.create();

        writeOnlyCommandStartSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandStartsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCommandCompletionSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandCompletionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCollapserSubject
                .onBackpressureBuffer()
                .doOnNext(writeCollapserExecutionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
    }

    // 。。。。。。

    public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        //根据处理结果,构造一个 HystrixCommandCompletion 事件
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);

        //通知事件完成主题的监听者
        writeOnlyCommandCompletionSubject.onNext(event);
    }

    // 。。。。。。
}

 

分析(删除了无用的代码):

HystrixThreadEventStream 本身维护各种关注事件的Subject ,Subject在Rxjava 作为主题,提供观察者双向操作的能力。最终其实是向HystrixCommandCompletionStream 中 提交 HystrixCommandCompletion 事件,供观察者使用。

 

(5)事件流:HystrixEventStream

Hystrix 命令执行过程会触发响应的事件,用户观察者订阅使用,HystrixCommandCompletionStream 就是用于订阅 HystrixCommandCompletion 命令完成事件。

 

这里简单看看类结果,详情不再赘述,如开始,不再赘述;

 

九 getFallback() 执行

 

1)概述:

关于 getFallback() 方法的执行,之前在

《Hystrix:断路器》:》 三 Hystrix :》 4)处理流程分析 :》 8)执行fallback  做过总结,触发方式如下:

 

(1)断路器状态:断路器已经打开的时候

 

(2)由construct() or run()抛出了一个异常

没有空闲的线程池和队列或者信号量:RejectedExecutionException

一次命令执行超时:HystrixTimeoutException

 

2)这里分析下具体实现

(1) 断路器状态判断-断路器已经打开的时候:在HystrixCommand执行前,根据 circuitBreaker 状态判断是否执行还是快速失效

回到之前讲解的主流程:

》三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中

》(2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();

 

 

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
   	  	
	//。。。。。。

    if (circuitBreaker.attemptExecution()) { //判断是否可执行

		//。。。。。。 
		// 继续包装HystrixCommand Observable,注册观察者,执行
        return executeCommandAndObserve(_cmd)
            .doOnError(markExceptionThrown)
            .doOnTerminate(singleSemaphoreRelease)
            .doOnUnsubscribe(singleSemaphoreRelease);          
    } else { // 不可执行,快速失效,执行getFallBack()方法
        return handleShortCircuitViaFallback();
    }
}

 

(2) construct() or run()抛出了一个异常或超时:
在HystrixCommand执行后,根据结果处理接收到 RejectedExecutionException;HystrixTimeoutException,及非 HystrixBadRequestException 异常 ,执行getFallback()

 

回到之前讲解的主流程:

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中

》3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行

 

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

   	// 。。。。。。

   	final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            circuitBreaker.markNonSuccess();
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }

                return handleFailureViaFallback(e);
            }
        }
    };

    // 。。。。。。

    Observable<R> execution;
   
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

 

分析(删除了无用的代码):主要是:handleFallback ,因为涉及fallback 链的情况这里只简单概述 

1)异常类型 RejectedExecutionException;HystrixTimeoutException,及非HystrixBadRequestException 会通过内部方法 getFallbackOrThrowException() 完成对getFallback() 的调用

2)异常类型 HystrixBadRequestException 不会调用 getFallback()