如何用nodejs处理MongoDB查询结果?

时间:2022-11-15 03:03:31

I have been searching for an example of how I can stream the result of a MongoDB query to a nodejs client. All solutions I have found so far seem to read the query result at once and then send the result back to the server.

我一直在搜索一个示例,说明如何将MongoDB查询的结果流到nodejs客户机。到目前为止,我发现的所有解决方案似乎都立即读取了查询结果,然后将结果发送回服务器。

Instead, I would (obviously) like to supply a callback to the query method and have MongoDB call that when the next chunk of the result set is available.

相反,我(显然)希望向查询方法提供回调,并在结果集的下一个块可用时调用MongoDB。

I have been looking at mongoose - should I probably use a different driver?

我一直在看猫鼬——我应该换一个司机吗?

Jan

1月

4 个解决方案

#1


25  

Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:

Mongoose的流媒体服务在2.4.0版本中出现,在你发布这个问题三个月后出现:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

More elaborated examples can be found on their documentation page.

更详细的示例可以在他们的文档页面中找到。

#2


24  

node-mongodb-driver (the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.

node-mongodb-driver(每个mongoDB客户机在node- js中使用的底层层)除了其他人提到的cursor API之外,还有一个不错的流API(#458)。不幸的是,我没有在其他地方找到它。

Update: there are docs also here.

更新:这里也有文档。

It can be used like this:

它可以这样使用:

var stream = collection.find().stream()
stream.on('error', function (err) {
  console.error(err)
})
stream.on('data', function (doc) {
  console.log(doc)
})

It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)

它实际上实现了ReadableStream接口,所以它有所有的好处(暂停/恢复等)

#3


10  

mongoose is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native).

mongoose并不是真正的“驱动程序”,它实际上是围绕MongoDB驱动程序(node-mongodb-native)的ORM包装。

To do what you're doing, take a look at the driver's .find and .each method. Here's some code from the examples:

要做你正在做的事情,请查看驱动程序的.find和.each方法。下面是一些例子中的代码:

// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
  sys.puts("Printing docs from Cursor Each")
  cursor.each(function(err, doc) {
    if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
  })                    
});

To stream the results, you're basically replacing that sys.puts with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush(), but you may also want to checkout socket.io.

为了实现流化结果,基本上就是替换了那个sys。使用你的“流”功能。不确定你打算如何发布结果。我认为您可以执行response.write() + response.flush(),但是您可能还想签出socket.com .io。

#4


2  

Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)

这里是我找到的解决方案(如果这是错误的方法,请纠正我):(也请原谅错误的编码——对我来说现在美化这个已经太晚了)

var sys = require('sys')
var http = require("http");

var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
  Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
  Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
  Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;

var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));

var products;

db.open(function (error, client) {
  if (error) throw error;
  products = new Collection(client, 'products');
});

function ProductReader(collection) {
        this.collection = collection;
}

ProductReader.prototype = new process.EventEmitter();

ProductReader.prototype.do = function() {
        var self = this;

        this.collection.find(function(err, cursor) {
                if (err) {
                        self.emit('e1');
                        return;

                }
                sys.puts("Printing docs from Cursor Each");

                self.emit('start');
                cursor.each(function(err, doc) {
                        if (!err) {
                                self.emit('e2');
                                self.emit('end');
                                return;
                        }

                        if(doc != null) {
                                sys.puts("doc:" + doc.name);
                                self.emit('doc',doc);
                        } else {
                                self.emit('end');
                        }
                })
        });
};
http.createServer(function(req,res){
        pr = new ProductReader(products);
        pr.on('e1',function(){
                sys.puts("E1");
                res.writeHead(400,{"Content-Type": "text/plain"});
                res.write("e1 occurred\n");
                res.end();
        });
        pr.on('e2',function(){
                sys.puts("E2");
                res.write("ERROR\n");
        });

        pr.on('start',function(){
                sys.puts("START");
                res.writeHead(200,{"Content-Type": "text/plain"});
                res.write("<products>\n");
        });

        pr.on('doc',function(doc){
                sys.puts("A DOCUMENT" + doc.name);
                res.write("<product><name>" + doc.name + "</name></product>\n");
        });

        pr.on('end',function(){
                sys.puts("END");
                res.write("</products>");
                res.end();
        });

        pr.do();

  }).listen(8000);

