Java使用NIO包实现Socket通信的实例代码

时间:2022-05-02 02:13:05

前面几篇文章介绍了使用java.io和java.net类库实现的Socket通信,下面介绍一下使用java.nio类库实现的Socket。

java.nio包是Java在1.4之后增加的,用来提高I/O操作的效率。在nio包中主要包括以下几个类或接口:

  •  Buffer:缓冲区,用来临时存放输入或输出数据。
  •  Charset:用来把Unicode字符编码和其它字符编码互转。
  •  Channel:数据传输通道,用来把Buffer中的数据写入到数据源,或者把数据源中的数据读入到Buffer。
  •  Selector:用来支持异步I/O操作,也叫非阻塞I/O操作。

nio包中主要通过下面两个方面来提高I/O操作效率:

  •  通过Buffer和Channel来提高I/O操作的速度。
  •  通过Selector来支持非阻塞I/O操作。

下面来看一下程序中是怎么通过这些类库实现Socket功能。

首先介绍一下几个辅助类

辅助类SerializableUtil,这个类用来把java对象序列化成字节数组,或者把字节数组反序列化成java对象。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.googlecode.garbagecan.test.socket;
 
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
 
public class SerializableUtil {
   
  public static byte[] toBytes(Object object) {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ObjectOutputStream oos = null;
    try {
      oos = new ObjectOutputStream(baos);
      oos.writeObject(object);
      byte[] bytes = baos.toByteArray();
      return bytes;
    } catch(IOException ex) {
      throw new RuntimeException(ex.getMessage(), ex);
    } finally {
      try {
        oos.close();
      } catch (Exception e) {}
    }
  }
   
  public static Object toObject(byte[] bytes) {
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    ObjectInputStream ois = null;
    try {
      ois = new ObjectInputStream(bais);
      Object object = ois.readObject();
      return object;
    } catch(IOException ex) {
      throw new RuntimeException(ex.getMessage(), ex);
    } catch(ClassNotFoundException ex) {
      throw new RuntimeException(ex.getMessage(), ex);
    } finally {
      try {
        ois.close();
      } catch (Exception e) {}
    }
  }
}

辅助类MyRequestObject和MyResponseObject,这两个类是普通的java对象,实现了Serializable接口。MyRequestObject类是Client发出的请求,MyResponseObject是Server端作出的响应。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.googlecode.garbagecan.test.socket.nio;
 
import java.io.Serializable;
 
public class MyRequestObject implements Serializable {
 
  private static final long serialVersionUID = 1L;
 
  private String name;
   
  private String value;
 
  private byte[] bytes;
   
  public MyRequestObject(String name, String value) {
    this.name = name;
    this.value = value;
    this.bytes = new byte[1024];
  }
   
  public String getName() {
    return name;
  }
 
  public void setName(String name) {
    this.name = name;
  }
 
  public String getValue() {
    return value;
  }
 
  public void setValue(String value) {
    this.value = value;
  }
   
  @Override
  public String toString() {
    StringBuffer sb = new StringBuffer();
    sb.append("Request [name: " + name + ", value: " + value + ", bytes: " + bytes.length+ "]");
    return sb.toString();
  }
}
 
package com.googlecode.garbagecan.test.socket.nio;
 
import java.io.Serializable;
 
public class MyResponseObject implements Serializable {
 
  private static final long serialVersionUID = 1L;
 
  private String name;
   
  private String value;
 
  private byte[] bytes;
   
  public MyResponseObject(String name, String value) {
    this.name = name;
    this.value = value;
    this.bytes = new byte[1024];
  }
   
  public String getName() {
    return name;
  }
 
  public void setName(String name) {
    this.name = name;
  }
 
  public String getValue() {
    return value;
  }
 
  public void setValue(String value) {
    this.value = value;
  }
   
  @Override
  public String toString() {
    StringBuffer sb = new StringBuffer();
    sb.append("Response [name: " + name + ", value: " + value + ", bytes: " + bytes.length+ "]");
    return sb.toString();
  }
}

下面主要看一下Server端的代码,其中有一些英文注释对理解代码很有帮助,注释主要是来源jdk的文档和例子,这里就没有再翻译

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package com.googlecode.garbagecan.test.socket.nio;
 
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
 
import com.googlecode.garbagecan.test.socket.SerializableUtil;
 
public class MyServer3 {
 
  private final static Logger logger = Logger.getLogger(MyServer3.class.getName());
   
