SprigBoot以Web方式集成Netty

时间:2021-08-14 01:14:59

初次发博客,如有错误,欢迎指出,谢谢!!!


这边以一个简单的案例来展示

集成方式有两种,

  • 一种以web的形式集成,正常SpringBoot占用一个端口,Netty占用一个端口,两个端口不相同
  • 另一种以非Web集成,SpringBoot不占用端口,Netty占用一个端口,也就是Netty作为一个独立的项目去运行,然后Netty希望用到一些Spring容器的一些功能,启动加web(WebApplicationType.NONE)

我在这里说Web的集成方式

首先先在pom引入相关依赖

<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
 </dependency>
 <!-- 整合netty开发依赖 -->
 <dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.39.Final</version>
 </dependency>

编写Netty的服务器,这里取名为ChatServer,端口我设置的是9090,具体值我写在了application.properties中,用@Value获取

(Netty的端口要和SpringBoot端口不同)

netty.port=9090

把服务器写成了两个方法,start()和destroy()并用@PostConstruct和@PreDestroy注解,这样可以在启动springboot项目的同时也启动Netty服务。

除了用注解的方式外,还有另一种方式和SpringBoot一起启动,创建NettyBooter.java,实现ApplicationListener,并重写onApplicationEvent方法

import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.stereotype.Component;
 
 /**
  * 在IOC的容器启动过程,当所有的Bean都已经处理完毕之后,spring ioc容器会有一个事件发布的动作,
  * 在我们的Bean实现ApplicationListener接口,这样当事件发布的时候,ioc容器就会以容器的实例对象作为事件源类,并从中找到监听 者,此时ApplicationListener接口的实例onApplicationEvent方法就会被调用
  */
 
 @Component
 public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {
     @Override
     public void onApplicationEvent(ContextRefreshedEvent event) {
 
         if (event.getApplicationContext().getParent() == null) {
             try {
                 ChatServer.getInstance().start();
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
     }
 }

为了方便,我选择使用注解的方式。

其中@PostConstruct是表示在springboot项目启动后执行,并且在@Component,@PreDestroy是表示关闭后执行。

我们在无参构造里先进行主(boss)和从(worker)的两个线程组和handler的一些初始化操作。其中boss专门负责连接业务,worker负责读写业务。

ChatServer.java

import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import org.springframework.beans.factory.annotation.Value;
 
 @Slf4j
 @Component
 public class ChatServer {
     
     @Value("${netty.port}")
     private Integer port;
 
     private NioEventLoopGroup boss;
     private NioEventLoopGroup worker;
     private ServerBootstrap server;
     private ChannelFuture future;
 
     public ChatServer() {
 
         //socket 连接处理循环组
         boss = new NioEventLoopGroup();
         //socket 业务处理循环组
         worker = new NioEventLoopGroup();
         server = new ServerBootstrap();
         server.group(boss, worker)
                 .channel(NioServerSocketChannel.class)
                 //自定义初始化
                 .childHandler(new WSServerInitializer());
 
     }
     // @PostConstruct,springboot项目启动后执行
     @PostConstruct
     public void start() {
         this.future = server.bind(port);
         System.out.println("Netty webSocket 启动完毕....");
     }
 
     @PreDestroy
     public void destroy() {
         boss.shutdownGracefully();
         worker.shutdownGracefully();
         System.out.println("Netty webSocket 关闭....");
     }
 }

接下来编写自定义初始化Handler,WSServerInitializer.java

import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import io.netty.handler.timeout.IdleStateHandler;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class WSServerInitializer extends ChannelInitializer<SocketChannel> {
 
     @Override
     protected void initChannel(SocketChannel channel) throws Exception {
         //获取pipeline
         ChannelPipeline pipeline = channel.pipeline();
         //基于http协议的编解码处理器
         pipeline.addLast("HttpServerCodec", new HttpServerCodec());
         //大数据处理
         pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler());
         //对HttpMessage进行聚合 聚合成FullHttpRequest或FullHttpResponse
         pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(1024 * 64));
 
         /**
          * 8秒读,10秒写,12秒读写
          * 主要是增加心跳支持
          * 针对客户端,如果在x秒没有向服务器读写心跳(All),则主动断开连接
          * 如果读空闲和写空闲,则不做处理
          */
         pipeline.addLast(new IdleStateHandler(8, 10, 12));
         //自定义心跳处理
         pipeline.addLast(new HeartBeanHandler());
 
         /**
          * websocket服务器处理的协议  并且用于指定给客户端连接访问的路由:/ws
          * 会帮你处理握手动作,handshakiing(close,ping,pang)
          * 对于websocket来说,都是以frame进行传输的,不同的数据类型对应的frame也不同
          */
         pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
         pipeline.addLast(new ChatHandler());
 
     }
 }

