上一篇博客【Netty源码学习】EventLoopGroup中我们介绍了EventLoopGroup,实际说来EventLoopGroup是EventLoop的一个集合,EventLoop是一个单线程的线程池,其接口和类实现关系如下:
接下来我们主要介绍实现类NioEventLoop中实现的操作,通过NioEventLoop的继承关系图我们可以看到,其就是一个单线程的线程池。
首先我们看NioEventLoop的构造函数
在构造函数中我们可以看到,其主要操作有获得NIO操作的Selector, selector = openSelector();
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; selector = openSelector(); selectStrategy = strategy; }
selector = openSelector();具体的操作其实就是获取一个Selector
private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (ClassNotFoundException e) { return e; } catch (SecurityException e) { return e; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) { if (maybeSelectorImplClass instanceof Exception) { Exception e = (Exception) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", selector, e); } return selector; } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", selector, e); } else { selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", selector); } return selector; }
NioEventLoop中还有的操作就是注册Channel到Selector中,具体实现如下:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) { if (ch == null) { throw new NullPointerException("ch"); } if (interestOps == 0) { throw new IllegalArgumentException("interestOps must be non-zero."); } if ((interestOps & ~ch.validOps()) != 0) { throw new IllegalArgumentException( "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')'); } if (task == null) { throw new NullPointerException("task"); } if (isShutdown()) { throw new IllegalStateException("event loop shut down"); } try { ch.register(selector, interestOps, task); } catch (Exception e) { throw new EventLoopException("failed to register a channel", e); } }
在构造函数中调用了父类的构造函数,其父类的构造函数及实现更多的是和任务队列相关的,
父类重写了线程池操作的类
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
在NioEventLoop中的run方法中我们可以看到任务队列的执行
@Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
通过上面的分析,我们基本上了解了eventloop和excutor,以及线程之间的关系了,其实netty就是实现了自己的一个线程池,来执行线程任务。大概的流程就是在eventloop的selector中注册channel,然后channel的事件处理都以线程任务的形式执行,并且先放入任务队列中(因为它重写了executor方法,在SingleThreadEventExecutor中我们可以看到重写的executor函数的实现),然后线程不断的从任务队列里面取任务执行。