IO流--转载

时间:2024-01-18 10:55:08

第 1 部分 从输出流中读取

http://www.ibm.com/developerworks/cn/java/j-io1/

自早期基于浏览器的 applet 和简单应用程序以来,Java 平台已有了巨大的发展。现在,我们有多个平台和概要及许多新的 API,并且还在制作的差不多有数百种之多。尽管 Java 语言的复杂程度在不断增加,但它对于日常的编程任务而言仍是一个出色的工具。虽然有时您会陷入那些日复一日的编程问题中,但偶尔您也能够回过头去,发现一个很棒的解决方案来处理您以前曾多次遇到过的问题。

就在前几天,我想要压缩一些通过网络连接读取的数据(我以压缩格式将 TCP 数据中继到一个 UDP 套接字)。记得 Java 平台自版本 1.1 开始就支持压缩,所以我直接求助于 java.util.zip 包,希望能找到一个适合于我的解决方案。然而,我发现一个问题:构造的类都适用于常规情况,即在读取时对数据解压缩而在写入时压缩它们,没有其它变通方法。虽然绕过 I/O 类是可能的,但我希望构建一个基于流的解决方案,而不想偷懒直接使用压缩程序。

不久以前,我在另一种情况下也遇到过完全相同的问题。我有一个 base-64 转码库,与使用压缩包一样,它支持对从流中读取的数据进行译码,并对写入流中的数据进行编码。然而,我需要的是一个在我从流中读取数据的同时可以进行编码的库。

在我着手解决该问题时,我认识到我在另一种情况下也遇到过该问题:当序列化 XML 文档时,通常会循环遍历整个文档,将节点写入流中。然而,我遇到的情况是需要读取序列化格式的文档,以便将子集重新解析成一个新文档。

回过头想一下,我意识到这些孤立事件表示了一个共性的问题:如果有一个递增地将数据写入输出流的数据源,那么我需要一个输入流使我能够读取这些数据,每当需要更多数据时,都能透明地访问数据源。

在本文中,我们将研究对这一问题的三种可能的解决方案,同时决定一个实现最佳解决方案的新框架。然后,我们将针对上面列出的每个问题,检验该框架。我们将扼要地谈及性能方面的问题,而把对此的大量讨论留到下一篇文章中。

I/O 流基础知识

首先,让我们简单回顾一下 Java 平台的基本流类,如图 1 所示。 OutputStream 表示对其写入数据的流。通常,该流将直接连接至诸如文件或网络连接之类的设备,或连接至另一个输出流(在这种情况下,它称为 过滤器(filter))。通常,输出流过滤器在转换了写入其中的数据之后,才将转换后产生的数据写入相连的流中。 InputStream 表示可以从中读取数据的流。同样,该流也直接连接至设备或其它流。输入流过滤器从相连的流中读取数据,转换该数据,然后允许从中读取转换后的数据。

图 1. I/O 流基础知识

IO流--转载

就我最初的问题看, GZIPOutputStream 类是一个输出流过滤器,它压缩写入其中的数据,然后将该压缩数据写入相连的流。我需要的输入流过滤器应该能从流中读取数据,压缩数据,然后让我读取结果。

Java 平台,版本 1.4 已引入了一个新的 I/O 框架 java.nio 。不过,该框架在很大程度上与提供对操作系统 I/O 资源的有效访问有关;而且,虽然它确实为一些传统的 java.io 类提供了类似功能,并可以表示同时支持输入和输出的双重用途的资源,但它并不能完全替代标准流类,并且不能直接处理我需要解决的问题。

回页首

蛮力解决方案

在着手寻找解决我问题的工程方案前,我根据标准 Java API 类的精致和有效性,研究了基于这些类的解决方案。

该问题的蛮力解决方案就是简单地从输入源中读取所有数据,然后通过转换程序(即,压缩流、编码流或 XML 序列化器)将它们推进内存缓冲区中。然后,我可以从该内存缓冲区中打开要读取的流,这样我就解决了问题。

首先,我需要一个通用的 I/O 方法。清单 1 中的方法利用一个小缓冲区将 InputStream 中的所有数据复制到 OutputStream 。当到达输入的结尾( read() 函数的返回值小于零)时,该方法就返回,但不关闭这两个流。

清单 1. 通用的 I/O 方法
public static void io (InputStream in, OutputStream out)
throws IOException {
byte[] buffer = new byte[8192];
int amount;
while ((amount = in.read (buffer)) >= 0)
out.write (buffer, 0, amount);
}

清单 2 显示蛮力解决方案如何使我读取压缩格式的输入流。我打开写入内存缓冲区的 GZIPOutputStream (使用 ByteArrayOutputStream)。接着,将输入流复制到压缩流中,这样将压缩数据填入内存缓冲区中。然后,我返回 ByteArrayInputStream ,它让我从输入流中读取,如图 2 所示。

图 2. 蛮力解决方案

IO流--转载

清单 2. 蛮力解决方案
public static InputStream bruteForceCompress (InputStream in)
throws IOException {
ByteArrayOutputStream sink = new ByteArrayOutputStream ():
OutputStream out = new GZIPOutputStream (sink);
io (in, out);
out.close ();
byte[] buffer = sink.toByteArray ();
return new ByteArrayInputStream (buffer);
}

这个解决方案有一个明显的缺点,它将整个压缩文档都存储在内存中。如果文档很大,那么这种方法将不必要地浪费系统资源。使用流的主要特性之一是它们允许您操作比所用系统内存要大的数据:您可以在读取数据时处理它们,或在写入数据时生成数据,而无需始终将所有数据保存在内存中。

从效率上,让我们对在缓冲区之间复制数据进行更深入研究。

通过 io() 方法,将数据从输入源读入至一个缓冲区中。然后,将数据从缓冲区写入 ByteArrayOutputStream 中的缓冲区(通过我忽略的压缩过程)。然而, ByteArrayOutputStream 类对扩展的内部缓冲区进行操作;每当缓冲区变满时,就会分配一个大小是原来两倍的新缓冲区,接着将现有的数据复制到该缓冲区中。平均下来,这一过程每个字节复制两次。(算术计算很简单:当进入 ByteArrayOutputStream 时,对数据平均复制两次;所有数据至少复制一次;有一半数据至少复制两次;四分之一的数据至少复制三次,依次类推。)然后,将数据从该缓冲区复制到 ByteArrayInputStream 的一个新缓冲区中。现在,应用程序可以读取数据了。总之,这个解决方案将通过四个缓冲区写数据。这对于估计其它技术的效率是一个有用的基准。

回页首

管道式流解决方案

管道式流 PipedOutputStream 和 PipedInputStream 在 Java 虚拟机的线程之间提供了基于流的连接。一个线程将数据写入PipedOutputStream 中的同时,另一个线程可以从相关联的 PipedInputStream 中读取该数据。

就这样,这些类提供了一个针对我问题的解决方案。清单 3 显示了使用一个线程通过 GZIPOutputStream 将数据从输入流复制到PipedOutputStream 的代码。然后,相关联的 PipedInputStream 将提供对来自另一个线程的压缩数据的读取权,如图 3 所示:

图 3. 管道式流解决方案

