如何在Stream.js中使用stream.Writable的drain事件

时间:2021-12-12 20:56:51

In Node.js I'm using the fs.createWriteStream method to append data to a local file. In the Node documentation they mention the drain event when using fs.createWriteStream, but I don't understand it.

在Node.js中,我使用fs.createWriteStream方法将数据附加到本地文件。在Node文档中,他们在使用fs.createWriteStream时提到了drain事件,但我不明白。

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);

In the code above, how can I use the drain event? Is the event used properly below?

在上面的代码中,我如何使用drain事件?下面的事件是否正确使用?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}

2 个解决方案

#1


18  

The drain event is for when a writable stream's internal buffer has been emptied.

drain事件用于何时清空可写流的内部缓冲区。

This can only happen when the size of the internal buffer once exceeded its highWaterMark property, which is the maximum bytes of data that can be stored inside a writable stream's internal buffer until it stops reading from the data source.

这只能在内部缓冲区的大小超过其highWaterMark属性时发生,该属性是可写入流的内部缓冲区内的数据的最大字节数,直到它停止从数据源读取为止。

The cause of something like this can be due to setups that involve reading a data source from one stream faster than it can be written to another resource. For example, take two streams:

造成这种情况的原因可能是由于设置涉及从一个流中读取数据源的速度比可以写入另一个资源的速度快。例如,采取两个流:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

Now imagine that the file read is on a SSD and can read at 500MB/s and write is on a HDD that can only write at 150MB/s. The write stream will not be able to keep up, and will start storing data in the internal buffer. Once the buffer has reached the highWaterMark, which is by default 16KB, the writes will start returning false, and the stream will internally queue a drain. Once the internal buffer's length is 0, then the drain event is fired.

现在想象一下,读取的文件位于SSD上,读取速度为500MB / s,写入速度只能写入150MB / s的HDD。写入流将无法跟上,并将开始将数据存储在内部缓冲区中。一旦缓冲区达到highWaterMark(默认为16KB),写入将开始返回false,并且流将在内部排队。一旦内部缓冲区的长度为0,则触发漏极事件。

This is how a drain works:

这是排水工作的方式:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

And these are the prerequisites for a drain which are part of the writeOrBuffer function:

这些是作为writeOrBuffer函数一部分的排水的先决条件:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

To see how the drain event is used, take the example from the Node.js documentation.

要查看如何使用drain事件,请参阅Node.js文档中的示例。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

The function's objective is to write 1,000,000 times to a writable stream. What happens is a variable ok is set to true, and a loop only executes when ok is true. For each loop iteration, the value of ok is set to the value of stream.write(), which will return false if a drain is required. If ok becomes false, then the event handler for drain waits, and on fire, resumes the writing.

该函数的目标是向可写流写入1,000,000次。变量ok设置为true,循环仅在ok为true时执行。对于每个循环迭代,ok的值设置为stream.write()的值,如果需要耗尽则返回false。如果ok变为false,那么drain的事件处理程序等待,并且着火时,恢复写入。


Regarding your code specifically, you don't need to use the drain event because you are writing only once right after opening your stream. Since you have not yet written anything to the stream, the internal buffer is empty, and you would have to be writing at least 16KB in chunks in order for the drain event to fire. The drain event is for writing many times with more data than the highWaterMark setting of your writable stream.

关于您的代码,您不需要使用drain事件,因为您在打开流后只编写一次。由于您还没有向流写入任何内容,因此内部缓冲区为空,您必须以块的形式写入至少16KB才能触发drain事件。 drain事件用于写入多次,其数据多于可写流的highWaterMark设置。

#2


6  

Imagine you're connecting 2 streams with very different bandwidths, say, uploading a local file to a slow server. The (fast) file stream will emit data faster than the (slow) socket stream can consume it.

想象一下,您正在连接具有不同带宽的2个流,例如,将本地文件上载到慢速服务器。 (快速)文件流将比(慢)套接字流消耗它更快地发出数据。

In this situation, node.js will keep data in memory until the slow stream gets a chance to process it. This can get problematic if the file is very large.

在这种情况下,node.js会将数据保存在内存中,直到慢流有机会处理它。如果文件非常大,这可能会出现问题。

To avoid this, Stream.write returns false when the underlying system buffer is full. If you stop writing, the stream will later emit a drain event to indicate that the system buffer has emptied and it is appropriate to write again.

为避免这种情况,当底层系统缓冲区已满时,Stream.write返回false。如果停止写入,流将稍后发出一个drain事件以指示系统缓冲区已清空,并且再次写入是合适的。

You can use pause/resume the readable stream and control the bandwidth of the readable stream.

您可以使用暂停/恢复可读流并控制可读流的带宽。

Better: you can use readable.pipe(writable) which will do this for you.

更好:您可以使用可执行此操作的readable.pipe(可写)。

