备注:本文的分析基于netty4.0.9final版本
1、event总体结构图
event总体结构类图如下:
2、event关键类和接口分析
1)基于NioEventLoop对关键类和接口进行分析,下面是它的关系图:
EventExecutor
相当于只有一个EventExcutor的EventExecutorGroup,它的next方法返回的是自己的引用,并且它还提供了方法判断线程是否在eventloop中执行,它是一个任务执行器。
EventExecutorGroup它继承了ScheduledExecutorService, Iterable<EventExecutor>,可以被看作是任务的调度执行器和EventExecutor容器,主要是定义了一些submit和schedule方法(用于线程的执行),以及next方法(返回一个EventExecutor实例)。
EventLoopGroup
它继承EventExecutorGroup,额外提供3个方法,一个next返回空闲的EventLoop,register方法注册Channel到EventLoop中。
EventLoop接口同时继承了EventExecutor和EventLoopGroup,因此它既是一个执行器,又是容器,提供一个parent方法,返回它所属的EventLoopGroup。其实它相当于是只有一个EventLoop的EventLoopGroup。
AbstractEventExecutor
它继承AbstractExecutorService并且实现EventExecutor接口,提供submit,schedule,已经next等方法。
SingleThreadEventExecutor
它继承AbstractEventExecutor,具体实现代码如下:
- private final EventExecutorGroup parent;
- private final Queue<Runnable> taskQueue;
- final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
- private final Thread thread;
- <pre name="code" class="java"> protected SingleThreadEventExecutor(
- EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
- if (threadFactory == null) {
- throw new NullPointerException("threadFactory");
- }
- this.parent = parent;
- this.addTaskWakesUp = addTaskWakesUp;
- thread = threadFactory.newThread(new Runnable() {
- @Override
- public void run() {
- boolean success = false;
- updateLastExecutionTime();
- try {
- SingleThreadEventExecutor.this.run();
- success = true;
- } catch (Throwable t) {
- logger.warn("Unexpected exception from an event executor: ", t);
- } finally {
- if (state < ST_SHUTTING_DOWN) {
- state = ST_SHUTTING_DOWN;
- }
- // Check if confirmShutdown() was called at the end of the loop.
- if (success && gracefulShutdownStartTime == 0) {
- logger.error(
- "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
- SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
- "before run() implementation terminates.");
- }
- try {
- // Run all remaining tasks and shutdown hooks.
- for (;;) {
- if (confirmShutdown()) {
- break;
- }
- }
- } finally {
- try {
- cleanup();
- } finally {
- synchronized (stateLock) {
- state = ST_TERMINATED;
- }
- threadLock.release();
- if (!taskQueue.isEmpty()) {
- logger.warn(
- "An event executor terminated with " +
- "non-empty task queue (" + taskQueue.size() + ')');
- }
- terminationFuture.setSuccess(null);
- }
- }
- }
- }
- });
- taskQueue = newTaskQueue();
- }
SingleThreadEventLoop
它继承了SingleThreadEventExecutor并且实现了EventLoop接口,提供注册Channel到事件循环中的函数,以及获取EventLoopGroup和EventLoop的函数。
NioEventLoop
它继承SingleThreadEventLoop,具体参考如下代码:
- /**
- * The NIO {@link Selector}.
- */
- Selector selector;
- private SelectedSelectionKeySet selectedKeys;
- private final SelectorProvider provider;
- <pre name="code" class="java"> NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
- super(parent, threadFactory, false);
- if (selectorProvider == null) {
- throw new NullPointerException("selectorProvider");
- }
- provider = selectorProvider;
- selector = openSelector();
- }
- private Selector openSelector() {
- final Selector selector;
- try {
- selector = provider.openSelector();
- } catch (IOException e) {
- throw new ChannelException("failed to open a new selector", e);
- }
- if (DISABLE_KEYSET_OPTIMIZATION) {
- return selector;
- }
- try {
- SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
- Class<?> selectorImplClass =
- Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
- selectorImplClass.isAssignableFrom(selector.getClass());
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
- selectedKeysField.setAccessible(true);
- publicSelectedKeysField.setAccessible(true);
- selectedKeysField.set(selector, selectedKeySet);
- publicSelectedKeysField.set(selector, selectedKeySet);
- selectedKeys = selectedKeySet;
- logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
- } catch (Throwable t) {
- selectedKeys = null;
- logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
- }
- return selector;
- }
2)基于NioEventLoopGroup对关键类和接口进行分析,下面是它的关系图:
EventExecutorGroup
它继承了ScheduledExecutorService, Iterable<EventExecutor>,可以被看作是任务的调度执行器和EventExecutor容器
主要是定义了一些submit和schedule方法(用于线程的执行),以及next方法(返回一个EventExecutor实例)。
EventExecutor是一个特殊的EventExecutorGroup,它的next方法返回的是自己的引用,并且它还提供了方法判断线程是否在eventloop中执行,它是一个任务执行器。
EventLoopGroup
它继承EventExecutorGroup,提供3个方法,一个next返回空闲的EventLoop,register方法注册Channel到EventLoop中。
EventLoop接口同时继承了EventExecutor和EventLoopGroup,因此它既是一个执行器,又是容器,提供一个parent方法,返回它所属的EventLoopGroup。
它实现EventExecutorGroup接口的submit和schedule方法。
MultithreadEventExecutorGroup
它继承AbstractEventExecutorGroup类,具体实现代码如下
- /*
- * Copyright 2012 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
- package io.netty.util.concurrent;
- import java.util.Collections;
- import java.util.Iterator;
- import java.util.LinkedHashMap;
- import java.util.Set;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
- * the same time.
- */
- public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
- private final EventExecutor[] children;
- private final AtomicInteger childIndex = new AtomicInteger();
- private final AtomicInteger terminatedChildren = new AtomicInteger();
- private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
- /**
- * Create a new instance.
- *
- * @param nThreads the number of threads that will be used by this instance.
- * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
- * @param args arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call
- */
- protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
- if (nThreads <= 0) {
- throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
- }
- if (threadFactory == null) {
- threadFactory = newDefaultThreadFactory();
- }
- children = new SingleThreadEventExecutor[nThreads];
- for (int i = 0; i < nThreads; i ++) {
- boolean success = false;
- try {
- children[i] = newChild(threadFactory, args);
- success = true;
- } catch (Exception e) {
- // TODO: Think about if this is a good exception type
- throw new IllegalStateException("failed to create a child event loop", e);
- } finally {
- if (!success) {
- for (int j = 0; j < i; j ++) {
- children[j].shutdownGracefully();
- }
- for (int j = 0; j < i; j ++) {
- EventExecutor e = children[j];
- try {
- while (!e.isTerminated()) {
- e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
- }
- } catch (InterruptedException interrupted) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
- }
- final FutureListener<Object> terminationListener = new FutureListener<Object>() {
- @Override
- public void operationComplete(Future<Object> future) throws Exception {
- if (terminatedChildren.incrementAndGet() == children.length) {
- terminationFuture.setSuccess(null);
- }
- }
- };
- for (EventExecutor e: children) {
- e.terminationFuture().addListener(terminationListener);
- }
- }
- protected ThreadFactory newDefaultThreadFactory() {
- return new DefaultThreadFactory(getClass());
- }
- @Override
- public EventExecutor next() {
- return children[Math.abs(childIndex.getAndIncrement() % children.length)];
- }
- @Override
- public Iterator<EventExecutor> iterator() {
- return children().iterator();
- }
- /**
- * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
- * 1:1 to the threads it use.
- */
- public final int executorCount() {
- return children.length;
- }
- /**
- * Return a safe-copy of all of the children of this group.
- */
- protected Set<EventExecutor> children() {
- Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
- Collections.addAll(children, this.children);
- return children;
- }
- /**
- * Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be
- * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
- *
- */
- protected abstract EventExecutor newChild(
- ThreadFactory threadFactory, Object... args) throws Exception;
- }
NioEventLoopGroup
它继承MultithreadEventLoopGroup,提供了几个额外的方法,如rebuildSelectors(重新生成selector),setIoRatio(设置IO处理的时间)等,重写newChild方法,具体代码如下:
- @Override
- protected EventExecutor newChild(
- ThreadFactory threadFactory, Object... args) throws Exception {
- return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
- }
3、总结
其实整个event就是围绕着Loop和Excutor进行的,LoopGroup和ExcutorGroup相当于Loop和Excutor的容器,Group中包括了多个Loop和多个Excutor,所以单个Loop和Excutor也可以理解为一个Group,但其中只有一个Loop和Excutor。Loop用于事件循环,Excutor用于任务的提交调度执行。
备注:这里简单的对event事件的总体结构进行了分析,很多地方还不是很详细,具体细节还需要进一步分析代码。