IO流--转载

清单 3. 管道式流解决方案
private static InputStream pipedCompress (final InputStream in)
throws IOException {
PipedInputStream source = new PipedInputStream ();
final OutputStream out =
new GZIPOutputStream (new PipedOutputStream (source));
new Thread () {
public void run () {
try {
Streams.io (in, out);
out.close ();
} catch (IOException ex) {
ex.printStackTrace ();
}
}
}.start ();
return source;
}

理论上,这可能是个好技术:通过使用线程(一个执行压缩,另一个处理产生的数据),应用程序可以从硬件 SMP(对称多处理)或 SMT(对称多线程)中受益。另外,这一解决方案仅涉及两个缓冲区写操作:I/O 循环将数据从输入流读入缓冲区,然后通过压缩流写入PipedOutputStream 。接着,输出流将数据存储在内部缓冲区中,与 PipedInputStream 共享缓冲区以供应用程序读取。而且,因为数据通过固定缓冲区流动,所以从不需要将它们完全读入内存中。事实上,在任何给定时刻,缓冲区都只存储小部分的工作集。

不过,实际上,它的性能很糟糕。管道式流需要利用同步,从而引起两个线程之间激烈争夺同步。它们的内部缓冲区太小,无法有效地处理大量数据或隐藏锁争用。其次,持久共享缓冲区会阻碍许多简单的高速缓存策略共享 SMP 机器上的工作负载。最后,线程的使用使得异常处理极其困难:没有办法将可能出现的任何 IOException 下推到管道中以便阅读器处理。总之,这一解决方案太难处理,根本不实际。

工程解决方案

现在,我们将研究另一种解决该问题的工程方案。这种解决方案提供了一个特地为解决这类问题而设计的框架,该框架提供了对数据的InputStream 访问,这些数据是从递增地向 OutputStream 写入数据的源中产生的。递增地写入数据这一事实很重要。如果源在单个原子操作中将所有数据都写入 OutputStream ,而且如果不使用线程,则我们基本上又回到了蛮力技术的老路上。不过,如果可以访问源以递增地写入其数据,则我们就实现了在蛮力和管道式流解决方案之间的良好平衡。该解决方案不仅提供了在任何时候只在内存中保存少量数据的管道式优点,同时也提供了避免线程的蛮力技术的优点。

图 4 演示了完整的解决方案。我们将在本文的剩余部分研究 该解决方案的源代码

图 4. 工程解决方案

IO流--转载

输出引擎

清单 4 提供了一个描述数据源的接口 OutputEngine 。正如我所说的,这些源递增地将数据写入输出流:

