Java NIO就已经将底层socket链路抽象为,Netty也将其抽象为。如果将Netty比如为一条高速运转的生产线,那么EventLoop是这条生产线的动力装置,而Channel则是一个个的机械臂,EventLoop驱动着Channel不断地执行IO操作。
Channel
可以认为Channel是对socket连接的抽象,一个支持IO操作的通道,Netty对Channel的定义如下:
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
//全局唯一Id
ChannelId id();
//注册的EventLoop
EventLoop eventLoop();
//父Channel,可为null
Channel parent();
//Channel的配置抽象
ChannelConfig config();
//channel打开,应该是指底层IO通道资源打开
boolean isOpen();
//是否注册到EventLoop
boolean isRegistered();
//是否活跃的,也即处于连接状态
boolean isActive();
//另外一个channel信息的抽象,暂时不管它
ChannelMetadata metadata();
//本地地址
SocketAddress localAddress();
//远端地址
SocketAddress remoteAddress();
//监视关闭状态的Future
ChannelFuture closeFuture();
//当前是否可写,是指能否立即将数据写入底层,否则的话写数据会进入队列
boolean isWritable();
//再写入多少byte数据,isWritable将会返回false
long bytesBeforeUnwritable();
//还需要从底层缓存减少多少数据,isWritable将会返回true
long bytesBeforeWritable();
//返回一个内部Unsafe对象,Channel的底层IO操作将会封装在Unsafe里
Unsafe unsafe();
//Channel的Pipeline
ChannelPipeline pipeline();
//指定给Channel的ByteBuf分配器
ByteBufAllocator alloc();
//请求从底层信道读入数据到缓冲区
@Override
Channel read();
//请求将写缓冲数据数据刷入底层信道
@Override
Channel flush();
//内部类接口Unsafe的定义(不用看仔细,瞄一眼即可)
interface Unsafe {
recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void deregister(ChannelPromise promise);
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
}
//Channel父接口ChannelOutboundInvoker定义(不用看仔细,瞄一眼即可)
public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
}
虽然只是几个接口,它们包含的信息量不少。Netty的Channel比Nio的Channel要复杂得多,上面的接口定义体现Channel和EventLoop、ChannelPipeline之间的关联。Channel的配置信息被独立出来成为一个抽象ChannelConfig,方便外部访问,体现了接口隔离原则。
从功能的角度看,Channel在Netty中有两个角色,对外它要提供一个IO操作的接口,方便用户使用;对内它要封装好底层IO的实现。在这,Channel再次采用了接口分离的原则,前一个角色由父接口ChannelOutboundInvoker担任,后一个角色则由内部接口Unsafe担任。用户可以在任意线程调用ChannelOutboundInvoker的方法,最终转换为EventLoop内对Unsafe的调用。
从设计模式角度看,Channel和Unsafe之间的关系构成是桥接模式,每种具体Channel实现类都需要有对应的Unsafe实现类。
另外Channel读写完全是无阻塞的,它内部肯定有缓冲区,Channel与ByteBufAllocator、ChannelOutboundBuffer的关联提供了线索。
最后AttributeMap是方便用户在Channel内存储key&value属性,Spring MVC的HttpRequest也有类似功能,后面不再讨论它。
AbstractChannel
AbstractChannel是直接实现Channel接口的抽象类,为所有具体Channel提供一个基石,它的成员字段定义如下:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
private Throwable initialCloseCause;
}
上面的代码还是比较好理解的,就是将Channel接口定义的一些属性具体化为相应类型的成员字段。
AbstractChannel构造方法
再看构造相关方法:
protected AbstractChannel(Channel parent) {
= parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected ChannelId newId() {
return ();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected abstract AbstractUnsafe newUnsafe();
channelId由静态工厂DefaultChannelId创建,保证全局的唯一性;pipeline的实现类型为DefaultChannelPipeline;这两个字段的创建方法虽然都声明为protected,子类一般不需要覆盖。最后unsafe的工厂方法声明为abstract,由子类来实现,对照上一节的分析,这是必然的。
AbstractChannel如何实现ChannelOutboundInvoker
AbstractChannel实现了ChannelOutboundInvoker定义的IO操作方法,摘录一部分如下:
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return (localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return (remoteAddress);
}
@Override
public ChannelFuture disconnect() {
return ();
}
@Override
public ChannelFuture close() {
return ();
}
@Override
public ChannelFuture deregister() {
return ();
}
@Override
public Channel read() {
();
return this;
}
@Override
public ChannelFuture write(Object msg) {
return (msg);
}
可以看到,面向远端的IO操作全部都转发给了pipeline来执行。,所以pipeline里面一定别有洞天。
AbstractChannel直接调用unsafe的场景
原则上Channel接口方法实现不应该直接调用unsafe(这相当于在非eventLoop线程内调用),不过在向外发布缓冲区的一些状态时,不可避免地要同步访问unsafe的成员:
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = ();
return buf != null && ();
}
@Override
public long bytesBeforeUnwritable() {
ChannelOutboundBuffer buf = ();
return buf != null ? () : 0;
}
@Override
public long bytesBeforeWritable() {
ChannelOutboundBuffer buf = ();
return buf != null ? () : Long.MAX_VALUE;
}
ChannelOutboundBuffer是Channel内部的写缓冲区,它有几个方法是线程安全的,包括isWritable,size,还有bytesBeforeUnwritable和bytesBeforeWritable。
这几个方法的语义在本章第一节已经介绍过
AbstractUnsafe
前面说过,每个Channel背后有一个对应的Unsafe,在AbstractChannel内有,也有一个AbstractUnsafe,为子类的Unsafe实现打下基础:
//为了节省篇幅,异常处理代码全忽略
public abstract class AbstractChannel {
protected abstract class AbstractUnsafe implements Unsafe {
//写缓冲区
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer();
//读缓冲区的工厂
private recvHandle;
//一个flush标记,属于实现细节,暂时不要care
private boolean inFlush0;
//channel是否从未注册过
private boolean neverRegistered = true;
//注册操作
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
= eventLoop;
//确保注册操作发生在EventLoop线程
if (()) {
register0(promise);
} else {
(new Runnable() {
public void run() {
register0(promise);
}
});
}
}
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
//调用Channel的doRegister方法
doRegister();
neverRegistered = false;
registered = true;
//触发pipeline事件HandlerAdded
();
safeSetSuccess(promise);
//触发pipeline事件ChannelRegistered
();
//Channel注册时可能已经处于连接状态
if (isActive()) {
if (firstRegistration) {
//触发pipeline事件ChannelActive
();
} else if (config().isAutoRead()) {
//开始读请求
beginRead();
}
}
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {...}
@Override
public final void disconnect(final ChannelPromise promise) {...}
@Override
public final void close(final ChannelPromise promise) {...}
@ Override
public final void beginRead() {...}
}
protected void doRegister() throws Exception { // NOOP }
protected abstract void doBind(SocketAddress localAddress) throws Exception;
protected abstract void doDisconnect() throws Exception;
protected abstract void doClose() throws Exception;
protected abstract void doBeginRead() throws Exception;
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
}
我们可以看到AbstractUnsafe的register实现,它主要做了三件事:
- 确保的注册操作在对应的EventLoop内执行;
- 调用了对应Channel的doRegister方法;
- 触发必要的pipline事件;
AbstractUnsafe的设计是典型的策略模式,父类实现了操作流程的骨架,将某些环节留给子类;,disconnect,close,beginRead都是这个套路。
但是也有例外,比如操作不是这样的,我们来看一下:
public abstract class AbstractChannel {
//消息过滤器,在将消息写入buffer之前,转换一下,默认doNothing
protected Object filterOutboundMessage(Object msg) throws Exception {
return msg;
}
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void write(Object msg, ChannelPromise promise) {
//忽略异常检查代码
...
int size;
try {
msg = filterOutboundMessage(msg);
size = ().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
//忽略异常处理代码
}
//将消息写入outboundBuffer
(msg, size, promise);
}
@Override
public final void flush() {
();
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
return;
}
inFlush0 = true;
//Channel已经不活跃,直接失败
if (!isActive()) {
//忽略错误状态处理
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
//忽略异常处理错误
} finally {
inFlush0 = false;
}
}
}
}
这段代码展示非常重要的信息:
- Unsafe的write操作,仅仅是将数据写入缓冲区outboundBuffer;
- Unsafe的flush操作,才调用方法将缓冲数据写入底层socket;
- 至于面向用户的操作,是将数据传入pipeline,我们暂时还没有把它和上面两个操作串起来,相信在pipeline的实现里面肯定另有玄机。
最后我们发现AbstractUnsafe没有实现connect方法,说明该操作不好抽象,只能完全由子类来实现。
AbstractNioChannel
让我们沿着Nio的方向继续前行,来到AbstractNioChannel;NIO有两种Channel,一种监听channel,一种数据传输channel,所以有一个公共的抽象基类是自然的。
AbstractNioChannel成员字段和构造方法如下:
public abstract class AbstractNioChannel extends AbstractChannel {
//包装一个NioChannel(SelectableChannel是NioChannel的抽象类型)
private final SelectableChannel ch;
//NioChanne读操作码,为什么直接是SelectionKey.OP_READ,而是一个变量呢?
//因为Netty将OP_READ和OP_ACCEPT都当做read操作,前者是通信Channel的read,后者是监听Channel的read
protected final int readInterestOp;
//在NioChannel注册到Selector之后产生的SelectionKey
volatile SelectionKey selectionKey;
//状态标记,是否一个读操作正在挂起
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
//同步连接操状态作的Future对象
private ChannelPromise connectPromise;
//同步连接超时状态的Future对象
private ScheduledFuture<?> connectTimeoutFuture;
//暂存正在连接的远端地址
private SocketAddress requestedRemoteAddress;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
= ch
= readInterestOp;
//配置NioChannel非阻塞模式
(false);
}
//返回包装的NioChannel
protected SelectableChannel javaChannel() {
return ch;
}
}
AbstractNioChannel已经和Java Nio相关类型关联上了,SelectableChannel,SelectionKey,readInterestOp都是Java NIO的概念。方法返回底层的Nio Channel,这个方法会被频繁调用。
doRegister
上节介绍AbstractChannel时,我们已经知道Unsafe的register操作需要AbstractChannel子类来提供一个doRegister方法来完成底层操作,我们现在就来看看。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//这里的eventLoop必然是NioEventLoop
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
//如果抛出了CancelledKeyException异常,立即执行一次操作;这应该为了解决JDK NIO的Bug。
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
有了前面的信息,我们很容易知道,doRegister所做的就是将Nio Channel注册到Selector;而doDeregister执行反向操作。
而上面对CancelledKeyException处理,再一次体现了Netty为我们屏蔽底层bug的事实。
doBeginRead
@Override
protected void doBeginRead() throws Exception {
// () or () was called
final SelectionKey selectionKey = ;
if (!()) {
return;
}
readPending = true;
final int interestOps = ();
if ((interestOps & readInterestOp) == 0) {
(interestOps | readInterestOp);
}
}
而doBeginRead则将Channel的读操作码(readInterestOp),添加到注册的SelectionKey里,表示需要Selector关注该Channel的可读事件。
doClose
@Override
protected void doClose() throws Exception {
ChannelPromise promise = connectPromise;
if (promise != null) {
(new ClosedChannelException());
connectPromise = null;
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
(false);
connectTimeoutFuture = null;
}
}
发现仅仅修改了相关Future对象的状态,没有实质性动作,说明子类会覆盖它。
其他doXXX操作
我们发现doBind,doDisconnect,doWrite在这一层都没有实现,留给了子类。
AbstractNioUnsafe
分析任何一个Channel类型都绕不开对应的Unsafe,而且我们知道类的AbstractUnsafe并没有实现connect功能,需要特别关注下。
//AbstractNioChannel扩充了Unsafe接口
public interface NioUnsafe extends Unsafe {
//返回底层nio channel
SelectableChannel ch();
//完成连接后的回调
void finishConnect();
//向底层发出读请求
void read();
//将缓存刷入底层信道
void forceFlush();
}
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
@Override
public final SelectableChannel ch() {
return javaChannel();
}
@Override
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//这里忽略可重入检查代码
...
try {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
//这里忽略了失败检查代码
}
} catch (Throwable t) {
//这里忽略了异常处理代码
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
boolean active = isActive();
boolean promiseSet = ();
if (!wasActive && active) {
//触发pipline ChannelActive事件
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}
@Override
public final void finishConnect() {
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
//忽略异常处理
}
}
@Override
protected final void flush0() {
if (!isFlushPending()) {
super.flush0();
}
}
@Override
public final void forceFlush() {
super.flush0();
}
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return () && (() & SelectionKey.OP_WRITE) != 0;
}
}
AbstractNioUnsafe其实还是用策略模式将底层connect操作委托给了底层doConnect,和AbstractUnsafe的套路一样。
finishConnect是AbstractNioUnsafe新引入的接口,它的调用在(NioEventLoop里),doConnect建立起连接后,还要等Selector返回OP_CONNECT信号,这个只能发生在EventLoop里;finishConnect使用策略模式,将底层逻辑委托给子类实现doFinishConnect。
另外在flush操作上,AbstractNioUnsafe只做了一点点优化,对父类的flush操作增加了isFlushPending检查(是否正在等待底层的OP_WRITE信号);如果要强行flush则要执行forceFlush方法。
AbstractNioMessageChannel
从AbstractNioChanne往下,又分成了两个方向,一个通往NioServerSocketChannel(监听Channel),一个通往NioSocketChannel(通信Channel)。我们先往第一个方向前进,来到AbstractNioMessageChannel。
从名字上看,AbstractNioMessageChannel处理的是message,而不是底层字节流,它实现了AbstractChannel层声明的doWrite方法:
//注意ChannelOutboundBuffer待写入底层信道的输出缓冲区
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = ();
for (;;) {
Object msg = ();
//如果没有待写入数据,暂时不关注该channel的OP_WRITE信号,优化eventLoop性能。
if (msg == null) {
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
//WriteSpinCount是ChannelConfig的属性,表示写入失败时,忙等的循环次数
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
//如果写入成功将当前数据对象从缓存删除
if (done) {
();
} else {
//否则,应该监听OP_WRITE状态变化,由eventLoop通知我们继续写入
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
//忽略了异常处理
}
}
}
NioMessageUnsafe
这一层的Unsafe实现不再是抽象的,它实现了AbstractNioUnsafe引入的read方法。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//allocHandle是操作读缓冲区的句柄对象
final allocHandle = unsafe().recvBufAllocHandle();
//这一步主要重置allocHandle内部的读消息计数器
(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//将底层读操作,委派给了方法
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
//如果读取的数量<0,说明底层信道已经关闭了
if (localRead < 0) {
closed = true;
break;
}
(localRead);
} while (());
} catch (Throwable t) {
exception = t;
}
int size = ();
for (int i = 0; i < size; i ++) {
readPending = false;
//读到的的消息,一个一个作为参数,触发pipeline ChannelRead事件
((i));
}
();
();
//触发pipeline ChannelReadComplete事件
();
//忽略了异常处理和closed状态处理代码
} finally {
//忽略了收尾代码
}
}
}
实现过程也向我们透露了一个信息:ChannelReadComplete事件触发意味着,在一个eventLoop循环中,底层读缓冲区全部处理掉了。另外需要AbstractNioMessageChannel的子类实现doReadMessages方法来实现底层的读消息逻辑。
NioServerSocketChannel
终于来到了Channel类族的最底部,NioServerSocketChannel是负责监听的Channel,它的字段和构造方法如下:
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = ();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return ();
} catch (IOException e) {
...
}
}
private final ServerSocketChannelConfig config;
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public ServerSocketChannelConfig config() {
return config;
}
}
NioServerSocketChannel的ChannelMetadata是常量,初始化config为NioServerSocketChannelConfig(ChannelConfig暂时不研究),它通过SelectorProvider打开一个Nio ServerSocketChannel,并且明确表明只关注SelectionKey.OP_ACCEPT状态。
Nio的()方法内部也是调用SelectorProvider,所以直接使用能节省一次调用。
doRead方法
AbstractNioMessageChannel留了一个doRead方法由NioServerSocketChannel来实现:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = (javaChannel());
try {
if (ch != null) {
(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
//忽略异常处理逻辑
}
return 0;
}
NioServerSocketChannel的doReadMessages执行的就是Nio ,而输出的结果就是一个新创建的NioSocketChannel。
其他IO操作
由于监听socket不支持其他IO操作,所以相关方法的实现如下:
@Override
protected boolean doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFinishConnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
AbstractNioByteChannel
现在退回去两步,再向Nio Channel的另一方向NioSocketChannel探索,来到AbstractNioByteChannel,从字面上看这是一个处理字节数据的Channel。
AbstractNioByteChannel的成员字段和构造方法如下:
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
//如果写关闭了,那么也不允许读
private boolean inputClosedSeenErrorOnRead;
//该Channel,初始只关注OP_READ状态
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
}
doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = ();
if (msg == null) {
//如果没有写数据,那么清除SelectionKey.OP_WRITE操作码
clearOpWrite();
return;
}
//嗲用doWriteInternal来将数据写入底层
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
//如果执行到这,说明数据没有写完,执行incompleteWrite
incompleteWrite(writeSpinCount < 0);
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
//ByteBuf写入分支
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!()) {
();
return 0;
}
//调用doWriteBytes写入ByteBuf
final int localFlushedAmount = doWriteBytes(buf);
//localFlushedAmount是写入的字节数,需要告诉ChannelOutboundBuffer,显然后者需要维护一个写的进度
if (localFlushedAmount > 0) {
(localFlushedAmount);
//=fasle,说明当前ByteBuf写完了,删除之
if (!()) {
();
}
return 1;
}
} else if (msg instanceof FileRegion) {
//写入FileRegion的代码忽略
} else {
//其他数据类型不支持
throw new Error();
}
//如果执行到这,说明底层写不进去了,WRITE_STATUS_SNDBUF_FULL=,告诉外层停止写入
return WRITE_STATUS_SNDBUF_FULL;
}
//doWriteBytes由子类来实现
protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
AbstractNioByteChannel的写入操作,声明了抽象方法doWriteBytes留给子类实现。
NioByteUnsafe
再来关注NioByteUnsafe,主要关注它的read方法:
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
final ChannelConfig config = config();
//这个应该是指:channel的写通道关闭之后,也不能读了
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = ();
final allocHandle = recvBufAllocHandle();
(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//分配一个ByteBuf缓冲区
byteBuf = (allocator);
//调用doReadBytes将数据读入byteBuf,同时由allocHandle记录读入的字节数
(doReadBytes(byteBuf));
if (() <= 0) {
//没有读到任何数据,释放byteBuf
();
byteBuf = null;
close = () < 0;
//读到的字节数<0,意味着通道已经关闭
if (close) {
readPending = false;
}
break;
}
(1);
readPending = false;
//触发pipeline ChannelRead事件,参数是byteBuf
(byteBuf);
byteBuf = null;
} while (());
();
();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
//忽略异常处理
}
}
}
NioSocketChannel
NioSocketChannel是最终和业务代码交互的类型,我们先看它的成员和构造方法:
public class NioSocketChannel extends AbstractNioByteChannel implements {
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = ();
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return ();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
private final SocketChannelConfig config;
//客户端使用这个
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
//在Server端使用该方法
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
}
Server端的NioSocketChannel是由NioServerSocketChannel创建的,底层的Nio SocketChannel是接受客户端连接请求产生的。而客户端的NioSocketChannel,内部通过SelectorProvider打开底层SocketChannel对象。
doBind
doBind的底层调用的就是,底层方法。
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
doBind0(localAddress);
}
private void doBind0(SocketAddress localAddress) throws Exception {
if (() >= 7) {
(javaChannel(), localAddress);
} else {
(javaChannel().socket(), localAddress);
}
}
connect和close
doConnect,doFinishConnect,doDisconnect,doClose都是执行对应的java nio操作,其中doDisconnect和doClose是一样的。
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = (javaChannel(), remoteAddress);
if (!connected) {
//如果connect没有立即成功,通过Selector来监视OP_CONNECT状态
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected void doFinishConnect() throws Exception {
//收到Selector监视到OP_CONNECT状态后,要调用,这是java nio的要求
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
();
javaChannel().close();
}
doWrite
本来以为AbstractNioByteChannel的doWrite是最终实现,NioSocketChannel又覆盖了它:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
//writeSpinCount = 连续尝试底层写操作的次数
int writeSpinCount = config().getWriteSpinCount();
do {
//没有要写的数据了,所以清楚SelectionKey上的OP_WRITE操作码
if (()) {
clearOpWrite();
return;
}
//从写缓冲区取出数据,注意:这里要求ChannelOutboundBuffer将内部的ByteBuf转换成ByteBuffer
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = (1024, maxBytesPerGatheringWrite);
int nioBufferCnt = ();
switch (nioBufferCnt) {
case 0:
//没有ByteBuffer,但是有可能有其他类型数据,比如FileRegion
writeSpinCount -= doWrite0(in);
break;
case 1: {
//一个ByteBuffer的写法
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = ();
final int localWrittenBytes = (buffer);
//如果写不进去,显然不需要再试
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
//依据写入的情况,动态调整配置,见adjustMaxBytesPerGatheringWrite解释
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
//删掉已经写入的数据
(localWrittenBytes);
//这里只写了一次
--writeSpinCount;
break;
}
default: {
//其实和上面的case差不多,调用ch另一个方法而已
long attemptedBytes = ();
final long localWrittenBytes = (nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,maxBytesPerGatheringWrite);
(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
//只有执行case 0才可能到这,参照实现,在底层写不进去的时候返回,导致writeSpinCount<0
incompleteWrite(writeSpinCount < 0);
}
//虽然可以通过SO_SENDBUF选项来追踪底层socket的一次写入数据块大小,但是有些操作系统会动态修改这个值,所以Netty也动态地追踪一次写入数据合适的上限
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
//如果 尝试写入=成功写入,那么下次扩大两倍
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
}
}
//如果 尝试写入> 成功写入*2,且attempted大于一个固定门限值,则下次缩小一般。
else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
经过上面的分析,我们可知,NioSocketChannel之所要覆盖基类的doWrite方法,是因为它要发挥Java NIO GatheringWrite的性能优势。
总结
Channel的继承层次很深,因此代码阅读比较困难(应该说是Netty的核心概念里面,最绕的一部分)。总结起来,Channel就是负责底层socket操作,屏蔽了底层接口细节;并将相关的事件(包括socket状态和读写相关)通过Channel的pipline传递。
而且我们也清清楚楚地看到,通过写入的数据会暂存到ChannelOutboundBuffer里面,才会真正写入底层socket。