ChannelPipeline类似于一个管道,管道中存放的是一系列对读取数据进行业务操作的ChannelHandler。
1、ChannelPipeline的结构图:
在之前的博客【Netty源码学习】入门示例我们看到了如下的代码:
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
其实就是Client向Server发送数据,其具体实现是在AbstractChannel类中
@Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
上面这段代码出现了我们这篇博客要分析的类DefaultChannelPipeline,其是ChannelPipeline的实现类。DefaultChannelPipeline中writeAndFlush(msg)的实现如下:
@Override public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }tail对象的定义如下final AbstractChannelHandlerContext tail,接口为ChannelHandlerContext我们会接下来进行分析,其进一步封装了数据读写操作。现在看好像 DefaultChannelPipeline的使命已经结束了,发送数据的操作在AbstractChannelHandlerContext中实现。
接下来我们详细介绍一下DefaultChannelPipeline,一开始我们说它是一个管道,是因为他的名称是pipe,其实现在看来他更像一个链表(其实是一个双向循环链表),队列的单位是AbstractChannelHandlerContext,因为其定义了两个变量。
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
head为链表的头,tail为链表的尾,接下来的很多操作都与head和tail相关的。添加ChannelHandler操作。
@Override public final ChannelPipeline addFirst(String name, ChannelHandler handler) { return addFirst(null, name, handler); } @Override public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); name = filterName(name, handler); newCtx = newContext(group, name, handler); addFirst0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } private void addFirst0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext nextCtx = head.next; newCtx.prev = head; newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; }在addFirst0中我们可以清晰的看到,创建双向循环对列,对列的基本单位是AbstractChannelHanndlerContext。ChannelPipeline继承ChannelInboundInvoker和ChannelOutboundInvoker,它既是一个inboundinvoke,又是一个outboundinvoke,同时也是ChannelChandler的管理者,提供了很多对handler进行的操作。
我们介绍一下channelActive操作channelActive函数
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); }其具体实现在我们继承的ChannelHandlerContext类中,比如我们入门教程中的ClientHandler中
@Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("HelloWorldClientHandler Active"); }
由于ChannelHandlerContext是实现数据读写的具体操作类,DefaultChannelPipeline实现了一个ChannelHandlerContext的双向链表,ChannelHandlerContext中封装了我们对消息数据的具体操作比如ClientHandler打印数据,因此DefaultChannelPipeplie的双向链表就是对数据的各种操作,所以在很多地方都会看到pipeline的身影。