服务端:
package com.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * netty5版本服务端 */ public class Server { public static void main(String[] args) { //服务类 ServerBootstrap bootstrap = new ServerBootstrap(); //boss和worker, netty5不是线程池,而是事件循环组,里面包含线程池。 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { //设置线程池 bootstrap.group(boss, worker);//boss用来监听端口的 //设置socket工厂、 bootstrap.channel(NioServerSocketChannel.class); //设置管道工厂 bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); //netty3中对应设置如下 //bootstrap.setOption("backlog", 1024); //bootstrap.setOption("tcpNoDelay", true); //bootstrap.setOption("keepAlive", true); //设置参数,TCP参数 bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小。tcp的服务端是有队列的。队列保存2048个客户端。2048后面的连接是拒绝的。 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接 bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送。发一包并不是马上发出去,而是积累到一定之后再发出去。 //绑定端口 ChannelFuture future = bootstrap.bind(10101); System.out.println("start"); //等待服务端关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //释放资源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
package com.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 服务端消息处理 */ public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); ctx.channel().writeAndFlush("hi"); ctx.writeAndFlush("hi"); } /** * 新客户端接入 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } /** * 客户端断开 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } /** * 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
客户端:
package com.client; import java.io.BufferedReader; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * netty5版本的客户端 */ public class Client { public static void main(String[] args) { //服务类 Bootstrap bootstrap = new Bootstrap(); //worker EventLoopGroup worker = new NioEventLoopGroup();//boss用来监听端口,这里只创建worker try { //设置线程池 bootstrap.group(worker); //设置socket工厂、 bootstrap.channel(NioSocketChannel.class); //设置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ System.out.println("请输入:"); String msg = bufferedReader.readLine(); connect.channel().writeAndFlush(msg); } } catch (Exception e) { e.printStackTrace(); } finally{ worker.shutdownGracefully(); } } }
package com.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 客户端消息处理 */ public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("客户端收到消息:"+msg); } }
一个客户端启动多个连接:
package com.client; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** 多连接客户端,客户端保持一个连接不够,要保持多个连接。 线程池是有多个线程,每个线程里面有一个任务队列,线程run的时候会从任务队列取一个任务出来,执行任务的run方法, 队列里面没有任务就阻塞等待新的任务进来。 一个thread + 队列 == 一个单线程线程池 =====> 线程安全的,任务是线性串行执行的 对象池:首先初始化n个对象,把这些对象放入一个队列里面,需要对象的时候会出栈一个对象,有对象就出栈,使用完了归还对象池里面。 没有对象会阻塞等待有可用的对象。或者创建一个新的对象使用完之后归还线程池,归还的时候如果池子满了就销毁。 比如数据库连接池:使用完后要释放资源,就是把连接放回连接池里面。 对象组:首先初始化n个对象,把这些对象放入一个数组里面。使用的时候获取一个对象不移除,使用完之后不用归还。需要对象有并发的能力。 对象组:线程安全,不会产生阻塞效应 对象池:线程不安全,会产生阻塞效应 */ public class MultClient { /** * 服务类 */ private Bootstrap bootstrap = new Bootstrap(); /** * 会话,多个channel, */ private List<Channel> channels = new ArrayList<>(); /** * 引用计数 */ private final AtomicInteger index = new AtomicInteger(); /** * 初始化 * @param count */ public void init(int count){ //worker EventLoopGroup worker = new NioEventLoopGroup(); //设置线程池 bootstrap.group(worker); //设置socket工厂、 bootstrap.channel(NioSocketChannel.class); //设置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); for(int i=1; i<=count; i++){ ChannelFuture future = bootstrap.connect("192.168.0.103", 10101); channels.add(future.channel()); } } /** * 获取会话 */ public Channel nextChannel(){ return getFirstActiveChannel(0); } private Channel getFirstActiveChannel(int count){ Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size())); if(!channel.isActive()){ //重连 reconnect(channel); if(count >= channels.size()){ throw new RuntimeException("no can use channel"); } return getFirstActiveChannel(count + 1); } return channel; } /** * 重连 * @param channel */ private void reconnect(Channel channel){ synchronized(channel){ if(channels.indexOf(channel) == -1){//已经重连过了 return ; } Channel newChannel = bootstrap.connect("192.168.0.103", 10101).channel(); channels.set(channels.indexOf(channel), newChannel); } } }
package com.client; import java.io.BufferedReader; import java.io.InputStreamReader; /** * 启动类 */ public class Start { public static void main(String[] args) { MultClient client = new MultClient(); client.init(5);//初始化5个连接 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ try { System.out.println("请输入:"); String msg = bufferedReader.readLine(); client.nextChannel().writeAndFlush(msg); } catch (Exception e) { e.printStackTrace(); } } } }