NIO.2 入门,第 1 部分: 异步通道 API

时间:2022-08-27 15:32:42

异步通道 提供支持连接、读取、以及写入之类非锁定操作的连接,并提供对已启动操作的控制机制。Java 7 中用于 Java Platform(NIO.2)的 More New I/O APIs,通过在 java.nio.channels 包中增加四个异步通道,从而增强了 Java 1.4 中的 New I/O APIs(NIO):

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

这些类在风格上与 NIO 通道 API 很相似。他们共享相同的方法与参数结构体,并且大多数对于 NIO 通道类可用的参数,对于新的异步版本仍然可用。主要区别在于新通道可使一些操作异步执行。

异步通道 API 提供两种对已启动异步操作的监测与控制机制。第一种是通过返回一个 java.util.concurrent.Future 对象来实现,它将会建模一个挂起操作,并可用于查询其状态以及获取结果。第二种是通过传递给操作一个新类的对象,java.nio.channels.CompletionHandler,来完成,它会定义在操作完毕后所执行的处理程序方法。每个异步通道类为每个操作定义 API 副本,这样可采用任一机制。

在本文中,关于 NIO.2 的 两部分系列文章 中的第一部分,介绍了每个通道,并提供一些简单的例子来演示它们的使用方法。这些例子都处于可运行状态(见 下载),您可在 Oracle 以及 IBM®(在本文写作期间,都还处于开发阶段;见 参见资料) 所提供的 Java 7 版中运行这些例子。在 第二部分 中,您将有机会了解 NIO.2 文件系统 API。

异步套接字通道及特性

首先,我们将了解 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel 类。我们将看到的第一个例子将演示如何利用这些新的类来实施简单的客户端/服务器。第一步,我们要设置服务器。

设置服务器

打开 AsychronousServerSocketChannel 并将其绑定到类似于 ServerSocketChannel 的地址:

AsynchronousServerSocketChannel server =
    AsynchronousServerSocketChannel.open().bind(null);

方法 bind() 将一个套接字地址作为其参数。找到空闲端口的便利方法是传递一个 null 地址,它会自动将套接字绑定到本地主机地址,并使用空闲的 临时 端口。

接下来,可以告诉通道接受一个连接:

Future<AsynchronousSocketChannel> acceptFuture = server.accept();

这是与 NIO 的第一个不同之处。接受调用总会立刻返回,并且,—— 不同于 ServerSocketChannel.accept(),它会返回一个SocketChannel —— 它返回一个 Future<AsynchronousSocketChannel> 对象,该对象可在以后用于检索AsynchronousSocketChannel。 Future 对象的通用类型是实际操作的结果。比如,读取或写入操作会因为操作返回读或写的字节数,而返回一个 Future<Integer>

利用 Future 对象,当前线程可阻塞来等待结果:

AsynchronousSocketChannel worker = future.get();

此处,其阻塞超时时间为 10 秒:

AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS);

或者轮询操作的当前状态,还可取消操作:

if (!future.isDone()) {
    future.cancel(true);
}

cancel() 方法可利用一个布尔标志来指出执行接受的线程是否可被中断。这是个很有用的增强;在以前的 Java 版本中,只能通过关闭套接字来中止此类阻塞 I/O 操作。

客户端设置

接下来,要通过打开并连接与服务器之间的 AsynchronousSocketChannel,来设置客户端:

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
    client.connect(server.getLocalAddress()).get();

一旦客户端与服务器建立连接,可通过使用字节缓存的通道来执行读写操作,如清单 1 所示:

清单 1. 使用读写字节缓存
// send a message to the server
ByteBuffer message = ByteBuffer.wrap("ping".getBytes());
client.write(message).get();

// read a message from the client
worker.read(readBuffer).get(10, TimeUnit.SECONDS);
System.out.println("Message: " + new String(readBuffer.array()));

还支持异步地分散读操作与写操作,该操作需要大量字节缓存。

新异步通道的 API 完全从底层套接字中抽取掉:无法直接获取套接字,而以前可以调用 socket() ,例如,SocketChannel。引入了两个新的方法 —— getOption 和 setOption —— 来在异步网络通道中查询并设置套接字选项。例如,可通过channel.getOption(StandardSocketOption.SO_RCVBUF) 而不是 channel.socket().getReceiveBufferSize(); 来检索接收缓存大小。

完成处理程序

使用 Future 对象的替代机制,是向异步操作注册一个 callback 。接口 CompletionHandler 有两个方法:

  • void completed(V result, A attachment) 在任务完成结果中具有类型 V 时执行。
  • void failed(Throwable e, A attachment) 在任务由于 Throwable e 而失败时执行。