清单 4. 输出引擎
package org.merlin.io;
import java.io.*;
/**
* An incremental data source that writes data to an OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute
* it and/or modify it under the terms of the GNU
* General Public License as published by the Free
* Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public interface OutputEngine {
public void initialize (OutputStream out) throws IOException;
public void execute () throws IOException;
public void finish () throws IOException;
}

initialize() 方法向该引擎提供一个流,应该向这个流写入数据。然后,重复调用 execute() 方法将数据写入该流中。当数据写完时,引擎会关闭该流。最后,当引擎应该关闭时,将调用 finish() 。这会发生在引擎关闭其输出流的前后。

I/O 流引擎

输出引擎解决了让我费力处理的问题,它是一个通过输出流过滤器将数据从输入流复制到目标输出流的引擎。这满足了递增性的特性,因为它可以一次读写单个缓冲区。

清单 5 到 10 中的代码实现了这样的一个引擎。通过输入流和输入流工厂来构造它。清单 11 是一个生成过滤后的输出流的工厂;例如,它会返回包装了目标输出流的 GZIPOutputStream 。

清单 5. I/O 流引擎
package org.merlin.io;
import java.io.*;
/**
* An output engine that copies data from an InputStream through
* a FilterOutputStream to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class IOStreamEngine implements OutputEngine {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private InputStream in;
private OutputStreamFactory factory;
private byte[] buffer;
private OutputStream out;

该类的构造器只初始化各种变量和将用于传输数据的缓冲区。

清单 6. 构造器
  public IOStreamEngine (InputStream in, OutputStreamFactory factory) {
this (in, factory, DEFAULT_BUFFER_SIZE);
}
public IOStreamEngine
(InputStream in, OutputStreamFactory factory, int bufferSize) {
this.in = in;
this.factory = factory;
buffer = new byte[bufferSize];
}

在 initialize() 方法中,该引擎调用其工厂来封装与其一起提供的 OutputStream 。该工厂通常将一个过滤器连接至 OutputStream 。

清单 7. initialize() 方法
  public void initialize (OutputStream out) throws IOException {
if (this.out != null) {
throw new IOException ("Already initialised");
} else {
this.out = factory.getOutputStream (out);
}
}

在 execute() 方法中,引擎从 InputStream 中读取一个缓冲区的数据,然后将它们写入已封装的 OutputStream ;或者,如果输入结束,它会关闭 OutputStream 。

清单 8. execute() 方法
  public void execute () throws IOException {
if (out == null) {
throw new IOException ("Not yet initialised");
} else {
int amount = in.read (buffer);
if (amount < 0) {
out.close ();
} else {
out.write (buffer, 0, amount);
}
}
}

最后,当关闭引擎时,它就关闭其 InputStream 。

清单 9. 关闭 InputStream
  public void finish () throws IOException {
in.close ();
}

内部 OutputStreamFactory 接口(下面清单 10 中所示)描述可以返回过滤后的 OutputStream 的类。

清单 10. 内部输出流工厂接口
  public static interface OutputStreamFactory {
public OutputStream getOutputStream (OutputStream out)
throws IOException;
}
}

清单 11 显示将提供的流封装到 GZIPOutputStream 中的一个示例工厂:

清单 11. GZIP 输出流工厂
public class GZIPOutputStreamFactory
implements IOStreamEngine.OutputStreamFactory {
public OutputStream getOutputStream (OutputStream out)
throws IOException {
return new GZIPOutputStream (out);
}
}

该 I/O 流引擎及其输出流工厂框架通常足以支持大多数的输出流过滤需要。

输出引擎输入流

最后,我们还需要一小段代码来完成这个解决方案。清单 12 到 16 中的代码提供了读取由输出引擎所写数据的输入流。事实上,这段代码有两个部分:主类是一个从内部缓冲区读取数据的输入流。与此紧密耦合的是一个输出流(如清单 17 所示),它把输出引擎所写的数据填充到内部读缓冲区。

主输入流类将用其内部输出流来初始化输出引擎。然后,每当它的缓冲区为空时,它会自动执行该引擎来接收更多数据。输出引擎将数据写入其输出流中,这将重新填充输入流的内部缓冲区,以允许需要内部缓冲区数据的应用程序高效地读取数据。

清单 12. 输出引擎输入流
package org.merlin.io;
import java.io.*;
/**
* An input stream that reads data from an OutputEngine.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class OutputEngineInputStream extends InputStream {
private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192;
private OutputEngine engine;
private byte[] buffer;
private int index, limit, capacity;
private boolean closed, eof;

该输入流的构造器获取一个输出引擎以从中读取数据和一个可选的缓冲区大小。该流首先初始化其本身,然后初始化输出引擎。

清单 13. 构造器
public OutputEngineInputStream (OutputEngine engine) throws IOException {
this (engine, DEFAULT_INITIAL_BUFFER_SIZE);
}
public OutputEngineInputStream (OutputEngine engine, int initialBufferSize)
throws IOException {
this.engine = engine;
capacity = initialBufferSize;
buffer = new byte[capacity];
engine.initialize (new OutputStreamImpl ());
}

代码的主要读部分是一个相对简单的基于字节数组的输入流,与 ByteArrayInputStream 类非常相似。然而,每当需要数据而该流为空时,它都会调用输出引擎的 execute() 方法来重新填写读缓冲区。然后,将这些新数据返回给调用程序。因而,这个类将对输出引擎所写的数据反复读取,直到它读完为止,此时将设置 eof 标志并且该流将返回已到达文件末尾的信息。

清单 14. 读取数据
  private byte[] one = new byte[1];
public int read () throws IOException {
int amount = read (one, 0, 1);
return (amount < 0) ? -1 : one[0] & 0xff;
}
public int read (byte data[], int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if
((offset < 0) || (length < 0) || (offset + length > data.length)) {
throw new IndexOutOfBoundsException ();
} else if (closed) {
throw new IOException ("Stream closed");
} else {
while (index >= limit) {
if (eof)
return -1;
engine.execute ();
}
if (limit - index < length)
length = limit - index;
System.arraycopy (buffer, index, data, offset, length);
index += length;
return length;
}
}
public long skip (long amount) throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else if (amount <= 0) {
return 0;
} else {
while (index >= limit) {
if (eof)
return 0;
engine.execute ();
}
if (limit - index < amount)
amount = limit - index;
index += (int) amount;
return amount;
}
}
public int available () throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else {
return limit - index;
}
}

当操作数据的应用程序关闭该流时,它调用输出引擎的 finish() 方法,以便可以释放其正在使用的任何资源。

清单 15. 释放资源
  public void close () throws IOException {
if (!closed) {
closed = true;
engine.finish ();
}
}

当输出引擎将数据写入其输出流时,调用 writeImpl() 方法。它将这些数据复制到读缓冲区,并更新读限制索引;这将使新数据可自动地用于读方法。

在单次循环中,如果输出引擎写入的数据比缓冲区中可以保存的数据多,则缓冲区的容量会翻倍。然而,这不能频繁发生;缓冲区应该快速扩展到足够的大小,以便进行状态稳定的操作。

清单 16. writeImpl() 方法
  private void writeImpl (byte[] data, int offset, int length) {
if (index >= limit)
index = limit = 0;
if (limit + length > capacity) {
capacity = capacity * 2 + length;
byte[] tmp = new byte[capacity];
System.arraycopy (buffer, index, tmp, 0, limit - index);
buffer = tmp;
limit -= index;
index = 0;
}
System.arraycopy (data, offset, buffer, limit, length);
limit += length;
}

下面清单 17 中显示的内部输出流实现表示了一个流将数据写入内部输出流缓冲区。该代码验证参数都是可接受的,并且如果是这样的话,它调用 writeImpl() 方法。

清单 17. 内部输出流实现
  private class OutputStreamImpl extends OutputStream {
public void write (int datum) throws IOException {
one[0] = (byte) datum;
write (one, 0, 1);
}
public void write (byte[] data, int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if
((offset < 0) || (length < 0) || (offset + length > data.length)) {
throw new IndexOutOfBoundsException ();
} else if (eof) {
throw new IOException ("Stream closed");
} else {
writeImpl (data, offset, length);
}
}

最后,当输出引擎关闭其输出流,表明它已写入了所有的数据时,该输出流设置输入流的 eof 标志,表明已经读取了所有的数据。

清单 18. 设置输入流的 eof 标志
    public void close () {
eof = true;
}
}
}

敏感的读者可能注意到我应该将 writeImpl() 方法的主体直接放在输出流实现中:内部类有权访问所有包含类的私有成员。然而,对这些字段的内部类访问比由包含类的直接方法的访问在效率方面稍许差一些。所以,考虑到效率以及为了使类之间的相关性最小化,我使用额外的助手方法。

回页首

应用工程解决方案:在读取期间压缩数据

清单 19 演示了这个类框架的使用来解决我最初的问题:在我读取数据时压缩它们。该解决方案归结为创建一个与输入流相关联的IOStreamEngine 和一个 GZIPOutputStreamFactory ,然后将 OutputEngineInputStream 与这个 GZIPOutputStreamFactory 相连。自动执行流的初始化和连接,然后可以直接从结果流中读取压缩数据。当处理完成且关闭流时,输出引擎自动关闭,并且它关闭初始输入流。

清单 19. 应用工程解决方案
  private static InputStream engineCompress (InputStream in)
throws IOException {
return new OutputEngineInputStream
(new IOStreamEngine (in, new GZIPOutputStreamFactory ()));
}

虽然为解决这类问题而设计的解决方案应该产生十分清晰的代码,这一点没有什么可惊奇的,但是通常要充分留意以下教训:无论问题大小,应用良好的设计技术都几乎肯定会产生更为清晰、更便于维护的代码。

回页首

测试性能

从效率看, IOStreamEngine 将数据读入其内部缓冲区,然后通过压缩过滤器将它们写入 OutputStreamImpl 。这将数据直接写入OutputEngineInputStream ,以便它们可供读取。总共只执行两次缓冲区复制,这意味着我应该从管道式流解决方案的缓冲区复制效率和蛮力解决方案的无线程效率的结合中获益。

要测试实际的性能,我编写了一个简单的测试工具(请参阅所附 资源中的 test.PerformanceTest ),它使用这三个推荐的解决方案,通过使用一个空过滤器来读取一块哑元数据。在运行 Java 2 SDK,版本 1.4.0 的 800 MHz Linux 机器上,达到了下列性能:

管道式流解决方案 
15KB:23ms;15MB:22100ms 
蛮力解决方案 
15KB:0.35ms;15MB:745ms 
工程解决方案 
15KB:0.16ms;15MB:73ms

该问题的工程解决方案很明显比基于标准 Java API 的另两个方法都更有效。

顺便提一下,考虑到如果输出引擎能够遵守这样的约定:在将数据写入其输出流后,它不修改从中写入数据的数组而返回,那么我能提供一个只使用一次缓冲区复制操作的解决方案。可是,输出引擎很少会遵守这种约定。如果需要,输出引擎只要通过实现适当的标记程序接口,就能宣称它支持这种方式的操作。

回页首

应用工程解决方案:读取编码的字符数据

任何可以用“提供对将数据反复写入 OutputStream 的实体的读访问权”表述的问题,都可以用这一框架解决。在这一节和下一节中,我们将研究这样的问题示例及其有效的解决方案。

首先,考虑要读取 UTF-8 编码格式的字符流的情况: InputStreamReader 类让您将以二进制编码的字符数据作为一系列 Unicode 字符读取;它表示了从字节输入流到字符输入流的关口。 OutputStreamWriter 类让您将一系列二进制编码格式的 Unicode 字符写入输出流;它表示从字符输出流到字节输入流的关口。 String 类的 getBytes() 方法将字符串转换成经编码的字节数组。然而,这些类中没有一个能直接让您读取 UTF-8 编码格式的字符流。

清单 20 到 24 中的代码演示了以与 IOStreamEngine 类极其相似的方式使用 OutputEngine 框架的一种解决方案。我们并不是从输入流读取和通过输出流过滤器进行写操作,而是从字符流读取,并通过所选的字符进行编码的 OutputStreamWriter 进行写操作。

清单 20. 读取编码的字符数据
package org.merlin.io;
import java.io.*;
/**
* An output engine that copies data from a Reader through
* a OutputStreamWriter to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class ReaderWriterEngine implements OutputEngine {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private Reader reader;
private String encoding;
private char[] buffer;
private Writer writer;

该类的构造器接受要从中读取的字符流、要使用的编码以及可选的缓冲区大小。

清单 21. 构造器
  public ReaderWriterEngine (Reader in, String encoding) {
this (in, encoding, DEFAULT_BUFFER_SIZE);
}
public ReaderWriterEngine
(Reader reader, String encoding, int bufferSize) {
this.reader = reader;
this.encoding = encoding;
buffer = new char[bufferSize];
}

当该引擎初始化时,它将以所选编码格式写字符的 OutputStreamWriter 连接至提供的输出流。

清单 22. 初始化输出流写程序
  public void initialize (OutputStream out) throws IOException {
if (writer != null) {
throw new IOException ("Already initialised");
} else {
writer = new OutputStreamWriter (out, encoding);
}
}

当执行该引擎时,它从输入字符流中读取数据,然后将它们写入 OutputStreamWriter ,接着 OutputStreamWriter 将它们以所选的编码格式传递给相连的输出流。至此,该框架使数据可供读取。

清单 23. 读取数据
  public void execute () throws IOException {
if (writer == null) {
throw new IOException ("Not yet initialised");
} else {
int amount = reader.read (buffer);
if (amount < 0) {
writer.close ();
} else {
writer.write (buffer, 0, amount);
}
}
}

当引擎执行完时,它关闭其输入。

清单 24. 关闭输入
  public void finish () throws IOException {
reader.close ();
}
}

在这种与压缩不同的情况中,Java I/O 包不提供对 OutputStreamWriter 之下的字符编码类的低级别访问。因此,这是在 Java 平台 1.4 之前的发行版上读取编码格式的字符流的唯一有效解决方案。从版本 1.4 开始, java.nio.charset 包确实提供了与流无关的字符编码和译码能力。然而,这个包不能满足我们对基于输入流的解决方案的要求。

回页首

应用工程解决方案:读取序列化的 DOM 文档

最后,让我们研究该框架的最后一种用法。清单 25 到 29 中的代码提供了一个用来读取序列化格式的 DOM 文档或文档子集的解决方案。这一代码的潜在用途可能是对部分 DOM 文档执行确认性重新解析。

清单 25. 读取序列化的 DOM 文档
package org.merlin.io;
import java.io.*;
import java.util.*;
import org.w3c.dom.*;
import org.w3c.dom.traversal.*;
/**
* An output engine that serializes a DOM tree using a specified
* character encoding to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class DOMSerializerEngine implements OutputEngine {
private NodeIterator iterator;
private String encoding;
private OutputStreamWriter writer;

构造器获取要在上面进行循环的 DOM 节点,或预先构造的节点迭代器(这是 DOM 2 的一部分),以及一个用于序列化格式的编码。

清单 26. 构造器
  public DOMSerializerEngine (Node root) {
this (root, "UTF-8");
}
public DOMSerializerEngine (Node root, String encoding) {
this (getIterator (root), encoding);
}
private static NodeIterator getIterator (Node node) {
DocumentTraversal dt= (DocumentTraversal)
(node.getNodeType () ==
Node.DOCUMENT_NODE) ? node : node.getOwnerDocument ();
return dt.createNodeIterator (node, NodeFilter.SHOW_ALL, null, false);
}
public DOMSerializerEngine (NodeIterator iterator, String encoding) {
this.iterator = iterator;
this.encoding = encoding;
}

初始化期间,该引擎将适当的 OutputStreamWriter 连接至目标输出流。

清单 27. initialize() 方法
  public void initialize (OutputStream out) throws IOException {
if (writer != null) {
throw new IOException ("Already initialised");
} else {
writer = new OutputStreamWriter (out, encoding);
}
}

在执行阶段,该引擎从节点迭代器中获得下一个节点,然后将其序列化至 OutputStreamWriter 。当获取了所有节点后,引擎关闭它的流。

清单 28. execute() 方法
  public void execute () throws IOException {
if (writer == null) {
throw new IOException ("Not yet initialised");
} else {
Node node = iterator.nextNode ();
closeElements (node);
if (node == null) {
writer.close ();
} else {
writeNode (node);
writer.flush ();
}
}
}

当该引擎关闭时,没有要释放的资源。

清单 29. 关闭
  public void finish () throws IOException {
}
// private void closeElements (Node node) throws IOException ...
// private void writeNode (Node node) throws IOException ...
}

序列化每个节点的其它内部细节不太有趣;这一过程主要涉及根据节点的类型和 XML 1.0 规范写出节点,所以我将在本文中省略这一部分的代码。请参阅附带的 源代码,获取完整的详细信息。

回页首

结束语

我所提供的是一个有用的框架,它利用标准输入流 API 让您能有效读取由只能写入输出流的系统产生的数据。它让我们读取经压缩或编码的数据及序列化文档等。虽然可以使用标准 Java API 实现这一功能,但使用这些类的效率根本不行。应该充分注意到,这种解决方案比最简单的蛮力解决方案更有效(即使在数据不大的情况下)。将数据写入 ByteArrayOutputStream 以便进行后续处理的任何应用程序都可能从这一框架中受益。

字节数组流的拙劣性能和管道式流难以置信的蹩脚性能,实际上都是我下一篇文章的主题。在那篇文章中,我将研究重新实现这些类,并比这些类的原创者更加关注它们的性能。只要 API 约定稍微宽松一点,性能就可能改进一百倍了。

我讨厌洗碗。不过,正如大多数我自认为是较好(虽然常常还是微不足道)的想法一样,这些类背后的想法都是在我洗碗时冒出来的。我时常发现撇开实际代码,回头看看并且把问题的范围考虑得更广些,可能会得出一个更好的解决方案,它最终为您提供的方法可能比您找出的容易方法更好。这些解决方案常常会产生更清晰、更有效而且更可维护的代码。

我真的担心我们有了洗碗机的那一天。

彻底转变流,第 2 部分:优化 Java 内部 I/O

http://www.ibm.com/developerworks/cn/java/j-io2/

在 本系列的第一篇文章中,您学习了解决从只能写出数据的源读取数据的问题的一些不同方法。在可能的解决方案中,我们研究了怎样使用字节数组流、管道流以及直接处理该问题的定制框架。定制方法显然是最有效率的解决方案;但是,分析其它几种方法有助于看清标准 Java 流的一些问题。具体地说,字节数组输出流并不提供可提供对它的内容进行只读访问的高效机制,管道流的性能通常很差。

为了处理这些问题,我们将在本文中实现功能同样齐全的替换类,但在实现时更强调性能。让我们先来简要地讨论一下同步问题,因为它与 I/O 流有关。

同步问题

一般来说,我推荐在不是特别需要同步的情况下避免不必要地使用同步。显然,如果多个线程需并发地访问一个类,那么这个类需确保线程安全。但是,在许多情况下并不需要并发的访问,同步成了不必要的开销。例如,对流的并发访问自然是不确定的 ― 您无法预测哪些数据被先写入,也无法预测哪个线程读了哪些数据 ― 也就是说,在多数情况下,对流的并发访问是没用的。所以,对所有的流强制同步是不提供实际好处的花费。如果某个应用程序要求线程安全,那么通过应用程序自己的同步原语可以强制线程安全。

事实上,Collection 类的 API 作出了同样的选择:在缺省的情况下,set、list 等等都不是线程安全的。如果应用程序想使用线程安全的 Collection,那么它可以使用 Collections 类来创建一个线程安全的包装器来包装非线程安全的 Collection。如果这种作法是有用的,那么应用程序可以使用完全相同的机制来包装流,以使它线程安全;例如, OutputStream out = Streams.synchronizedOutputStream (byteStream) 。请参阅附带的 源代码中的 Streams 类,这是一个实现的示例。

所以,对于我所认为的多个并发线程无法使用的类,我没用同步来为这些类提供线程安全。在您广泛采用这种方式前,我推荐您研究一下 Java 语言规范(Java Language Specification)的 Threads and Locks那一章(请参阅 参考资料),以理解潜在的缺陷;具体地说,在未使用同步的情况下无法确保读写的顺序,所以,对不同步的只读方法的并发访问可能导致意外的行为,尽管这种访问看起来是无害的。

回页首

更好的字节数组输出流

当您需要把未知容量的数据转储到内存缓冲区时, ByteArrayOutputStream 类是使用效果很好的流。当我为以后再次读取而存储一些数据时,我经常使用这个类。但是,使用 toByteArray() 方法来取得对结果数据的读访问是很低效的,因为它实际返回的是内部字节数组的副本。对于小容量的数据,使用这种方式不会有太大问题;然而,随着容量增大,这种方式的效率被不必要地降低了。这个类必须复制数据,因为它不能强制对结果字节数组进行只读访问。如果它返回它的内部缓冲区,那么在一般的情况下,接收方无法保证该缓冲区未被同一数组的另一个接收方并发地修改。

StringBuffer 类已解决了类似的问题;它提供可写的字符缓冲区,它还支持高效地返回能从内部字符数组直接读取的只读 String 。因为StringBuffer 类控制着对它的内部数组的写访问,所以它仅在必要时才复制它的数组;也就是说,当它导出了 String 且后来调用程序修改了StringBuffer 的时候。如果没有发生这样的修改,那么任何不必要的复制都不会被执行。通过支持能够强制适当的访问控制的字节数组的包装器,新的 I/O 框架以类似的方式解决了这个问题。

我们可以使用相同的通用机制为需要使用标准流 API 的应用程序提供高效的数据缓冲和再次读取。我们的示例给出了可替代ByteArrayOutputStream 类的类,它能高效地导出对内部缓冲区的只读访问,方法是返回直接读取内部字节数组的只读 InputStream 。

我们来看一下代码。清单 1 中的构造函数分配了初始缓冲区,以存储写到这个流的数据。为了存储更多的数据,该缓冲区将按需自动地扩展。

清单 1. 不同步的字节数组输出流
package org.merlin.io;
import java.io.*;
/**
* An unsynchronized ByteArrayOutputStream alternative that efficiently
* provides read-only access to the internal byte array with no
* unnecessary copying.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*/
public class BytesOutputStream extends OutputStream {
private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192; // internal buffer
private byte[] buffer;
private int index, capacity;
// is the stream closed?
private boolean closed;
// is the buffer shared?
private boolean shared; public BytesOutputStream () {
this (DEFAULT_INITIAL_BUFFER_SIZE);
}
public BytesOutputStream (int initialBufferSize) {
capacity = initialBufferSize;
buffer = new byte[capacity];
}

清单 2 显示的是写方法。这些方法按需扩展内部缓冲区,然后把新数据复制进来。在扩展内部缓冲区时,我们使缓冲区的大小增加了一倍再加上存储新数据所需的容量;这样,为了存储任何所需的数据,缓冲区的容量成指数地增长。为了提高效率,如果您知道您将写入的数据的预期容量,那么您应该指定相应的初始缓冲区的大小。 close() 方法只是设置了一个合适的标志。

清单 2. 写方法
  public void write (int datum) throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else {
if (index >= capacity) {
// expand the internal buffer
capacity = capacity * 2 + 1;
byte[] tmp = new byte[capacity];
System.arraycopy (buffer, 0, tmp, 0, index);
buffer = tmp;
// the new buffer is not shared
shared = false;
}
// store the byte
buffer[index ++] = (byte) datum;
}
}
public void write (byte[] data, int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if ((offset < 0) || (offset + length > data.length)
|| (length < 0)) {
throw new IndexOutOfBoundsException ();
} else if (closed) {
throw new IOException ("Stream closed");
} else {
if (index + length > capacity) {
// expand the internal buffer
capacity = capacity * 2 + length;
byte[] tmp = new byte[capacity];
System.arraycopy (buffer, 0, tmp, 0, index);
buffer = tmp;
// the new buffer is not shared
shared = false;
}
// copy in the subarray
System.arraycopy (data, offset, buffer, index, length);
index += length;
}
}
public void close () {
closed = true;
}

