Java 中的io模型详解

时间:2021-09-17 22:31:10

1. BIO

我们先看一个 Java 例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package cn.bridgeli.demo;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
 
/**
 * @author bridgeli
 */
public class SocketBIO {
 
 
    public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(9090, 20);
 
        System.out.println("step1: new ServerSocket(9090) ");
 
        while (true) {
            Socket client = server.accept();
            System.out.println("step2:client: " + client.getPort());
 
            new Thread(new Runnable() {
                @Override
                public void run() {
                    InputStream inputStream = null;
                    BufferedReader reader = null;
                    try {
                        inputStream = client.getInputStream();
                        reader = new BufferedReader(new InputStreamReader(inputStream));
                        while (true) {
                            String dataLine = reader.readLine(); //阻塞2
                            if (null != dataLine) {
                                System.out.println(dataLine);
                            } else {
                                client.close();
                                break;
                            }
                        }
                        System.out.println("客户端断开");
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (null != reader) {
                            try {
                                reader.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (null!= inputStream) {
                            try {
                                inputStream.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
 
                }
 
 
            }).start();
 
        }
    }
 
}

BIO 是最初始的 IO 模型,该模型有两个大问题:1. accept 是阻塞的;2. read 也是阻塞的,也就是说我们的服务器起来之后,首先会在 accept 处阻塞,等待客户端连接,但有一个客户端连接的时候,我们可以从客户端处读取数据,这个时候也是阻塞的,所以我们的系统只能是单连接的,当有多个客户端连接的时候,只能一个一个的排着队连接,然后从客户端中读取数据,为了实现多连接,这就要求我们必须启用线程来解决,最开始等待客户端连接,然后有一个客户端连上了之后,启动一个线程读取客户端的数据,然后主线程继续等待客户端连接。

该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,Java 中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就死掉了。当然不仅仅是 Java,我们直接设想假设有一万个客户端连接到服务端,服务端要开一万个线程,那么这个时候服务端光开线程要占用多少资源?需要多大内存?操作系统为了调度这些线程 CPU 是不是也要被占用完了?

为了解决此问题,有人对服务器的线程模型进行优化,服务端采用线程池来处理多个客户端请求。但是同样是有问题的,

1. 线程总数有限,又要等待;

2. 多余的连接会堆积在任务队列中,当任务队列满了,那么此时就开始启用拒绝策略了,所以还是没有从根本上解决问题。

2. NIO

BIO 最大的问题,在于 B,block,阻塞,所以只要解决了这个问题就可以,那么此时 NIO 应运而生,N 就是 non-block 的意思(Java 中是 new 的意思),同样先看一个例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.bridgeli.demo;
 
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
 
/**
 * @author bridgeli
 */
public class SocketNIO {
 
    public static void main(String[] args) throws Exception {
 
        LinkedList<SocketChannel> clients = new LinkedList<>();
 
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9090));
        serverSocketChannel.configureBlocking(false);
 
        while (true) {
            SocketChannel client = serverSocketChannel.accept();
 
            if (null != client) {
                client.configureBlocking(false);
                System.out.println("client port: " + client.socket().getPort());
                clients.add(client);
            }
 
            ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
 
            for (SocketChannel c : clients) {
                int num = c.read(buffer);
                if (num > 0) {
                    buffer.flip();
                    byte[] aaa = new byte[buffer.limit()];
                    buffer.get(aaa);
 
                    String b = new String(aaa);
                    System.out.println(c.socket().getPort() + " : " + b);
                    buffer.clear();
                }
            }
        }
    }
 
}

这个时候我们会发现连接和读取都是非阻塞的了,由于都是非阻塞的,所以这就要求我们需要有一个集合,用来存储所有的连接,然后从连接中读取数据。这个模型解决了我们需要开线程的问题,没循环一次,如果有新连接过来,我们就把连接放到集合中,然后挨个读取连接中的数据,此时就不需要我们每连接每线程了,但是还是有一个问题,随着连接的增加,我们的队列会越来越大,而且我们每次都要遍历所有的连接读取数据,我们还假设有一万个连接,但是前 9999 个连接都没有数据,只有最后一个连接有数据,那前 9999 次读取都是浪费。

3. 多路复用

为了解决 NIO 中无效读取的问题,这个时候我们可以根据事件监听,告诉操作系统说,我们监听那些事件,然后当这些事件有数据到达时通知我们去读取,例子如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package cn.bridgeli.demo;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
/**
 * @author bridgeli
 */
public class SocketMultiplexingIO {
 
    private ServerSocketChannel serverSocketChannel = null;
    private Selector selector = null;
 
    public void initServer() {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));
            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public void start() {
        initServer();
        System.out.println("服务器启动了...");
        try {
            while (true) {
 
                while (selector.select() > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept();
            client.configureBlocking(false);
 
            ByteBuffer buffer = ByteBuffer.allocate(8192);
 
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("新客户端:" + client.getRemoteAddress());
 
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
 
        }
    }
 
    public static void main(String[] args) {
        SocketMultiplexingIO service = new SocketMultiplexingIO();
        service.start();
    }
}

再多路复用中,有 poll、epoll、Selector 等实现方式,其中他们的区别是,poll 需要我们每次告诉操作系统说,我们都要关注哪些事件,而 epoll 是操作系统会开辟一块内存区域,存储下我们要关注的事件,不用每次都告诉操作系统我们关注哪些事件。

关于 BIO、NIO、多路复用,马士兵教育的周志磊老师有一个很形象的例子。BIO 是阻塞的,所以需要我们每连接每线程,就相当于我们为每一辆车在收费站修建一条路,每来一辆车就要修一条路,我们我们自己从车上卸下装的货;NIO 是非阻塞的,我们就需要我们每次都跑到收费站,然后看我们修好的路上面车来了没有,没有来的话,等下次在看,来的话,我们卸下货,再等下次看有没有新货;多路复用中的 poll,就是我们在收费站安装一个电话机,然后我们每次打电话,我关注的哪些路是否有车来了,需要我卸货,而 epoll 是我们不仅在收费站安装了一个电话机,我们还留下了一个本子,我们每次打电话的时候,会把我们新关注的路告诉收费站,收费站在本子上记下我们关注的那些路,假设我们关注一万条路,这样就不需要我们每次在电话中每次把这一万条路说一边,问这些路是否有车来了,需要我们卸货。

最后再说几个小问题

1. 我们学习 IO 模型,IO 模型是操作系统提供给我们的接口,属于系统调用,所以我们可以通过 strace 追踪到每一个程序所执行的系统调用。命令如下:

?
1
strace -ff -o out + 要追踪的进程

2. 当我们追踪 BIO 的时候,因为 JDK 的优化,所以如果使用高版本的 JDK,也不会看到阻塞,这个时候你可以通过 JDK1.4 编译运行(这也是为什么我们使用 lambda 表达式和 try-with-resource 的原因)

3. IO 调用属于系统调用,所以从 BIO -> NIO -> 多路复用,是操作系统的进步,而我们各种变成语言写的属于应用,所以有没有 异步非阻塞IO 模型,这样看操作系统底层有没有这样的模型,需要操作系统给我们提供 异步非阻塞IO 相关的接口,我们的应用才能进一步优化

4. 我们通过 strace 追踪到的每一个系统调用,都可以通过 man 命令查看文档(仅限 linux 系统,非 Windows 系统),如果没有 man 命令,安装一下就可以了。

以上就是Java 中的io模型详解的详细内容,更多关于Java io模型的资料请关注服务器之家其它相关文章!

原文链接:http://www.bridgeli.cn/archives/700