vertx模块DeploymentManager部署管理器

时间:2022-06-07 10:15:41

DeploymentManager

public DeploymentManager(VertxInternal vertx) {
this.vertx = vertx;
loadVerticleFactories();
} /**
* ServiceHelper本质对jdk ServiceLoader的封装,vertx大量使用 SPI 扩展功能实现
*/
private void loadVerticleFactories() {
Collection<VerticleFactory> factories = ServiceHelper.loadFactories(VerticleFactory.class);
factories.forEach(this::registerVerticleFactory);
VerticleFactory defaultFactory = new JavaVerticleFactory();
defaultFactory.init(vertx);
defaultFactories.add(defaultFactory);
}

verticle部署

/**
* 部署verticle
* @params identifier 类全限定名称
* @params options 部署选项
* @params completionHandler 回调处理逻辑
*/
public void deployVerticle(String identifier,
DeploymentOptions options,
Handler<AsyncResult<String>> completionHandler) {
if (options.isMultiThreaded() && !options.isWorker()) {
throw new IllegalArgumentException("If multi-threaded then must be worker too");
}
//获取context
ContextImpl callingContext = vertx.getOrCreateContext();
//jdk9+不支持 isolationGroup,通常情况都不会使用isolationGroup
ClassLoader cl = getClassLoader(options, callingContext);
// deployment Id 采用 UUID
doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler);
} /**
* 部署verticle 逻辑
*/
private void doDeployVerticle(String identifier,
String deploymentID,
DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
ClassLoader cl,
Handler<AsyncResult<String>> completionHandler) {
//根据identifier查找对应的 xxFactory,查找不到使用默认JavaVerticleFactory
List<VerticleFactory> verticleFactories = resolveFactories(identifier);
Iterator<VerticleFactory> iter = verticleFactories.iterator();
doDeployVerticle(iter, null, identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
} private void doDeployVerticle(Iterator<VerticleFactory> iter,
Throwable prevErr,
String identifier,
String deploymentID,
DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
ClassLoader cl,
Handler<AsyncResult<String>> completionHandler) {
if (iter.hasNext()) {
//是否解析,default false
...
//执行部署verticle逻辑
fut.setHandler(ar -> {
Throwable err;
if (ar.succeeded()) {
//判断解析之后名称是否相等
...
if (verticleFactory.blockingCreate()) {//是否是阻塞创建
/**根据instances数量执行deploy*/
vertx.<Verticle[]>executeBlocking(createFut -> {
try {
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
createFut.complete(verticles);
} catch (Exception e) {
createFut.fail(e);
}
}, res -> {
if (res.succeeded()) {
doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, res.result());
} else {
// 失败继续执行下一个
doDeployVerticle(iter, res.cause(), identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
}
});
return;
} else {
try {
/**根据instances数量执行deploy*/
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);
return;
} catch (Exception e) {
err = e;
}
}
}
}
...
} /**
* 根据instances数量初始化verticle实例
*/
private Verticle[] createVerticles(VerticleFactory verticleFactory, String identifier, int instances, ClassLoader cl) throws Exception {
Verticle[] verticles = new Verticle[instances];
for (int i = ; i < instances; i++) {
verticles[i] = verticleFactory.createVerticle(identifier, cl);
if (verticles[i] == null) {
throw new NullPointerException("VerticleFactory::createVerticle returned null");
}
}
return verticles;
} private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
Handler<AsyncResult<String>> completionHandler,
ClassLoader tccl, Verticle... verticles) {
// Copy一份副本
JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy();
String poolName = options.getWorkerPoolName();//获取定义的worker Pool 名称 Deployment parent = parentContext.getDeployment();
DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options); AtomicInteger deployCount = new AtomicInteger();
AtomicBoolean failureReported = new AtomicBoolean();
/**根据instances verticle 数量执行多实例,利用CPU多核心*/
for (Verticle verticle: verticles) {
/**
* 三种context类型,默认Standard
* Standard Verticles(eventloop)
* Worker Verticles(DeploymentOptions_workerPoolName & DeploymentOptions_workerPoolSize & DeploymentOptions_worker)
* Multi-threaded worker verticles(worker Verticles 基础上 & DeploymentOptions_multiThreaded)
*/
WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl); if (workerExec != null) {//使用worker verticle 添加Close Hook
context.addCloseHook(workerExec);
}
//绑定当前context
context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context)); //执行deploy
context.runOnContext(v -> {
try {
//verticle初始化
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
//调用deploy verticle的start 方法
verticle.start(startFuture);
//deployVerticle 调用complete方法的回调处理
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {//parentVericl不为null,绑定父子映射关系
if (parent.addChild(deployment)) {
deployment.child = true;
} else {
// Orphan
deployment.undeploy(null);
return;
}
}
VertxMetrics metrics = vertx.metricsSPI();//初始化verticle metrics
if (metrics != null) {
metrics.verticleDeployed(verticle);
}
deployments.put(deploymentID, deployment);//添加集合中
//实例数量是否全部部署成功
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
} else if (failureReported.compareAndSet(false, true)) {//部署失败
deployment.rollback(callingContext, completionHandler, context, ar.cause());
}
});
} catch (Throwable t) {
if (failureReported.compareAndSet(false, true))
deployment.rollback(callingContext, completionHandler, context, t);
}
});
}
}

Verticle Types

Verticle Types : 一、Standard Verticles、二、Worker Verticles、三、Multi-threaded worker verticles.
本质还是eventloop处理connection、read/write、encode/decode , 事件处理(eventHandler)在配置
何种类型的verticle相应的context处理 , 调度逻辑在VertxHandler类配置的context调度,
和vertx.executeBlocking->{...} 相同效果.

Context Class diagram 如图:

vertx模块DeploymentManager部署管理器

DeploymentOptions

//添加配置,当前部署verticle共享配置
private JsonObject config; //是否使用WorkerContext
private boolean worker; /**
* 是否使用mutil workerContext,不推荐,通常使用eventloop或worker,
* 运行是在worker pool不同线程执行,需考虑线程安全,如果使用将无法使用其他多数模块
*/
private boolean multiThreaded; //隔离组配置,部署隔离相关配置,不推荐,并且JDK9+不支持
private String isolationGroup; //定义的worker pool名称前缀
private String workerPoolName; // 定义打worker pool线程数量
private int workerPoolSize; //定义最大worker poll 执行时间,Thread超过时间阀值输出level:warn报警 ,不配置默认 60s
private long maxWorkerExecuteTime; //是否启用HA(高可用),默认false
private boolean ha; //配置额外的Classpath路径,如果未设置isolationGroup 则忽略
private List<String> extraClasspath; //配置verticle 实例数量
private int instances; //配置隔离的类,路径为完全限定名,可使用通配符 *
private List<String> isolatedClasses;