清单 3 中的字节数组抽取方法返回内部字节数组的副本。因为我们无法防止调用程序把数据写到结果数组,所以我们无法安全地返回对内部缓冲区的直接引用。

清单 3. 转换成字节数组
  public byte[] toByteArray () {
// return a copy of the internal buffer
byte[] result = new byte[index];
System.arraycopy (buffer, 0, result, 0, index);
return result;
}

当方法提供对存储的数据的只读访问的时候,它们可以安全地高效地直接使用内部字节数组。清单 4 显示了两个这样的方法。 writeTo() 方法把这个流的内容写到输出流;它直接从内部缓冲区进行写操作。 toInputStream() 方法返回了可被高效地读取数据的输入流。它所返回的BytesInputStream (这是 ByteArrayInputStream 的非同步替代品。)能直接从我们的内部字节数组读取数据。在这个方法中,我们还设置了标志,以表示内部缓冲区正被输入流共享。这一点很重要,因为这样做可以防止在内部缓冲区正被共享时这个流被修改。

清单 4. 只读访问方法
  public void writeTo (OutputStream out) throws IOException {
// write the internal buffer directly
out.write (buffer, 0, index);
}
public InputStream toInputStream () {
// return a stream reading from the shared internal buffer
shared = true;
return new BytesInputStream (buffer, 0, index);
}

