I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.
我需要构建一个处理大型CSV文件的函数,以便在bluebird.map()调用中使用。考虑到文件的潜在大小,我想使用流媒体。
This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).
此函数应接受流(CSV文件)和函数(处理流中的块),并在读取文件结束(已解决)或错误(拒绝)时返回承诺。
So, I start with:
所以,我开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Now, I have two inter-related issues:
现在,我有两个相互关联的问题:
- I need to throttle the actual amount of data being processed, so as to not create memory pressures.
- 我需要限制正在处理的实际数据量,以免产生内存压力。
- The function passed as the
processor
param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now:pg-promise
). As such, it will create a promise in memory and move on, repeatedly. - 作为处理器参数传递的函数通常是异步的,例如通过基于承诺的库(现在:pg-promise)将文件的内容保存到数据库。因此,它将在记忆中创造一个承诺并反复继续前进。
The pg-promise
library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable
section after each read()
, which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.
pg-promise库具有管理它的功能,如page(),但我无法将如何将流事件处理程序与这些promise方法混合在一起。现在,我在每个read()之后在处理程序中返回一个可读部分的promise,这意味着我创建了大量的承诺数据库操作并最终因为我达到进程内存限制而出错。
Does anyone have a working example of this that I can use as a jumping point?
有没有人有一个这样的工作例子,我可以用作跳跃点?
UPDATE: Probably more than one way to skin the cat, but this works:
更新:可能有不止一种方法给猫皮肤,但这有效:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Anyone sees a potential problem with this approach?
有人发现这种方法存在潜在问题吗?
3 个解决方案
#1
3
Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.
在下面找到一个完整的应用程序,它可以正确地执行相同类型的任务:它将文件作为流读取,将其解析为CSV并将每行插入数据库。
const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});
const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');
const db = pgp(cn);
function receiver(_, data) {
function source(index) {
if (index < data.length) {
// here we insert just the first column value that contains a prime number;
return this.none('insert into primes values($1)', data[index][0]);
}
}
return this.sequence(source);
}
db.task(t => {
return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
.then(data => {
console.log('DATA:', data);
}
.catch(error => {
console.log('ERROR:', error);
});
Note that the only thing I changed: using library csv-parse
instead of csv
, as a better alternative.
请注意,我唯一改变的是:使用库csv-parse而不是csv,作为更好的选择。
Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.
添加了spex库中方法stream.read的使用,该方法正确地提供了一个可读流用于promises。
#2
7
You might want to look at promise-streams
您可能希望查看promise-streams
var ps = require('promise-streams');
passedStream
.pipe(csv.parse({trim: true}))
.pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
.wait().then(_ => {
console.log("All done!");
});
Works with backpressure and everything.
适用于背压和一切。
#3
1
So to say you don't want streaming but some kind of data chunks? ;-)
那么说你不想要流媒体而是某种数据块? ;-)
Do you know https://github.com/substack/stream-handbook?
你知道https://github.com/substack/stream-handbook吗?
I think the simplest approach without changing your architecture would be some kind of promise pool. e.g. https://github.com/timdp/es6-promise-pool
我认为在不改变你的架构的情况下最简单的方法就是某种承诺池。例如https://github.com/timdp/es6-promise-pool
#1
3
Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.
在下面找到一个完整的应用程序,它可以正确地执行相同类型的任务:它将文件作为流读取,将其解析为CSV并将每行插入数据库。
const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});
const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');
const db = pgp(cn);
function receiver(_, data) {
function source(index) {
if (index < data.length) {
// here we insert just the first column value that contains a prime number;
return this.none('insert into primes values($1)', data[index][0]);
}
}
return this.sequence(source);
}
db.task(t => {
return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
.then(data => {
console.log('DATA:', data);
}
.catch(error => {
console.log('ERROR:', error);
});
Note that the only thing I changed: using library csv-parse
instead of csv
, as a better alternative.
请注意,我唯一改变的是:使用库csv-parse而不是csv,作为更好的选择。
Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.
添加了spex库中方法stream.read的使用,该方法正确地提供了一个可读流用于promises。
#2
7
You might want to look at promise-streams
您可能希望查看promise-streams
var ps = require('promise-streams');
passedStream
.pipe(csv.parse({trim: true}))
.pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
.wait().then(_ => {
console.log("All done!");
});
Works with backpressure and everything.
适用于背压和一切。
#3
1
So to say you don't want streaming but some kind of data chunks? ;-)
那么说你不想要流媒体而是某种数据块? ;-)
Do you know https://github.com/substack/stream-handbook?
你知道https://github.com/substack/stream-handbook吗?
I think the simplest approach without changing your architecture would be some kind of promise pool. e.g. https://github.com/timdp/es6-promise-pool
我认为在不改变你的架构的情况下最简单的方法就是某种承诺池。例如https://github.com/timdp/es6-promise-pool