#1


25  

Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:

Mongoose的流媒体服务在2.4.0版本中出现,在你发布这个问题三个月后出现:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

More elaborated examples can be found on their documentation page.

更详细的示例可以在他们的文档页面中找到。

#2


24  

node-mongodb-driver (the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.

node-mongodb-driver(每个mongoDB客户机在node- js中使用的底层层)除了其他人提到的cursor API之外,还有一个不错的流API(#458)。不幸的是,我没有在其他地方找到它。

Update: there are docs also here.

更新:这里也有文档。

It can be used like this:

它可以这样使用:

var stream = collection.find().stream()
stream.on('error', function (err) {
  console.error(err)
})
stream.on('data', function (doc) {
  console.log(doc)
})

It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)

它实际上实现了ReadableStream接口,所以它有所有的好处(暂停/恢复等)

#3


10  

mongoose is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native).

mongoose并不是真正的“驱动程序”,它实际上是围绕MongoDB驱动程序(node-mongodb-native)的ORM包装。

To do what you're doing, take a look at the driver's .find and .each method. Here's some code from the examples:

要做你正在做的事情,请查看驱动程序的.find和.each方法。下面是一些例子中的代码:

// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
  sys.puts("Printing docs from Cursor Each")
  cursor.each(function(err, doc) {
    if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
  })                    
});

To stream the results, you're basically replacing that sys.puts with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush(), but you may also want to checkout socket.io.

为了实现流化结果,基本上就是替换了那个sys。使用你的“流”功能。不确定你打算如何发布结果。我认为您可以执行response.write() + response.flush(),但是您可能还想签出socket.com .io。

#4


2  

Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)

这里是我找到的解决方案(如果这是错误的方法,请纠正我):(也请原谅错误的编码——对我来说现在美化这个已经太晚了)

var sys = require('sys')
var http = require("http");

var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
  Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
  Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
  Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;

var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));

var products;

db.open(function (error, client) {
  if (error) throw error;
  products = new Collection(client, 'products');
});

function ProductReader(collection) {
        this.collection = collection;
}

ProductReader.prototype = new process.EventEmitter();

ProductReader.prototype.do = function() {
        var self = this;

        this.collection.find(function(err, cursor) {
                if (err) {
                        self.emit('e1');
                        return;

                }
                sys.puts("Printing docs from Cursor Each");

                self.emit('start');
                cursor.each(function(err, doc) {
                        if (!err) {
                                self.emit('e2');
                                self.emit('end');
                                return;
                        }

                        if(doc != null) {
                                sys.puts("doc:" + doc.name);
                                self.emit('doc',doc);
                        } else {
                                self.emit('end');
                        }
                })
        });
};
http.createServer(function(req,res){
        pr = new ProductReader(products);
        pr.on('e1',function(){
                sys.puts("E1");
                res.writeHead(400,{"Content-Type": "text/plain"});
                res.write("e1 occurred\n");
                res.end();
        });
        pr.on('e2',function(){
                sys.puts("E2");
                res.write("ERROR\n");
        });

        pr.on('start',function(){
                sys.puts("START");
                res.writeHead(200,{"Content-Type": "text/plain"});
                res.write("<products>\n");
        });

        pr.on('doc',function(doc){
                sys.puts("A DOCUMENT" + doc.name);
                res.write("<product><name>" + doc.name + "</name></product>\n");
        });

        pr.on('end',function(){
                sys.puts("END");
                res.write("</products>");
                res.end();
        });

        pr.do();

  }).listen(8000);