可能会覆盖共享数据的唯一的一个方法是显示在清单 5 中的 reset() 方法,该方法清空了这个流。所以,如果 shared 等于 true 且 reset() 被调用,那么我们创建新的内部缓冲区,而不是重新设置写索引。

清单 5. 重新设置流
  public void reset () throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else {
if (shared) {
// create a new buffer if it is shared
buffer = new byte[capacity];
shared = false;
}
// reset index
index = 0;
}
}
}

回页首

更好的字节数组输入流

用 ByteArrayInputStream 类来提供对内存中的二进制数据基于流的读访问是很理想的。但是,有时候,它的两个设计特点使我觉得需要一个替代它的类。第一,这个类是同步的;我已讲过,对于多数应用程序来说没有这个必要。第二,如果在执行 mark() 前调用它所实现的reset() 方法,那么 reset() 将忽略初始读偏移。这两点都不是缺陷;但是,它们不一定总是人们所期望的。

清单 6 中的 BytesInputStream 类是不同步的较为普通的字节数组输入流类。

清单 6. 不同步的字节数组输入流
package org.merlin.io;
import java.io.*;
/**
* An unsynchronized ByteArrayInputStream alternative.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*/
public class BytesInputStream extends InputStream {
// buffer from which to read
private byte[] buffer;
private int index, limit, mark;
// is the stream closed?
private boolean closed; public BytesInputStream (byte[] data) {
this (data, 0, data.length);
}
public BytesInputStream (byte[] data, int offset, int length) {
if (data == null) {
throw new NullPointerException ();
} else if ((offset < 0) || (offset + length > data.length)
|| (length < 0)) {
throw new IndexOutOfBoundsException ();
} else {
buffer = data;
index = offset;
limit = offset + length;
mark = offset;
}
}
public int read () throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else if (index >= limit) {
return -1; // EOF
} else {
return buffer[index ++] & 0xff;
}
}
public int read (byte data[], int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if ((offset < 0) || (offset + length > data.length)
|| (length < 0)) {
throw new IndexOutOfBoundsException ();
} else if (closed) {
throw new IOException ("Stream closed");
} else if (index >= limit) {
return -1; // EOF
} else {
// restrict length to available data
if (length > limit - index)
length = limit - index;
// copy out the subarray
System.arraycopy (buffer, index, data, offset, length);
index += length;
return length;
}
}
public long skip (long amount) throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else if (amount <= 0) {
return 0;
} else {
// restrict amount to available data
if (amount > limit - index)
amount = limit - index;
index += (int) amount;
return amount;
}
}
public int available () throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else {
return limit - index;
}
} public void close () {
closed = true;
}
public void mark (int readLimit) {
mark = index;
} public void reset () throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else {
// reset index
index = mark;
}
} public boolean markSupported () {
return true;
}
}

