NIO 源码分析(02-1) BIO 源码分析 - binarylei

时间:2024-01-31 12:50:49

NIO 源码分析(02-1) BIO 源码分析

NIO 源码分析(02-1) BIO 源码分析

Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

一、BIO 最简使用姿势

(1) JDK BIO 启动服务典型场景

// 1. 绑定端口
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress((InetAddress) null, PROT), BACKLOG);

while (true) {
    // 2. 获取客户端请求的Socket,没有请求就阻塞
    Socket socket = serverSocket.accept();
    // 3. 开启一个线程执行客户端的任务
    new Thread(new ServerHandler(socket)).start();
}

// 绑定端口,开启服务
public void bind(SocketAddress endpoint, int backlog) throws IOException {
    getImpl().bind(epoint.getAddress(), epoint.getPort());
    getImpl().listen(backlog);
}

ok,代码已经完成!!!下面我们和 Linux 下的网络编程进行对比。

(2) Linux BIO 启动服务典型场景

int listenfd = socket(AF_INET, SOCK_STREAM, 0);
bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
listen(listenfd, BACKLOG);

socklen_t cliaddr_len = sizeof(client_addr);
int clientfd = accept(listenfd, (struct sockaddr*)&client_addr, &cliaddr_len);

对比 Linux 上网络编程,我们会发现 JDK Socket 的编程逻辑是一模一样的。实时上也是这样,JDK 网络编程也没有做很多事,主要还是调用了 Linux 相关的函数。唯一的不同是 Linux 是面向过程程序,socket 函数返回的是一个句柄,bind 和 listen 都是对这个句柄的操作;而 JDK 是面向对象编程,new ServerSocket() 返回了一个对象,我们可以调用这个 serverSocket 对象的各种方法。

Linux NIO 系列(02) 阻塞式 IO 网络编程

二、ServerSocket 源码分析

JDK 为我们提供了 ServerSocket 类作为服务端套接字的实现,通过它可以让主机监听某个端口而接收其他端的请求,处理完后还可以对请求端做出响应。它的内部真正实现是通过 SocketImpl 类来实现的,它提供了工厂模式,所以如果自己想要其他的实现也可以通过工厂模式来改变的。默认的实现类是 SocksSocketImpl,ServerSocket 和 Socket 只是一个门面模式。

2.1 相关类图

前面说到 ServerSocket 类真正的实现是通过 SocketImpl 类,于是可以看到它使用了 SocketImpl 类,但由于 windows 和 unix-like 系统有差异,而 windows 不同的版本也需要做不同的处理,所以两类系统的类不尽相同。

SocketImpl类图

说明:

  1. SocketImpl 类实现了 SocketOptions 接口,接着还派生出了一系列的子类,其中 AbstractPlainSocketImpl 是原始套接字的实现的一些抽象,而 PlainSocketImpl 类是一个代理类。

  2. windows 下 PlainSocketImpl 代理 TwoStacksPlainSocketImpl 和 DualStackPlainSocketImpl 两种不同实现。存在两种实现的原因是一个用于处理 Windows Vista 以下的版本,另一个用于处理 Windows Vista 及以上的版本。

  3. unix-like 不存在版本的问题,所以它直接由 PlainSocketImpl 类实现。

  4. 这两类操作系统都还存在一个 SocksSocketImpl 类,它其实主要是实现了防火墙安全会话转换协议,包括 SOCKS V4 和 V5 。

根据上面可以看到其实对于不同系统就是需要做差异处理,基本都是大同小异,下面涉及到套接字实现均以 Windows Vista 及以上的版本为例进行分析,即 DualStackPlainSocketImpl。

2.2 主要属性

private boolean created = false;
private boolean bound = false;
private boolean closed = false;
private Object closeLock = new Object();
private SocketImpl impl;
private boolean oldImpl = false;
  • created 表示是否已经创建了 SocketImpl 对象,ServerSocket 需要依赖该对象实现套接字操作。
  • bound 是否已绑定地址和端口。
  • closed 是否已经关闭套接字。
  • closeLock 关闭套接字时用的锁。
  • impl 真正的套接字实现对象。
  • oldImpl 是不是使用旧的实现。

下面我们看一下 ServerSocket 的主要方法。

