最近在研究Java NIO和netty,曾经一度感觉很吃力,根本原因还是对操作系统、TCP/IP、socket编程的理解不到位。
不禁感叹,还是当初逃的课太多。
假如上天给我一次机会,能够再回到意气风发的校园时代,我想那些逃过的课,应该还是会逃。
毕竟在那个躁动的年纪,有很多的事情都比上课有意思。
不扯闲篇了,进入正题。
先重新理解一下socket编程,主要是基于TCP协议。上一张我从《Unix网络编程》里面截取的一张图
通过这张图,能够大概理解socket编程的几个函数功能和调用顺序,更为关键的是可以看出TCP协议的3次握手发生的时机。
但是这张图并没有很好的揭示socket是怎样体现插座、插口的含义,所以我自己斗胆画了一张图,请多多指教。
借着这张图,说几个要点:
1、刚创建出来的socket,其实并没有server和client之分,只是socket调用了listen方法之后,角色才改变,处理逻辑也随之改变
2、client端的socket发送连接请求,server端的socket接收请求后,再创建一个socket与client端的socket传递数据,就像两个插口在通信
3、每个socket都有发送缓存和接收缓存,操作系统可以根据这些缓存来判断socket可读、可写、异常等状态
4、server端的socket保存着2种连接队列,后面还会说到
5、每个socket还会关联一个文件描述符(文件句柄),操作系统通过这个文件描述符(文件句柄)操作socket。图中并未画出。
再来说说Linux的IO多路复用。
Linux的多种IO模型以及select、poll、epoll等的详细介绍,我这里不赘述,主要也是因为段位不够。
我比较关注的是IO多路复用的那些IO事件。先看看jdk里面SelectionKey类里面的几个方法
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
} public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
} public final boolean isConnectable() {
return (readyOps() & OP_CONNECT) != 0;
} public final boolean isAcceptable() {
return (readyOps() & OP_ACCEPT) != 0;
}
方法名很简单:可读、可写、可连接、可接收。从socket的缓存判断可读、可写倒是很好理解;可是什么时候socket是可连接或者可接收呢???
于是硬着头皮慢慢啃《TCP-IP详解:卷2》,终于找到了一些端倪。不得不说,欠的债迟早是要还的。
下面再引入书中的一段文字:
图 1 6 - 5 2 显 示 了 插 口 的 读 、 写 和 例 外 情 况 。 我 们 将 看 到 s o o _ s e l e c t 使用了 s o r e a d a b l e 和 s o w r i t e a b l e 宏 , 这 些 宏 在 s y s / s o c k e t v a r . h 中定义。
1. 插口可读吗
1 1 3 - 1 2 0 s o r e a d a b l e 宏的定义如下:
#define soreadable(so) \
((so)->so_rcv.sb_cc >= (so)->so_rcv.sb_lowat || \
((so)->so_state & SS_CANTRCVMORE) || \
(so)->so_qlen || (so)->so_error)
因为 U D P 和 T C P 的 接 收 下 限 默 认 值 为 1 ( 图 1 6 - 4 ) , 下 列 情 况 表 示 插 口 是 可 读 的 : 接 收 缓 存 中有数据,连接的读通道被关闭,可以接受任何连接或有挂起的差错。
2. 插口可写吗
1 2 1 - 1 2 8 s o w r i t e a b l e 宏的定义如下:
#define sowriteable(so) \
(sbspace(&(so)->so_snd) >= (so)->so_snd.sb_lowat && \
(((so)->so_state&SS_ISCONNECTED) || \
((so)->so_proto->pr_flags&PR_CONNREQUIRED)==0) || \
((so)->so_state & SS_CANTSENDMORE) || \
(so)->so_error)
T C P 和 U D P 默 认 的 发 送 低 水 位 标 记 是 2 0 4 8 。对于 U D P 而言, s o w r i t e a b l e 总 是 为 真 , 因 为 s b s p a c e 总是等于 s b _ h i w a t , 当 然 也 总 是 大 于 或 等 于 s o _ l o w a t , 且 不 要 求 连 接 。对于 T C P 而 言 , 当 发 送 缓 存 中 的 可 用 空 间 小 于 2 0 4 8 个 字 节 时 , 插 口 不 可 写 。 其 他 的 情 况在图 1 6 - 5 2 中讨论。
3. 还有挂起的例外情况吗
1 2 9 - 1 4 0 对于例外情况,需检查标志 s o _ o o b m a r k 和 S S _ R E C V A T M A R K 。 直 到 进 程 读 完 数 据流中的同步标记后,例外情况才可能存在。
原来,select调用的底层实现里面,把很多个事件都只是归并进了可读和可写这两种状态。比如在我之前看来,server端的socket已经将连接排队,就代表可连接状态,可是在select看来,这就是可读状态。
有了前面的一些基础,现在上一段Java NIO的代码
// 创建一个selector
Selector selector = Selector.open();
// 创建一个ServerSocketChannel
ServerSocketChannel servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
// 绑定端口号
servChannel.socket().bind(new InetSocketAddress(8080), 1024);
// 注册感兴趣事件
servChannel.register(selector, SelectionKey.OP_ACCEPT); // select系统调用
selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 接收客户端的连接,并创建一个SocketChannel
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 将SocketChannel和感兴趣事件注册到selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// 读数据的处理
}
}
}
分析这段代码之前,先搞清楚selector、SelectionKey、pollArray等几个数据结构以及相互持有关系。
pollArray干的是数组的活,但是并不是一个直接的数组。
selector诞生的时候,随之关联了一块内存(pollArray),然后用unsafe类来小心翼翼的按字节顺序写入数据,最终实现了数组结构的功能。这种看似怪异的实现方式,应该是处于效率的考虑吧。
selector并没有直接持有pollArray,而是持有一个pollArray的封装类PollArrayWrapper的引用。
// The poll fd array
PollArrayWrapper pollWrapper; // 在selector的父类里面 // The set of keys with data ready for an operation
protected Set<SelectionKey> selectedKeys;
selectedKeys是一个集合,代表poll系统调用后返回的所有就绪事件,里面存放的数据结构是SelectionKey。
final SelChImpl channel; // package-private
public final SelectorImpl selector; // Index for a pollfd array in Selector that this key is registered with
private int index; // pollArray里面的索引值,保存在这里是方便实现数组操作 private volatile int interestOps; // 注册的感兴趣事件掩码
private int readyOps; // 就绪事件掩码
SelectionKey不但持有channel,还持有selector;interestOps、readyOps与pollArray里面的eventOps、reventOps对应。
Java定义了一些针对文件描述符的事件,其实也是对底层操作系统poll定义的事件的一个映射。事件用掩码来表示,非常方便进行位操作。如下:
public static final short POLLIN = 0x0001; // 文件描述符可读
public static final short POLLOUT = 0x0004; // 文件描述符可写
public static final short POLLERR = 0x0008; // 文件描述符出现错误
public static final short POLLHUP = 0x0010; // 文件描述符挂断
public static final short POLLNVAL = 0x0020; // 文件描述符不对
public static final short POLLREMOVE = 0x0800; // 文件描述符移除 @Native static final short POLLCONN = 0x0002; // 可连接
我记得POLLCONN在之前的版本中直接被赋值成POLLOUT,这里改成了0x0002,这里我是真不知道为什么。希望高手来回复一下。
最终这些事件都会传递到内核的poll系统调用,去监控所有传递给poll的文件描述符。
回到之前的NIO代码
1、先看看 servChannel.register(selector, SelectionKey.OP_ACCEPT) 是如何实现注册的
一路调用后,会到一个关键方法
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k); // 这一步把channel的文件描述符fd添加到pollArray(见上图)
}
k.interestOps(ops); // 这一步把感兴趣事件eventOps添加到pollArray(见上图)
return k;
}
具体的逻辑肯定比注释要复杂。接下来看看pollArray的内存操作,以添加文件描述符fd为例
void putDescriptor(int i, int fd) {
int offset = SIZE_POLLFD * i + FD_OFFSET;
pollArray.putInt(offset, fd);
} final void putInt(int offset, int value) {
unsafe.putInt(offset + address, value);
}
最终还是用unsafe直接修改内存
2、再看看最核心的selector.select(1000)。次方法最终调用doSelect方法,而doSelect方法的实现有多种,我们就以poll版本进行探秘
// 做了很多删减
protected int doSelect(long timeout)
throws IOException
{
// 执行最核心的poll系统调用
pollWrapper.poll(totalChannels, 0, timeout);
// 将到来的就绪事件更新保存
int numKeysUpdated = updateSelectedKeys();
return numKeysUpdated;
}
poll系统调用会把用户空间的线程挂起,也就是阻塞调用,timeout指定多长时间后必须返回。
updateSelectedKeys方法根据poll返回的channel就绪事件,去更新pollArray对应fd的reventOps(见上图),以及selector的selectedKeys。
/**
* Copy the information in the pollfd structs into the opss
* of the corresponding Channels. Add the ready keys to the
* ready queue.
*/
protected int updateSelectedKeys() {
int numKeysUpdated = 0;
// Skip zeroth entry; it is for interrupts only
for (int i=channelOffset; i<totalChannels; i++) {
// 得到就绪事件的掩码
int rOps = pollWrapper.getReventOps(i);
if (rOps != 0) {
SelectionKeyImpl sk = channelArray[i];
pollWrapper.putReventOps(i, 0); // 重置为0,即为未就绪
if (selectedKeys.contains(sk)) {
// 把事件的掩码翻译成SelectionKey中定义的操作(OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT)
if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
numKeysUpdated++;
}
} else {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
// 更新selectedKeys
selectedKeys.add(sk);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
把就绪事件的掩码进行翻译,感觉就像是Java做的一层适配,让我们用户不用去关注事件掩码等细节
看一下实现这一逻辑的一段代码,在ServerSocketChannel类里面:
/**
* Translates native poll revent set into a ready operation set
*/
public boolean translateReadyOps(int ops, int initialOps,
SelectionKeyImpl sk) {
int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
int oldOps = sk.nioReadyOps();
int newOps = initialOps; if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
// This should only happen if this channel is pre-closed while a
// selection operation is in progress
// ## Throw an error if this channel has not been pre-closed
return false;
} if ((ops & (PollArrayWrapper.POLLERR
| PollArrayWrapper.POLLHUP)) != 0) {
newOps = intOps;
sk.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
// 这里将可连接当作可读来看待的
if (((ops & PollArrayWrapper.POLLIN) != 0) &&
((intOps & SelectionKey.OP_ACCEPT) != 0))
newOps |= SelectionKey.OP_ACCEPT; sk.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
通过上面的分析,大概有了一个清晰的思路:
Java NIO主要是基于底层操作系统提供的的IO多路复用功能,比如Linux下的select/poll、epoll等系统调用。Java层面为每个selector开辟了一块内存,用来保存用户注册的所有channel、所有感兴趣事件,并最终当作参数传递给底层的系统调用,最后将内核返回的结果封装成selectedKeys等数据结构。