Netty 3.x 和Netty4.x 开发示例

时间:2022-08-05 20:49:05

Netty 4.x 示例:

服务器端:
Server

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootStrap = new ServerBootstrap();
            bootStrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    });
            ChannelFuture f = bootStrap.bind(port).sync();
            System.out.println("The server opened the port:" + port);
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 30000;
        }
        new Server(port).run();
    }
}

ServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.UnsupportedEncodingException;

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
        ByteBuf in = (ByteBuf) msg;
        byte[] req = new byte[in.readableBytes()];
        in.readBytes(req);
        String body = new String(req, "utf-8");
        System.out.println("Receive message from client:" + body);
        //回写响应消息
        ctx.write(Unpooled.copiedBuffer("Success".getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端:
Client

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Scanner;

public class Client implements Runnable {
    private String host;
    private int port;
    static ClientHandler client = new ClientHandler();

    public static void main(String[] args) throws Exception {
        new Thread(new Client("localhost",30000)).start();
        Scanner scanner = new Scanner(System.in);
        while (client.sendMsg(scanner.nextLine())) ;
    }

    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public void run() {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootStrap = new Bootstrap();
            bootStrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(client);
                }
            });
            ChannelFuture f = bootStrap.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

ClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.UnsupportedEncodingException;

public class ClientHandler extends ChannelInboundHandlerAdapter {
   private ChannelHandlerContext ctx;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
    }

    public boolean sendMsg(String msg) {
        System.out.println("The client send data: :" + msg);
        byte[] req = msg.getBytes();
        ByteBuf m = Unpooled.buffer(req.length);
        m.writeBytes(req);
        ctx.writeAndFlush(m);
        return msg.equals("q") ? false : true;
    }

    /** * 收到服务器消息后调用 */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "utf-8");
        System.out.println("The server response message:" + body);
    }

    /** * 发生异常时调用 */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty3.x 示例

服务器端:

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

public class Server {
    public static ChannelGroup channelGroup = new DefaultChannelGroup();
    private ChannelFactory factory;
    private String host;
    private int port;

    public Server(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        Server server = new Server("localhost", 30000);
        server.start();
    }

    public void start() {
        factory = new NioServerSocketChannelFactory(
                Executors.newCachedThreadPool(),    // boss线程池
                Executors.newCachedThreadPool(),    // worker线程池
                8); // worker线程数

        ServerBootstrap bootstrap = new ServerBootstrap(factory);

        bootstrap.setOption("reuseAddress", true);

        ServerChannelPipelineFactory channelPiplineFactory =
                new ServerChannelPipelineFactory();
        bootstrap.setPipelineFactory(channelPiplineFactory);
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("writeBufferLowWaterMark", 64 * 1024);//默认是32k
        bootstrap.setOption("writeBufferHighWaterMark", 128 * 1024);
        // 这里绑定服务端监听的IP和端口
        Channel channel = bootstrap.bind(new InetSocketAddress(host, port));
        Server.channelGroup.add(channel);

        System.out.println("Server is started...");
    }

    public void stop() {
        // ChannelGroup为其管理的Channels提供一系列的批量操作
        // 关闭的Channel会自动从ChannelGroup中移除
        ChannelGroupFuture channelGroupFuture = Server.channelGroup.close();
        channelGroupFuture.awaitUninterruptibly();
        factory.releaseExternalResources();
        System.out.println("Server is stopped.");
    }
}

ServerChannelPipelineFactory

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

import java.util.concurrent.Executor;

public class ServerChannelPipelineFactory implements ChannelPipelineFactory {

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        ServerReadDecoder serverReadDecoder = new ServerReadDecoder();
        ServerWriteEncoder serverWriteEncoder = new ServerWriteEncoder();
        Executor executor =
                new OrderedMemoryAwareThreadPoolExecutor(4, 200, 200);
        ServerExecutionHandler serverExecutionHandler =
                new ServerExecutionHandler(executor);
        ServerLogicHandler serverLogicHandler = new ServerLogicHandler();


        ChannelPipeline channelPipeline = Channels.pipeline();
        channelPipeline.addLast("1", serverReadDecoder);
        channelPipeline.addLast("2", serverWriteEncoder);
        channelPipeline.addLast("3", serverExecutionHandler);
        channelPipeline.addLast("4", serverLogicHandler);

        return channelPipeline;
    }

}

Handler

import org.jboss.netty.handler.execution.ExecutionHandler;

import java.util.concurrent.Executor;

