Netty websocket 实现服务端推送消息

时间:2021-02-28 14:54:24

当握手完成时,将channel加入到ChannelGroup中,当发生异常或失活的时候,将channel从ChannelGroup中移除。

ChannelGroup维护着所有活动着的链接,使用ChannelGroup向所有Channel推送消息。

package netty.pushmsg;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private final ChannelGroup channelGroup;

    public TextWebSocketFrameHandler(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        channelGroup.remove(ctx.channel());
        ctx.channel().close();
        System.out.println("失活");
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        channelGroup.remove(ctx.channel());
        ctx.channel().close();
        cause.printStackTrace();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            //完成握手
            System.out.println("握手完成");
            channelGroup.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println(msg.text());
        System.out.println("收到消息,可以推送消息了");
        //推送消息
        channelGroup.writeAndFlush(msg.retain());
    }
}
package netty.pushmsg;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.concurrent.ImmediateEventExecutor;

public final class WebSocketServer {

    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

    private final NioEventLoopGroup group = new NioEventLoopGroup();

    private Channel channel;

    public ChannelFuture start() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .localAddress(8060)
                .childHandler(new WebSocketServerInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind();
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    public void destory() {
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }


    public static void main(String[] args) {
        WebSocketServer endpoint = new WebSocketServer();
        ChannelFuture future = endpoint.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destory();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();

    }

    public static class WebSocketServerInitializer extends ChannelInitializer<Channel> {

        private final ChannelGroup channelGroup;

        public WebSocketServerInitializer(ChannelGroup channelGroup) {
            this.channelGroup = channelGroup;
        }

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(
                    new HttpServerCodec(),
                    new HttpObjectAggregator(64 * 1024),
                    new WebSocketServerProtocolHandler("/app/websocket"),
                    new TextWebSocketFrameHandler(channelGroup));

        }
    }
}
握手需要http协议,所以需要添加HttpServerCodec 和HttpObjectAggregator。客户端我们使用浏览器代码如下:
<html>
<head>
    <title>Web Socket Test</title>
</head>
<body>
<script type="text/javascript">
    var socket;
    if (!window.WebSocket) {
        window.WebSocket = window.MozWebSocket;
    }
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:8060/app/websocket");
        socket.onmessage = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = ta.value + '\n' + event.data
        };
        socket.onopen = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = "Web Socket opened!";
        };
        socket.onclose = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = ta.value + "Web Socket closed";
        };
    } else {
        alert("Your browser does not support Web Socket.");
    }

    function send(message) {
        if (!window.WebSocket) { return; }
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("The socket is not open.");
        }
    }
</script>
<form onsubmit="return false;">
    <input type="text" name="message" value="Hello, World!"><input
        type="button" value="Send Web Socket Data"
        onclick="send(this.form.message.value)">
    <h3>Output</h3>
    <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
</form>
</body>
</html>