2.3 构造函数

有五类构造函数,可以什么参数都不传,也可以传入 SocketImpl、端口、backlog 和地址等。主要看一下最后一个构造函数,setImpl 方法用于设置实现对象,然后检查端口大小是否正确,检查 backlog 小于 0 就让它等于 50,最后进行端口和地址绑定操作。

public ServerSocket() throws IOException {
    setImpl();
}  

总结: 在 new ServerSocket() 时会通过 setImpl 方法创建一个 SocketImpl 的实现类,以 window 下 DualStackPlainSocketImpl 为例。时序图如下:

ServerSocket创建时序图

2.3.1 setImpl 方法

设置套接字实现对象,这里提供了工厂模式可以方便的对接其他的实现,而默认是没有工厂对象的,所以模式的实现为 SocksSocketImpl 对象。

private void setImpl() {
    if (factory != null) {
        impl = factory.createSocketImpl();
        checkOldImpl();
    } else {
        impl = new SocksSocketImpl();
    }
    if (impl != null)
        impl.setServerSocket(this);
}

总结: 从上面的方法可以看出构造方法只是创建了 SocksSocketImpl 对象,这些都只是 JDK 层面的东西,并没直接创建网络连接,bind 方法则会直接创建网络连接。

2.4 bind 方法

该方法用于将套接字绑定到指定的地址和端口上,如果 SocketAddress 为空,即代表地址和端口都不指定,此时系统会将套接字绑定到所有有效的本地地址,且动态生成一个端口。逻辑如下:

// 绑定端口,开启服务
public void bind(SocketAddress endpoint, int backlog) throws IOException {
    // 省略...
    if (endpoint == null)
        endpoint = new InetSocketAddress(0);
    if (backlog < 1)
          backlog = 50;
    getImpl().bind(epoint.getAddress(), epoint.getPort());
    getImpl().listen(backlog);
}

总结: ServerSocket.bind 方法调用 impl.bind 和 impl.listen 创建网络连接,下面我们就重点分析这两个方法都做了些什么事。

注意: 地址是否为空,为空则创建一个 InetSocketAddress,默认是所有有效的本地地址,对应的为
0.0.0.0,而端口默认为0,由操作系统动态生成,backlog 如果小于 1 则设为 50。

ServerSocket bind

2.4.1 socketCreate 方法

先看一下 createImpl 方法。

// ServerSocket
void createImpl() throws SocketException {
    if (impl == null)
        setImpl();
    try {
        impl.create(true);
        created = true;
    } catch (IOException e) {
        throw new SocketException(e.getMessage());
    }
}

总结: createImpl 将创建 socket 网络套接字的任务直接委任给了对应的 impl 实现类,在这里也就是 DualStackPlainSocketImpl。

// AbstractPlainSocketImpl
protected synchronized void create(boolean stream) throws IOException {
    this.stream = stream;
    if (!stream) {  // UDP
        ResourceManager.beforeUdpCreate();
        // only create the fd after we know we will be able to create the socket
        fd = new FileDescriptor();
        try {
            socketCreate(false);
        } catch (IOException ioe) {
            ResourceManager.afterUdpClose();
            fd = null;
            throw ioe;
        }
    } else {        // TCP
        fd = new FileDescriptor();
        socketCreate(true);
    }
    if (socket != null)
        socket.setCreated();
    if (serverSocket != null)
        serverSocket.setCreated();
}

总结: 在 AbstractPlainSocketImpl 抽象类中 create 方法对 UDP 和 TCP 协议分别做了处理,创建 socket 套接字的代码就一句 socketCreate(true),由具体的实现类完成。下面我们再看一下 DualStackPlainSocketImpl 是如何进行网络连接的。

// DualStackPlainSocketImpl
void socketCreate(boolean stream) throws IOException {
    if (fd == null)
        throw new SocketException("Socket closed");
    int newfd = socket0(stream, false /*v6 Only*/);
    fdAccess.set(fd, newfd);
}

总结: socket0 是一个 native 方法,也就是上面 Linux 的 Socket 函数完成的。然后将返回的 Socket 句柄设置到 fd 对象中(FileDescriptor 是 JDK 对句柄的抽象)。

