I have been searching for an example of how I can stream the result of a MongoDB query to a nodejs client. All solutions I have found so far seem to read the query result at once and then send the result back to the server.
我一直在搜索一个示例,说明如何将MongoDB查询的结果流到nodejs客户机。到目前为止,我发现的所有解决方案似乎都立即读取了查询结果,然后将结果发送回服务器。
Instead, I would (obviously) like to supply a callback to the query method and have MongoDB call that when the next chunk of the result set is available.
相反,我(显然)希望向查询方法提供回调,并在结果集的下一个块可用时调用MongoDB。
I have been looking at mongoose - should I probably use a different driver?
我一直在看猫鼬——我应该换一个司机吗?
Jan
1月
4 个解决方案
#1
25
Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:
Mongoose的流媒体服务在2.4.0版本中出现,在你发布这个问题三个月后出现:
Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
More elaborated examples can be found on their documentation page.
更详细的示例可以在他们的文档页面中找到。
#2
24
node-mongodb-driver
(the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.
node-mongodb-driver(每个mongoDB客户机在node- js中使用的底层层)除了其他人提到的cursor API之外,还有一个不错的流API(#458)。不幸的是,我没有在其他地方找到它。
Update: there are docs also here.
更新:这里也有文档。
It can be used like this:
它可以这样使用:
var stream = collection.find().stream()
stream.on('error', function (err) {
console.error(err)
})
stream.on('data', function (doc) {
console.log(doc)
})
It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)
它实际上实现了ReadableStream接口,所以它有所有的好处(暂停/恢复等)
#3
10
mongoose
is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native
).
mongoose并不是真正的“驱动程序”,它实际上是围绕MongoDB驱动程序(node-mongodb-native)的ORM包装。
To do what you're doing, take a look at the driver's .find
and .each
method. Here's some code from the examples:
要做你正在做的事情,请查看驱动程序的.find和.each方法。下面是一些例子中的代码:
// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
sys.puts("Printing docs from Cursor Each")
cursor.each(function(err, doc) {
if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
})
});
To stream the results, you're basically replacing that sys.puts
with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush()
, but you may also want to checkout socket.io
.
为了实现流化结果,基本上就是替换了那个sys。使用你的“流”功能。不确定你打算如何发布结果。我认为您可以执行response.write() + response.flush(),但是您可能还想签出socket.com .io。
#4
2
Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)
这里是我找到的解决方案(如果这是错误的方法,请纠正我):(也请原谅错误的编码——对我来说现在美化这个已经太晚了)
var sys = require('sys')
var http = require("http");
var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;
var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));
var products;
db.open(function (error, client) {
if (error) throw error;
products = new Collection(client, 'products');
});
function ProductReader(collection) {
this.collection = collection;
}
ProductReader.prototype = new process.EventEmitter();
ProductReader.prototype.do = function() {
var self = this;
this.collection.find(function(err, cursor) {
if (err) {
self.emit('e1');
return;
}
sys.puts("Printing docs from Cursor Each");
self.emit('start');
cursor.each(function(err, doc) {
if (!err) {
self.emit('e2');
self.emit('end');
return;
}
if(doc != null) {
sys.puts("doc:" + doc.name);
self.emit('doc',doc);
} else {
self.emit('end');
}
})
});
};
http.createServer(function(req,res){
pr = new ProductReader(products);
pr.on('e1',function(){
sys.puts("E1");
res.writeHead(400,{"Content-Type": "text/plain"});
res.write("e1 occurred\n");
res.end();
});
pr.on('e2',function(){
sys.puts("E2");
res.write("ERROR\n");
});
pr.on('start',function(){
sys.puts("START");
res.writeHead(200,{"Content-Type": "text/plain"});
res.write("<products>\n");
});
pr.on('doc',function(doc){
sys.puts("A DOCUMENT" + doc.name);
res.write("<product><name>" + doc.name + "</name></product>\n");
});
pr.on('end',function(){
sys.puts("END");
res.write("</products>");
res.end();
});
pr.do();
}).listen(8000);
#1
25
Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:
Mongoose的流媒体服务在2.4.0版本中出现,在你发布这个问题三个月后出现:
Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
More elaborated examples can be found on their documentation page.
更详细的示例可以在他们的文档页面中找到。
#2
24
node-mongodb-driver
(the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.
node-mongodb-driver(每个mongoDB客户机在node- js中使用的底层层)除了其他人提到的cursor API之外,还有一个不错的流API(#458)。不幸的是,我没有在其他地方找到它。
Update: there are docs also here.
更新:这里也有文档。
It can be used like this:
它可以这样使用:
var stream = collection.find().stream()
stream.on('error', function (err) {
console.error(err)
})
stream.on('data', function (doc) {
console.log(doc)
})
It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)
它实际上实现了ReadableStream接口,所以它有所有的好处(暂停/恢复等)
#3
10
mongoose
is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native
).
mongoose并不是真正的“驱动程序”,它实际上是围绕MongoDB驱动程序(node-mongodb-native)的ORM包装。
To do what you're doing, take a look at the driver's .find
and .each
method. Here's some code from the examples:
要做你正在做的事情,请查看驱动程序的.find和.each方法。下面是一些例子中的代码:
// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
sys.puts("Printing docs from Cursor Each")
cursor.each(function(err, doc) {
if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
})
});
To stream the results, you're basically replacing that sys.puts
with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush()
, but you may also want to checkout socket.io
.
为了实现流化结果,基本上就是替换了那个sys。使用你的“流”功能。不确定你打算如何发布结果。我认为您可以执行response.write() + response.flush(),但是您可能还想签出socket.com .io。
#4
2
Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)
这里是我找到的解决方案(如果这是错误的方法,请纠正我):(也请原谅错误的编码——对我来说现在美化这个已经太晚了)
var sys = require('sys')
var http = require("http");
var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;
var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));
var products;
db.open(function (error, client) {
if (error) throw error;
products = new Collection(client, 'products');
});
function ProductReader(collection) {
this.collection = collection;
}
ProductReader.prototype = new process.EventEmitter();
ProductReader.prototype.do = function() {
var self = this;
this.collection.find(function(err, cursor) {
if (err) {
self.emit('e1');
return;
}
sys.puts("Printing docs from Cursor Each");
self.emit('start');
cursor.each(function(err, doc) {
if (!err) {
self.emit('e2');
self.emit('end');
return;
}
if(doc != null) {
sys.puts("doc:" + doc.name);
self.emit('doc',doc);
} else {
self.emit('end');
}
})
});
};
http.createServer(function(req,res){
pr = new ProductReader(products);
pr.on('e1',function(){
sys.puts("E1");
res.writeHead(400,{"Content-Type": "text/plain"});
res.write("e1 occurred\n");
res.end();
});
pr.on('e2',function(){
sys.puts("E2");
res.write("ERROR\n");
});
pr.on('start',function(){
sys.puts("START");
res.writeHead(200,{"Content-Type": "text/plain"});
res.write("<products>\n");
});
pr.on('doc',function(doc){
sys.puts("A DOCUMENT" + doc.name);
res.write("<product><name>" + doc.name + "</name></product>\n");
});
pr.on('end',function(){
sys.puts("END");
res.write("</products>");
res.end();
});
pr.do();
}).listen(8000);