两个方法的附件参数都是一个传递到异步操作的对象。如果相同的对象用于多个操作,其可用于追踪哪个操作已完成。

Open 命令

我们来看一个使用 AsynchronousFileChannel 类的例子。可通过将 java.nio.file.Path 对象传递到静态 open() 方法中,来创建一个新的通道:

AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("myfile"));

用于 FileChannel 的新 open 命令

用于异步通道的 open 命令格式已被移植到 FileChannel类。在 NIO 中,通过在FileInputStreamFileOutputStream、或者RandomAccessFile 上调用 getChannel() 来获取FileChannel。借助 NIO.2,可利用 open() 方法来直接创建 FileChannel,此处展示了相关的例子。

Path 是 Java 7 中的新类,可在 第 2 部分 中找到更多细节。可利用 Paths.get(String)实用方法,从代表文件名的 String 中创建 Path

默认情况下,该文件已打开以供读取。open() 方法可利用附加选项来指定如何打开该文件。例如,此调用打开文件以供读取或写入,如果必要将创建该文件,并在通道关闭或者 JVM 终止时尝试删除文件:

fileChannel = AsynchronousFileChannel.open(Paths.get("afile"),
    StandardOpenOption.READ, StandardOpenOption.WRITE,
    StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);

替代方法,open() 提供了对通道的更好的控制,允许设置文件属性。

实现一个完成处理程序

接下来,可将这些写入文件,写入完成后,就可执行一些操作。 首先要构造一个封装了 “ something ” 的 CompletionHandler ,如清单 2 所示:

清单 2. 创建完成处理程序
CompletionHandler<Integer, Object> handler =
    new CompletionHandler<Integer, Object>() {
    @Override
    public void completed(Integer result, Object attachment) {
        System.out.println(attachment + " completed with " + result + " bytes written");
    }
    @Override
    public void failed(Throwable e, Object attachment) {
        System.err.println(attachment + " failed with:");
        e.printStackTrace();
    }
};

现在可以进行写入:

fileChannel.write(ByteBuffer.wrap(bytes), 0, "Write operation 1", handler);

write() 方法采取:

  • 包含要写入内容的 ByteBuffer
  • 文件中的绝对位置
  • 要传递给完成处理程序方法的附件对象
  • 完成处理程序

操作必须给出进行读或写的文件中的绝对位置。文件具有内部位置标记,来指出读/写发生的位置,这样做没有意义,因为在上一个操作完成之前,就可以启动新操作,它们的发生顺序无法得到保证。由于相同的原因,在 AsynchronousFileChannel API 中没有用于设置或查询位置的方法,在 FileChannel 中同样也没有。

除了读写方法之外,还支持异步锁定方法,因此,如果当前有其他线程保持锁定时,可对文件进行执行访问锁定,而不必在当前线程中锁定(或者利用 tryLock 轮询)。

异步通道组

每个异步通道都属于一个通道组,它们共享一个 Java 线程池,该线程池用于完成启动的异步 I/O 操作。这看上去有点像欺骗,因为您可在自己的 Java 线程中执行大多数异步功能,来获得相同的表现,并且,您可能希望能够仅仅利用操作系统的异步 I/O 能力,来执行 NIO.2 ,从而获得更优的性能。然而,在有些情况下,有必要使用 Java 线程:比如,保证 completion-handler 方法在来自线程池的线程上执行。

默认情况下,具有 open() 方法的通道属于一个全局通道组,可利用如下系统变量对其进行配置:

  • java.nio.channels.DefaultThreadPoolthreadFactory,其不采用默认设置,而是定义一个java.util.concurrent.ThreadFactory
  • java.nio.channels.DefaultThreadPool.initialSize,指定线程池的初始规模

java.nio.channels.AsynchronousChannelGroup 中的三个实用方法提供了创建新通道组的方法:

  • withCachedThreadPool()
  • withFixedThreadPool()
  • withThreadPool()

这些方法或者对线程池进行定义,如 java.util.concurrent.ExecutorService,或者是java.util.concurrent.ThreadFactory。例如,以下调用创建了具有线程池的新的通道组,该线程池包含 10 个线程,其中每个都构造为来自 Executors 类的线程工厂:

AsynchronousChannelGroup tenThreadGroup =
    AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());