回页首

使用新的字节数组流

清单 7 中的代码演示了怎样使用新的字节数组流来解决第一篇文章中处理的问题(读一些压缩形式的数据):

清单 7. 使用新的字节数组流
public static InputStream newBruteForceCompress (InputStream in)
throws IOException {
BytesOutputStream sink = new BytesOutputStream ();
OutputStream out = new GZIPOutputStream (sink);
Streams.io (in, out);
out.close ();
return sink.toInputStream ();
}

回页首

更好的管道流

虽然标准的管道流既安全又可靠,但在性能方面不能令人满意。几个因素导致了它的性能问题:

  • 对于不同的使用情况,大小为 1024 字节的内部缓冲区并不都适用;对于大容量的数据,该缓冲区太小了。
  • 基于数组的操作只是反复调用低效的一个字节一个字节地复制操作。该操作本身是同步的,从而导致非常严重的锁争用。
  • 如果管道变空或变满而在这种状态改变时一个线程阻塞了,那么,即使仅有一个字节被读或写,该线程也被唤醒。在许多情况下,线程将使用这一个字节并立即再次阻塞,这将导致只做了很少有用的工作。

最后一个因素是 API 提供的严格的约定的后果。对于最通用的可能的应用程序中使用的流来说,这种严格的约定是必要的。但是,对于管道流实现,提供一种更宽松的约定是可能的,这个约定牺牲严格性以换取性能的提高:

  • 仅当缓冲区的可用数据(对阻塞的读程序而言)或可用空间(对写程序而言)达到指定的某个 滞后阈值或发生异常事件(例如管道关闭)时,阻塞的读程序和写程序才被唤醒。这将提高性能,因为仅当线程能完成适度的工作量时它们才被唤醒。
  • 只有一个线程可以从管道读取数据,只有一个线程可以把数据写到管道。否则,管道无法可靠地确定读程序线程或写程序线程何时意外死亡。

这个约定可完全适合典型应用程序情形中独立的读程序线程和写程序线程;需要立即唤醒的应用程序可以使用零滞后级别。我们将在后面看到,这个约定的实现的操作速度比标准 API 流的速度快两个数量级(100 倍)。

我们可以使用几个可能的 API 中的一个来开发这些管道流:我们可以模仿标准类,显式地连接两个流;我们也可以开发一个 Pipe 类并从这个类抽取输出流和输入流。我们不使用这两种方式而是使用更简单的方式:创建一个 PipeInputStream ,然后抽取关联的输出流。

这些流的一般操作如下:

  • 我们把内部数组用作环缓冲区(请看图 1):这个数组中维护着一个读索引和一个写索引;数据被写到写索引所指的位置,数据从读索引所指的位置被读取;当两个索引到达缓冲区末尾时,它们回绕到缓冲区起始点。任一个索引不能超越另一个索引。当写索引到达读索引时,管道是满的,不能再写任何数据。当读索引到达写索引时,管道是空的,不能再读任何数据。
  • 同步被用来确保两个协作线程看到管道状态的最新值。Java 语言规范对内存访问的顺序的规定是很宽容的,因此,无法使用无锁缓冲技术。
图 1. 环缓冲区

IO流--转载

在下面的代码清单中给出的是实现这些管道流的代码。清单 8 显示了这个类所用的构造函数和变量。您可以从这个 InputStream 中抽取相应的OutputStream (请看清单 17 中的代码)。在构造函数中您可以指定内部缓冲区的大小和滞后级别;这是缓冲区容量的一部分,在相应的读程序线程或写程序线程被立即唤醒前必须被使用或可用。我们维护两个变量, reader 和 writer ,它们与读程序线程和写程序线程相对应。我们用它们来发现什么时候一个线程已死亡而另一个线程仍在访问流。

