使用ES6的Promise.all()时限制并发性的最佳方法是什么?

时间:2022-08-23 11:58:56

I have some code that is iterating over a list that was queried out of a database and making an HTTP request for each element in that list. That list can sometimes be a reasonably large number (in the thousands), and I would like to make sure I am not hitting a web server with thousands of concurrent HTTP requests.

我有一些代码迭代在一个列表中,该列表从数据库中查询并为该列表中的每个元素发出HTTP请求。该列表有时可能是一个相当大的数字(成千上万),我想确保我没有遇到数千个并发HTTP请求的Web服务器。

An abbreviated version of this code currently looks something like this...

此代码的缩写版本目前看起来像这样......

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

This code is running on Node 4.3.2. To reiterate, can Promise.all be managed so that only a certain number of Promises are in progress at any given time?

此代码在Node 4.3.2上运行。重申一下,可以管理Promise.all,以便在任何给定时间只有一定数量的Promise正在进行中吗?

6 个解决方案

#1


24  

Note that Promise.all() doesn't trigger the promises to start their work, creating the promise itself does.

请注意,Promise.all()不会触发承诺开始他们的工作,创建承诺本身。

With that in mind, one solution would be to check whenever a promise is resolved whether a new promise should be started or whether you're already at the limit.

考虑到这一点,一种解决方案是检查承诺何时解决是否应该启动新承诺或是否已经达到极限。

However, there is really no need to reinvent the wheel here. One library that you could use for this purpose is es6-promise-pool. From their examples:

但是,这里真的没有必要重新发明*。您可以用于此目的的一个库是es6-promise-pool。从他们的例子:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})

#2


6  

bluebird's Promise.map can take a concurrency option to control how many promises should be running in parallel. Sometimes it is easier than .all because you don't need to create the promise array.

bluebird的Promise.map可以采用并发选项来控制并行运行的promises数量。有时它比.all更容易,因为您不需要创建promise数组。

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}

ref: http://bluebirdjs.com/docs/api/promise.map.html

参考:http://bluebirdjs.com/docs/api/promise.map.html

#3


4  

Instead of using promises for limiting http requests, use node's built-in http.Agent.maxSockets. This removes the requirement of using a library or writing your own pooling code, and has the added advantage more control over what you're limiting.

不使用promises来限制http请求,而是使用node的内置http.Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并且具有额外的优势,可以更好地控制您的限制。

agent.maxSockets

agent.maxSockets

By default set to Infinity. Determines how many concurrent sockets the agent can have open per origin. Origin is either a 'host:port' or 'host:port:localAddress' combination.

默认情况下设置为Infinity。确定代理程序每个源可以打开多少并发套接字。 Origin是'host:port'或'host:port:localAddress'组合。

For example:

例如:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

If making multiple requests to the same origin, it might also benefit you to set keepAlive to true (see docs above for more info).

如果对同一个来源发出多个请求,那么将keepAlive设置为true也可能对您有所帮助(有关详细信息,请参阅上面的文档)。

#4


2  

If you know how iterators work and how they are consumed you would't need any extra library, since it can become very easy to build your own concurrency yourself. Let me demonstrate:

如果您知道迭代器如何工作以及如何使用它们,则不需要任何额外的库,因为自己构建自己的并发变得非常容易。让我来证明一下:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

We can use the same iterator and share it across workers.
If you had used .entries() instead of .values() you would have goten a 2D array with [index, value] which i will demonstrate below with a concurrency of 2

我们可以使用相同的迭代器并在工作者之间共享它。如果您使用了.entries()而不是.values(),那么您将获得一个带有[index,value]的2D数组,我将在下面演示并发度为2

const sleep = n => new Promise(rs => setTimeout(rs,n))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const arr = Array.from('abcdefghij')
const worker = new Array(2).fill(arr.entries()).map(doWork)
//    ^--- starts two worker sharing the same iterator

Promise.all(worker).then(() => console.log('done'))

#5


0  

So I tried to make some examples shown work for my code, but since this was only for an import script and not production code, using the npm package batch-promises was surely the easiest path for me

所以我试着让一些例子显示我的代码工作,但由于这只是一个导入脚本而不是生产代码,使用npm包批量承诺肯定是我最简单的方法

NOTE: Requires runtime to support Promise or to be polyfilled.

注意:需要运行时支持Promise或进行polyfilled。

Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch.

Api batchPromises(int:batchSize,array:Collection,i => Promise:Iteratee)Promise:每次批处理后都会调用Iteratee。

Use:

使用:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

#6


-3  

This becomes relatively trival with async/await, depending on what you want this translates nicely to an delayed map or forEach, here is the map implementation.

这与async / await变得相对简单,取决于你想要什么,它很好地转换为延迟映射或forEach,这里是map实现。

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))