三个异步网络通道都具有 open() 方法的替代版本,它们采用给出的通道组而不是默认通道组。例如,当有异步操作请求时,此调用告诉channel 使用 tenThreadGroup 而不是默认通道组来获取线程:

AsynchronousServerSocketChannel channel = 
    AsynchronousServerSocketChannel.open(tenThreadGroup);

定义自己的通道组可更好地控制服务于操作的线程,并能提供关闭线程或者等待终止的机制。清单 3 展示了相关的例子:

清单 3. 利用通道组来控制线程关闭
// first initiate a call that won't be satisfied
channel.accept(null, completionHandler);
// once the operation has been set off, the channel group can
// be used to control the shutdown
if (!tenThreadGroup.isShutdown()) {
    // once the group is shut down no more channels can be created with it
    tenThreadGroup.shutdown();
}
if (!tenThreadGroup.isTerminated()) {
    // forcibly shutdown, the channel will be closed and the accept will abort
    tenThreadGroup.shutdownNow();
}
// the group should be able to terminate now, wait for a maximum of 10 seconds
tenThreadGroup.awaitTermination(10, TimeUnit.SECONDS);

AsynchronousFileChannel 在此处与其他通道不同,为了使用定制的线程池,open() 方法采用 ExecutorService 而不是AsynchronousChannelGroup

异步数据报通道与多播

最后的新通道是 AsynchronousDatagramChannel。它与 AsynchronousSocketChannel 很类似,但由于 NIO.2 API 在该通道级别增加了对多播的支持,而在 NIO 中只在 MulticastDatagramSocket 级别才提供这一支持,因此有必要将其单独提出。Java 7 中的java.nio.channels.DatagramChannel 也能提供这一功能。

作为服务器来使用的 AsynchronousDatagramChannel 可构建如下:

AsynchronousDatagramChannel server = AsynchronousDatagramChannel.open().bind(null);

接下来,可设置客户端来接收发往一个多播地址的数据报广播。首先,必须在多播地址范围内选择一个地址(从 224.0.0.0 到 239.255.255.255),还要选择一个所有客户端都可绑定的端口:

// specify an arbitrary port and address in the range
int port = 5239;
InetAddress group = InetAddress.getByName("226.18.84.25");

我们也需要一个到所使用网络接口的引用:

// find a NetworkInterface that supports multicasting
NetworkInterface networkInterface = NetworkInterface.getByName("eth0");

现在,打开数据报通道并设置多播选项,如清单 4 所示:

清单 4. 打开数据报通道并设置多播选项
// the channel should be opened with the appropriate protocol family,
// use the defined channel group or pass in null to use the default channel group
AsynchronousDatagramChannel client =
    AsynchronousDatagramChannel.open(StandardProtocolFamily.INET,  tenThreadGroup);
// enable binding multiple sockets to the same address
client.setOption(StandardSocketOption.SO_REUSEADDR, true);
// bind to the port
client.bind(new InetSocketAddress(port));
// set the interface for sending datagrams
client.setOption(StandardSocketOption.IP_MULTICAST_IF, networkInterface);

客户端可通过如下方式加入多播组:

MembershipKey key = client.join(group, networkInterface);

java.util.channels.MembershipKey 是提供对组成员控制的新类。利用该键,您可丢弃组成员、阻塞或者取消阻塞来自特定地址的数据报、以及返回有关组和通道的消息。

服务器可以向特定地址和端口发送数据报,供客户端接收,如清单 5 所示:

清单 5. 发送以及接收数据报
// send message
ByteBuffer message = ByteBuffer.wrap("Hello to all listeners".getBytes());
server.send(message, new InetSocketAddress(group, port));

// receive message
final ByteBuffer buffer = ByteBuffer.allocate(100);
client.receive(buffer, null, new CompletionHandler<SocketAddress, Object>() {
    @Override
    public void completed(SocketAddress address, Object attachment) {
        System.out.println("Message from " + address + ": " +
            new String(buffer.array()));
    }

    @Override
    public void failed(Throwable e, Object attachment) {
        System.err.println("Error receiving datagram");
        e.printStackTrace();
    }
});

可在同一端口上创建多个客户端,它们可加入多播组来接收来自服务器的数据报。

结束语

NIO.2 的异步通道 APIs 提供方便的、平*立的执行异步操作的标准方法。这使得应用程序开发人员能够以更清晰的方式来编写程序,而不必定义自己的 Java 线程,此外,还可通过使用底层 OS 所支持的异步功能来提高性能。如同其他 Java API 一样,API 可利用的 OS 自有异步功能的数量取决于其对该平台的支持程度。