清单 8. 一个替代的管道流实现
package org.merlin.io;
import java.io.*;
/**
* An efficient connected stream pair for communicating between
* the threads of an application. This provides a less-strict contract
* than the standard piped streams, resulting in much-improved
* performance. Also supports non-blocking operation.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*/
public class PipeInputStream extends InputStream {
// default values
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final float DEFAULT_HYSTERESIS = 0.75f;
private static final int DEFAULT_TIMEOUT_MS = 1000; // flag indicates whether method applies to reader or writer
private static final boolean READER = false, WRITER = true;
// internal pipe buffer
private byte[] buffer;
// read/write index
private int readx, writex;
// pipe capacity, hysteresis level
private int capacity, level;
// flags
private boolean eof, closed, sleeping, nonBlocking;
// reader/writer thread
private Thread reader, writer;
// pending exception
private IOException exception;
// deadlock-breaking timeout
private int timeout = DEFAULT_TIMEOUT_MS; public PipeInputStream () {
this (DEFAULT_BUFFER_SIZE, DEFAULT_HYSTERESIS);
}
public PipeInputStream (int bufferSize) {
this (bufferSize, DEFAULT_HYSTERESIS);
}
// e.g., hysteresis .75 means sleeping reader/writer is not
// immediately woken until the buffer is 75% full/empty
public PipeInputStream (int bufferSize, float hysteresis) {
if ((hysteresis < 0.0) || (hysteresis > 1.0))
throw new IllegalArgumentException ("Hysteresis: " + hysteresis);
capacity = bufferSize;
buffer = new byte[capacity];
level = (int) (bufferSize * hysteresis);
}

清单 9 中的配置方法允许您配置流的超时值和非阻塞模式。超时值的单位是毫秒,它表示阻塞的线程在过了这段时间后将被自动唤醒;这对于打破在一个线程死亡的情况下可能发生的死锁是必要的。在非阻塞模式中,如果线程阻塞,那么 InterruptedIOException 将被抛出。

清单 9. 管道配置
  public void setTimeout (int ms) {
this.timeout = ms;
}
public void setNonBlocking (boolean nonBlocking) {
this.nonBlocking = nonBlocking;
}

清单 10 中的读方法都遵循相当标准的模式:如果我们还没有读线程的引用,那么我们先取得它,然后我们验证输入参数,核对流未被关闭或没有异常待处理,确定可以读取多少数据,最后把数据从内部的环缓冲区复制到读程序的缓冲区。清单 12 中的 checkedAvailable() 方法在返回前自动地等待,直到出现一些可用的数据或流被关闭。

清单 10. 读数据
  private byte[] one = new byte[1];

  public int read () throws IOException {
// read 1 byte
int amount = read (one, 0, 1);
// return EOF / the byte
return (amount < 0) ? -1 : one[0] & 0xff;
} public synchronized int read (byte data[], int offset, int length)
throws IOException {
// take a reference to the reader thread
if (reader == null)
reader = Thread.currentThread ();
// check parameters
if (data == null) {
throw new NullPointerException ();
} else if ((offset < 0) || (offset + length > data.length)
|| (length < 0)) { // check indices
throw new IndexOutOfBoundsException ();
} else {
// throw an exception if the stream is closed
closedCheck ();
// throw any pending exception
exceptionCheck ();
if (length <= 0) {
return 0;
} else {
// wait for some data to become available for reading
int available = checkedAvailable (READER);
// return -1 on EOF
if (available < 0)
return -1;
// calculate amount of contiguous data in pipe buffer
int contiguous = capacity - (readx % capacity);
// calculate how much we will read this time
int amount = (length > available) ? available : length;
if (amount > contiguous) {
// two array copies needed if data wrap around the buffer end
System.arraycopy (buffer, readx % capacity, data, offset,
contiguous);
System.arraycopy (buffer, 0, data, offset + contiguous,
amount - contiguous);
} else {
// otherwise, one array copy needed
System.arraycopy (buffer, readx % capacity, data, offset,
amount);
}
// update indices with amount of data read
processed (READER, amount);
// return amount read
return amount;
}
}
}
public synchronized long skip (long amount) throws IOException {
// take a reference to the reader thread
if (reader == null)
reader = Thread.currentThread ();
// throw an exception if the stream is closed
closedCheck ();
// throw any pending exception
exceptionCheck ();
if (amount <= 0) {
return 0;
} else {
// wait for some data to become available for skipping
int available = checkedAvailable (READER);
// return 0 on EOF
if (available < 0)
return 0;
// calculate how much we will skip this time
if (amount > available)
amount = available;
// update indices with amount of data skipped
processed (READER, (int) amount);
// return amount skipped
return amount;
}
}

当数据从这个管道被读取或数据被写到这个管道时,清单 11 中的方法被调用。该方法更新有关的索引,如果管道达到它的滞后级别,该方法自动地唤醒阻塞的线程。

清单 11. 更新索引
  private void processed (boolean rw, int amount) {
if (rw == READER) {
// update read index with amount read
readx = (readx + amount) % (capacity * 2);
} else {
// update write index with amount written
writex = (writex + amount) % (capacity * 2);
}
// check whether a thread is sleeping and we have reached the
// hysteresis threshold
if (sleeping && (available (!rw) >= level)) {
// wake sleeping thread
notify ();
sleeping = false;
}
}

在管道有可用空间或可用数据(取决于 rw 参数)前,清单 12 中的 checkedAvailable() 方法一直等待,然后把空间的大小或数据的多少返回给调用程序。在这个方法内还核对流未被关闭、管道未被破坏等。

清单 12. 检查可用性
  public synchronized int available () throws IOException {
// throw an exception if the stream is closed
closedCheck ();
// throw any pending exception
exceptionCheck ();
// determine how much can be read
int amount = available (READER);
// return 0 on EOF, otherwise the amount readable
return (amount < 0) ? 0 : amount;
} private int checkedAvailable (boolean rw) throws IOException {
// always called from synchronized(this) method
try {
int available;
// loop while no data can be read/written
while ((available = available (rw)) == 0) {
if (rw == READER) { // reader
// throw any pending exception
exceptionCheck ();
} else { // writer
// throw an exception if the stream is closed
closedCheck ();
}
// throw an exception if the pipe is broken
brokenCheck (rw);
if (!nonBlocking) { // blocking mode
// wake any sleeping thread
if (sleeping)
notify ();
// sleep for timeout ms (in case of peer thread death)
sleeping = true;
wait (timeout);
// timeout means that hysteresis may not be obeyed
} else { // non-blocking mode
// throw an InterruptedIOException
throw new InterruptedIOException
("Pipe " + (rw ? "full" : "empty"));
}
}
return available;
} catch (InterruptedException ex) {
// rethrow InterruptedException as InterruptedIOException
throw new InterruptedIOException (ex.getMessage ());
}
}
private int available (boolean rw) {
// calculate amount of space used in pipe
int used = (writex + capacity * 2 - readx) % (capacity * 2);
if (rw == WRITER) { // writer
// return amount of space available for writing
return capacity - used;
} else { // reader
// return amount of data in pipe or -1 at EOF
return (eof && (used == 0)) ? -1 : used;
}
}

清单 13 中的方法关闭这个流;该方法还提供对读程序或写程序关闭流的支持。阻塞的线程被自动唤醒,该方法还检查各种其它情况是否正常。

清单 13. 关闭流
  public void close () throws IOException {
// close the read end of this pipe
close (READER);
}
private synchronized void close (boolean rw) throws IOException {
if (rw == READER) { // reader
// set closed flag
closed = true;
} else if (!eof) { // writer
// set eof flag
eof = true;
// check if data remain unread
if (available (READER) > 0) {
// throw an exception if the reader has already closed the pipe
closedCheck ();
// throw an exception if the reader thread has died
brokenCheck (WRITER);
}
}
// wake any sleeping thread
if (sleeping) {
notify ();
sleeping = false;
}
}

清单 14 中的方法检查这个流的状态。如果有异常待处理,那么流被关闭或管道被破坏(也就是说,读程序线程或写程序线程已死亡),异常被抛出。

清单 14. 检查流状态
  private void exceptionCheck () throws IOException {
// throw any pending exception
if (exception != null) {
IOException ex = exception;
exception = null;
throw ex; // could wrap ex in a local exception
}
}
private void closedCheck () throws IOException {
// throw an exception if the pipe is closed
if (closed)
throw new IOException ("Stream closed");
}
private void brokenCheck (boolean rw) throws IOException {
// get a reference to the peer thread
Thread thread = (rw == WRITER) ? reader : writer;
// throw an exception if the peer thread has died
if ((thread != null) && !thread.isAlive ())
throw new IOException ("Broken pipe");
}

当数据被写入这个管道时,清单 15 中的方法被调用。总的来说,它类似于读方法:我们先取得写程序线程的副本,然后检查流是否被关闭,接着进入把数据复制到管道的循环。和前面一样,该方法使用 checkedAvailable() 方法,checkedAvailable() 自动阻塞,直到管道中有可用的容量。

清单 15. 写数据
private synchronized void writeImpl (byte[] data, int offset, int length)
throws IOException {
// take a reference to the writer thread
if (writer == null)
writer = Thread.currentThread ();
// throw an exception if the stream is closed
if (eof || closed) {
throw new IOException ("Stream closed");
} else {
int written = 0;
try {
// loop to write all the data
do {
// wait for space to become available for writing
int available = checkedAvailable (WRITER);
// calculate amount of contiguous space in pipe buffer
int contiguous = capacity - (writex % capacity);
// calculate how much we will write this time
int amount = (length > available) ? available : length;
if (amount > contiguous) {
// two array copies needed if space wraps around the buffer end
System.arraycopy (data, offset, buffer, writex % capacity,
contiguous);
System.arraycopy (data, offset + contiguous, buffer, 0,
amount - contiguous);
} else {
// otherwise, one array copy needed
System.arraycopy (data, offset, buffer, writex % capacity,
amount);
}
// update indices with amount of data written
processed (WRITER, amount);
// update amount written by this method
written += amount;
} while (written < length);
// data successfully written
} catch (InterruptedIOException ex) {
// write operation was interrupted; set the bytesTransferred
// exception field to reflect the amount of data written
ex.bytesTransferred = written;
// rethrow exception
throw ex;
}
}
}

如清单 16 所示,这个管道流实现的特点之一是写程序可设置一个被传递给读程序的异常。

清单 16. 设置异常
  private synchronized void setException (IOException ex)
throws IOException {
// fail if an exception is already pending
if (exception != null)
throw new IOException ("Exception already set: " + exception);
// throw an exception if the pipe is broken
brokenCheck (WRITER);
// take a reference to the pending exception
this.exception = ex;
// wake any sleeping thread
if (sleeping) {
notify ();
sleeping = false;
}
}

清单 17 给出这个管道的有关输出流的代码。 getOutputStream() 方法返回 OutputStreamImpl ,OutputStreamImpl 是使用前面给出的方法来把数据写到内部管道的输出流。OutputStreamImpl 类继承了 OutputStreamEx ,OutputStreamEx 是允许为读线程设置异常的输出流类的扩展。

清单 17. 输出流
  public OutputStreamEx getOutputStream () {
// return an OutputStreamImpl associated with this pipe
return new OutputStreamImpl ();
} private class OutputStreamImpl extends OutputStreamEx {
private byte[] one = new byte[1]; public void write (int datum) throws IOException {
// write one byte using internal array
one[0] = (byte) datum;
write (one, 0, 1);
}
public void write (byte[] data, int offset, int length)
throws IOException {
// check parameters
if (data == null) {
throw new NullPointerException ();
} else if ((offset < 0) || (offset + length > data.length)
|| (length < 0)) {
throw new IndexOutOfBoundsException ();
} else if (length > 0) {
// call through to writeImpl()
PipeInputStream.this.writeImpl (data, offset, length);
}
}
public void close () throws IOException {
// close the write end of this pipe
PipeInputStream.this.close (WRITER);
}
public void setException (IOException ex) throws IOException {
// set a pending exception
PipeInputStream.this.setException (ex);
}
}
// static OutputStream extension with setException() method
public static abstract class OutputStreamEx extends OutputStream {
public abstract void setException (IOException ex) throws IOException;
}
}

