4.1 服务端
超过 timeout 毫秒没有连接,关闭服务端
public class ChatServer {
public static void main(String[] args) throws IOException {
new ChatServer().startServer();
}
/**
* 服务器端启动的方法
*/
public void startServer() throws IOException {
// 1、创建Selector选择器
Selector selector = Selector.open();
// 2、创建服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3、为channel通道绑定端口、设置为非阻塞模式
serverSocketChannel.bind(new InetSocketAddress(8000));
serverSocketChannel.configureBlocking(false);
// 4、把channel通道注册到选择器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器已启动成功!");
// 5、循环查询就绪状态
while(selector.select() > 0){
// 6、得到选择键集合,并遍历
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey sk : selectionKeys) {
// 6.1、可接收状态:表示服务端已做好准备,可以接收客户的连接了
if(sk.isAcceptable()){
// 处理可接收时的操作
acceptOperator(serverSocketChannel,selector);
}
// 6.2、可读状态:表示客户端已发送完毕,可以在服务端读取数据了
if(sk.isReadable()){
// 处理可读时的操作
readOperator(selector,sk);
}
}
// 清理选择键集合:为下次轮询查询做准备
selectionKeys.clear();
}
}
/**
* 可接收状态时的处理操作
* @param serverSocketChannel 服务端通道
* @param selector 选择器
* @throws IOException
*/
private static void acceptOperator(ServerSocketChannel serverSocketChannel,Selector selector) throws IOException {
// 1、获取客户端通道 SocketChannel
SocketChannel sc = serverSocketChannel.accept();
// 2、设置为非阻塞模式
sc.configureBlocking(false);
// 3、把通道注册到选择器上,监听可读状态
sc.register(selector,SelectionKey.OP_READ);
// 4、回复客户端
ByteBuffer replyStr = Charset.forName("utf8").encode("欢迎进入聊天室!");
sc.write(replyStr);
}
/**
* 可读状态时的处理操作
* @param selector 选择器
* @param sk 选择键
* @throws IOException
*/
private static void readOperator(Selector selector,SelectionKey sk) throws IOException {
// 1、从选择键SelectionKey获取已经就绪的客户端通道
SocketChannel socketChannel = (SocketChannel) sk.channel();
// 2、创建Buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3、循环读取客户端消息
String message = "";
while(socketChannel.read(buf) > 0){
// 切换buf读写模式
// 调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值:即readLength
buf.flip();
message += Charset.forName("utf8").decode(buf);
}
// 4、把通道再次注册到选择器,监听可读状态
socketChannel.register(selector,SelectionKey.OP_READ);
// 5、把客户端消息广播到其它客户端
if(message.length() > 0){
System.out.println(message);
castOtherClient(message,selector,socketChannel);
}
}
/**
* 给其它客户端广播消息
* @param message 消息
* @param selector 选择器
* @param sc 自已的通道
* @throws IOException
*/
private static void castOtherClient(String message,Selector selector,SocketChannel sc) throws IOException {
// 1、获取所有已经接入的通道的选择键
Set<SelectionKey> keys = selector.keys();
// 2、循环遍历:找出除了自已之外的其它客户端通道,并发送消息
for (SelectionKey key : keys) {
//System.out.println(key);
// 获取当前选择键的通道
Channel targetChannel = key.channel();
// 向除了自已之外的其它客户端通道发送消息
if(targetChannel instanceof SocketChannel && targetChannel != sc){
// 发送消息
((SocketChannel)targetChannel).write(Charset.forName("utf8").encode(message));
}
}
}
}
4.2 客户端
- ChatClient.java
Scanner 会阻塞等待
public class ChatClient {
/**
* 启动方法
*/
public void startClient(String name) {
System.out.print(name + ",你好,");
SocketChannel sc = null;
try {
// 1、创建选择器
Selector selector = Selector.open();
// 2、创建客户端通道,连接服务端
sc = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8000));
// 3、设置为非阻塞模式
sc.configureBlocking(false);
// 4、把通道注册到选择器
sc.register(selector, SelectionKey.OP_READ);
// 5、创建新线程,接收消息
new Thread(new ClientThread(selector)).start();
// 6、向服务端发送消息
Scanner scanner = new Scanner(System.in);
String msg = "";
while (scanner.hasNextLine()) {
msg = scanner.nextLine();
if (msg.length() > 0) {
sc.write(Charset.forName("utf8").encode(name + ":" + msg));
}
}
} catch (IOException e) {
//e.printStackTrace();
} finally {
// 关闭
try {
if (sc != null) {
sc.close();
}
System.out.println("客户端已关闭!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- ClientThread.java
public class ClientThread implements Runnable{
/**
* 选择器
*/
private Selector selector;
/**
* 构造器
* @param selector
*/
public ClientThread(Selector selector){
this.selector = selector;
}
@Override
public void run() {
try{
while(selector.select() > 0){
// 得到选择键集合,并遍历
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey sk : selectionKeys) {
// 可读状态:表示可以读取服务器端发送的数据了
if(sk.isReadable()){
// 处理可读时的操作
readOperator(selector,sk);
}
}
// 清理选择键集合:为下次轮询查询做准备
selectionKeys.clear();
}
}catch (IOException e){
e.printStackTrace();
}
}
/**
* 可读状态时的处理操作
* @param selector 选择器
* @param sk 选择键
* @throws IOException
*/
private void readOperator(Selector selector,SelectionKey sk) throws IOException {
// 1、从选择键SelectionKey获取已经就绪的客户端通道
SocketChannel sc = (SocketChannel) sk.channel();
// 2、创建Buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3、循环读取客户端消息
String message = "";
while(sc.read(buf) > 0){
// 切换buf读写模式
// 调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值:即readLength
buf.flip();
message += Charset.forName("utf8").decode(buf);
}
// 4、把通道再次注册到选择器,监听可读状态。好像不用再次注册
sc.register(selector,SelectionKey.OP_READ);
// 5、输出消息
if(message.length() > 0){
System.out.println(message);
}
}
}
-
模拟聊天用户,每个用户是独立的;startClient() 不能用静态方法,否则将共用客户端通道
-
用户A
public class AClient { public static void main(String[] args) { new ChatClient().startClient("lucy"); } }
-
用户B
public class BClient { public static void main(String[] args) { new ChatClient().startClient("mack"); } }
-