补充1:socket0 在 JVM 中的实现
// windows/native/java/net/DualStackPlainSocketImpl.c
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_socket0
  (JNIEnv *env, jclass clazz, jboolean stream, jboolean v6Only /*unused*/) {
    int fd, rv, opt=0;

    // 最关键的一句代码,怎么样,是不是和 Linux 网络编程的 socket 函数一模一样的
    fd = NET_Socket(AF_INET6, (stream ? SOCK_STREAM : SOCK_DGRAM), 0);
    if (fd == INVALID_SOCKET) {
        NET_ThrowNew(env, WSAGetLastError(), "create");
        return -1;
    }

    rv = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &opt, sizeof(opt));
    if (rv == SOCKET_ERROR) {
        NET_ThrowNew(env, WSAGetLastError(), "create");
    }

    SetHandleInformation((HANDLE)(UINT_PTR)fd, HANDLE_FLAG_INHERIT, FALSE);
    return fd;
}

2.4.2 socketBind 方法

// AbstractPlainSocketImpl
protected synchronized void bind(InetAddress address, int lport) throws IOException {
   synchronized (fdLock) {
        if (!closePending && (socket == null || !socket.isBound())) {
            NetHooks.beforeTcpBind(fd, address, lport);
        }
    }
    socketBind(address, lport);     // 绑定端口
    if (socket != null)
        socket.setBound();
    if (serverSocket != null)
        serverSocket.setBound();
}

总结: 和 create 方法一样 AbstractPlainSocketImpl 也把绑定端口 socketBind 交给子类 DualStackPlainSocketImpl 实现。

// DualStackPlainSocketImpl
void socketBind(InetAddress address, int port) throws IOException {
    int nativefd = checkAndReturnNativeFD();
    if (address == null)
        throw new NullPointerException("inet address argument is null.");

    bind0(nativefd, address, port, exclusiveBind);
    if (port == 0) {
        localport = localPort0(nativefd);
    } else {
        localport = port;
    }

    this.address = address;
}

总结: bind0 也是一个 native 方法,下面看一下 Winidow 上的实现。

补充2:bind0 在 JVM 中的实现
JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_bind0
  (JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port,
   jboolean exclBind) {
    SOCKETADDRESS sa;
    int rv;
    int sa_len = sizeof(sa);

    if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
                                 &sa_len, JNI_TRUE) != 0) {
      return;
    }
    // 最关键的一句代码,绑定端口
    rv = NET_WinBind(fd, (struct sockaddr *)&sa, sa_len, exclBind);

    if (rv == SOCKET_ERROR)
        NET_ThrowNew(env, WSAGetLastError(), "JVM_Bind");
}

2.4.3 socketListen 方法

// DualStackPlainSocketImpl
void socketListen(int backlog) throws IOException {
    int nativefd = checkAndReturnNativeFD();
    listen0(nativefd, backlog);
}
补充3:listen0 在 JVM 中的实现
JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_listen0
  (JNIEnv *env, jclass clazz, jint fd, jint backlog) {
    // 关键的一句代码 listen 启动服务
    if (listen(fd, backlog) == SOCKET_ERROR) {
        NET_ThrowNew(env, WSAGetLastError(), "listen failed");
    }
}

2.5 accept 方法

accept方法

该方法用于接收套接字连接,套接字开启监听后会阻塞等待套接字连接,一旦有连接可接收了则通过该方法进行接收操作。

2.5.1 ServerSocket.accept

public Socket accept() throws IOException {
    if (isClosed())
        throw new SocketException("Socket is closed");
    if (!isBound())
        throw new SocketException("Socket is not bound yet");
    Socket s = new Socket((SocketImpl) null);
    implAccept(s);
    return s;
}

总结: accept 做了三件事:一是判断套接字是否已经关闭。 二是判断套接字是否已经绑定。三是创建 Socket 对象,并调用 implAccept 接收连接。

2.5.2 ServerSocket.implAccept

