JAVA异步的TCP 通讯-服务端

时间:2025-02-07 08:44:15

一、服务端代码示例

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AdvancedAsyncTCPServer {
    private static final int PORT = 8888;
    private static final int BUFFER_SIZE = 1024;
    private final AsynchronousServerSocketChannel serverSocketChannel;
    private final ExecutorService threadPool;

    public AdvancedAsyncTCPServer() throws IOException {
        // 创建异步服务器套接字通道
        serverSocketChannel = AsynchronousServerSocketChannel.open();
        // 绑定到指定端口
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        // 创建一个固定大小的线程池,用于处理业务逻辑
        threadPool = Executors.newFixedThreadPool(10);
        System.out.println("Server started on port " + PORT);
    }

    public void start() {
        // 开始接受客户端连接
        acceptConnections();
    }

    private void acceptConnections() {
        // 异步接受客户端连接
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
                // 继续接受下一个连接
                acceptConnections();
                // 处理新连接
                handleConnection(clientChannel);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                System.err.println("Failed to accept connection: " + exc.getMessage());
            }
        });
    }

    private void handleConnection(AsynchronousSocketChannel clientChannel) {
        // 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        // 异步读取客户端数据
        clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer bytesRead, ByteBuffer buffer) {
                if (bytesRead > 0) {
                    buffer.flip();
                    byte[] data = new byte[buffer.remaining()];
                    buffer.get(data);
                    String message = new String(data);
                    System.out.println("Received message from client: " + message);

                    // 使用线程池处理业务逻辑
                    threadPool.submit(() -> {
                        try {
                            // 模拟业务处理
                            String responseMessage = "Server processed: " + message;
                            ByteBuffer responseBuffer = ByteBuffer.wrap(responseMessage.getBytes());
                            // 异步发送响应给客户端
                            clientChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                                @Override
                                public void completed(Integer bytesWritten, ByteBuffer buffer) {
                                    System.out.println("Response sent to client");
                                    try {
                                        // 继续读取客户端数据
                                        buffer.clear();
                                        clientChannel.read(buffer, buffer, this);
                                    } catch (Exception e) {
                                        closeChannel(clientChannel);
                                    }
                                }

                                @Override
                                public void failed(Throwable exc, ByteBuffer buffer) {
                                    System.err.println("Failed to send response: " + exc.getMessage());
                                    closeChannel(clientChannel);
                                }
                            });
                        } catch (Exception e) {
                            closeChannel(clientChannel);
                        }
                    });
                } else if (bytesRead == -1) {
                    // 客户端关闭连接
                    closeChannel(clientChannel);
                } else {
                    // 继续读取客户端数据
                    buffer.clear();
                    clientChannel.read(buffer, buffer, this);
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer buffer) {
                System.err.println("Failed to read data: " + exc.getMessage());
                closeChannel(clientChannel);
            }
        });
    }

    private void closeChannel(AsynchronousSocketChannel channel) {
        try {
            System.out.println("Closing client connection");
            channel.close();
        } catch (IOException e) {
            System.err.println("Error closing channel: " + e.getMessage());
        }
    }

    public void stop() {
        try {
            // 关闭服务器套接字通道
            serverSocketChannel.close();
            // 关闭线程池
            threadPool.shutdown();
        } catch (IOException e) {
            System.err.println("Error stopping server: " + e.getMessage());
        }
    }

    public static void main(String[] args) {
        try {
            AdvancedAsyncTCPServer server = new AdvancedAsyncTCPServer();
            server.start();
        } catch (IOException e) {
            System.err.println("Error starting server: " + e.getMessage());
        }
    }
}

二、代码分析

AdvancedAsyncTCPServer 类

  1. 构造函数:创建 AsynchronousServerSocketChannel 并绑定到指定端口,同时创建一个固定大小的线程池用于处理业务逻辑。

  2. start() 方法:开始接受客户端连接。
  3. acceptConnections() 方法:异步接受客户端连接,使用 CompletionHandler 处理连接结果。
  4. handleConnection() 方法:处理新连接,异步读取客户端数据,并使用线程池处理业务逻辑。
  5. closeChannel() 方法:关闭客户端通道。
  6. stop() 方法:关闭服务器套接字通道和线程池。

CompletionHandler

  1. 用于处理异步操作的完成结果,包括连接、读取和写入操作。
  2. 在 completed() 方法中处理成功的操作,在 failed() 方法中处理失败的操作。

线程池

  1. 使用 Executors.newFixedThreadPool(10) 创建一个固定大小的线程池,用于处理业务逻辑,避免阻塞 I/O 操作。

三、优点

  • 异步 I/O:使用 Java NIO 2 的异步 I/O 功能,提高了服务器的并发处理能力。
  • 线程池:使用线程池处理业务逻辑,避免了创建过多线程导致的性能问题。
  • 异常处理:对各种异常情况进行了处理,提高了代码的健壮性。
  • 资源管理:在关闭服务器时,正确关闭服务器套接字通道和线程池,避免资源泄漏。