一、Java I/O 演进之路
1.1 Old I/O
Java1.4之前的早期版本,Java对I/O的支持并不完善,在开发高性能的I/O程序的时候,会面临一些巨大的挑战和困难,主要问题如下:
-
没有数据缓冲区,I/O性能存在问题
数据缓冲区:Buffer, 如:ByteBuffer、CharBuffer、IntBuffer等。
为什么没有数据缓冲区,I/O性能就存在问题?
Java1.4之前是面向流的,而NIO是基于Buffer进行操作的。
-
没有C或者C++中的Channel概念,只有输入和输出流
Channel的特点:
1)通道可以读也可以写,流一般来说是单向的(只能读或者写)
2)通道可以异步读写
3)通道总是基于缓冲区Buffer来读写
-
同步阻塞式I/O通信(BIO)
Java IO的各种流是阻塞的,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。
Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。
-
支持的字符集有限,硬件可移植性不好
在Java支持异步I/O之前的很长一段时间里,高性能服务端开发领域一直被C++和C长期占据,Java的同步阻塞I/O被大家所诟病。
1.2 Linux网络I/O模型
网络I/O的本质是Socket的读写,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
那这样数据多次拷贝,会有性能问题吧?为什么要这样?那我们要谈一下用户空间和内核空间:
为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。
那么当进行一次read操作(Socket)时,会经历两个阶段:
-
等待网络上的数据分组到达,然后被复制到内核的某个缓冲区
-
把数据从内核缓冲区复制到应用进程缓冲区
网络I/O模型包括以下五种:
-
阻塞I/O模型
-
非阻塞I/O模型
-
I/O复用模型
-
信号驱动I/O模型
-
异步I/O
1.2.1 阻塞I/O模型
在进程空间中调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间一直会等待。进程在从调用recvfrom开始到它返回的整段时间内都是被阻塞的。
或者更准确点,我们可以称这种模型为同步阻塞I/O模型。
阻塞I/O模型是最常用,也是最简单的网络I/O模型。
优点:
-
能够及时返回数据,无延迟
-
对开发者来说简单
缺点:
-
付出性能的代价:如果需要同时处理多个Socket,会阻塞多个进程,并且进程切换成本高
1.2.2 非阻塞I/O模型
在linux下,可以通过设置socket使其变为non-blocking。
当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error(如EWOULDBLOCK错误)。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,非阻塞I/O模型的特点是用户进程需要不断的主动询问kernel数据好了没有。
需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
与阻塞I/O模型对比,优点:
-
可以在等待任务完成的时间里干其他活了
缺点:
-
任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成
1.2.3 I/O复用模型
上面讲到的非阻塞I/O模型一个重要的问题是:需要不断主动轮询,轮询会消耗大量的CPU时间。那是不是可以有个专门的后台任务,循环查询多个read操作的完成状态,只要有一个或者多个完成,就去处理。
而且这个轮询如果不是进程的用户态,而是内核态进程完成岂不是更好了?这就是所谓的“I/O复用模型”。
UNIX/Linux 下的 select、poll、epoll 就是做这个的事情的(epoll 比 poll、select 效率高,做的事情是一样的)。后面我们会专门讲下select、epoll间的区别。
Select调用是内核级别的,select轮询相对非阻塞的轮询的区别在于:前者可以等待多个socket,能实现同时对多个IO端口进行监听,当其中任何一个socket的数据准好了,就能返回可读状态,然后用户进程再进行recvform系统调用,将数据由内核拷贝到用户进程,当然这个过程是阻塞的。如何知道socket的数据准备好了呢?监视的事情交给了内核,内核负责数据到达的处理。
具体的流程是:当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
多路复用的特点是通过一种机制一个进程能同时等待IO文件描述符,内核监视这些文件描述符(套接字描述符),其中的任意一个进入读就绪状态,select, poll,epoll函数就可以返回。对于监视的方式,又可以分为 select, poll, epoll三种方式。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
Java NIO的核心类库多路复用器Selector就是基于epoll的多路复用技术实现。
select、poll、epoll
在《Netty权威指南》中,关于select/poll、epoll的描述如下:
select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到了一些制约。Linux还提供了一个epoll系统调用,epoll使用基于事件驱动方式代替顺序扫描,因此性能更高。当有fd就绪时,立即回调函数rollback。
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
select/poll顺序扫描fd是否就绪,是实现机制的限制,当要扫描的fd数量很多时,会花费很多CPU时间。
关于select支持的fd数量有限,请参考Linux 用户手册中的说明:
POSIX allows an implementation to define an upper limit, advertised via the constant FD_SETSIZE, on the range of file descriptors that can be specified in a file descriptor set.
The Linux kernel imposes no fixed limit, but the glibc implementation makes fd_set a fixed-size type, with FD_SETSIZE defined as 1024, and the FD_*() macros operating according to that limit. To monitor file descriptors greater than 1023, use poll(2) instead.
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
int epoll_create(int size); // 创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // 注册要监听的事件类型 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); // 等待事件的产生
所能打开的最大连接数
说明:epoll连接数的上限是操作系统的最大文件句柄数。
FD剧增后带来的IO效率问题
消息传递方式
1.2.4 信号驱动I/O模型
首先开启Socket信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据准备就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据。
1.2.5 异步I/O
告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核复制到用户自己的缓冲区)通知我们。这种模型与信号驱动模型的主要区别是:信号驱动I/O由内核通知我们何时可以开始一个I/O操作,异步I/O模型由内核通知我们I/O操作何时已经完成。
1.3 New I/O
JDK1.4,新增了java.nio包,提供了很多进行异步I/O开发的API和类库,主要的类和接口如下:
-
进行异步I/O操作的缓冲区ByteBuffer等;
-
进行异步I/O操作的管道Pipe;
-
进行各种I/O操作的Channel,包括ServerSocketChanel和SocketChannel;
-
各种字符集的编码能力和解码能力;
-
实现非阻塞I/O操作的多路复用器selector;
-
文件通道FileChannel。
2011年7月28日,JDK1.7正式发布。它的一个比较大的亮点是将原来的NIO类库进行了升级,被称为NIO2.0,主要提供了如下三个方面的改进:
-
提供能够批量获取文件属性的API,这些API具有平台无关性,不与特性的文件系统相耦合;
-
提供AIO功能,支持基于文件的异步I/O操作和针对网络套接字的异步操作;
-
完成JSR-51定义的通道功能,包括对配置和多播数据报的支持等。
二、NIO编程
2.1 传统的BIO编程
在基于传统同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。
2.1.1 BIO通信模型
采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,销毁线程。这就是典型的一请求一应答通信模型。
2.1.2 伪异步I/O通信模型
采用线程池和任务队列可以实现一种叫做伪异步的I/O通信框架:当有新的客户端接入时,将客户端的Socket封装成一个Task投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程,对消息队列中的任务进行处理。
public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket("127.0.0.1", port); in = new BufferedReader(new InputStreamReader( socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println("QUERY TIME ORDER"); System.out.println("Send order 2 server succeed."); String resp = in.readLine(); System.out.println("Now is : " + resp); } catch (Exception e) { e.printStackTrace(); } finally { if (out != null) { out.close(); out = null; } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } ServerSocket server = null; try { server = new ServerSocket(port); System.out.println("The time server is start in port : " + port); Socket socket = null; TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool( 50, 10000);// 创建IO任务线程池 while (true) { socket = server.accept(); singleExecutor.execute(new TimeServerHandler(socket)); } } finally { if (server != null) { System.out.println("The time server close"); server.close(); server = null; } } } }
public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) { executor = new ThreadPoolExecutor(Runtime.getRuntime() .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<java.lang.Runnable>(queueSize)); } public void execute(java.lang.Runnable task) { executor.execute(task); } }
public class TimeServerHandler implements Runnable { private Socket socket; public TimeServerHandler(Socket socket) { this.socket = socket; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader( this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String currentTime = null; String body = null; while (true) { body = in.readLine(); if (body == null) break; System.out.println("The time server receive order : " + body); currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; out.println(currentTime); } } catch (Exception e) { if (in != null) { try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (out != null) { out.close(); out = null; } if (this.socket != null) { try { this.socket.close(); } catch (IOException e1) { e1.printStackTrace(); } this.socket = null; } } } }
伪异步I/O实际上仅仅是对之前I/O线程模型的一个简单优化,它无法从根本上解决同步I/O导致的通信线程阻塞问题。下面我们就简单分析下通信对方返回应答时间过长会引起的级联故障。
(1)服务端处理缓慢,返回应答消息耗费60s,平时只需要10ms
(2)采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,它将会被同步阻塞60s
(3)假如所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队
(4)由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞
(5)由于只有一个Accepor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时
(6)由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接收新的请求消息
2.2 NIO编程
Java NIO编程的基础是多路复用器Selector,多路复用器提供选择已经就绪的任务的能力。简单来讲,Selector会不断的轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪的Channel的集合,进行后续的I/O操作。
一个多路复用器Selector可以同时处理多个Channel,由于JDK使用了epoll代替传统的select实现,所以只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端,这确实是个非常巨大的进步。
public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路复用器、绑定监听端口 * * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 处理新接入的请求消息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else ; // 读到0字节,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001") .start(); } }
public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 判断是否连接成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else System.exit(1);// 连接失败,进程退出 } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else ; // 读到0字节,忽略 } } } private void doConnect() throws IOException { // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else socketChannel.register(selector, SelectionKey.OP_CONNECT); } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) System.out.println("Send order 2 server succeed."); } }
2.3 不同I/O模型对比
2.4 选择Netty的理由
2.4.1 不选择Java原生NIO编程的理由
不建议开发者直接使用JDK的NIO类库进行开发的原因:
(1) NIO的类库和API繁杂,使用麻烦,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
(2) 需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程、Reactor模式
(3) 可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等问题,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐的工作量和难度都非常大
(4) JDK NIO的Bug,例如臭名昭著的epoll bug,会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该Bug发生概率降低了一些而已,它并没有得到根本性解决。
2.4.2 为什么选择Netty
Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,已经得到成百上千的商业项目验证。
通过对Netty的分析,将它的优点总结如下:
-
API使用简单,开发门槛低;
-
功能强大,预置了多种编解码功能,支持多种主流协议;
-
定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;
-
性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;
-
成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的Bug而烦恼;
-
社区活跃,版本迭代周期短,发现的Bug可以及时修复,同时,更多的新功能会加入;
-
经历了大规模的商业应用考验,质量得到验证。
三、Reactor模型
Reactor模式:由一个不断等待和循环的单独进程(线程),它接受所有handler的注册,并负责向操作系统查询IO是否就绪,在就绪后就调用指定handler进行处理,这个角色的名字就叫做Reactor。
3.1 单线程模型
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:
1)一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
2)当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
3)可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
3.2 多线程模型
Reactor多线程模型的特点:
1)有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;
2)网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
3)1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。
在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是,在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。
3.3 主从多线程模型
利用主从NIO线程模型,可以解决1个服务端监听线程无法有效处理所有客户端连接的性能不足问题。
它的工作流程总结如下:
-
从主线程池中随机选择一个Reactor线程作为Acceptor线程,用于绑定监听端口,接收客户端连接;
-
Acceptor线程接收客户端连接请求之后创建新的SocketChannel,将其注册到主线程池的其它Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操作;
-
步骤2完成之后,业务层的链路正式建立,将SocketChannel从主线程池的Reactor线程的多路复用器上摘除,重新注册到Sub线程池的线程上,用于处理I/O的读写操作。
四、Netty Demo
先回顾一下使用NIO进行服务端开发的步骤:
1.创建ServerSocketChannel,配置它为非阻塞模式;
2.绑定监听,配置TCP参数,例如backlog大小;
3.创建一个独立的I/O线程,用于轮询多路复用器Selector;
4.创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKey.ACCEPT;
5.启动I/O线程,在循环体中执行Selector.select()方法,轮询就绪的Channel;
6.当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;
7.设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数;
8.将SocketChannel注册到Selector,监听OP_READ / OP_WRITE操作位;
9.如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;
10.如果轮询的Channel为OP_WRITE,说明还有数据没有发送完成,需要继续发送。
public class TimeServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new TimeServer().bind(port); } }
public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new TimeClient().connect(port, "127.0.0.1"); } }
public class TimeClientHandler extends ChannelHandlerAdapter { private static final Logger logger = Logger .getLogger(TimeClientHandler.class.getName()); private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public TimeClientHandler() { byte[] req = "QUERY TIME ORDER".getBytes(); firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is : " + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 释放资源 logger.warning("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
五、Netty高性能之道
5.1 异步非阻塞通信
Netty的I/O线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端SocketChannel。由于读写操作都是非阻塞的,这就可以充分提升I/O线程的运行效率,避免由频繁的I/O阻塞导致的线程挂起。另外,由于Netty采用了异步通信模式,一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
5.2 高效的Reactor线程模型
Netty可以灵活的切换到不同的Reactor线程模型上:
-
单线程模型
-
多线程模型
-
主从多线程模型
5.3 无锁化的串行设计
Netty采用了串行无锁化设计,在I/O线程内部进行串行操作,避免多线程竞争导致的性能下降。Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeLine的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换。这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。
5.4 高效的并发编程
Netty的高效并发编程主要体现在以下几点:
-
volatile的大量、正确使用
-
CAS和原子类的广泛使用
-
线程安全容器的使用
-
通过读写锁提升并发性能
5.5 高性能的序列化框架
Netty默认提供了对Google Protobuf的支持,通过扩展Netty的编解码接口,用户可以实现其他的高性能序列化框架。
5.6 零拷贝
Netty的“零拷贝”主要体现在如下三个方面:
-
Netty的接收和发送Buffer采用Direct Buffer,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(Heap Buffer)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket
-
CompositeByteBuf,对外将多个ByteBuf封装成一个ByteBuf,对外提供统一封装后的ByteBuf接口
3.Netty文件传输类DefaultFileRegion通过transferTo方法将文件发送到目标Channel中,而不需要通过循环拷贝的方式,这是一种更加高效的传输方式,提升了传输性能,降低了CPU和内存占用,实现了文件传输的“零拷贝”
5.7 内存池
对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于内存池的缓冲区重用机制。
5.8 灵活的TCP参数配置能力
合理设置TCP参数在某些场景下对于性能的提升可以起到显著的效果,例如:SO_RCVBUF和SO_SNDBUF。Netty在启动辅助类中可以灵活的配置TCP参数,满足不同的用户场景。
参考资料
-
《Netty权威指南》
-
Linux帮助文档: http://man7.org/linux/man-pages/man2/select.2.html
-
聊聊Linux 五种IO模型: https://www.jianshu.com/p/486b0965c296
-
聊聊IO多路复用之select、poll、epoll详解 :https://www.jianshu.com/p/dfd940e7fca2
-
Linux IO模式及 select、poll、epoll详解:https://segmentfault.com/a/1190000003063859
-
Java NIO Pipe使用示例: https://blog.csdn.net/u013063153/article/details/76563131
-
Netty系列之Netty线程模型: http://www.infoq.com/cn/articles/netty-threading-model/
-
Netty内存池实现: https://www.jianshu.com/p/8d894e42b6e6