目录
一、pom依赖
二、配置yml文件
三、服务端
四、客户端
五、粘包和拆包问题
一、pom依赖
<dependency>
<groupId></groupId>
<artifactId>netty-all</artifactId>
<version>4.1.</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.8</version>
</dependency>
二、配置yml文件
server:
port: 8001
servlet:
context-path: /netty
netty:
url: 0.0.0.0 #0.0.0.0表示绑定任意ip
port: 20004
三、服务端
package ;
import ;
import .*;
import ;
import ;
import ;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建两个线程组bossGroup和workerGroup,含有的子线程NioEventLoop的个数默认是CPU的两倍
//bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
//创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
(bossGroup, workerGroup)//设置两个线程组
.channel()//使用NioServerSocketChannel作为服务器的通道实现
//初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
//多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//对workerGroup的SocketChannel设置处理器
().addLast(new NettyServerHandler());
}
});
("netty server start..");
//绑定一个端口并且同步生成一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
//启动服务器(并绑定的端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = (9000).sync();
//给cf注册监听器,监听我们关心的事件
(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (()) {
("监听端口9000成功");
} else {
("监听端口9000失败");
}
}
});
//等待服务端监听端口关闭,closeFuture是异步操作
//通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
().closeFuture().sync();
} finally {
();
();
}
}
}
NettyServer类中的().addLast(new NettyServerHandler());对应以下的处理器。
package ;
import ;
import ;
import ;
import ;
import ;
import .slf4j.Slf4j;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
("客户端发送消息是:" + (CharsetUtil.UTF_8));
// 读取byteBuf
// 业务处理
// 回消息给客户端
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ("HelloClient".getBytes(CharsetUtil.UTF_8));
(buf);
}
//只要Netty抛出错误就会执行,Netty断会开连接会抛出连接超时的错误
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
("关闭通道");
();
();
}
}
四、客户端
package ;
import ;
import ;
import ;
import ;
import ;
import ;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是SocketBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
(group) //设置线程组
.channel()// 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
().addLast(new NettyClientHandler());
}
});
("netty client start..");
ChannelFuture cf = ("127.0.0.1", 9000).sync();
().closeFuture().sync();
}finally {
();
}
}
}
NettyClient类中().addLast(new NettyClientHandler());为处理器。
package ;
import ;
import ;
import ;
import ;
import ;
import .slf4j.Slf4j;
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接标识
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ("HelloServer".getBytes(CharsetUtil.UTF_8));
(buf);
}
//当通道建立后有事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
("收到服务端的消息是:" + (CharsetUtil.UTF_8));
("服务端地址是:" + ().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
("关闭通道");
();
();
}
}
五、粘包和拆包问题
客户端和服务端都是固定的框架,我们只需写处理器。
粘包和拆包问题,可以自己手写通过固定长度发送数据,或者使用Google的Protostuff。
<dependency>
<groupId></groupId>
<artifactId>protostuff-api</artifactId>
<version>1.0.8</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.8</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.8</version>
</dependency>
package ;
import ;
import ;
import ;
import ;
import ;
import ;
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) (clazz);
if (schema == null) {
schema = (clazz);
if (schema != null) {
(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*
* @param obj
* @return
*/
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) ();
LinkedBuffer buffer = (LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return (obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException((), e);
} finally {
();
}
}
/**
* 反序列化
*
* @param data
* @param clazz
* @return
*/
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = ();
Schema<T> schema = getSchema(clazz);
(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException((), e);
}
}
}