EDIT: There's a bug in your code: regardless of what write returns, your data has been written. You don't need to retry it. In your case, you're writing data twice.

编辑:您的代码中存在一个错误:无论写入何种返回,您的数据都已写入。您无需重试它。在您的情况下,您正在两次写入数据。

Something like this would work:

这样的东西会起作用:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}

#1


18  

The drain event is for when a writable stream's internal buffer has been emptied.

drain事件用于何时清空可写流的内部缓冲区。

This can only happen when the size of the internal buffer once exceeded its highWaterMark property, which is the maximum bytes of data that can be stored inside a writable stream's internal buffer until it stops reading from the data source.

这只能在内部缓冲区的大小超过其highWaterMark属性时发生,该属性是可写入流的内部缓冲区内的数据的最大字节数,直到它停止从数据源读取为止。

The cause of something like this can be due to setups that involve reading a data source from one stream faster than it can be written to another resource. For example, take two streams:

造成这种情况的原因可能是由于设置涉及从一个流中读取数据源的速度比可以写入另一个资源的速度快。例如,采取两个流:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

Now imagine that the file read is on a SSD and can read at 500MB/s and write is on a HDD that can only write at 150MB/s. The write stream will not be able to keep up, and will start storing data in the internal buffer. Once the buffer has reached the highWaterMark, which is by default 16KB, the writes will start returning false, and the stream will internally queue a drain. Once the internal buffer's length is 0, then the drain event is fired.

现在想象一下,读取的文件位于SSD上,读取速度为500MB / s,写入速度只能写入150MB / s的HDD。写入流将无法跟上,并将开始将数据存储在内部缓冲区中。一旦缓冲区达到highWaterMark(默认为16KB),写入将开始返回false,并且流将在内部排队。一旦内部缓冲区的长度为0,则触发漏极事件。

This is how a drain works:

这是排水工作的方式:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

And these are the prerequisites for a drain which are part of the writeOrBuffer function:

这些是作为writeOrBuffer函数一部分的排水的先决条件:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

To see how the drain event is used, take the example from the Node.js documentation.

要查看如何使用drain事件,请参阅Node.js文档中的示例。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

The function's objective is to write 1,000,000 times to a writable stream. What happens is a variable ok is set to true, and a loop only executes when ok is true. For each loop iteration, the value of ok is set to the value of stream.write(), which will return false if a drain is required. If ok becomes false, then the event handler for drain waits, and on fire, resumes the writing.

该函数的目标是向可写流写入1,000,000次。变量ok设置为true,循环仅在ok为true时执行。对于每个循环迭代,ok的值设置为stream.write()的值,如果需要耗尽则返回false。如果ok变为false,那么drain的事件处理程序等待,并且着火时,恢复写入。


Regarding your code specifically, you don't need to use the drain event because you are writing only once right after opening your stream. Since you have not yet written anything to the stream, the internal buffer is empty, and you would have to be writing at least 16KB in chunks in order for the drain event to fire. The drain event is for writing many times with more data than the highWaterMark setting of your writable stream.

关于您的代码,您不需要使用drain事件,因为您在打开流后只编写一次。由于您还没有向流写入任何内容,因此内部缓冲区为空,您必须以块的形式写入至少16KB才能触发drain事件。 drain事件用于写入多次,其数据多于可写流的highWaterMark设置。

#2


6  

Imagine you're connecting 2 streams with very different bandwidths, say, uploading a local file to a slow server. The (fast) file stream will emit data faster than the (slow) socket stream can consume it.

想象一下,您正在连接具有不同带宽的2个流,例如,将本地文件上载到慢速服务器。 (快速)文件流将比(慢)套接字流消耗它更快地发出数据。

In this situation, node.js will keep data in memory until the slow stream gets a chance to process it. This can get problematic if the file is very large.

在这种情况下,node.js会将数据保存在内存中,直到慢流有机会处理它。如果文件非常大,这可能会出现问题。

To avoid this, Stream.write returns false when the underlying system buffer is full. If you stop writing, the stream will later emit a drain event to indicate that the system buffer has emptied and it is appropriate to write again.

为避免这种情况,当底层系统缓冲区已满时,Stream.write返回false。如果停止写入,流将稍后发出一个drain事件以指示系统缓冲区已清空,并且再次写入是合适的。

You can use pause/resume the readable stream and control the bandwidth of the readable stream.

您可以使用暂停/恢复可读流并控制可读流的带宽。

Better: you can use readable.pipe(writable) which will do this for you.

更好:您可以使用可执行此操作的readable.pipe(可写)。

EDIT: There's a bug in your code: regardless of what write returns, your data has been written. You don't need to retry it. In your case, you're writing data twice.

编辑:您的代码中存在一个错误:无论写入何种返回,您的数据都已写入。您无需重试它。在您的情况下,您正在两次写入数据。

Something like this would work:

这样的东西会起作用:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}