编写心跳处理Handler, HeartBeanHandler.java

先对evt事件类型进行判断是否为IdleStateEvent类型,在判断是具体的哪种类型

  • READER_IDLE,读
  • WRITER_IDLE,写
  • ALL_IDLE,读和写

我们可以用ChatHandler的clients的大小来更直观的看到channel数量的变化

import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 
 /**
  * 用于检测心跳Handler
  * 如果直接继承SimpleChannelInboundHandler的话还要重写channelRead,这边用不到channelRead,所以可以选择直接继承Simple的父类ChannelInboundHandlerAdapter
  */
 public class HeartBeanHandler extends ChannelInboundHandlerAdapter {
 
     //事件触发
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
     
         if (evt instanceof IdleStateEvent) {
             IdleStateEvent event = (IdleStateEvent) evt;
             //读空闲状态
             if (event.state() == IdleState.READER_IDLE) {
                 System.out.println("进入读空闲");
             } else if (event.state() == IdleState.WRITER_IDLE) {
                 System.out.println("进入写空闲");
             } else if (event.state() == IdleState.ALL_IDLE) {
                 System.out.println("channel关闭之前,clients的数量是" + ChatHandler.clients.size());
                 Channel channel = ctx.channel();
                 //资源释放
                 channel.close();
                 System.out.println("channel关闭之后,clients的数量是" + ChatHandler.clients.size());
             }
         }
 
     }
 }

编写消息处理ChatHandler.java,这边我用MsgActionEnum枚举类定义了几个消息类型

public enum MsgActionEnum {
     CONNECT(1, "第一次初始化连接"),
     CHAT(2, "聊天消息"),
     SIGNED(3, "消息签收"),
     KEEP_ALIVE(4, "心跳类型");
 
     public final Integer TYPE;
     public final String CONTENT;
 
     MsgActionEnum(Integer type, String content) {
         this.TYPE = type;
         this.CONTENT = content;
     }
 }

如果想要在其中加入数据库操作,我们可以打一个SpringUtil的工具类来根据名称来获取Bean,然后调用自己的service方法即可。

@Component
 @Lazy(false)
 public class SpringUtil implements ApplicationContextAware {
     private static ApplicationContext applicationContext = null;
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
         if (SpringUtil.applicationContext == null) {
             SpringUtil.applicationContext = applicationContext;
         }
     }
     public static ApplicationContext getApplicationContext() {
         return applicationContext;
     }
     public static Object getBean(String name) {
         return getApplicationContext().getBean(name);
     }
 }
//数据库操作
HelloService helloService = (HelloService) SpringUtil.getBean("helloServiceImpl");
helloService.sayHi("张三");

DataContent.java

@Data
 public class DataContent implements Serializable {
     //动作类型
     private Integer action;
     //内容
     private String content;
 }

ChatHandler.java

创建clinets来装多个channel,也可以创建一个UserChannelRel来存储每个用户和对应Channel的map,主要看具体业务。