// 提供一个线程池
public class ServerExecutionHandler extends ExecutionHandler {

    public ServerExecutionHandler(Executor executor) {
        super(executor);
    }
}
public class ServerLogicHandler extends SimpleChannelHandler {
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        System.out.println("Server Channel Connected");
        // channel group is thread safe
        Server.channelGroup.add(e.getChannel());
// System.out.println(e.getChannel().toString());
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("Receive message from client.....");

        String clientData = (String) e.getMessage();
        System.out.println("The message receive from client is : " + clientData);

        Channel ch = e.getChannel();
        String responseMessage = "success";
        ctx.getChannel().write(responseMessage);
        super.messageReceived(ctx, e);
// ChannelFuture channelFuture = ch.write(responseMessage);
// channelFuture.addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future)
// throws Exception {
// Channel channel = future.getChannel();
// channel.close();
// }
// });
// System.out.println("Response message has sent to client.");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        Channel ch = e.getChannel();
        ch.close();
    }
}
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.util.CharsetUtil;

public class ServerReadDecoder extends StringDecoder {

    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel,
                            Object msg) throws Exception {
        System.out.println("Server Read Decoder");
        return new String(((ChannelBuffer) msg).array(), CharsetUtil.UTF_8);
    }
}
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.util.CharsetUtil;

public class ServerWriteEncoder extends StringEncoder {

    @Override
    protected Object encode(ChannelHandlerContext ctx, Channel channel,
                            Object msg) throws Exception {
        System.out.println("Server Write Encoder");

        return ChannelBuffers.copiedBuffer((String) msg, CharsetUtil.UTF_8);
    }
}

客户端:

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.Executors;

public class Client implements Runnable{
    private String host;
    private int port;

    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) {
        // 同服务端相同,只是这里使用的是NioClientSocketChannelFactory
// start();
        Thread thead = new Thread(new Client("localhost", 30000));
        thead.start();
        @SuppressWarnings("resource")
        Scanner scanner = new Scanner(System.in);
        while (ClientLogicHandler.sendMsg(scanner.nextLine())) ;

    }


    @Override
    public void run() {
        final ChannelFactory factory = new NioClientSocketChannelFactory(
                Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool(),
                8);

        // ClientBootstrap用于帮助客户端启动
        ClientBootstrap bootstrap = new ClientBootstrap(factory);
        // 由于客户端不包含ServerSocketChannel,所以参数名不能带有child.前缀
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", false);

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline channelPipeline =
                        Channels.pipeline(new ClientReadDecoder(),
                                new ClientWriteEncoder(), new ClientLogicHandler());

                return channelPipeline;
            }
        });
        // 这里连接服务端绑定的IP和端口
        bootstrap.connect(new InetSocketAddress(host, port));
        System.out.println("Client is started...");
    }
}

Handler

import org.jboss.netty.channel.*;

import java.util.Scanner;

public class ClientLogicHandler extends SimpleChannelHandler {
    private static ChannelHandlerContext ctx;

    // private Channel channel;
//
// public void writeData(String messge) {
// channel.write(messge);
// }
    public static boolean sendMsg(String msg) {
        System.out.println("The client send data: :" + msg);

        ctx.getChannel().write(msg);
        return msg.equals("q") ? false : true;
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        this.ctx = ctx;
        System.out.println("Client Channel Connected");
// Channel ch = e.getChannel();
// Scanner scanner = new Scanner(System.in);
// System.out.println("请输入");
// ch.write(scanner.next());

    }

    @Override
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
            throws Exception {
        System.out.println("Client write Complete");
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("Receive response message from server.... ");

        String msg = (String) e.getMessage();
        System.out.println("The message gotten from server is : " + msg);
        super.messageReceived(ctx, e);
// ChannelFuture channelFuture = e.getChannel().close();
// channelFuture.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        Channel ch = e.getChannel();
        ch.close();
    }
}
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.util.CharsetUtil;

public class ClientReadDecoder extends StringDecoder {

    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel,
                            Object msg) throws Exception {
        System.out.println("Client Read Decoder");

        return new String(((ChannelBuffer) msg).array(), CharsetUtil.UTF_8);
    }
}
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.util.CharsetUtil;

public class ClientWriteEncoder extends StringEncoder {

    @Override
    protected Object encode(ChannelHandlerContext ctx, Channel channel,
                            Object msg) throws Exception {
        System.out.println("Client Write Encoder");

        return ChannelBuffers.copiedBuffer((String) msg, CharsetUtil.UTF_8);
    }
}