当握手完成时,将channel加入到ChannelGroup中,当发生异常或失活的时候,将channel从ChannelGroup中移除。
ChannelGroup维护着所有活动着的链接,使用ChannelGroup向所有Channel推送消息。
package netty.pushmsg; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final ChannelGroup channelGroup; public TextWebSocketFrameHandler(ChannelGroup channelGroup) { this.channelGroup = channelGroup; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { channelGroup.remove(ctx.channel()); ctx.channel().close(); System.out.println("失活"); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { channelGroup.remove(ctx.channel()); ctx.channel().close(); cause.printStackTrace(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { //完成握手 System.out.println("握手完成"); channelGroup.add(ctx.channel()); } else { super.userEventTriggered(ctx, evt); } } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println(msg.text()); System.out.println("收到消息,可以推送消息了"); //推送消息 channelGroup.writeAndFlush(msg.retain()); } }
package netty.pushmsg; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.util.concurrent.ImmediateEventExecutor; public final class WebSocketServer { private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final NioEventLoopGroup group = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start() { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .localAddress(8060) .childHandler(new WebSocketServerInitializer(channelGroup)); ChannelFuture future = bootstrap.bind(); future.syncUninterruptibly(); channel = future.channel(); return future; } public void destory() { if (channel != null) { channel.close(); } channelGroup.close(); group.shutdownGracefully(); } public static void main(String[] args) { WebSocketServer endpoint = new WebSocketServer(); ChannelFuture future = endpoint.start(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { endpoint.destory(); } }); future.channel().closeFuture().syncUninterruptibly(); } public static class WebSocketServerInitializer extends ChannelInitializer<Channel> { private final ChannelGroup channelGroup; public WebSocketServerInitializer(ChannelGroup channelGroup) { this.channelGroup = channelGroup; } @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast( new HttpServerCodec(), new HttpObjectAggregator(64 * 1024), new WebSocketServerProtocolHandler("/app/websocket"), new TextWebSocketFrameHandler(channelGroup)); } } }
握手需要http协议,所以需要添加HttpServerCodec 和HttpObjectAggregator。客户端我们使用浏览器代码如下:
<html> <head> <title>Web Socket Test</title> </head> <body> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost:8060/app/websocket"); socket.onmessage = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + '\n' + event.data }; socket.onopen = function(event) { var ta = document.getElementById('responseText'); ta.value = "Web Socket opened!"; }; socket.onclose = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + "Web Socket closed"; }; } else { alert("Your browser does not support Web Socket."); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("The socket is not open."); } } </script> <form onsubmit="return false;"> <input type="text" name="message" value="Hello, World!"><input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)"> <h3>Output</h3> <textarea id="responseText" style="width: 500px; height: 300px;"></textarea> </form> </body> </html>