Spring Boot与Netty打造TCP服务端(解决粘包问题)

时间:2025-04-14 10:14:33
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; /** * I/O数据读写处理类 * * @author xiaobo */ @Slf4j public class CarTcpNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter { /** * 从客户端收到新的数据时,这个方法会在收到消息时被调用 * * @param ctx * @param msg */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { // 这里是在前面的DelimiterBasedFrameDecoder转为了ByteBuf,验证是否是ByteBuf if (msg instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) msg; try { String receivedData = byteBuf.toString(CharsetUtil.UTF_8); // 接收完整数据 handleReceivedData(receivedData); } finally { // 释放 ByteBuf 占用的资源 byteBuf.release(); // 回复消息 ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8)); } } } private void handleReceivedData(String receivedData) { // 数据处理 // 这里如果想实现spring中bean的注入,可以用geBean的方式获取 } /** * 从客户端收到新的数据、读取完成时调用 * * @param ctx */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { log.info("channelReadComplete"); ctx.flush(); } /** * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException { cause.printStackTrace(); ctx.close();// 抛出异常,断开与客户端的连接 } /** * 客户端与服务端第一次建立连接时 执行 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException { super.channelActive(ctx); ctx.channel().read(); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); // 此处不能使用(),否则客户端始终无法与服务端建立连接 System.out.println("channelActive:" + clientIp + ctx.name()); // 这里是向客户端发送回应 ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8)); } /** * 客户端与服务端 断连时 执行 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException { super.channelInactive(ctx); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); // 断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机 ctx.close(); log.info("channelInactive:{}", clientIp); } /** * 服务端当read超时, 会调用这个方法 * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { super.userEventTriggered(ctx, evt); InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = socket.getAddress().getHostAddress(); ctx.close();// 超时时断开连接 log.info("userEventTriggered:" + clientIp); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { log.info("channelWritabilityChanged"); } }