Hystrix Plugins插件机制
Hystrix通过SPI提供了插件扩展机制,有如下几种插件:
- HystrixConcurrencyStrategy:并发
- HystrixEventNotifier:通知
- HystrixMetricsPublisher:度量
- HystrixPropertiesStrategy:Properties配置
- HystrixCommandExecutionHook:回调函数
- HystrixDynamicProperties:配置
以上插件都由HystrixPlugins统一管理,负责加载和实例化。
SPI插件机制
Hystrix插件机制实现
举例HystrixConcurrencyStrategy来进行讲解
HystrixConcurrencyStrategy插件讲解
插件实现的相关类如下
- 实现默认的抽象类HystrixConcurrencyStrategy
- 官方仅提供了HystrixConcurrencyStrategyDefault的默认实现,继承了1中的抽象类
HystrixConcurrencyStrategy类的作用解析
- Hystrix只有一个作用,实例化线程执行器。其核心方法为getThreadPool(),其本质就是把参数传进来,然后实例化一个ThreadPoolExecutor
- 实际上HystrixConcurrencyStrategyDefault是直接继承HystrixConcurrencyStrategy,并且没有重写getThreadPool()方法,所以直接调用到下面的getThreadPool()逻辑的
/**
* 目前只有这个使用,只分析这个就好
* @param threadPoolKey key
* @param threadPoolProperties 参数
* @return 线程池执行器
*/
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
// 获取ThreadFactory
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
// 获取请求参数
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
// allowMaximumSizeToDivergeFromCoreSize = true,就允许核心线程起作用
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
// allowMaximumSizeToDivergeFromCoreSize=true情况下,核心线程数必须小于等于最大线程数
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
怎么加载HystrixConcurrencyStrategy插件
初始化HystrixThreadPool时,默认会调用到HystrixThreadPoolDefault,并进行实例化
class HystrixThreadPoolDefault implements HystrixThreadPool {
// ....其他逻辑
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
// !!!实例化HystrixConcurrencyStrategy!!
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
// 实例化了一个线程池:concurrencyStrategy.getThreadPool(threadPoolKey, properties)
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
// 最后线程池在这里赋值给this.threadPool
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
// ....其他逻辑
}
可见最终调用了HystrixPlugins.getInstance().getConcurrencyStrategy()来及性能获取具体的策略实例
- 首先从属性"hystrix.plugin.HystrixConcurrencyStrategy.implementation=xxx“,看看有没自定义的策略类,如果有就用Class.forName加载
1.1 举个参数例子:hystrix.plugin.HystrixConcurrencyStrategy.implementation=com.lds.hystrix.MyHystrixConcurrencyStrategy - 如果没有返回null,就会用CAS来吧策略实例替换为默认的HystrixConcurrencyStrategyDefault,否则用自定义的
public class HystrixPlugins {
// ....其他逻辑
public HystrixConcurrencyStrategy getConcurrencyStrategy() {
// 获取策略实例
if (concurrencyStrategy.get() == null) {
// 走到这部说明不存在
// check for an implementation from Archaius first
// 不存在则通过getPluginImplementation获取,这个会从"hystrix.plugin.HystrixConcurrencyStrategy.implementation“属性值里面获取
// 如果没有设置对应的实例返回,就会返回null
Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class);
if (impl == null) {
// 返回null,就会默认采用HystrixConcurrencyStrategyDefault来进行实例化对应的线程池
// nothing set via Archaius so initialize with default
concurrencyStrategy.compareAndSet(null, HystrixConcurrencyStrategyDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from Archaius so use it
// 否则用自定义的类来实例化线程池
concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl);
}
}
return concurrencyStrategy.get();
}
// ....其他逻辑
}