1.
package com.xiaogang.netty.chat.server;
import java.util.Date;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
public class ServerHandler extends SimpleChannelHandler {
private static ChannelGroup channelGroup;
public ServerHandler() {
super();
/* 获得客户端在服务器端注册的所有信息,用于向所有客户端分发消息 */
channelGroup = new DefaultChannelGroup("client-channel-group");
}
/**
* 关键方法 用于接收从客户端发来的消息,进行相应的逻辑处理 这里,我们将任何一个客户端发来的消息,都将其转发给所有的客户端。
*/
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
String content = (String) e.getMessage();
System.out.println(
"服务器收到" + e.getChannel().getId() + " 的消息 时间:" + new Date().toString() + " 消息内容:\n" + content);
content = e.getChannel().getId() + ":" + content;
if (content.equalsIgnoreCase("quit")) {
e.getChannel().close();
channelGroup.remove(e.getChannel());
return;
} else {
System.out.println("开始转发到其他客户端!:size=" + channelGroup.size());
for (final Channel ch : channelGroup) {
System.out.println("开始转发到其他客户端!:" + ch.getId());
ch.write(content);
}
}
super.messageReceived(ctx, e);
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
final Channel channel = e.getChannel();
channel.close();
if (channelGroup.contains(channel)) {
System.out.println("一个客户端退出:" + channel.getId());
channelGroup.remove(channel);
}
super.exceptionCaught(ctx, e);
}
@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
System.out.println("新的客户端连入:" + e.getChannel().getId());
channelGroup.add(e.getChannel());
super.channelConnected(ctx, e);
}
@Override
public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
System.out.println("老的客户端退出:" + e.getChannel().getId());
channelGroup.remove(e.getChannel());
super.channelDisconnected(ctx, e);
}
}
2.
package com.xiaogang.netty.chat.server;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
public class ChartServer {
public static int port = 9090;
private ServerBootstrap bootstrap;
private ServerHandler handler;
public void init() {
final Executor bossExecutor = Executors.newCachedThreadPool();// boss
// 监听请求,并分派给slave进行处理
final Executor workerExecutor = Executors.newCachedThreadPool();// slave
// 处理请求,将其丢到线程池中处理
bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(bossExecutor, workerExecutor));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline pipeline = Channels.pipeline();
/* 典型的过滤式处理 */
pipeline.addLast("encode", new StringEncoder());
pipeline.addLast("decode", new StringDecoder());
/* 添加自定义的handler,对请求进行处理 */
pipeline.addLast("handler", handler);
return pipeline;
}
});
/* 使用tcp长连接 */
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.setOption("reuseAddress", true);
}
/**
* 绑定端口,启动netty服务
*/
public void start() {
bootstrap.bind(new InetSocketAddress(port));
System.out.println("服务器启动,端口:" + port);
}
/**
* 关闭netty,释放资源。
*/
public void stop() {
bootstrap.releaseExternalResources();
}
public void setHandler(final ServerHandler handler) {
this.handler = handler;
}
}
3.
package com.xiaogang.netty.chat.server;
public class ServerDemo {
public static void main(final String[] args) {
final ChartServer server = new ChartServer();
server.setHandler(new ServerHandler());
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
server.stop();
} catch (final Exception e) {
System.out.println("run main stop error!");
}
}
});
server.init();
server.start();
}
}
4.
package com.xiaogang.netty.chat.client;
import java.util.Date;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
public class ClientHandler extends SimpleChannelHandler {
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
final String content = (String) e.getMessage();
System.out.println("" + new Date().toString() + "\n" + content);
}
}
5.
package com.xiaogang.netty.chat.client;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import com.xiaogang.netty.chat.server.ChartServer;
public class ChartClient {
private final int port = ChartServer.port;
private final String host = "10.40.30.189";
private ClientBootstrap bootstrap;
private ClientHandler handler;
private ChannelFuture channelFuture;
/**
* 初始化客户端
*/
public void init() {
final Executor bossExecutor = Executors.newCachedThreadPool();// boss
// 监听请求,并分派给slave进行处理
final Executor workerExecutor = Executors.newCachedThreadPool();// slave
// 处理请求,将其丢到线程池中处理
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossExecutor, workerExecutor));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline channelPipeline = Channels.pipeline();
channelPipeline.addLast("encode", new StringEncoder());
channelPipeline.addLast("decode", new StringDecoder());
channelPipeline.addLast("handler", handler);
return channelPipeline;
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
}
public void start() {
channelFuture = bootstrap.connect(new InetSocketAddress(host, port));
System.out.println("连接远程服务器" + host + ":" + port + "端口成功,现在可以开始发消息了。");
}
public void stop() {
channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
channelFuture.getCause().printStackTrace();
}
// 等待或者监听数据全部完成
channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
// 释放连接资源
bootstrap.releaseExternalResources();
}
public void setHandler(final ClientHandler handler) {
this.handler = handler;
}
public ChannelFuture getChannelFuture() {
return channelFuture;
}
}
6.
package com.xiaogang.netty.chat.client;
import java.util.Scanner;
import org.jboss.netty.channel.Channel;
public class ClientRunable implements Runnable {
private ChartClient client;
private final Scanner scanner = new Scanner(System.in);
public void init() {
client.init();
client.start();
}
@Override
public void run() {
while (true) {
final Channel channel = client.getChannelFuture().getChannel();
System.out.println("发送消息(Enter发送):");
final Object msg = scanner.next();
if (msg.toString().equals("quit")) {
System.out.println("wait, you will quit..");
client.stop();
}
channel.write(msg);
}
}
public void setClient(final ChartClient client) {
this.client = client;
}
}
7.
package com.xiaogang.netty.chat.client;
public class ClientDemo {
public static void main(final String[] args) {
final ClientRunable client = new ClientRunable();
final ChartClient clientChart = new ChartClient();
clientChart.setHandler(new ClientHandler());
client.setClient(clientChart);
client.init();
final Thread thread = new Thread(client);
thread.start();
}
}