const delayMap = async (ms, arr, f) => {
  const results = []
  let i = 0
  for (const item of arr) {
    results.push(await f(item, i++, arr))
    await sleep(ms)
  }
  return results
}

// Example use - delaying 1 second between each call
delayMap(1000, [ 1, 2, 3 ], id => 
  fetch(`https://jsonplaceholder.typicode.com/posts/${id}`)
)
  .then(posts => posts.map(post => post.json()))
  .then(Promise.all.bind(Promise))
  .then(posts => console.log('Posts', posts))

#1


24  

Note that Promise.all() doesn't trigger the promises to start their work, creating the promise itself does.

请注意,Promise.all()不会触发承诺开始他们的工作,创建承诺本身。

With that in mind, one solution would be to check whenever a promise is resolved whether a new promise should be started or whether you're already at the limit.

考虑到这一点,一种解决方案是检查承诺何时解决是否应该启动新承诺或是否已经达到极限。

However, there is really no need to reinvent the wheel here. One library that you could use for this purpose is es6-promise-pool. From their examples:

但是,这里真的没有必要重新发明*。您可以用于此目的的一个库是es6-promise-pool。从他们的例子:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})

#2


6  

bluebird's Promise.map can take a concurrency option to control how many promises should be running in parallel. Sometimes it is easier than .all because you don't need to create the promise array.

bluebird的Promise.map可以采用并发选项来控制并行运行的promises数量。有时它比.all更容易,因为您不需要创建promise数组。

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}

ref: http://bluebirdjs.com/docs/api/promise.map.html

参考:http://bluebirdjs.com/docs/api/promise.map.html

#3


4  

Instead of using promises for limiting http requests, use node's built-in http.Agent.maxSockets. This removes the requirement of using a library or writing your own pooling code, and has the added advantage more control over what you're limiting.

不使用promises来限制http请求,而是使用node的内置http.Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并且具有额外的优势,可以更好地控制您的限制。

agent.maxSockets

agent.maxSockets

By default set to Infinity. Determines how many concurrent sockets the agent can have open per origin. Origin is either a 'host:port' or 'host:port:localAddress' combination.

默认情况下设置为Infinity。确定代理程序每个源可以打开多少并发套接字。 Origin是'host:port'或'host:port:localAddress'组合。

For example:

例如:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

If making multiple requests to the same origin, it might also benefit you to set keepAlive to true (see docs above for more info).

如果对同一个来源发出多个请求,那么将keepAlive设置为true也可能对您有所帮助(有关详细信息,请参阅上面的文档)。

#4


2  

If you know how iterators work and how they are consumed you would't need any extra library, since it can become very easy to build your own concurrency yourself. Let me demonstrate:

如果您知道迭代器如何工作以及如何使用它们,则不需要任何额外的库,因为自己构建自己的并发变得非常容易。让我来证明一下:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

We can use the same iterator and share it across workers.
If you had used .entries() instead of .values() you would have goten a 2D array with [index, value] which i will demonstrate below with a concurrency of 2

我们可以使用相同的迭代器并在工作者之间共享它。如果您使用了.entries()而不是.values(),那么您将获得一个带有[index,value]的2D数组,我将在下面演示并发度为2

const sleep = n => new Promise(rs => setTimeout(rs,n))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const arr = Array.from('abcdefghij')
const worker = new Array(2).fill(arr.entries()).map(doWork)
//    ^--- starts two worker sharing the same iterator

Promise.all(worker).then(() => console.log('done'))

#5


0  

So I tried to make some examples shown work for my code, but since this was only for an import script and not production code, using the npm package batch-promises was surely the easiest path for me

所以我试着让一些例子显示我的代码工作,但由于这只是一个导入脚本而不是生产代码,使用npm包批量承诺肯定是我最简单的方法

NOTE: Requires runtime to support Promise or to be polyfilled.

注意:需要运行时支持Promise或进行polyfilled。

Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch.

Api batchPromises(int:batchSize,array:Collection,i => Promise:Iteratee)Promise:每次批处理后都会调用Iteratee。

Use:

使用:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

#6


-3  

This becomes relatively trival with async/await, depending on what you want this translates nicely to an delayed map or forEach, here is the map implementation.

这与async / await变得相对简单,取决于你想要什么,它很好地转换为延迟映射或forEach,这里是map实现。

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))

const delayMap = async (ms, arr, f) => {
  const results = []
  let i = 0
  for (const item of arr) {
    results.push(await f(item, i++, arr))
    await sleep(ms)
  }
  return results
}

// Example use - delaying 1 second between each call
delayMap(1000, [ 1, 2, 3 ], id => 
  fetch(`https://jsonplaceholder.typicode.com/posts/${id}`)
)
  .then(posts => posts.map(post => post.json()))
  .then(Promise.all.bind(Promise))
  .then(posts => console.log('Posts', posts))