初次发博客,如有错误,欢迎指出,谢谢!!!
这边以一个简单的案例来展示
集成方式有两种,
- 一种以web的形式集成,正常SpringBoot占用一个端口,Netty占用一个端口,两个端口不相同
- 另一种以非Web集成,SpringBoot不占用端口,Netty占用一个端口,也就是Netty作为一个独立的项目去运行,然后Netty希望用到一些Spring容器的一些功能,启动加web(WebApplicationType.NONE)
我在这里说Web的集成方式
首先先在pom引入相关依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 整合netty开发依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
编写Netty的服务器,这里取名为ChatServer,端口我设置的是9090,具体值我写在了application.properties中,用@Value获取
(Netty的端口要和SpringBoot端口不同)
netty.port=9090
把服务器写成了两个方法,start()和destroy()并用@PostConstruct和@PreDestroy注解,这样可以在启动springboot项目的同时也启动Netty服务。
除了用注解的方式外,还有另一种方式和SpringBoot一起启动,创建NettyBooter.java,实现ApplicationListener,并重写onApplicationEvent方法
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/**
* 在IOC的容器启动过程,当所有的Bean都已经处理完毕之后,spring ioc容器会有一个事件发布的动作,
* 在我们的Bean实现ApplicationListener接口,这样当事件发布的时候,ioc容器就会以容器的实例对象作为事件源类,并从中找到监听 者,此时ApplicationListener接口的实例onApplicationEvent方法就会被调用
*/
@Component
public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) {
try {
ChatServer.getInstance().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
为了方便,我选择使用注解的方式。
其中@PostConstruct是表示在springboot项目启动后执行,并且在@Component,@PreDestroy是表示关闭后执行。
我们在无参构造里先进行主(boss)和从(worker)的两个线程组和handler的一些初始化操作。其中boss专门负责连接业务,worker负责读写业务。
ChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Value;
@Slf4j
@Component
public class ChatServer {
@Value("${netty.port}")
private Integer port;
private NioEventLoopGroup boss;
private NioEventLoopGroup worker;
private ServerBootstrap server;
private ChannelFuture future;
public ChatServer() {
//socket 连接处理循环组
boss = new NioEventLoopGroup();
//socket 业务处理循环组
worker = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(boss, worker)
.channel(NioServerSocketChannel.class)
//自定义初始化
.childHandler(new WSServerInitializer());
}
// @PostConstruct,springboot项目启动后执行
@PostConstruct
public void start() {
this.future = server.bind(port);
System.out.println("Netty webSocket 启动完毕....");
}
@PreDestroy
public void destroy() {
boss.shutdownGracefully();
worker.shutdownGracefully();
System.out.println("Netty webSocket 关闭....");
}
}
接下来编写自定义初始化Handler,WSServerInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WSServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//获取pipeline
ChannelPipeline pipeline = channel.pipeline();
//基于http协议的编解码处理器
pipeline.addLast("HttpServerCodec", new HttpServerCodec());
//大数据处理
pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler());
//对HttpMessage进行聚合 聚合成FullHttpRequest或FullHttpResponse
pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(1024 * 64));
/**
* 8秒读,10秒写,12秒读写
* 主要是增加心跳支持
* 针对客户端,如果在x秒没有向服务器读写心跳(All),则主动断开连接
* 如果读空闲和写空闲,则不做处理
*/
pipeline.addLast(new IdleStateHandler(8, 10, 12));
//自定义心跳处理
pipeline.addLast(new HeartBeanHandler());
/**
* websocket服务器处理的协议 并且用于指定给客户端连接访问的路由:/ws
* 会帮你处理握手动作,handshakiing(close,ping,pang)
* 对于websocket来说,都是以frame进行传输的,不同的数据类型对应的frame也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new ChatHandler());
}
}
编写心跳处理Handler, HeartBeanHandler.java
先对evt事件类型进行判断是否为IdleStateEvent类型,在判断是具体的哪种类型
- READER_IDLE,读
- WRITER_IDLE,写
- ALL_IDLE,读和写
我们可以用ChatHandler的clients的大小来更直观的看到channel数量的变化
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 用于检测心跳Handler
* 如果直接继承SimpleChannelInboundHandler的话还要重写channelRead,这边用不到channelRead,所以可以选择直接继承Simple的父类ChannelInboundHandlerAdapter
*/
public class HeartBeanHandler extends ChannelInboundHandlerAdapter {
//事件触发
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
//读空闲状态
if (event.state() == IdleState.READER_IDLE) {
System.out.println("进入读空闲");
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("进入写空闲");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel关闭之前,clients的数量是" + ChatHandler.clients.size());
Channel channel = ctx.channel();
//资源释放
channel.close();
System.out.println("channel关闭之后,clients的数量是" + ChatHandler.clients.size());
}
}
}
}
编写消息处理ChatHandler.java,这边我用MsgActionEnum枚举类定义了几个消息类型
public enum MsgActionEnum {
CONNECT(1, "第一次初始化连接"),
CHAT(2, "聊天消息"),
SIGNED(3, "消息签收"),
KEEP_ALIVE(4, "心跳类型");
public final Integer TYPE;
public final String CONTENT;
MsgActionEnum(Integer type, String content) {
this.TYPE = type;
this.CONTENT = content;
}
}
如果想要在其中加入数据库操作,我们可以打一个SpringUtil的工具类来根据名称来获取Bean,然后调用自己的service方法即可。
@Component
@Lazy(false)
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
}
//数据库操作
HelloService helloService = (HelloService) SpringUtil.getBean("helloServiceImpl");
helloService.sayHi("张三");
DataContent.java
@Data
public class DataContent implements Serializable {
//动作类型
private Integer action;
//内容
private String content;
}
ChatHandler.java
创建clinets来装多个channel,也可以创建一个UserChannelRel来存储每个用户和对应Channel的map,主要看具体业务。
import com.hui.utils.JSONUtil;
import com.hui.ws.pojo.DataContent;
import com.hui.ws.pojo.MsgActionEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.time.LocalDateTime;
/**
* 用来处理消息的Handler, 这个frame在netty中,用于websocket专门处理文本对象的,类叫TextWebSocket
*/
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 用于记录和管理所有客户端的channel。
*/
public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//获取客户端传输的消息
String content = msg.text();
//通过JSON的工具类把string转成对应类
DataContent data = JSONUtil.parseObject(content, DataContent.class);
// //获取消息类型来作判断
Integer action = data.getAction();
// //根据action找枚举类中的不同业务....
// ChatMsg chatMsg = JSONUtil.parseObject(content, ChatMsg.class);
// Integer actionType = chatMsg.getActionType();
// String sendID = chatMsg.getSendID();
if (action == MsgActionEnum.CONNECT.TYPE) {
System.out.println("第一次连接建立...");
} else if (action == MsgActionEnum.KEEP_ALIVE.TYPE) {
//心跳类型
System.out.println("收到客户端" + ctx.channel().id().asShortText() + "发送的心跳");
} else if (action == MsgActionEnum.CHAT.TYPE) {
//发送聊天信息类型
System.out.println("接收到的数据:" + content);
clients.writeAndFlush(new TextWebSocketFrame("服务器在" + LocalDateTime.now() + "接收到的消息,消息内容为:" + content));
}
//数据库操作
// HelloService helloService = (HelloService) SpringUtil.getBean("helloServiceImpl");
// helloService.sayHi("张三");
}
//有新连接时触发
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//把新连接加入clients
clients.add(ctx.channel());
}
//连接断开的时候触发
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String channelID = ctx.channel().id().asShortText();
System.out.println("客户端断开,channel对应的长ID:" + ctx.channel().id().asLongText());
System.out.println("客户端断开,channel对应的短ID:" + ctx.channel().id().asShortText());
//把断开连接移除clients
clients.remove(ctx.channel());
}
//有异常时触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//打印异常信息
cause.printStackTrace();
//发生了异常后关闭连接,同时从client中移除
ctx.channel().close();
clients.remove(ctx.channel());
}
}
后端基本编写完毕,来看下前端,这边就用一个简单的html页面来展示。
这边用了setInterval("CHAT.keepAlive()", 5000);来持续发送心跳
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"></meta>
<title>Netty 实时通讯</title>
</head>
<body>
<div>
发送消息:<input type="text" />
<input type="button" onclick="CHAT.chat()" value="发送消息"/><br/>
<hr/>
接受消息:
<div >
</div>
</div>
<script>
const CONNECT_TYPE = 1;
const CHAT_TYPE = 2;
const SINGED_TYPE = 3;
const KEEP_ALIVE = 4;
window.CHAT = {
socket: null,
init: function () {
//判断浏览器是否支持websocket
if (window.WebSocket) {
if (CHAT.socket != null && CHAT.socket != undefined && CHAT.socket.readyState == WebSocket.OPEN) {
return false
}
//创建socket对象
window.CHAT.socket = new WebSocket("ws://127.0.0.1:9090/ws");
CHAT.socket.onopen = function () {
var DataContent = new Object()
DataContent.action = CONNECT_TYPE
DataContent.content = "连接建立成功...."
console.log("连接建立成功....")
CHAT.socket.send(JSON.stringify(DataContent))
setInterval("CHAT.keepAlive()", 5000);
},
CHAT.socket.close = function () {
console.log("连接关闭")
},
CHAT.socket.onerror = function () {
console.log("发生异常")
},
CHAT.socket.onmessage = function (e) {
console.log("接受消息:" + e.data)
var receive = document.getElementById("receiveMsg")
var html = receive.innerHTML;
receive.innerHTML = html + "<br/>" + e.data
}
} else {
alert("该浏览器不支持WebSocket协议")
}
},
chat: function () {
//获取输入框的内容弄
var sendMsg = document.getElementById("msgContent").value;
var DataContent = new Object()
DataContent.action = CHAT_TYPE
DataContent.content = sendMsg
CHAT.socket.send(JSON.stringify(DataContent))
},
keepAlive: function () {
var DataContent = new Object()
DataContent.action = KEEP_ALIVE
DataContent.content = "ping....."
console.log(DataContent)
CHAT.socket.send(JSON.stringify(DataContent))
}
}
CHAT.init()
</script>
</body>
</html>
启动SpringBoot服务,看看结果。
这边可以看到Netty随着SpringBoot服务一起启动了
打开F12,可以看到console打印连接成功,并且心跳正在打印
我这边开了两个客户端,可以看到在任意一边输入都可以在多端同时获取信息
后台也有两个channel的心跳信息
关闭其中一个客户端