NIO 多线程处理异步队列

时间:2025-03-21 22:39:58

本文使用java的NIO简单实现server-client模式,处理异步队列。

缓存队列类

public class Buffer {
    private static Queue<Object> queue = new LinkedList<Object>();

    private static int INITSIZE = 2;

    private Lock mutex;

    private Condition condition;

    private Buffer(){
        mutex = new ReentrantLock();
        condition = ();
    }

    public static Buffer getIntance(){
        return ;
    }

    static class QueueBuffer{
        private static Buffer instance = new Buffer();
    }

    public void setInitSize(int size){
        INITSIZE = size;
    }

    public void produce(String msg){
        mutex.lock();
        try {
            while(() >= INITSIZE ){
                System.out.println("queue wait to consume");
                condition.await();
            }

            (msg);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            ();
        } finally {
            ();
            ();
        }

    }

    public Object consume(){
        mutex.lock();
        try {
            while (() == 0) {
                System.out.println("queue wait to produce");
                condition.await();
            }

            return ();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            ();
            return null;
        } finally {
            ();
            ();
        }
    }

    public int getQueueSize(){
        return ();
    }
}

任务类

public class Task implements Runnable{

    private static ConcurrentLinkedQueue<SelectionKey> clq = new ConcurrentLinkedQueue<>();

    private NIOServer server;

    public Task(NIOServer server){
        this.server = server;
    }

    public static void push(SelectionKey key){
        (key);
    }

    @Override
    public void run() {
        try {
                SelectionKey key = ();
                (key);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            ();
        }
    }

}

服务端类

public class NIOServer {
    private ServerSocketChannel serverSocketChannel;

    int port = 8080;

    private Selector selector;

    ByteBuffer recBuffer = (1024);

    ByteBuffer sendBuffer = (1024);

    Map<SelectionKey, String> clientMsgs = new HashMap<>();

    private static int client_no = 0;

    ExecutorService serviceExecutor = (10);

    Task task = null;

    public NIOServer(int port) throws IOException {
        this.port = port;
        serverSocketChannel = ();
        ().bind(new InetSocketAddress(this.port));
        (false);
        selector = ();
        (selector, SelectionKey.OP_ACCEPT);
        System.out.println("init finish");
        task = new Task(this);
    }

    public void listen() throws IOException {
        while (true) {
            System.out.println("server scanning");
            int event = selector.select();
            if (event > 0) {
                Set<SelectionKey> keys = ();
                Iterator<SelectionKey> iterator = ();
                while (()) {
                    SelectionKey key = ();
                    ();
                    process(key);
                }
            }
        }
    }

    private void process(SelectionKey selectionKey) {
        SocketChannel client = null;
        try {
            if (() && ()) {
                client = ();
                ++client_no;
                (false);
                (selector, SelectionKey.OP_READ);
            } else if (() && ()) {
                readHandle(selectionKey);
            } else if (() && ()) {
                if ((selectionKey)) {
                    writeMsgHanle(selectionKey);
                }
            }
        } catch (Exception e) {
            ();
            try {
                ().close();
            } catch (IOException e1) {
                // TODO Auto-generated catch block
                ();
            }

        }

    }

    private void readHandle(SelectionKey selectionKey) throws IOException, InterruptedException {
        SocketChannel client;
        client = (SocketChannel) ();
        int len = 0;
        while ((len = (recBuffer)) > 0) {
            ();
            String sb = new String((), 0, len);
            (selectionKey, ());
            System.out.println("Thread:" + ().getId() + ",NO." + client_no + " client send msg:"
                    + ());
            ();

        }
        (selectionKey);
        (task);
    }

    public void msgHandle(SelectionKey key) throws IOException {
        String msg = clientMsgs.get(key);
        String serverMsg = null;
        if (msg != null && !"".equals(msg)) {
            if (("add")) {
                msg = (("add") + 4);
                ().produce(msg);
                serverMsg = "server:add " + msg + " to queue successfully";
            } else if (("poll")) {
                String consumeMsg = (String) ().consume();
                serverMsg = "server:remove " + consumeMsg + " from queue successfully";
            } else if (("size")) {
                serverMsg = "server:size is " + ().getQueueSize();
            } else {
                serverMsg = "server:no such command";
            }
        } else {
            serverMsg = "server:blank message";
        }
        (key,serverMsg);
        writeMsgHanle(key);
    }

    private void writeMsgHanle(SelectionKey key) throws IOException {
        String msg = clientMsgs.get(key);
        SocketChannel client;
        ();
        (());
        client = (SocketChannel) ();
        ();// write in range of text length
        while (()) {
            (sendBuffer);
        }
        (SelectionKey.OP_READ);
    }

    public static void main(String[] args) throws IOException {
        new NIOServer(8080).listen();
    }
}

客户端类

public class NIOClient {
    private SocketChannel client;

    private Selector selector;

    private ByteBuffer sendBuffer = (1024);

    private ByteBuffer receiveBuffer = (1024);

    public NIOClient(String ip, int port) throws IOException{
        client = ();
        selector = ();
        (false);
        (selector, SelectionKey.OP_READ);
        (new InetSocketAddress(ip, port));
        while (!()) {
            System.out.println("finish connect");
        }
    }

    public void run(){
        while(true){
            try {
                while (!writeHandle()) {
                    System.out.println("please input a not empty command");
                }
                int event = selector.select();
                if (event > 0) {
                    Set<SelectionKey> keys = ();
                    Iterator<SelectionKey> iterator = ();
                    while (()) {
                        SelectionKey key = ();
                        if (() && ()) {
                            readHandle(key);
                        }
                        ();
                    }
                }
            } catch (Exception e) {
                ();
                if (()) {
                    try {
                        ();
                    } catch (IOException e1) {
                        ();
                    }
                }
            }
        }
    }

    private void readHandle(SelectionKey key) throws IOException {
        SocketChannel scChannel = null;
        scChannel = (SocketChannel) ();
        ();
        int len = 0;
        if ((len = (receiveBuffer)) > 0) {
            ();
            String msg = new String((),0,len);
            System.out.println(msg);
        }
    }

    public boolean writeHandle() throws IOException{
        Scanner scanner = new Scanner(System.in);
        String line = ();
        if (line != null && () > 0) {
            ();
            (());
            ();
            while (()) {
                (sendBuffer);
            }
            return true;
        }else{
            return false;
        }
    }

    public static void main(String[] args) throws IOException {
        new NIOClient("127.0.0.1", 8080).run();
    }
}