netty实现简单聊天室

时间:2020-12-07 19:58:19

1.

package com.xiaogang.netty.chat.server;

import java.util.Date;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;

public class ServerHandler extends SimpleChannelHandler {
private static ChannelGroup channelGroup;

public ServerHandler() {
super();
/* 获得客户端在服务器端注册的所有信息,用于向所有客户端分发消息 */
channelGroup = new DefaultChannelGroup("client-channel-group");
}

/**
* 关键方法 用于接收从客户端发来的消息,进行相应的逻辑处理 这里,我们将任何一个客户端发来的消息,都将其转发给所有的客户端。
*/
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {

String content = (String) e.getMessage();
System.out.println(
"服务器收到" + e.getChannel().getId() + " 的消息 时间:" + new Date().toString() + " 消息内容:\n" + content);
content = e.getChannel().getId() + ":" + content;
if (content.equalsIgnoreCase("quit")) {
e.getChannel().close();
channelGroup.remove(e.getChannel());
return;
} else {
System.out.println("开始转发到其他客户端!:size=" + channelGroup.size());
for (final Channel ch : channelGroup) {
System.out.println("开始转发到其他客户端!:" + ch.getId());
ch.write(content);
}
}

super.messageReceived(ctx, e);
}

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
final Channel channel = e.getChannel();
channel.close();
if (channelGroup.contains(channel)) {
System.out.println("一个客户端退出:" + channel.getId());
channelGroup.remove(channel);
}
super.exceptionCaught(ctx, e);
}

@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
System.out.println("新的客户端连入:" + e.getChannel().getId());
channelGroup.add(e.getChannel());
super.channelConnected(ctx, e);
}

@Override
public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
System.out.println("老的客户端退出:" + e.getChannel().getId());
channelGroup.remove(e.getChannel());
super.channelDisconnected(ctx, e);
}

}


2.

package com.xiaogang.netty.chat.server;

import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

public class ChartServer {
public static int port = 9090;
private ServerBootstrap bootstrap;
private ServerHandler handler;

public void init() {
final Executor bossExecutor = Executors.newCachedThreadPool();// boss
// 监听请求,并分派给slave进行处理
final Executor workerExecutor = Executors.newCachedThreadPool();// slave
// 处理请求,将其丢到线程池中处理
bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(bossExecutor, workerExecutor));

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

@Override
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline pipeline = Channels.pipeline();
/* 典型的过滤式处理 */
pipeline.addLast("encode", new StringEncoder());
pipeline.addLast("decode", new StringDecoder());
/* 添加自定义的handler,对请求进行处理 */
pipeline.addLast("handler", handler);
return pipeline;
}
});

/* 使用tcp长连接 */
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.setOption("reuseAddress", true);
}

/**
* 绑定端口,启动netty服务
*/
public void start() {
bootstrap.bind(new InetSocketAddress(port));
System.out.println("服务器启动,端口:" + port);
}

/**
* 关闭netty,释放资源。
*/
public void stop() {
bootstrap.releaseExternalResources();
}

public void setHandler(final ServerHandler handler) {
this.handler = handler;
}
}


3.

package com.xiaogang.netty.chat.server;

public class ServerDemo {
public static void main(final String[] args) {
final ChartServer server = new ChartServer();
server.setHandler(new ServerHandler());

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override
public void run() {
try {
server.stop();
} catch (final Exception e) {
System.out.println("run main stop error!");
}
}

});
server.init();
server.start();
}

}


4.

package com.xiaogang.netty.chat.client;

import java.util.Date;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class ClientHandler extends SimpleChannelHandler {

@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
final String content = (String) e.getMessage();
System.out.println("" + new Date().toString() + "\n" + content);
}
}



5.

package com.xiaogang.netty.chat.client;

import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

import com.xiaogang.netty.chat.server.ChartServer;

public class ChartClient {
private final int port = ChartServer.port;
private final String host = "10.40.30.189";
private ClientBootstrap bootstrap;
private ClientHandler handler;
private ChannelFuture channelFuture;

/**
* 初始化客户端
*/
public void init() {
final Executor bossExecutor = Executors.newCachedThreadPool();// boss
// 监听请求,并分派给slave进行处理
final Executor workerExecutor = Executors.newCachedThreadPool();// slave
// 处理请求,将其丢到线程池中处理

bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossExecutor, workerExecutor));

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline channelPipeline = Channels.pipeline();
channelPipeline.addLast("encode", new StringEncoder());
channelPipeline.addLast("decode", new StringDecoder());
channelPipeline.addLast("handler", handler);
return channelPipeline;
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
}

public void start() {
channelFuture = bootstrap.connect(new InetSocketAddress(host, port));
System.out.println("连接远程服务器" + host + ":" + port + "端口成功,现在可以开始发消息了。");
}

public void stop() {
channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
channelFuture.getCause().printStackTrace();
}
// 等待或者监听数据全部完成
channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
// 释放连接资源
bootstrap.releaseExternalResources();

}

public void setHandler(final ClientHandler handler) {
this.handler = handler;
}

public ChannelFuture getChannelFuture() {
return channelFuture;
}

}


6.

package com.xiaogang.netty.chat.client;

import java.util.Scanner;

import org.jboss.netty.channel.Channel;

public class ClientRunable implements Runnable {

private ChartClient client;

private final Scanner scanner = new Scanner(System.in);

public void init() {
client.init();
client.start();
}

@Override
public void run() {
while (true) {

final Channel channel = client.getChannelFuture().getChannel();
System.out.println("发送消息(Enter发送):");
final Object msg = scanner.next();
if (msg.toString().equals("quit")) {
System.out.println("wait, you will quit..");
client.stop();

}
channel.write(msg);
}
}

public void setClient(final ChartClient client) {
this.client = client;
}

}

7.

package com.xiaogang.netty.chat.client;

public class ClientDemo {
public static void main(final String[] args) {
final ClientRunable client = new ClientRunable();
final ChartClient clientChart = new ChartClient();
clientChart.setHandler(new ClientHandler());
client.setClient(clientChart);
client.init();
final Thread thread = new Thread(client);
thread.start();
}
}


netty实现简单聊天室

netty实现简单聊天室