Vertx.vertx()实例
一、构造方法
1. VertxImpl构造方法 选择 transports protocol , default select 模型
if (options.getPreferNativeTransport()) {
Transport nativeTransport = Transport.nativeTransport();
if (nativeTransport != null && nativeTransport.isAvailable()) {
transport = nativeTransport;// 使用netty-transport-native epoll-ET 或 Kqueue
} else {
transport = Transport.JDK;// java.nio (linux下 epoll-LT)
}
} else {
transport = Transport.JDK;// java.nio (linux下 epoll-LT)
}
native transports protocol
transports | TCP | UDP | SCTP | UDT |
---|---|---|---|---|
jdk-NIO | X | X | X | X |
netty-native-epoll | X | X | — | — |
OIO | X | X | X | X |
PS : select 模型是线性扫描所有 FD , 时间复杂度O(n), 轮训机制.
epoll 模型基于callback机制 , k 活跃数 时间复杂度O(k).
试用场景: 大量FD 基本都是活跃状态,select 比 epoll 优秀,没有epollctl的回調开销,
反之有大量idle , dead状态 , epoll更优秀
2. BlockedThreadChecker线程,检查所有线程阻塞, 观察者模式
//默认Timer每 1 seconds检查一次 , 默认warn时间 5 seconds
checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval()
, options.getWarningExceptionTime()); //守护线程,监听所有阻塞线程,遍历扫描(container) WeakHashMap 弱引用,
//ContextImpl.runCloseHooks 执行结束置为null,使GC线程自动回收资源, 防止内存泄漏
timer = new Timer("vertx-blocked-thread-checker", true);
timer.schedule(...);
3. eventloop线程池
eventloop线程本质是对netty eventloop的封装.
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize()
,eventLoopThreadFactory, NETTY_IO_RATIO);
// 默认 2 * number of cores ,linux下依赖 /proc/self/status 做计算 acceptorEventLoopGroup = transport.eventLoopGroup(
, acceptorEventLoopThreadFactory, );
//负责客户端连接,线程1,单进程服务线程为1即可,否则造成资源浪费
note: eventloop线程checker默认最大阻塞时间 2 seconds,处理时间越长,eventloop事件堆积 就越多,影响服务正常响应,切不可阻塞,阻塞任务可换成worker线程池运行
4. worker线程池
worker线程池是J.U.C包下的FixedThreadPool,默认default 20
ExecutorService workerExec=Executors.newFixedThreadPool(
options.getWorkerPoolSize(),new VertxThreadFactory("vert.x-worker-thread-"
,checker, true, options.getMaxWorkerExecuteTime()));
note: worker主要用来处理耗时任务, 阻塞同步的调用而设计,checker默认超时时间 60 seconds, 执行行为两种, 串行(采用数据结构队列,FIFO) ; 并行 .
5.internal 线程池
internal线程池是J.U.C包下的FixedThreadPool,默认default 20
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(
options.getInternalBlockingPoolSize()
,new VertxThreadFactory("vert.x-internal-blocking-"
,checker, true, options.getMaxWorkerExecuteTime()));
note: 内部使用的线程池,主要用来将阻塞代码异步化
5.初始化加载模块
//创建文件解析器,会创建一个 cache 目录,默认当前执行目录建立隐藏文件夹.vertx,
//使用 kill -15 <pid> 使进程触发ShutdownHook清理 cache目录文件和目录
fileResolver = new FileResolver(this, options.isFileResolverCachingEnabled()); /**
* 创建地址解析器,会读取/etc/resolv.conf DNS配置文件,
* resolv.conf手册地址:http://www.man7.org/linux/man-pages/man5/resolv.conf.5.html
*/
addressResolver = new AddressResolver(this, options.getAddressResolverOptions()); //创建部署Verticle管理器
deploymentManager = new DeploymentManager(this); //启用集群cluster,创建集群管理器
clusterManager = getClusterManager(options); //集群状态下创建HA管理器(高可用)
haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(), options.getHAGroup(), haEnabled); //创建并且启动事件总线,local和cluster模式实现不同
createAndStartEventBus(options, resultHandler); //创建sharedData实例,共享数据,local和cluster模式实现不同
sharedData = new SharedDataImpl(this, clusterManager); //创建抽象层的文件系统模块,AsyncFileImpl 依赖 java nio AsynchronousFileChannel
// private final FileSystem fileSystem = getFileSystem();
//不同操作系统call调用有兼容问题,最好开发环境和发布环境一致
Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this) //初始化度量指标
metrics = initialiseMetrics(options);
5. static 块
static {
// 关闭netty的资源泄露器检查,有性能的一定开销
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
// 默认使用 Jdk deflater/inflater
System.setProperty("io.netty.noJdkZlibDecoder", "false");
}
二、其它模块
//创建DNSclient
DnsResolverProvider provider = new
DnsResolverProvider(this,addressResolverOptions); //创建异步的httpClient,协议包括 HTTP_1_0 , HTTP_1_1 , HTTP_2 和 HTTPS
new HttpClientImpl(this, options); //创建httpServer, 默认协议协商版本 HTTP/2 , HTTP/1.1 , 支持 https
//(SNI:多证书单服务需要集成KeyCertOptions自己实现)
new HttpServerImpl(this, serverOptions); //创建tcp服务端,支持 tls
new NetServerImpl(this, options); //创建tcp客户端,支持 tls
new NetClientImpl(this, options); //创建udp服务
DatagramSocketImpl.create(this, options); //额外 CLI(command-line interface) , cli 模块