什么是 stream
Stream 借鉴自 Unix 编程哲学中的 pipe。
Unix shell 命令中,管道式的操作 | 将上一个命令的输出作为下一个命令的输入。Node.js stream 中则是通过 .pip() 方法来进行的。
来看一个 stream 的运用场景:从服务器读取文件并返回给页面。
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);
好处:
stream 的种类
分五种:
- readable
- writable
- duplex
- transform
- classic
readable
readable 类型的流产生的数据,可通过 .pip() 输送到能够消费(consume)流数据的地方,比如 writable,transform,duplex 类型的对象。
一个 readable stream 示例:
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
运行结果:
$ node read0.js
beep boop
_read 方法与按需输出
上面 rs.push(null) 表示没有更多数据了。直接将数据塞入到 readable 流中,然后被缓冲起来,直到被消费(示例中消费方为 process.stdout ,即输出到命令行。)。因为消费者有可能并不能立即消费这些内容,直接 push 数据后会消耗不必要的资源。
更好的做法是,让 readable 流只在消费者需要数据的时候再 push 。这是通过定义能 raedable 对象定义 ._read 方法来完成的。
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
运行结果:
$ node read1.js
abcdefghijklmnopqrstuvwxyz
这种方式下,定义了 readable 流产生数据的方法 ._read ,但并没有马上执行并输出数据,而是在 process.stdout 读取时,才调用输出的。
_read 方法可动态接收一个可选的 size 参数,由消费方指定一次读取想要多少字节的数据,当然,_read 方法的实现中是可以忽略这个入参的。
下面的示例可证明 _read 方法是消费方调用的时候才执行的,而不是主动执行。
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
<span class="pl-c1">setTimeout</span>(<span class="pl-k">function</span> () {
<span class="pl-smi">rs</span>.<span class="pl-c1">push</span>(<span class="pl-c1">String</span>.<span class="pl-c1">fromCharCode</span>(<span class="pl-k">++</span>c));
}, <span class="pl-c1">100</span>);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);
输出任意数据
上面展示的是输出简单字符串,如果需要输出其他复杂数据,初始化时设置上正确的 objectMode 参数,Readable({ objectMode: true })
消费 readable 流产生的数据
一般情况下, 我们会将 readable 产生的数据直接传递给其他的消费方,比如 through 或 concat-stream 创建的流对象。但直接操作来自 readable 的数据也是可以的。
process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
上面的示例代码中,监听了来自命令行的输入,有数据时 stdin readable 事件便会触发,然后可通过调用 read() 方法获取到数据。数据结束时 read() 返回 null 表示没有更多数据了。
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null
同时 read() 方法支持传递一个 size 指定一次获取多少的字节(bytes)。比如一次只获取 3 字节的数据:
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});
这将导致我们获取到的数据是不完整的,因为指定了尺寸后,超出的部分是拿不到的。
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
此时可通过调用 read(0) 告诉 Node.js 我们不希望丢弃超出的部分。这样超出的部分会在后面的读取中陆续输出。
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
+ process.stdin.read(0);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
<Buffer 61 62 63>
<Buffer **0a** 64 65>
<Buffer 66 **0a** 67>
<Buffer 68 69 **0a**>
除了调用 read(0) ,还可调用 unshift() 方法将多余数据放回去。这样下次 read() 的时候数据都还在。比如一个解析换行的示例,遇到换行时将本行输出,剩下的数据塞回,以在下一行处理时使用。
var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'
writable 流
writable 流可作为 .pip() 的对象。
创建 writable 流
需要实现 ._write(chunk, enc, next) 方法,其中:
-
chunk 为接收到的数据
-
enc 当 opts.decodeString 为 false 且收到的数据这字符串时,它表示字符串的编码
-
next(err) 数据处理后的回调,可传递一个错误信息以表示数据处理失败
默认情况下,获取到的字符串数据会转为 Buffer ,可设置 Writable({ decodeStrings: false }) 来获取字符串数据。
一个 writable 示例:
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
向 writable 流写入数据
通过调用 writable 流的 write 方法来写入。
process.stdout.write('beep boop\n');
通过调用 end() 来结束数据的写入。
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
duplex
双工类型的流,同时具有 writable 和 readable 流的功能。Node.js 内建的 zlib ,TCP sockets 以及 crypto 都是双工类型的。
所以可对双工类型的流进行如下操作:
transform
一种特殊类型的双工流,区别在于 transform 类型其输出是输入的转换。跟它的名字一样,这里面对数据进行一些转换后输出。比如,通过 zlib.createGzip 来对数据进行 gzip 的压缩。有时候也将这种类型的流称为 through steam 。
classic stream
这里指使用旧版 api 的流。当一个流身上绑定了 data 事件的监听时,便会回退为经典旧版的流。
classic readable stream
当有数据时它会派发 data 事件,数据输出结束时派发 end 事件给消费者。
.pipe() 通过检查 stream.readable 以判断该流是否是 readable 类型。
classic readable 流的创建
一个 classic readable 流的创建示例:
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
从 classic readable 流读取数据
数据读取是通过监听流上的 data 与 end 事件。
一个从 classic readable 流读取数据的示例:
process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
一般不建议通过这种方式来操作,一旦给流绑定 data 事件处理器,即回退到旧的 api 来使用流。如果真的有兼容操作旧版流的需求,应该通过 through 或 concat-stream 来进行。
classic writable stream
只需要实现 .write(buf) , .end(buf) 及 .destroy() 方法即可,比较简单。
内建的流对象
总结
本质上,所有流都是 EventEmitter ,通过事件可写入和读取数据。但通过新的 stream api,可方便地通过 .pipe() 方法来使用流而不是事件的方式。
使用 stream 可显著提高程序性能,特别是处理数据量大的情况下。它可以将数据分片处理,而不是粗暴地将数据看作一个整体。但掌握并熟练是需要一定时间练习的。
参考
|