protected final void implAccept(Socket s) throws IOException {
    // 1. 创建一个空的 Socket 对象,用于接收 Socket 连接
    SocketImpl si = null;
    try {
        if (s.impl == null)
          s.setImpl();
        else {
            s.impl.reset();
        }
        si = s.impl;
        s.impl = null;
        si.address = new InetAddress();
        si.fd = new FileDescriptor();

        // 2. 最关键的一步,接收请求
        getImpl().accept(si);

        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkAccept(si.getInetAddress().getHostAddress(),
                                 si.getPort());
        }
    } catch (IOException e) {
        if (si != null)
            si.reset();
        s.impl = si;
        throw e;
    } catch (SecurityException e) {
        if (si != null)
            si.reset();
        s.impl = si;
        throw e;
    }
    // 3. 调用 Socket.postAccept 通知连接已经完成
    s.impl = si;
    s.postAccept();
}

总结: ServerSocket.implAccept 调用逻辑如下:

  1. 传入的 Socket 对象里面的套接字实现如果为空,则通过 setImpl 方法设置套接字实现,如果非空就执行 reset 操作。

  2. 调用套接字实现对象的 accept 方法完成接收操作,做这一步是因为我们的 Socket 对象里面的 SocketImpl 对象还差操作系统底层的套接字对应的文件描述符。

  3. 得到完整的 SocketImpl 对象,赋值给 Socket 对象,并且调用 postAccept 方法将 Socket 对象设置为已创建、已连接、已绑定。

2.5.3 AbstractPlainSocketImpl.accept

// AbstractPlainSocketImpl
protected void accept(SocketImpl s) throws IOException {
    acquireFD();        // fdUseCount++
    try {
        socketAccept(s);// 接收请求  
    } finally {
        releaseFD();    // fdUseCount--
    }
}

总结: accept 直接委托给 socketAccept 方法。

2.5.4 DualStackPlainSocketImpl.socketAccept

void socketAccept(SocketImpl s) throws IOException {
    // 1. 获取操作系统的文件描述符。 
    int nativefd = checkAndReturnNativeFD();
    if (s == null)
        throw new NullPointerException("socket is null");

    int newfd = -1;
    InetSocketAddress[] isaa = new InetSocketAddress[1];
    // 2. timeout <= 0 则表示阻塞式接收连接
    if (timeout <= 0) {
        newfd = accept0(nativefd, isaa);
    // 3. 如果 timeout 大于0,即设置了超时,那么会先非阻塞式接收连接
    } else {
        // 3.1 serverSocket 设置成非阻塞模式
        configureBlocking(nativefd, false);
        try {
            // 3.2 本地方法,阻塞 timeout 时长用于获取新的 socket
            waitForNewConnection(nativefd, timeout);
            // 3.3 因为现在是非阻塞的,不管有没有连接都会马上返回
            newfd = accept0(nativefd, isaa);
            if (newfd != -1) {
                // 3.4 新建立的 Socket 设置为阻塞模式
                configureBlocking(newfd, true);
            }
        } finally {
            // 3.5 serverSocket 恢复为阻塞模式
            configureBlocking(nativefd, true);
        }
    }
    
    // 4. 将获取到的新文件描述符赋给 SocketImpl 对象,
    //    同时也将远程端口、远程地址、本地端口等都赋给它相关变量。
    fdAccess.set(s.fd, newfd);
    InetSocketAddress isa = isaa[0];
    s.port = isa.getPort();
    s.address = isa.getAddress();
    s.localport = localport;
}

总结: socketAccept 调用本地方法 accept0 接收连接。具体逻辑如下:

  1. 获取操作系统的文件描述符。
  2. SocketImpl 对象为空则抛出 NullPointerException("socket is null")。
  3. 如果 timeout 小于等于 0 则直接调用本地 accept0 方法,一直阻塞。
  4. 反之,如果 timeout 大于0,即设置了超时,那么会先调用 configureBlocking 本地方法,该方法用于将指定套接字设置为非阻塞模式。接着调用waitForNewConnection 本地方法,如果在超时时间内能获取到新的套接字,则调用 accept0 方法获取新套接字的句柄,获取成功后再次调用 configureBlocking 本地方法将新套接字设置为阻塞模式。最后,如果非阻塞模式失败了,则将原来的套接字设置会紫塞模式,这里使用了 finally,所以能保证就算发生异常也能被执行。
  5. 最后将获取到的新文件描述符赋给 SocketImpl 对象,同时也将远程端口、远程地址、本地端口等都赋给它相关变量。
