实现socketChannel 通信例子(Demo)

时间:2021-03-25 16:50:52

前言:本篇文章并没有深刻介绍nio相关的npi或者详细的使用说明,适合刚刚了解nio并且没写过例子的同学查看。如有错误,还请多多指正。

想入门学习nio技术的话请戳这里 并发编程网nio

最近看了下nio相关的知识,但还是有很多不明白的地方。写了一个小小的demo,可以实现nio通信。

SocketChannel通信代码

注: 以下代码改编自网络

service

public class MyService {
    public static Selector selector = null;

    public static void main(String[] args) {
        MyService.init();
    }

    public static void init() {
        initSelector();// 初始化selector
        initServerSocketChannel(); // 初始化serverSocketChannel
        run();
    }

    // first
    public static void initSelector() {
        try {
            selector = Selector.open();// 打开selector
        } catch (IOException e) {
            // 初始化selector失败
            e.printStackTrace();
        }
    }

    public static void initServerSocketChannel() {
        try {
            ServerSocketChannel server = ServerSocketChannel.open();
            server.socket().bind(new InetSocketAddress(7777), 1024);
            server.configureBlocking(false);
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            // 初始化serverSocket失败
            e.printStackTrace();
        }
    }

    public static void run() {
        while (true) {
            try {
                selector.select(1000); // 阻塞selector
                // ================如果有新连接
                Set<SelectionKey> selectedKeys = selector.selectedKeys();// 获得事件集合;
                // ================遍历selectedKeys
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                SelectionKey key = null;
                while (iterator.hasNext()) {
                    key = iterator.next();// 获得到当前的事件
                    // ===============处理事件
                    handle(key);
                    // ===============
                    iterator.remove(); // 移除事件
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // 初始化seclector和serverSocket
    // 当一个selector上有新的事件有反应后 select();
    // 获得获得事件集合
    // 遍历集合事件
    // 处理事件
    public static void handle(SelectionKey key) {
        try {
            // 连接就绪
            if (key.isAcceptable()) {
                handleAcceptable(key);
            }
            // 读就绪
            if (key.isReadable()) {
                handelReadable(key);
            }
        } catch (IOException e) {
            key.cancel();
            if (key.channel() != null) {
                try {
                    key.channel().close();
                } catch (IOException e1) {
                }
            }
        }
    }

    // 处理读事件
    public static void handelReadable(SelectionKey key) throws IOException {
        // ==================我们要将数据从通道读到buffer里
        SocketChannel ssc = (SocketChannel) key.channel(); // TODO:
                                                            // 为什么这里是socketChannel
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        String content = "服务端返回了: ";
        int readBytes = ssc.read(byteBuffer);// channel ==> buffer
        if (readBytes > 0) {// 代表读完毕了,准备写(即打印出来)
            byteBuffer.flip(); // 为write()准备
            // =====取出buffer里的数据
            byte[] bytes = new byte[byteBuffer.remaining()]; // 创建字节数组
            byteBuffer.get(bytes);// 将数据取出放到字节数组里
            content += new String(bytes);
            content += "__________";
            doWrite(ssc, content);
        }
    }

    // 处理连接事件
    public static void handleAcceptable(SelectionKey key) throws IOException {
        // 获得对应的ServerSocketChannel TODO: 这里为什么是socketChannel
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        // 得到对应的SocketChannel TODO:accpet是什么意思
        SocketChannel channel = ssc.accept();// 在非阻塞模式下,accept()可能为null
        // 处理socketChannel
        channel.configureBlocking(false); // TODO: 为什么设置非阻塞
        channel.register(selector, SelectionKey.OP_READ); // TODO: 将准备状态转化为读状态

        // 将key对应Channel设置为准备接受其他请求
        key.interestOps(SelectionKey.OP_ACCEPT);// TODO:
    }

    // ============= 发送消息
    private static void doWrite(SocketChannel sc, String data) throws IOException {
        byte[] req = data.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
        byteBuffer.put(req);
        byteBuffer.flip();
        sc.write(byteBuffer);
        if (!byteBuffer.hasRemaining()) {
            System.out.println(data + " Send 2 Service successed");
        }
    }
}

client

public class MyClient {

    public static void main(String[] args) {
        MyClient.init();
    }

// public ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(8);
// static Charset charset = Charset.forName("UTF-8");
    private static Selector selector = null;
    private volatile static boolean stop = false;
    private static SocketChannel channel = null;

    public static void init() {
        initSelector();// 初始化selector
        initSocketChannel(); // 初始化serverSocketChannel
        run();
    }

    // 初始化selector
    public static void initSelector() {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    // 初始化SocketChannel
    public static void initSocketChannel() {
        try {
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            channel.connect(new InetSocketAddress("127.0.0.1", 7777));
            channel.register(selector, SelectionKey.OP_CONNECT);
        } catch (ClosedChannelException e) {
            System.out.println("client: 失去主机连接");
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void run() {
        try {
            while (!stop) {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    handle(key);
                    iterator.remove();
                }

            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static void handle(SelectionKey key) throws IOException {
        // 连接就绪
        try {
            if (key.isConnectable()) {
                handleConnectable(key);
            }
            // 读就绪
            if (key.isReadable()) {
                handelReadable(key);
            }
        } catch (Exception e) {
            key.cancel();
            if (key.channel() != null) {
                try {
                    key.channel().close();
                } catch (IOException e1) {
                }
            }
        }
    }

    private static void handelReadable(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int temp = sc.read(buffer); // 从channel读到buffer
        String content = "来自服务端的: ";
        if (temp > 0) {// 代表读完毕了,准备写(即打印出来)
            buffer.flip(); // 为write()准备
            // =====取出buffer里的数据
            byte[] bytes = new byte[buffer.remaining()]; // 创建字节数组
            buffer.get(bytes);// 将数据取出放到字节数组里
            content += new String(bytes);
            content += "============";
            System.out.println(content);
            // doWrite(sc, content);
        }
        // key.interestOps(SelectionKey.OP_READ);// TODO:
    }

    private static void handleConnectable(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        if (sc.finishConnect()) {
            // 将关注的事件变成read
            sc.register(selector, SelectionKey.OP_READ);
            doWrite(sc, "dddddd");
        }
    }

    private static void doWrite(SocketChannel sc, String data) throws IOException {
        byte[] req = data.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.wrap(req);
        byteBuffer.put(req);
        byteBuffer.flip();
        sc.write(byteBuffer);
        // if (!byteBuffer.hasRemaining()) {
        // System.out.println("Send successed : " + data);
        // }
    }
}

总结:
1. byteBuffer 用于数据的读取。从channel中获得是读,放到channel里是写。
大概是这种感觉 buffer —->channel——>buffer
2. socketChannel通信里有很多代码的格式是重复的。 将相同部分抽象之后,还是很有规律的。
- 初始化函数: 初始化 selector 和 ServerSocketChannel(或SocketChannel)
- 运行函数:死循环,用途在于等待新的连接,并且收到新的连接之后,调用总体处理函数。处理后将事件移除
- 总体处理函数:判断事件类型,并交给相应的处理函数处理,并且处理异常,在异常里取消key,并把channel关闭
- 工具函数:即将String变成byte构建buffer,再写入到channel里。