编写不易,转载请注明(http://shihlei.iteye.com/blog/2429846)!
一 概述
书接前篇,《Hystrix:断路器》 对断路器和Hystrix做了简单的介绍,旨在帮助读者做个简单入门。本文简单分下Hystrix的源码实现,帮助读者更好的了解Hystrix。
分析版本:
<dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</artifactId> <version>1.5.12</version> </dependency>
如之前所述:Hystrix 底层基于 RxJava,RxJava 是响应式编程开发库,因此Hystrix的整个实现策略简单说即:把一个HystrixCommand封装成一个Observable(待观察者),针对自身要实现的核心功能,对Observable进行各种装饰,并在订阅各步装饰的Observable,以便在指定事件到达时,添加自己的业务。
阅读之前,建议对RxJava做个了解,Hystrix 实现基于RxJava,大量使用RxJava的相关操作,推荐之前的文章《响应式编程 RxJava》,《RxJava2.x 操作Demo 》;
二 Hystrix的执行流程
流程详细说明参见《Hystrix:断路器》处理流程分析部分;
根据执行流程,将实现分为如下几块:
(1)主流程HystrixCommand 包装成 Observable 执行:AbstractCommand
(2)异步执行:HystrixContextScheduler
(3)超时中断:HystrixObservableTimeoutOperator
(4)断路器:HystrixCircuitBreaker
(5)滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()
(6)事件处理流:HystrixEventStream
(7)getFallback() 执行
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()
1)AbstractCommand<R> toObservable()
流程中的(1)(2)步HystrixCommand的所有执行方法 execute( ) , queue() , observe() , toObservable() 最终都归结于 AbstractCommand<R> toObservable() 方法调用,直接看该方法。
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// 。。。。。。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
// 。。。。。。
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 。。。。。。
// 返回一个冷Observable,有订阅才会执行,并且数据是最新的
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 把observable 放进缓存 。。。。。。
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
分析(删除了无用的代码):
(1)重点方法 applyHystrixSemantics() ,包装HystrixCommand; 最终的返回值,通过 Observable 的 defer 操作做延迟绑定,保证观察者获取HystrixCommand执行的最新数据,包装后的Observable:hystrixObservable
(2)hystrixObservable 再进行缓存包装 ,也是通过defer操作完成,提供了requestCache能力,因为不是本篇的重点,不做分析。
RxJava defer操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 1) defer操作符
2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//。。。。。。
if (circuitBreaker.attemptExecution()) { //判断是否可执行
//。。。。。。
if (executionSemaphore.tryAcquire()) { // 判断是否具有信号量资源
try {
//设置开始时间,用于监控执行超时。
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 继续包装HystrixCommand Observable,注册观察者,执行
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else { //信号量资源不足,拒绝执行,执行getFallBack()方法
return handleSemaphoreRejectionViaFallback();
}
} else { // 不可执行,快速失效,执行getFallBack()方法
return handleShortCircuitViaFallback();
}
}
分析(删除了无用的代码):
(1)circuitBreaker.attemptExecution() 断路器状态判断,是否可以执行;可执行则继续包装;不可执行,直接调用getFallBack();关于 HystrixCircuitBreaker 后面详细介绍
(2)executionSemaphore.tryAcquire() 判断是否有信号量资源,没有则直接调用getFallBack();
(3)如果条件都满足,则 调用 executeCommandAndObserve(_cmd) 继续包装 HystrixCommand执行。
3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要,根据用户指定的隔离级别(线程,信号量),包装相应的执行环境,执行Command
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 。。。。。。
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) { // 如果开启超时功能,则包装超时中断能力
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else { //否则不需要包装中断能力
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
分析(删除了无用的代码):
(1)本方法包括两部分
(a)注册观察者,监听Command执行事件,用于更新统计信息,先不详细介绍,忽略了
(b)包装超时熔断能力:HystrixObservableTimeoutOperator
(2)超时熔断能力通过 Observable 的 left 操作 在原有的 HystrixObservable 上进行包装实现的,核心 HystrixObservableTimeoutOperator
execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
RxJava left 操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 2)lift 操作符
(3)最终调用 executeCommandWithSpecifiedIsolation(_cmd) 根据隔离级别选择是“线程方式”隔离执行还是“信号量方式”隔离执行;
4) executeCommandWithSpecifiedIsolation(_cmd) 具体的执行方法:
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { //Hystrix 配置的隔离策略是线程
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
// 。。。。。。
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else { // Hystrx配置的隔离策略是信号量
// 。。。。。。
return Observable.defer(new Func0<Observable<R>>() {
// 。。。。。。
});
}
}
分析(删除了无用的代码):
(1)这里就两件事:
(a)如果隔离策略是线程,则“线程策略”执行
(b)如果隔离策略是信号量,则“信号量策略”执行
(2)咋启动的线程呢,RxJava本身就提供异步执行的能力,通过 Observable 的 subscribeOn 绑定执行方式,底层具体实现是 线程调度:Scheduler。
马上进入下一章节,详细讲解一下
四 异步执行:HystrixContextScheduler
1)概述
Hystrix 要完成超时中断,需要异步执行,调用线程才能隔离影响,该功能借助RxJava的 Scheduler 机制实现;
关于 RxJava Scheduler 可以提前阅读下:《RxJava2.x 操作Demo》:》 三 线程调度:Scheduler 做个了解;
Hystrix实现异步的类结构如下:
主要职能:
HystrixThreadPoolDefault:依赖BlockingQueue<Runnable> queue 和 ThreadPoolExecutor threadPool; 统一线程池和任务队列
(a)Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();
HystrixContextScheduler: 这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作
ThreadPoolScheduler:线程池调度器,负责产生ThreadPoolWorker 向线程池提交任务
(b)Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)
HystrixContextSchedulerWorker:代理ThreadPoolWorker
ThreadPoolWorker:负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力
各个实现组件做个比较
(1)执行代码
RxJava :observable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())
Hystrix:
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
(2)核心组件
(a)RxJava: Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();
Hystrix:HystrixContextScheduler --》 依赖:ThreadPoolScheduler 返回的worker 即 ThreadPoolWorker
(b)RxJava: Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)
Hystrix:HystrixContextSchedulerWorker --》 依赖:ThreadPoolScheduler --》 ThreadPoolWorker
2)具体实现
(1)Hystrix指定Scheduler:
通过executeCommandWithSpecifiedIsolation(_cmd) 里的subscribeOn()方法完成;
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
分析:这里使用 HystrixThreadPool 获得 Scheduler(调度器) ; 默认实现 HystrixThreadPoolDefault:维护线程池和任务队列。
(2)HystrixThreadPoolDefault:HystrixThreadPool 内部类
static class HystrixThreadPoolDefault implements HystrixThreadPool {
// 。。。。。。
// Hystrix线程池配置,demo中有指定
private final HystrixThreadPoolProperties properties;
//任务队列
private final BlockingQueue<Runnable> queue;
//线程池
private final ThreadPoolExecutor threadPool;
//线程监控
private final HystrixThreadPoolMetrics metrics;
// 队列大小
private final int queueSize;
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
// 。。。。。。
}
// 。。。。。。
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
// 。。。。。。
}
分析(删除了无用的代码):HystrixThreadPoolDefault 主要维护一个线程池和任务队列,用于异步操作,最后会传给 HystrixContextScheduler 调度器使用。
(3)HystrixContextScheduler :上下文调度器
public class HystrixContextScheduler extends Scheduler {
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;
private final HystrixThreadPool threadPool;
// 。。。。。。
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
@Override
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
// 。。。。。。
}
分析(删除了无用的代码):这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作。
(4)HystrixContextSchedulerWorker :
上下文Worker,只代理,具体的调度工作由传入的actualWorker 完成(即 ThreadPoolWorker)
private class HystrixContextSchedulerWorker extends Worker {
private final Worker worker;
private HystrixContextSchedulerWorker(Worker actualWorker) {
this.worker = actualWorker;
}
//。。。。。。。
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
}
分析(删除了无用的代码):HystrixContextSchedulerWorker,这个调度器除了封装线程池和同步策略外,只一个HystrixContextSchedulerWorker 外,没有特殊的操作。
(5) ThreadPoolScheduler:
真正的线程池调度Scheduler,创建一个 ThreadPoolWorker
private static class ThreadPoolScheduler extends Scheduler {
private final HystrixThreadPool threadPool;
private final Func0<Boolean> shouldInterruptThread;
public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
@Override
public Worker createWorker() {
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}
}
(6) ThreadPoolWorker:
负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力;
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
// 。。。。。。
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
// 。。。。。。
}
分析(删除了无用的代码):schedule() 方法向线程池提交任务,关键返回一个Subscription,用于调用端使用,或中断当前任务执行;
五 超时中断:HystrixObservableTimeoutOperator
1)概述:
Hystrix实现异步的类结构如下:
其中:
HystrixObservableTimeoutOperator:包装超时中断的主流程,借助下面的组件实现超时中断逻辑
(1)HystrixTimer:依赖ScheduledExecutor,最终使用JDK的ScheduledThreadPoolExecutor 实现指定时间后的回调
(2)TimerListener:实现超时的回调逻辑
(3)CompositeSubscription:RxJava 1.x 提供,通过unsubscribe()方法可以中断当前任务的执行。
2)看一下细节:
(1)回到之前讲解的主流程:
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中
》 3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 。。。。。。
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
// 添加超时中断逻辑
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
// 。。。。。。
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
分析(删除了无用的代码):超时中断主要通过:lift 操作,使用new HystrixObservableTimeoutOperator<R>(_cmd) 包装 HystrixObservable实现的。
(2)HystrixObservableTimeoutOperator 实现
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
// 创建一个组合订阅,用于添加一组Disposable,快速解除订阅,这里用于超时,中断任务
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
//capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
//创建一个超时回调类
TimerListener listener = new TimerListener() {
@Override
public void tick() {
//设置TimedOutStatus
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 报告超时
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// 通过 CompositeSubscription 的 unsubscribe 中断正在异步执行的HystrixCommand
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
//启动timeoutRunnable,就是执行上面的 child.onError(new HystrixTimeoutException()); 发送error时间,通知观察者
timeoutRunnable.run();
}
}
//获取超时时间
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// 设置当前时间,用于比对超时
originalCommand.timeoutTimer.set(tl);
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
// if s is unsubscribed we want to unsubscribe the parent
s.add(parent);
}
分析(删除了无用的代码):
(1)创建 TimerListener 实例,添加监听到超时的回调逻辑,指定超时时间(用户通过executionTimeoutInMilliseconds 配置的超时时间)。
主要逻辑:
(a)设置HystrixCommand状态为超时
(b)创建 CompositeSubscription 绑定之前异步的Subscription 使 调用端可以根据超时中断 当前任务。
(c)若超时,异步发送超时事件,通知HystrixCommand执行的观察者
(2)CompositeSubscription 绑定父子Subscription ,用于中断任务
RxJava CompositeSubscription使用,可参见 《RxJava2.x 操作Demo》:》 四 解除订阅,中断任务:Disposable 》 (2) CompositeDisposable 尽管 1.x 和2.x 有差别
(3) HystrixTimer 和 HystrixTimerListener:比较简单,就是ScheduledThreadPoolExecutor 定时触发
public static interface TimerListener {
//指定时间周期触发回调
public void tick();
//时间周期
public int getIntervalTimeInMilliseconds();
}
public class HystrixTimer {
private static final Logger logger = LoggerFactory.getLogger(HystrixTimer.class);
private static HystrixTimer INSTANCE = new HystrixTimer();
// 。。。。。。
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
// add the listener
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
// 使用
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
static class ScheduledExecutor {
volatile ScheduledThreadPoolExecutor executor;
// 。。。。。。
public void initialize() {
// 。。。。。。
executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
initialized = true;
}
// 。。。。。。
}
}
六 断路器:HystrixCircuitBreaker
1)概述
HystrixCircuitBreaker 提供了状态维护功能,其使用体现在如下部分:
(1)HystrixCommand 执行前 通过 HystrixCircuitBreaker 根据状态判断是否允许执行:
open:禁止执行;
close 可以执行;
half_close:进入open状态已超过等待重试时间,则可以执行,否则禁止执行。
(2)订阅HystrixCommandMetrics 滑动窗口的数据统计更新事件,根据统计数据(错误率),决定是否进入open状态。
HystrixCircuitBreaker的类结构如下:
2)看看细节:
(1)HystrixCircuitBreaker接口:定义了主要服务
public interface HystrixCircuitBreaker {
/**
* 是否允许HystrixCommand请求执行,如果断路器是half-open(半开)状态,则根据逻辑允许部分请求执行和改变状态。
*/
boolean allowRequest();
/**
* 断路器当前是否打开状态
*/
boolean isOpen();
/**
* half-open(半开)状态的反馈机制,执行成功时调用
*/
void markSuccess();
/**
* half-open(半开)状态的反馈机制,执行失败调用
*/
void markNonSuccess();
/**
* HystrixCommand执行行前,判断是否尝试执行
*/
boolean attemptExecution();
}
分析:这里就几个方法,简单做了注释;
(2)内部类 HystrixCircuitBreakerImpl 断路器实现(重要):
HystrixCircuitBreakerImpl实现断路器的核心逻辑:状态定义,状态维护。
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
// 用户配置定义
private final HystrixCommandProperties properties;
// HystrixCommand 执行指标数据
private final HystrixCommandMetrics metrics;
// 断路器状态定义
enum Status {
CLOSED, OPEN, HALF_OPEN;
}
// 断路器当前状态,利用原子操作保证线程安全
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
// 断路器是否打开,利用原子操作保证线程安全,circuitOpened > 0 则为打开状态,同时该变量记录了打开的时间,用于间隔“circuitBreakerSleepWindowInMilliseconds”重试,
private final AtomicLong circuitOpened = new AtomicLong(-1);
//指标数据订阅
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//订阅 metrics 指标数据,一旦HystrixCommand执行结束,触发监控更新,“断路器” HystrixCircuitBreaker 重新计算并进入“关闭”或“打开“状态
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
/**
* 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新
*/
private Subscription subscribeToStream() {
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
// 。。。。。。
@Override
public void onNext(HealthCounts hc) {
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态;
// 指标由用户通过“statisticalWindowVolumeThreshold”定义;
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
// 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态
// 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义;
} else {
// 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间
// 特别注意:这里使用了 compareAndSet ,不会重复打开
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
@Override
public void markSuccess() {
// 半开状态下,尝试执行成功,断路器关闭
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
// 。。。。。。
}
}
@Override
public void markNonSuccess() {
// 半开状态,尝试执行失败,断路器打开
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
// 。。。。。。
}
}
// 判断断路器是否打开,很简单,不解释
@Override
public boolean isOpen() {
if (properties.circuitBreakerForceOpen().get()) {
return true;
}
if (properties.circuitBreakerForceClosed().get()) {
return false;
}
//circuitOpened > 0 打开状态
return circuitOpened.get() >= 0;
}
//HystrixCommand执行行前,判断是否允许请求
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
// 断路器关闭,可以执行
return true;
} else {
//断路器半开状态,不允许执行
if (status.get().equals(Status.HALF_OPEN)) {
return false;
} else {
//根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间
return isAfterSleepWindow();
}
}
}
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
//断路器半开状态,不允许执行
return true;
} else {
//根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间
if (isAfterSleepWindow()) {
//等待重试时间已到
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
// 首次请求,断路器状态进入“半开状态”,可以尝试执行HystrixCommand
return true;
} else {
// 非首次进入“半开状态”,禁止执行
return false;
}
} else {
// 等待重试时间未到,继续禁止执行
return false;
}
}
}
}
分析:这里逻辑较多,直接在代码做相应的解释,这里就不再赘述了
七 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()
1)概述
上面介绍 HystrixCircuitBreakerImpl 的实现,比较关键的一步,在构造函数中调用了subscribeToStream()方法;细看该方法,HystrixCircuitBreakerImpl 订阅了metrics.getHealthCountsStream(), 通过判断滑动窗口内的统计数据 HystrixCommandMetrics.HealthCounts 完成自身状态的更新;
/**
* 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新
*/
private Subscription subscribeToStream() {
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
// 。。。。。。
@Override
public void onNext(HealthCounts hc) {
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态;
// 指标由用户通过“statisticalWindowVolumeThreshold”定义;
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
// 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态
// 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义;
} else {
// 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间
// 特别注意:这里使用了 compareAndSet ,不会重复打开
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
下面介绍下Hystrix滑动窗口设计(类图):
(1)健康统计数据:
(a)HealthCounts:具体的统计信息,包括totalCount:执行总数;errorCount:错误数;errorPercentage:错误率;配合上面的代码供状态转换使用
(2)滑动窗口:
(a)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅
(b)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出
(c)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据
2)看看细节
(1)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
// 桶数量
protected final int numBuckets;
// 基于Bucket的观察流
protected final Observable<Bucket> bucketedStream;
//
protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);
private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;
private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());
protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
this.numBuckets = numBuckets;
// Event事件按照一个bucket的时间周期的合并逻辑,其实是定义了一个开放两个抽象方法getEmptyBucketSummary() , appendRawEventToBucket 用于聚合
this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
@Override
public Observable<Bucket> call(Observable<Event> eventBucket) {
return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
}
};
final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
for (int i = 0; i < numBuckets; i++) {
emptyEventCountsToStart.add(getEmptyBucketSummary());
}
this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
@Override
public Observable<Bucket> call() {
return inputEventStream
.observe()
//借助windows()操作缓存Event事件,积累到一个bucket的时间周期,统一处理
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
//聚合一个bucket的时间周期Event数据
.flatMap(reduceBucketToSummary)
.startWith(emptyEventCountsToStart);
}
});
}
abstract Bucket getEmptyBucketSummary();
abstract Output getEmptyOutputValue();
// 。。。。。。
}
分析(删除了无用的代码):BucketedCounterStream 提供了基于Bucket的抽象,核心就是构造函数
1)构造函数可以看到 numBuckets 提供了桶的数量,bucketSizeInMs 提供一个桶的时间周期
2)然后 针对源 inputEventStream 是通过 windows() 操作指定了聚合事件的周期,flatMap()操作指定 聚合方法
3)最后通过抽象方法 appendRawEventToBucket 参数提供调用端指定具体的聚合策略
注:这里只抽象到桶的级别和聚合,具体输出类型 Output 需要子类继续重写实现
(2)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
this.sourceStream = bucketedStream
// 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定窗口数量
.window(numBuckets, 1)
// 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定聚合策略
.flatMap(reduceWindowToSummary)
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.share()
.onBackpressureDrop();
}
@Override
public Observable<Output> observe() {
return sourceStream;
}
// 。。。。。。
}
分析(删除了无用的代码):BucketedRollingCounterStream提供了滑动窗口的抽象,核心还是在构造函数中
1)然后 针对bucket源 bucketedStream 是通过 windows() 操作对最近的 numBuckets 个 bucket数据进行聚合,flatMap()操作指定 聚合方法
2)最后通过抽象方法 reduceBucket 参数提供调用端指定具体的聚合策略
(3)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据
/**
* 看泛型:HystrixCommandCompletion:command执行结束事件;long[]:bucket 类型;HystrixCommandMetrics.HealthCounts:滑动时间窗口内的执行统计数据
*/
public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts> {
private static final ConcurrentMap<String, HealthCountsStream> streams = new ConcurrentHashMap<String, HealthCountsStream>();
private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;
private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
@Override
public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
return healthCounts.plus(bucketEventCounts);
}
};
// 。。。。。。
private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
// 特别注意 HystrixCommandCompletionStream 这里直接绑死了 这个事件流
super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
}
// 。。。。。。
}
分析(删除了无用的代码):
HealthCountsStream实现具体的滑动窗口业务,构造函数绑定 HystrixCommandCompletionStream 事件流,直接观察 HystrixCommandCompletion:HystrixCommand执行结束事件,聚合成 类型;滑动窗口内 HystrixCommandMetrics 更新统计需要的 HealthCounts
(4)HealthCounts:具体的统计信息 没有特殊的,不解释了
public static class HealthCounts {
private final long totalCount;
private final long errorCount;
private final int errorPercentage;
HealthCounts(long total, long error) {
this.totalCount = total;
this.errorCount = error;
if (totalCount > 0) {
this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
} else {
this.errorPercentage = 0;
}
}
//。。。。。。
public HealthCounts plus(long[] eventTypeCounts) {
long updatedTotalCount = totalCount;
long updatedErrorCount = errorCount;
long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
return new HealthCounts(updatedTotalCount, updatedErrorCount);
}
//。。。。。。
}
八 事件处理机制:HystrixEventStream
1)概述:
六 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream() 讲述了 “断路器”如何根据HystrixCommand处理结果更新状态。
核心:HealthCountsStream 是在构造函数中绑定了 HystrixCommandCompletionStream 事件流 ,关注HystrixCommand执行结束事件,那么 HystrixEventStream 事件流式如何产生的,我们下面就简单介绍下;
HystrixEventStream 的集成体系如下:
2)看看细节:
(1)回到之前的主流程:
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()
》 1)流程中的(1)(2)步HystrixCommand的所有执行方法execute(),queue(),observe(),toObservable()最终都是调用的都是 AbstractCommand<R> toObservable(),直接看该方法。
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
final Action0 terminateCommandCleanup = new Action0() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(true); //user code did run
}
}
};
// 。。。。。。
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 。。。。。。
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
分析(删除了无用的代码):订阅了HystrixCommand的 结束事件,会执行 terminateCommandCleanup ;主要代码在 handleCommandEnd() 方法中;
(2)handleCommandEnd(): HystrixCommand 命令执行结束,根据执行结果做相应的处理
private void handleCommandEnd(boolean commandExecutionStarted) {
// 。。。。。。
// 记录用户执行时间
long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
if (executionResultAtTimeOfCancellation == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
} else {
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
}
// 。。。。。。
}
分析(删除了无用的代码):主要是调用 HystrixCommandMetrics 的 markCommandDone 方法
(3)HystrixCommandMetrics:通过HystrixThreadEventStream 执行 executionDone 执行命令结束的相关操作
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
if (executionStarted) {
concurrentExecutionCount.decrementAndGet();
}
}
(4)HystrixThreadEventStream:线程事件流,事件和事件监听器采用观察者模式,互相进行了隔离
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
//根据处理结果,构造一个 HystrixCommandCompletion 事件
HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
//通知事件完成主题的监听者
writeOnlyCommandCompletionSubject.onNext(event);
}
分析(删除了无用的代码):最终调用了 HystrixThreadEventStream 完成事件的生成和绑定客户端的情况
public class HystrixThreadEventStream {
private final long threadId;
private final String threadName;
private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject;
private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject;
// 。。。。。。
HystrixThreadEventStream(Thread thread) {
this.threadId = thread.getId();
this.threadName = thread.getName();
writeOnlyCommandStartSubject = PublishSubject.create();
writeOnlyCommandCompletionSubject = PublishSubject.create();
writeOnlyCollapserSubject = PublishSubject.create();
writeOnlyCommandStartSubject
.onBackpressureBuffer()
.doOnNext(writeCommandStartsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeOnlyCommandCompletionSubject
.onBackpressureBuffer()
.doOnNext(writeCommandCompletionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeOnlyCollapserSubject
.onBackpressureBuffer()
.doOnNext(writeCollapserExecutionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
}
// 。。。。。。
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
//根据处理结果,构造一个 HystrixCommandCompletion 事件
HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
//通知事件完成主题的监听者
writeOnlyCommandCompletionSubject.onNext(event);
}
// 。。。。。。
}
分析(删除了无用的代码):
HystrixThreadEventStream 本身维护各种关注事件的Subject ,Subject在Rxjava 作为主题,提供观察者双向操作的能力。最终其实是向HystrixCommandCompletionStream 中 提交 HystrixCommandCompletion 事件,供观察者使用。
(5)事件流:HystrixEventStream
Hystrix 命令执行过程会触发响应的事件,用户观察者订阅使用,HystrixCommandCompletionStream 就是用于订阅 HystrixCommandCompletion 命令完成事件。
这里简单看看类结果,详情不再赘述,如开始,不再赘述;
九 getFallback() 执行
1)概述:
关于 getFallback() 方法的执行,之前在
《Hystrix:断路器》:》 三 Hystrix :》 4)处理流程分析 :》 8)执行fallback 做过总结,触发方式如下:
(1)断路器状态:断路器已经打开的时候
(2)由construct() or run()抛出了一个异常
没有空闲的线程池和队列或者信号量:RejectedExecutionException
一次命令执行超时:HystrixTimeoutException
2)这里分析下具体实现
(1) 断路器状态判断-断路器已经打开的时候:在HystrixCommand执行前,根据 circuitBreaker 状态判断是否执行还是快速失效
回到之前讲解的主流程:
》三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中
》(2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//。。。。。。
if (circuitBreaker.attemptExecution()) { //判断是否可执行
//。。。。。。
// 继续包装HystrixCommand Observable,注册观察者,执行
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else { // 不可执行,快速失效,执行getFallBack()方法
return handleShortCircuitViaFallback();
}
}
(2) construct() or run()抛出了一个异常或超时:
在HystrixCommand执行后,根据结果处理接收到 RejectedExecutionException;HystrixTimeoutException,及非 HystrixBadRequestException 异常 ,执行getFallback()
回到之前讲解的主流程:
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中
》3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 。。。。。。
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
// 。。。。。。
Observable<R> execution;
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
分析(删除了无用的代码):主要是:handleFallback ,因为涉及fallback 链的情况这里只简单概述
1)异常类型 RejectedExecutionException;HystrixTimeoutException,及非HystrixBadRequestException 会通过内部方法 getFallbackOrThrowException() 完成对getFallback() 的调用
2)异常类型 HystrixBadRequestException 不会调用 getFallback()