【Netty源码学习】ChannelPipeline(一)

时间:2021-11-14 01:51:50

ChannelPipeline类似于一个管道,管道中存放的是一系列对读取数据进行业务操作的ChannelHandler。

1、ChannelPipeline的结构图:

【Netty源码学习】ChannelPipeline(一)


在之前的博客【Netty源码学习】入门示例我们看到了如下的代码:

future.channel().writeAndFlush("Hello Netty Server ,I am a common client");  
其实就是Client向Server发送数据,其具体实现是在AbstractChannel类中

@Override    public ChannelFuture writeAndFlush(Object msg) {        return pipeline.writeAndFlush(msg);    }
上面这段代码出现了我们这篇博客要分析的类DefaultChannelPipeline,其是ChannelPipeline的实现类。DefaultChannelPipeline中writeAndFlush(msg)的实现如下:

@Override    public final ChannelFuture writeAndFlush(Object msg) {        return tail.writeAndFlush(msg);    }
tail对象的定义如下final AbstractChannelHandlerContext tail,接口为ChannelHandlerContext我们会接下来进行分析,其进一步封装了数据读写操作。现在看好像 DefaultChannelPipeline的使命已经结束了,发送数据的操作在AbstractChannelHandlerContext中实现。

接下来我们详细介绍一下DefaultChannelPipeline,一开始我们说它是一个管道,是因为他的名称是pipe,其实现在看来他更像一个链表(其实是一个双向循环链表),队列的单位是AbstractChannelHandlerContext,因为其定义了两个变量。

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;

head为链表的头,tail为链表的尾,接下来的很多操作都与head和tail相关的。添加ChannelHandler操作。

@Override    public final ChannelPipeline addFirst(String name, ChannelHandler handler) {        return addFirst(null, name, handler);    }    @Override    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {        final AbstractChannelHandlerContext newCtx;        synchronized (this) {            checkMultiplicity(handler);            name = filterName(name, handler);            newCtx = newContext(group, name, handler);            addFirst0(newCtx);            // If the registered is false it means that the channel was not registered on an eventloop yet.            // In this case we add the context to the pipeline and add a task that will call            // ChannelHandler.handlerAdded(...) once the channel is registered.            if (!registered) {                newCtx.setAddPending();                callHandlerCallbackLater(newCtx, true);                return this;            }            EventExecutor executor = newCtx.executor();            if (!executor.inEventLoop()) {                newCtx.setAddPending();                executor.execute(new Runnable() {                    @Override                    public void run() {                        callHandlerAdded0(newCtx);                    }                });                return this;            }        }        callHandlerAdded0(newCtx);        return this;    }    private void addFirst0(AbstractChannelHandlerContext newCtx) {        AbstractChannelHandlerContext nextCtx = head.next;        newCtx.prev = head;        newCtx.next = nextCtx;        head.next = newCtx;        nextCtx.prev = newCtx;    }
在addFirst0中我们可以清晰的看到,创建双向循环对列,对列的基本单位是AbstractChannelHanndlerContext。ChannelPipeline继承ChannelInboundInvoker和ChannelOutboundInvoker,它既是一个inboundinvoke,又是一个outboundinvoke,同时也是ChannelChandler的管理者,提供了很多对handler进行的操作。


我们介绍一下channelActive操作channelActive函数

@Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {            ctx.fireChannelActive();            readIfIsAutoRead();        }
其具体实现在我们继承的ChannelHandlerContext类中,比如我们入门教程中的ClientHandler中
@Override      public void channelActive(ChannelHandlerContext ctx) {          System.out.println("HelloWorldClientHandler Active");      }  


由于ChannelHandlerContext是实现数据读写的具体操作类,DefaultChannelPipeline实现了一个ChannelHandlerContext的双向链表,ChannelHandlerContext中封装了我们对消息数据的具体操作比如ClientHandler打印数据,因此DefaultChannelPipeplie的双向链表就是对数据的各种操作,所以在很多地方都会看到pipeline的身影。