import com.hui.utils.JSONUtil;
 import com.hui.ws.pojo.DataContent;
 import com.hui.ws.pojo.MsgActionEnum;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.time.LocalDateTime;
 
 /**
  * 用来处理消息的Handler, 这个frame在netty中,用于websocket专门处理文本对象的,类叫TextWebSocket
  */
 public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
     /**
      * 用于记录和管理所有客户端的channel。
      */
     public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
         //获取客户端传输的消息
         String content = msg.text();
         //通过JSON的工具类把string转成对应类
         DataContent data = JSONUtil.parseObject(content, DataContent.class);
         // //获取消息类型来作判断
         Integer action = data.getAction();
         // //根据action找枚举类中的不同业务....
         // ChatMsg chatMsg = JSONUtil.parseObject(content, ChatMsg.class);
         // Integer actionType = chatMsg.getActionType();
         // String sendID = chatMsg.getSendID();
         if (action == MsgActionEnum.CONNECT.TYPE) {
             System.out.println("第一次连接建立...");
         } else if (action == MsgActionEnum.KEEP_ALIVE.TYPE) {
             //心跳类型
             System.out.println("收到客户端" + ctx.channel().id().asShortText() + "发送的心跳");
         } else if (action == MsgActionEnum.CHAT.TYPE) {
             //发送聊天信息类型
             System.out.println("接收到的数据:" + content);
             clients.writeAndFlush(new TextWebSocketFrame("服务器在" + LocalDateTime.now() + "接收到的消息,消息内容为:" + content));
         }
         //数据库操作
         // HelloService helloService = (HelloService) SpringUtil.getBean("helloServiceImpl");
         // helloService.sayHi("张三");
 
     }
     //有新连接时触发
     @Override
     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
         //把新连接加入clients
         clients.add(ctx.channel());
     }
 
     //连接断开的时候触发
     @Override
     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
         String channelID = ctx.channel().id().asShortText();
         System.out.println("客户端断开,channel对应的长ID:" + ctx.channel().id().asLongText());
         System.out.println("客户端断开,channel对应的短ID:" + ctx.channel().id().asShortText());
         //把断开连接移除clients
         clients.remove(ctx.channel());
     }
 
     //有异常时触发
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         //打印异常信息
         cause.printStackTrace();
         //发生了异常后关闭连接,同时从client中移除
         ctx.channel().close();
         clients.remove(ctx.channel());
     }
 }

后端基本编写完毕,来看下前端,这边就用一个简单的html页面来展示。

这边用了setInterval("CHAT.keepAlive()", 5000);来持续发送心跳

<!DOCTYPE html>
 <html>
 <head>
     <meta charset="utf-8"></meta>
     <title>Netty 实时通讯</title>
 </head>
 
 <body>
 <div>
     发送消息:<input type="text" />
     <input type="button" onclick="CHAT.chat()" value="发送消息"/><br/>
     <hr/>
     接受消息:
     <div >
     </div>
 </div>
 
 <script>
     const CONNECT_TYPE = 1;
     const CHAT_TYPE = 2;
     const SINGED_TYPE = 3;
     const KEEP_ALIVE = 4;
 
     window.CHAT = {
         socket: null,
         init: function () {
             //判断浏览器是否支持websocket
             if (window.WebSocket) {
                 if (CHAT.socket != null && CHAT.socket != undefined && CHAT.socket.readyState == WebSocket.OPEN) {
                     return false
                 }
                 //创建socket对象
                 window.CHAT.socket = new WebSocket("ws://127.0.0.1:9090/ws");
 
                 CHAT.socket.onopen = function () {
                     var DataContent = new Object()
                     DataContent.action = CONNECT_TYPE
                     DataContent.content = "连接建立成功...."
                     console.log("连接建立成功....")
                     CHAT.socket.send(JSON.stringify(DataContent))
                     setInterval("CHAT.keepAlive()", 5000);
                 },
                     CHAT.socket.close = function () {
                         console.log("连接关闭")
                     },
                     CHAT.socket.onerror = function () {
                         console.log("发生异常")
                     },
                     CHAT.socket.onmessage = function (e) {
                         console.log("接受消息:" + e.data)
                         var receive = document.getElementById("receiveMsg")
                         var html = receive.innerHTML;
                         receive.innerHTML = html + "<br/>" + e.data
                     }
             } else {
                 alert("该浏览器不支持WebSocket协议")
             }
         },
         chat: function () {
             //获取输入框的内容弄
             var sendMsg = document.getElementById("msgContent").value;
             var DataContent = new Object()
             DataContent.action = CHAT_TYPE
             DataContent.content = sendMsg
             CHAT.socket.send(JSON.stringify(DataContent))
         },
         keepAlive: function () {
             var DataContent = new Object()
             DataContent.action = KEEP_ALIVE
             DataContent.content = "ping....."
             console.log(DataContent)
             CHAT.socket.send(JSON.stringify(DataContent))
         }
     }
     CHAT.init()
 </script>
 </body>
 </html>

启动SpringBoot服务,看看结果。

这边可以看到Netty随着SpringBoot服务一起启动了

SprigBoot以Web方式集成Netty

打开F12,可以看到console打印连接成功,并且心跳正在打印

SprigBoot以Web方式集成Netty

我这边开了两个客户端,可以看到在任意一边输入都可以在多端同时获取信息

SprigBoot以Web方式集成Netty

后台也有两个channel的心跳信息

SprigBoot以Web方式集成Netty

关闭其中一个客户端

SprigBoot以Web方式集成Netty