NIO学习总结(二)——Selector、FileLock、Path、Files、聊天室实现-四、多人聊天室

时间:2024-03-15 16:34:32

image-20240310145517265

image-20240310150125897

4.1 服务端

超过 timeout 毫秒没有连接,关闭服务端

public class ChatServer {

    public static void main(String[] args) throws IOException {
        new ChatServer().startServer();
    }

    /**
     * 服务器端启动的方法
     */
    public void startServer() throws IOException {
        // 1、创建Selector选择器
        Selector selector = Selector.open();
        // 2、创建服务端通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 3、为channel通道绑定端口、设置为非阻塞模式
        serverSocketChannel.bind(new InetSocketAddress(8000));
        serverSocketChannel.configureBlocking(false);

        // 4、把channel通道注册到选择器
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务器已启动成功!");

        // 5、循环查询就绪状态
        while(selector.select() > 0){
            // 6、得到选择键集合,并遍历
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            for (SelectionKey sk : selectionKeys) {
                // 6.1、可接收状态:表示服务端已做好准备,可以接收客户的连接了
                if(sk.isAcceptable()){
                    // 处理可接收时的操作
                    acceptOperator(serverSocketChannel,selector);
                }
                // 6.2、可读状态:表示客户端已发送完毕,可以在服务端读取数据了
                if(sk.isReadable()){
                    // 处理可读时的操作
                    readOperator(selector,sk);
                }
            }
            // 清理选择键集合:为下次轮询查询做准备
            selectionKeys.clear();
        }
    }

    /**
     * 可接收状态时的处理操作
     * @param serverSocketChannel 服务端通道
     * @param selector 选择器
     * @throws IOException
     */
    private static void acceptOperator(ServerSocketChannel serverSocketChannel,Selector selector) throws IOException {
        // 1、获取客户端通道 SocketChannel
        SocketChannel sc = serverSocketChannel.accept();

        // 2、设置为非阻塞模式
        sc.configureBlocking(false);

        // 3、把通道注册到选择器上,监听可读状态
        sc.register(selector,SelectionKey.OP_READ);

        // 4、回复客户端
        ByteBuffer replyStr = Charset.forName("utf8").encode("欢迎进入聊天室!");
        sc.write(replyStr);
    }

    /**
     * 可读状态时的处理操作
     * @param selector 选择器
     * @param sk 选择键
     * @throws IOException
     */
    private static void readOperator(Selector selector,SelectionKey sk) throws IOException {
        // 1、从选择键SelectionKey获取已经就绪的客户端通道
        SocketChannel socketChannel = (SocketChannel) sk.channel();

        // 2、创建Buffer
        ByteBuffer buf = ByteBuffer.allocate(1024);

        // 3、循环读取客户端消息
        String message = "";
        while(socketChannel.read(buf) > 0){
            // 切换buf读写模式
            // 调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值:即readLength
            buf.flip();
            message += Charset.forName("utf8").decode(buf);
        }

        // 4、把通道再次注册到选择器,监听可读状态
        socketChannel.register(selector,SelectionKey.OP_READ);

        // 5、把客户端消息广播到其它客户端
        if(message.length() > 0){
            System.out.println(message);
            castOtherClient(message,selector,socketChannel);
        }
    }

    /**
     * 给其它客户端广播消息
     * @param message 消息
     * @param selector 选择器
     * @param sc 自已的通道
     * @throws IOException
     */
    private static void castOtherClient(String message,Selector selector,SocketChannel sc) throws IOException {
        // 1、获取所有已经接入的通道的选择键
        Set<SelectionKey> keys = selector.keys();

        // 2、循环遍历:找出除了自已之外的其它客户端通道,并发送消息
        for (SelectionKey key : keys) {
            //System.out.println(key);
            // 获取当前选择键的通道
            Channel targetChannel = key.channel();
            // 向除了自已之外的其它客户端通道发送消息
            if(targetChannel instanceof SocketChannel && targetChannel != sc){
                // 发送消息
                ((SocketChannel)targetChannel).write(Charset.forName("utf8").encode(message));
            }
        }
    }

}

4.2 客户端

  • ChatClient.java

Scanner 会阻塞等待

public class ChatClient {

    /**
     * 启动方法
     */
    public void startClient(String name) {
        System.out.print(name + ",你好,");
        SocketChannel sc = null;
        try {
            // 1、创建选择器
            Selector selector = Selector.open();
            // 2、创建客户端通道,连接服务端
            sc = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8000));
            // 3、设置为非阻塞模式
            sc.configureBlocking(false);
            // 4、把通道注册到选择器
            sc.register(selector, SelectionKey.OP_READ);
            // 5、创建新线程,接收消息
            new Thread(new ClientThread(selector)).start();

            // 6、向服务端发送消息
            Scanner scanner = new Scanner(System.in);
            String msg = "";
            while (scanner.hasNextLine()) {
                msg = scanner.nextLine();
                if (msg.length() > 0) {
                    sc.write(Charset.forName("utf8").encode(name + ":" + msg));
                }
            }
        } catch (IOException e) {
            //e.printStackTrace();
        } finally {
            // 关闭
            try {
                if (sc != null) {
                    sc.close();
                }
                System.out.println("客户端已关闭!");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  • ClientThread.java
public class ClientThread implements Runnable{

    /**
     * 选择器
     */
    private Selector selector;

    /**
     * 构造器
     * @param selector
     */
    public ClientThread(Selector selector){
        this.selector = selector;
    }
    @Override
    public void run() {
        try{
            while(selector.select() > 0){
                // 得到选择键集合,并遍历
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey sk : selectionKeys) {
                    // 可读状态:表示可以读取服务器端发送的数据了
                    if(sk.isReadable()){
                        // 处理可读时的操作
                        readOperator(selector,sk);
                    }
                }
                // 清理选择键集合:为下次轮询查询做准备
                selectionKeys.clear();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    /**
     * 可读状态时的处理操作
     * @param selector 选择器
     * @param sk 选择键
     * @throws IOException
     */
    private void readOperator(Selector selector,SelectionKey sk) throws IOException {
        // 1、从选择键SelectionKey获取已经就绪的客户端通道
        SocketChannel sc = (SocketChannel) sk.channel();

        // 2、创建Buffer
        ByteBuffer buf = ByteBuffer.allocate(1024);

        // 3、循环读取客户端消息
        String message = "";
        while(sc.read(buf) > 0){
            // 切换buf读写模式
            // 调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值:即readLength
            buf.flip();
            message += Charset.forName("utf8").decode(buf);
        }

        // 4、把通道再次注册到选择器,监听可读状态。好像不用再次注册
        sc.register(selector,SelectionKey.OP_READ);

        // 5、输出消息
        if(message.length() > 0){
            System.out.println(message);
        }
    }
}
  • 模拟聊天用户,每个用户是独立的;startClient() 不能用静态方法,否则将共用客户端通道

    • 用户A

      public class AClient {
      
          public static void main(String[] args) {
              new ChatClient().startClient("lucy");
          }
      
      }
      
    • 用户B

      public class BClient {
      
          public static void main(String[] args) {
              new ChatClient().startClient("mack");
          }
      }