补充4:configureBlocking 在 JVM 中的实现

本地方法逻辑很简单,如下,核心就是通过调用 Winsock 库的 ioctlsocket 函数来设置套接字为阻塞还是非阻塞,根据 blocking 标识。

JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_configureBlocking
  (JNIEnv *env, jclass clazz, jint fd, jboolean blocking) {
    u_long arg;
    int result;

    if (blocking == JNI_TRUE) {
        arg = SET_BLOCKING;    // 0
    } else {
        arg = SET_NONBLOCKING;   // 1
    }

    result = ioctlsocket(fd, FIONBIO, &arg);
    if (result == SOCKET_ERROR) {
        NET_ThrowNew(env, WSAGetLastError(), "configureBlocking");
    }
}
补充5:waitForNewConnection 在 JVM 中的实现

通过 Winsock 库的 select 函数来实现超时的功能,它会等待 timeout 时间看指定的文件描述符是否有活动,超时了的话则会返回 0,此时向 Java 层抛出 SocketTimeoutException 异常。而如果返回了 -1 则表示套接字已经关闭了,抛出 SocketException 异常。如果返回-2则抛出 InterruptedIOException。

JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
  (JNIEnv *env, jclass clazz, jint fd, jint timeout) {
    int rv, retry;
    int optlen = sizeof(rv);
    fd_set wr, ex;
    struct timeval t;

    FD_ZERO(&wr);
    FD_ZERO(&ex);
    FD_SET(fd, &wr);
    FD_SET(fd, &ex);
    t.tv_sec = timeout / 1000;
    t.tv_usec = (timeout % 1000) * 1000;

    rv = select(fd+1, 0, &wr, &ex, &t);

    if (rv == 0) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                        "connect timed out");
        shutdown( fd, SD_BOTH );
        return;
    }

    if (!FD_ISSET(fd, &ex)) {
        return;         /* connection established */
    }

    for (retry=0; retry<3; retry++) {
        NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
                       (char*)&rv, &optlen);
        if (rv) {
            break;
        }
        Sleep(0);
    }

    if (rv == 0) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                        "Unable to establish connection");
    } else {
        NET_ThrowNew(env, rv, "connect");
    }
}
补充6:accept0 在 JVM 中的实现
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_accept0
  (JNIEnv *env, jclass clazz, jint fd, jobjectArray isaa) {
    int newfd, port=0;
    jobject isa;
    jobject ia;
    SOCKETADDRESS sa;
    int len = sizeof(sa);

    memset((char *)&sa, 0, len);
    newfd = accept(fd, (struct sockaddr *)&sa, &len);

    if (newfd == INVALID_SOCKET) {
        if (WSAGetLastError() == -2) {
            JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
                            "operation interrupted");
        } else {
            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                            "socket closed");
        }
        return -1;
    }

    ia = NET_SockaddrToInetAddress(env, (struct sockaddr *)&sa, &port);
    isa = (*env)->NewObject(env, isa_class, isa_ctorID, ia, port);
    (*env)->SetObjectArrayElement(env, isaa, 0, isa);

    return newfd;
}

总结: 本地方法 accept0 实现逻辑:

  1. 通过 C 语言的 memset 函数将 SOCKETADDRESS 联合体对应的结构体内的值设置为 0。
  2. 通过 Winsock 库的 accept 函数获取套接字地址。
  3. 判断接收的套接字描述符是否无效,分别可能抛 InterruptedIOException 或 SocketException 异常。
  4. 通过 SetHandleInformation 函数设置句柄的继承标志。
  5. NET_SockaddrToInetAddress 函数用于将得到的套接字转换成 Java 层的 InetAddress 对象。
  6. 将生成的 InetAddress 对象用于生成 Java 层的 InetSocketAddress 对象。
  7. 赋值给 Java 层的 InetSocketAddress 数组对象。
  8. 返回新接收的套接字的文件描述符。

2.6 总结

可以看到 ServerSocket 的核心方法都是 native 方法,是由 JVM 调用 linux 的内核函数完成的。想要对 Socket 网络编程有更详细的了解就必须进一步了解 Linux 网络编程

参考:

  1. JVM的ServerSocket是怎么实现的(上)

每天用心记录一点点。内容也许不重要,但习惯很重要!