Netty实现简单聊天室
服务端:
public class Server {
int port;
public Server(int port) throws InterruptedException {
this.port = port;
start_Server();
}
public void start_Server() throws InterruptedException {
EventLoopGroup boos = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel arg0) throws Exception {
arg0.pipeline().addLast(new StringEncoder());
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("Server.start_Server()");
channelFuture.channel().closeFuture().sync();
}
public static void main(String[] args) throws InterruptedException {
Server server = new Server(9978);
}
}
服务端处理:
public class ServerHandler extends SimpleChannelInboundHandler<String>{
public static final ChannelGroup group = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext arg0, String arg1) throws Exception {
Channel channel = arg0.channel();
for(Channel ch :group) {
if (ch==channel) {
arg0.writeAndFlush("[你说]:"+arg1+"\n");
} else {
arg0.writeAndFlush("["+channel.remoteAddress()+"]"+arg1+"\n");
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println("["+channel.remoteAddress()+"]"+"线上");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println("["+channel.remoteAddress()+"]"+"下线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(
"[" + ctx.channel().remoteAddress() + "]" + "exit the room");
ctx.close().sync();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
for(Channel ch :group) {
ctx.writeAndFlush( "[" + channel.remoteAddress() + "] " + "上线了");
}
group.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
for (Channel ch : group) {
ch.writeAndFlush(
"[" + channel.remoteAddress() + "] " + "is comming");
}
group.remove(channel);
}
}
客户端:
public class Client {
int port;
public Client(int port) throws InterruptedException, IOException {
this.port = port;
connect();
}
public void connect() throws InterruptedException, IOException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
pipeline.addLast("stringD", new StringDecoder());
pipeline.addLast("stringC", new StringEncoder());
pipeline.addLast("http", new HttpClientCodec());
pipeline.addLast("chat", new ClientHandler());
}
});
Channel channel = bootstrap.connect("127.0.0.1",port).sync().channel();
while (true) {
BufferedReader reader = new BufferedReader(
new InputStreamReader(System.in));
String input = reader.readLine();
if (input != null) {
if ("quit".equals(input)) {
System.exit(1);
}
channel.writeAndFlush(input);
}
}
}
public static void main(String[] args) throws InterruptedException, IOException {
Client client = new Client(9978);
}
}
客户处理:
public class ClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext arg0, String arg1) throws Exception {
System.out.println(arg1);
}
}