-
1. 概述
-
2. 构建 Netty 服务端与客户端
-
3. 通信协议
-
4. 消息分发
-
5. 断开重连
-
6. 心跳机制与空闲检测
-
7. 认证逻辑
-
8. 单聊逻辑
-
9. 群聊逻辑
本文在提供完整代码示例,可见 https://github.com/kaixuanzhang123/springboot-netty.git。
原创不易,给点个 Star 嘿,一起冲鸭!
Netty 是一个 Java 开源框架。
Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。
Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。
下面,我们来新建三个项目,如下图所示:
测试demo:localhost:8081/test/mock?type=CHAT_SEND_TO_ALL_REQUEST&message={"msgId":"1","content":"sss"}
项目总结:
通信粘包与拆包
我们实现了客户端和服务端的连接功能。而本小节,我们要让它们两能够说上话,即进行数据的读写。
在日常项目的开发中,前端和后端之间采用 HTTP 作为通信协议,使用文本内容进行交互,数据格式一般是 JSON。但是在 TCP 的世界里,我们需要自己基于二进制构建,构建客户端和服务端的通信协议。
我们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登录请求,对应的类如下:
产生原因
产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。
-
如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。
“
例如说,在《详解 Socket 编程 --- TCP_NODELAY 选项》文章中我们可以看到,在关闭 Nagle 算法时,请求不会等待满足缓冲区大小,而是尽快发出,降低延迟。
-
如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。
如下图展示了粘包和拆包的一个示意图,演示了粘包和拆包的三种情况:
-
A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送。
-
A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端。
-
B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。
解决方案
对于粘包和拆包问题,常见的解决方案有三种:
???? ① 客户端在发送数据包的时候,每个包都固定长度。比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度。
???? ② 客户端在每个包的末尾使用固定的分隔符。例如 \r\n
,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 \r\n
,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。
具体的案例,有 HTTP、WebSocket、Redis。
???? ③ 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。
“友情提示:方案 ③ 是 ① 的升级版,动态长度。
本文,在每次 Invocation 序列化成字节数组写入 TCP Socket 之前,先将字节数组的长度写到其中。如下图所示:
消息分发
在 SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。
在 项目的 dispatcher
包中,我们创建了 MessageDispatcher 类,实现和 DispatcherServlet 类似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。
EventGroup 我们可以先简单理解成一个线程池,并且线程池的大小仅仅是 CPU 数量 * 2。每个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。并且,多个 Channel 会共享一个线程,即使用同一个线程进行数据的读写。
那么胖友试着思考下,MessageHandler 的具体逻辑视线中,往往会涉及到 IO 处理,例如说进行数据库的读取。这样,就会导致一个 Channel 在执行 MessageHandler 的过程中,阻塞了共享当前线程的其它 Channel 的数据读取。
因此,我们在这里创建了 executor
线程池,进行 MessageHandler 的逻辑执行,避免阻塞Channel 的数据读取。
可能会有胖友说,我们是不是能够把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长连接的 Netty 服务端,往往会有 1000 ~ 100000 的 Netty 客户端连接上来,这样无论设置多大的线程池,都会出现阻塞数据读取的情况。
心跳机制与空闲检测
在上文中,艿艿推荐胖友阅读《TCP Keepalive 机制刨根问底》文章,我们可以了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来说是可以接受的。
但是在业务层面,如果 2 小时才发现客户端与服务端的连接实际已经断开,会导致中间非常多的消息丢失,影响客户的使用体验。
因此,我们需要在业务层面,自己实现空闲检测,保证尽快发现客户端与服务端实际已经断开的情况。实现逻辑如下:
-
服务端发现 180 秒未从客户端读取到消息,主动断开连接。
-
客户端发现 180 秒未从服务端读取到消息,主动断开连接。
考虑到客户端和服务端之间并不是一直有消息的交互,所以我们需要增加心跳机制:
-
客户端每 60 秒向服务端发起一次心跳消息,保证服务端可以读取到消息。
-
服务端在收到心跳消息时,回复客户端一条确认消息,保证客户端可以读取到消息。
如果看到这里,说明你喜欢这篇文章,请转发、点赞。扫描下方二维码关注我们,您会收到更多优质文章推送
关注「Java源码进阶」,获取海量java,大数据,机器学习资料!