  public static void main(String[] args) {
    Selector selector = null;
    ServerSocketChannel serverSocketChannel = null;
     
    try {
      // Selector for incoming time requests
      selector = Selector.open();
 
      // Create a new server socket and set to non blocking mode
      serverSocketChannel = ServerSocketChannel.open();
      serverSocketChannel.configureBlocking(false);
       
      // Bind the server socket to the local host and port
      serverSocketChannel.socket().setReuseAddress(true);
      serverSocketChannel.socket().bind(new InetSocketAddress(10000));
       
      // Register accepts on the server socket with the selector. This
      // step tells the selector that the socket wants to be put on the
      // ready list when accept operations occur, so allowing multiplexed
      // non-blocking I/O to take place.
      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
   
      // Here's where everything happens. The select method will
      // return when any operations registered above have occurred, the
      // thread has been interrupted, etc.
      while (selector.select() > 0) {
        // Someone is ready for I/O, get the ready keys
        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
   
        // Walk through the ready keys collection and process date requests.
        while (it.hasNext()) {
          SelectionKey readyKey = it.next();
          it.remove();
           
          // The key indexes into the selector so you
          // can retrieve the socket that's ready for I/O
          execute((ServerSocketChannel) readyKey.channel());
        }
      }
    } catch (ClosedChannelException ex) {
      logger.log(Level.SEVERE, null, ex);
    } catch (IOException ex) {
      logger.log(Level.SEVERE, null, ex);
    } finally {
      try {
        selector.close();
      } catch(Exception ex) {}
      try {
        serverSocketChannel.close();
      } catch(Exception ex) {}
    }
  }
 
  private static void execute(ServerSocketChannel serverSocketChannel) throws IOException {
    SocketChannel socketChannel = null;
    try {
      socketChannel = serverSocketChannel.accept();
      MyRequestObject myRequestObject = receiveData(socketChannel);
      logger.log(Level.INFO, myRequestObject.toString());
       
      MyResponseObject myResponseObject = new MyResponseObject(
          "response for " + myRequestObject.getName(), 
          "response for " + myRequestObject.getValue());
      sendData(socketChannel, myResponseObject);
      logger.log(Level.INFO, myResponseObject.toString());
    } finally {
      try {
        socketChannel.close();
      } catch(Exception ex) {}
    }
  }
   
  private static MyRequestObject receiveData(SocketChannel socketChannel) throws IOException {
    MyRequestObject myRequestObject = null;
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
     
    try {
      byte[] bytes;
      int size = 0;
      while ((size = socketChannel.read(buffer)) >= 0) {
        buffer.flip();
        bytes = new byte[size];
        buffer.get(bytes);
        baos.write(bytes);
        buffer.clear();
      }
      bytes = baos.toByteArray();
      Object obj = SerializableUtil.toObject(bytes);
      myRequestObject = (MyRequestObject)obj;
    } finally {
      try {
        baos.close();
      } catch(Exception ex) {}
    }
    return myRequestObject;
  }
 
  private static void sendData(SocketChannel socketChannel, MyResponseObject myResponseObject) throws IOException {
    byte[] bytes = SerializableUtil.toBytes(myResponseObject);
    ByteBuffer buffer = ByteBuffer.wrap(bytes);
    socketChannel.write(buffer);
  }
}

下面是Client的代码,代码比较简单就是启动了100个线程来访问Server

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.googlecode.garbagecan.test.socket.nio;
 
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
 
import com.googlecode.garbagecan.test.socket.SerializableUtil;
 
public class MyClient3 {
 
  private final static Logger logger = Logger.getLogger(MyClient3.class.getName());
   
  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 100; i++) {
      final int idx = i;
      new Thread(new MyRunnable(idx)).start();
    }
  }
   
  private static final class MyRunnable implements Runnable {
     
    private final int idx;
 
    private MyRunnable(int idx) {
      this.idx = idx;
    }
 
    public void run() {
      SocketChannel socketChannel = null;
      try {
        socketChannel = SocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 10000);
        socketChannel.connect(socketAddress);
 
        MyRequestObject myRequestObject = new MyRequestObject("request_" + idx, "request_" + idx);
        logger.log(Level.INFO, myRequestObject.toString());
        sendData(socketChannel, myRequestObject);
         
        MyResponseObject myResponseObject = receiveData(socketChannel);
        logger.log(Level.INFO, myResponseObject.toString());
      } catch (Exception ex) {
        logger.log(Level.SEVERE, null, ex);
      } finally {
        try {
          socketChannel.close();
        } catch(Exception ex) {}
      }
    }
 
    private void sendData(SocketChannel socketChannel, MyRequestObject myRequestObject) throws IOException {
      byte[] bytes = SerializableUtil.toBytes(myRequestObject);
      ByteBuffer buffer = ByteBuffer.wrap(bytes);
      socketChannel.write(buffer);
      socketChannel.socket().shutdownOutput();
    }
 
    private MyResponseObject receiveData(SocketChannel socketChannel) throws IOException {
      MyResponseObject myResponseObject = null;
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
       
      try {
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
        byte[] bytes;
        int count = 0;
        while ((count = socketChannel.read(buffer)) >= 0) {
          buffer.flip();
          bytes = new byte[count];
          buffer.get(bytes);
          baos.write(bytes);
          buffer.clear();
        }
        bytes = baos.toByteArray();
        Object obj = SerializableUtil.toObject(bytes);
        myResponseObject = (MyResponseObject) obj;
        socketChannel.socket().shutdownInput();
      } finally {
        try {
          baos.close();
        } catch(Exception ex) {}
      }
      return myResponseObject;
    }
  }
}

最后测试上面的代码,首先运行Server类,然后运行Client类,就可以分别在Server端和Client端控制台看到发送或接收到的MyRequestObject或MyResponseObject对象了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:http://blog.csdn.net/kongxx/article/details/7288896