netty常用使用方式

时间:2023-03-08 18:25:35

最近在重新看netty,在这里总结一些netty的一些常用的使用方式,类似于模板,方便速查。

以netty 4.1.x的API作记录,不同版本可能API有略微差异,需要注意netty5被废弃掉了(辨别方式是看SimpleChannelInboundHandler是否有一个messageReceived方法 有的话就是5),netty3是以org.jboss开头为包名。

统一模板

大多数情况下使用netty的步骤是定义好EventLoopGroup,定义好Bootstrap(ServerBootstrap)以及使用的channel类型(一般就是NioSocketChannel,服务端是NioServerSocketChannel)。

剩下是业务相关,使用的ChannelInitializer和具体的handler。

主要就是2+N(N为自定义的handler数量)个类。

服务端启动模板(也可以不区分boss和worker 用一个):

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyChannelInitializer());
ChannelFuture future = serverBootstrap.bind(8999).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

客户端启动模板:

EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new MyChannelInitializer());
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}

ChannelInitializer模板(继承ChannelInitializer即可):

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

	@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(...);
} }

接下来的例子就以这个模板为骨架,主要涉及到初始化器的代码,启动代码一致。

处理Http请求

netty自带了对用的codec类比较方便。

pipeline.addLast("httpServerCodec", new HttpServerCodec());

自己实现的handler最简单的方式用SimpleChannelInboundHandler接收HttpRequest方法即可

class MyHttpHandler extends SimpleChannelInboundHandler<HttpRequest>

这里简单说下SimpleChannelInboundHandler这个类,他是简化处理接受信息并处理的一个类,主要做两件事。

第一件事是根据泛型决定是否处理这个消息,能够处理就自己处理,不行就交给下一个(可以参考acceptInboundMessagechannelRead方法)。

第二件事是消息的自动回收(有构造函数支持 默认是true),消息的引用计数会减一(所以在其他地方还要使用记得再retain一下)。

使用它可以节省很多冗余代码的编写。

一个简单例子:

public class MyHttpHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
System.out.println(msg.getClass());
System.out.println(msg.uri());
System.out.println(msg.method().name());
System.out.println(ctx.channel().remoteAddress());
System.out.println("headers:");
msg.headers().forEach(System.out::println);
ByteBuf buf = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
ctx.writeAndFlush(response);
// ctx.channel().close();
}
}

不过只用这个handler并不能拿到content,还需要配合ChunkedWriteHandlerHttpObjectAggregator得到FullHttpRequest对象。

处理WebSocket请求

只需要在上面的基础上增加一个WebSocketServerProtocolHandler即可,完整如下:

    pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8096));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

自己的处理器可以接收并处理WebSocketFrame的子类。

自定义文本协议

netty提供了几个比较方便的用于自定义文本协议的编解码器。

基于长度

pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyHandler());

上述参数的意义直接搬了文档:

 lengthFieldOffset   = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field) BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+

基于分隔符

    ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

第二个参数需要一个ByteBuf[],自己指定下分隔符即可。

注意使用这种发消息的时候要带上那个分隔符,不然会处理失败(被当做未结束)。

广播实现

利用了netty提供的一个很便利的类,ChannelGroup.

首先要了解一下Channel生命周期函数的调用:

channel added -> channel registered -> channel active -> channel inactive -> channel unregistered -> channel removed

我们可以定义一个静态(保证共享和唯一)的ChannelGroup在channel added的时候把对应channel增加到ChannelGroup中即可(但不需要自己移除,这一点他自己实现了)

然后利用它的写方法就可以实现广播,或者forEach做下过滤做多播也可以。

这个实现和用什么协议无关,主要涉及到ChannelGroup使用。

心跳

netty自带的IdleStateHandler,超时后会向下一个handler发出IdleStateEvent消息,接收并处理即可。

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
ctx.channel().writeAndFlush("超时").addListener((ChannelFuture ch) -> ch.channel().close());
}
}

它细分为3中类型的超时,读、写、读写,通过IdleStateEvent的state属性可以获取,可以单独判断。

其他的一些模式会在之后复习和使用过程中不断补完~

2017年10月08日00点53分 init

2017年10月10日10点02分 更新websocket

相关文章