回页首

使用新的管道流

清单 18 演示了怎样使用新的管道流来解决上一篇文章中的问题。请注意,写程序线程中出现的任何异常均可在流中被传递。

清单 18. 使用新的管道流
public static InputStream newPipedCompress (final InputStream in)
throws IOException {
PipeInputStream source = new PipeInputStream ();
final PipeInputStream.OutputStreamEx sink = source.getOutputStream ();
new Thread () {
public void run () {
try {
GZIPOutputStream gzip = new GZIPOutputStream (sink);
Streams.io (in, gzip);
gzip.close ();
} catch (IOException ex) {
try {
sink.setException (ex);
} catch (IOException ignored) {
}
}
}
}.start ();
return source;
}

回页首

性能结果

在下面的表中显示的是这些新的流和标准流的性能,测试环境是运行 Java 2 SDK,v1.4.0 的 800MHz Linux 机器。性能测试程序与我在上一篇文章中用的相同:

管道流 
15KB:21ms;15MB:20675ms 
新的管道流 
15KB:0.68ms;15MB:158ms 
字节数组流 
15KB:0.31ms;15MB:745ms 
新的字节数组流 
15KB:0.26ms;15MB:438ms

与上一篇文章中的性能差异只反映了我的机器中不断变化的环境负载。您可以从这些结果中看到,在大容量数据方面,新的管道流的性能远好于蛮力解决方案;但是,新的管道流的速度仍然只有我们分析的工程解决方案的速度的一半左右。显然,在现代的 Java 虚拟机中使用多个线程的开销远比以前小得多。

回页首

结束语

我们分析了两组可替代标准 Java API 的流的流: BytesOutputStream 和 BytesInputStream 是字节数组流的非同步替代者。因为这些类的预期的用例涉及单个线程的访问,所以不采用同步是合理的选择。实际上,执行时间的缩短(最多可缩短 40%)很可能与同步的消灭没有多大关系;性能得到提高的主要原因是在提供只读访问时避免了不必要的复制。第二个示例 PipeInputStream 可替代管道流;为了减少超过 99% 的执行时间,这个流使用宽松的约定、改进的缓冲区大小和基于数组的操作。在这种情况下无法使用不同步的代码;Java 语言规范排除了可靠地执行这种代码的可能性,否则,在理论上是可以实现最少锁定的管道。

字节数组流和管道流是基于流的应用程序内部通信的主要选择。虽然新的 I/O API 提供了一些其它选择,但是许多应用程序和 API 仍然依赖标准流,而且对于这些特殊用途来说,新的 I/O API 并不一定有更高的效率。通过适当地减少同步的使用、有效地采用基于数组的操作以及最大程度地减少不必要的复制,性能结果得到了很大的提高,从而提供了完全适应标准流框架的更高效的操作。在应用程序开发的其它领域中采用相同的步骤往往能取得类似地性能提升。