如何在不使用所有员工的情况下减少大量的任务

时间:2021-09-15 16:57:42

Imagine I have a dask grid with 10 workers & 40 cores totals. This is a shared grid, so I don't want to fully saturate it with my work. I have 1000 tasks to do, and I want to submit (and have actively running) a maximum of 20 tasks at a time.

假设我有一个dask网格,有10个工作人员和40个核心。这是一个共享的网格,所以我不想让我的工作完全饱和。我有1000个任务要做,并且我想一次提交(并且积极运行)最多20个任务。

To be concrete,

具体的,

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>

If I setup a system of Queues

如果我建立一个队列系统

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)

This will work, BUT, this will just dump ALL of my tasks to the grid, saturating it. Ideally I could:

这将会工作,但是,这将把我所有的任务都转储到网格中,使其饱和。在理想的情况下我可以:

e.scatter(input_q, max_submit=20)

It seems that the example from the docs here would allow me to use a maxsize queue. But that looks like from a user-perspective I would still have to deal with the backpressure. Ideally dask would automatically take care of this.

这里的文档示例似乎允许我使用maxsize队列。但是从用户的角度来看,我仍然需要处理背压。理想情况下,dask会自动处理这个问题。

2 个解决方案

#1


6  

Use maxsize=

You're very close. All of scatter, gather, and map take the same maxsize= keyword argument that Queue takes. So a simple workflow might be as follows:

你非常接近。所有的散列、收集和映射都使用队列使用的相同的maxsize=关键字参数。因此,一个简单的工作流可以如下所示:

Example

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

All of q, futures, and results are Python Queue objects. The q and results queues don't have a limit, so they'll greedily pull in as much as they can. The futures queue however has a maximum size of 20, so it will only allow 20 futures in flight at any given time. Once the leading future is complete it will immediately be consumed by the gather function and its result will be placed into the results queue. This frees up space in futures and causes another task to be submitted.

所有的q、期货和结果都是Python队列对象。q和结果队列没有限制,因此它们将贪婪地尽可能多地拉入。然而,期货队列的最大大小是20,因此在任何给定的时间,它只允许20个期货在飞行。一旦领导未来完成,它将立即被聚集函数消耗,其结果将被放置到结果队列中。这将释放未来的空间,并导致提交另一个任务。

Note that this isn't exactly what you wanted. These queues are ordered so futures will only get popped off when they're in the front of the queue. If all of the in-flight futures have finished except for the first they'll still stay in the queue, taking up space. Given this constraint you might want to choose a maxsize= slightly more than your desired 20 items.

注意,这并不是你想要的。这些队列是有序的,因此只有当它们排在队列的前面时,期货才会突然消失。如果除第一个以外的所有飞行中的期货都完成了,它们仍然会排在队列中,占据空间。有了这个约束,您可能想要选择一个maxsize=略多于您想要的20个项目。

Extending this

Here we do a simple map->gather pipeline with no logic in between. You could also put other map computations in here or even pull futures out of the queues and do custom work with them on your own. It's easy to break out of the mold provided above.

这里我们做一个简单的map->收集管道,中间没有逻辑。你也可以把其他的地图计算放在这里,或者甚至从队列中取出期货,自己做定制工作。很容易打破上面提供的模具。

#2


1  

The solution posted on github was very useful - https://github.com/dask/distributed/issues/864

github上发布的解决方案非常有用——https://github.com/dask/distributed/issues/864

Solution:

解决方案:

inputs = iter(inputs)
futures = [c.submit(func, next(inputs)) for i in range(maxsize)]
ac = as_completed(futures)

for finished_future in ac:
    # submit new future 
    try:
        new_future = c.submit(func, next(inputs))
        ac.append(new_future)
    except StopIteration:
        pass
    result = finished_future.result() 
    ... # do stuff with result

Query:

查询:

However for determining the workers that are free for throttling the tasks, am trying to utilize the client.has_what() api. Seems like the load on workers does not get reflected immediately similar to what is shown on the status UI page. At times it takes quite a bit of time for has_what to reflect any data.

但是,为了确定可以*地控制任务的工作人员,我尝试使用client.has_what() api。似乎工人的负载不会立即得到反映,类似于状态UI页面上显示的内容。有时,has_what要反映任何数据需要相当长的时间。

Is there another api that can be used to determine number of free workers which can then be used to determine the throttle range similar to what UI is utilizing.

是否有另一个api可以用来确定空闲工人的数量,然后可以用来确定节流范围,类似于UI所使用的。

#1


6  

Use maxsize=

You're very close. All of scatter, gather, and map take the same maxsize= keyword argument that Queue takes. So a simple workflow might be as follows:

你非常接近。所有的散列、收集和映射都使用队列使用的相同的maxsize=关键字参数。因此,一个简单的工作流可以如下所示:

Example

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

All of q, futures, and results are Python Queue objects. The q and results queues don't have a limit, so they'll greedily pull in as much as they can. The futures queue however has a maximum size of 20, so it will only allow 20 futures in flight at any given time. Once the leading future is complete it will immediately be consumed by the gather function and its result will be placed into the results queue. This frees up space in futures and causes another task to be submitted.

所有的q、期货和结果都是Python队列对象。q和结果队列没有限制,因此它们将贪婪地尽可能多地拉入。然而,期货队列的最大大小是20,因此在任何给定的时间,它只允许20个期货在飞行。一旦领导未来完成,它将立即被聚集函数消耗,其结果将被放置到结果队列中。这将释放未来的空间,并导致提交另一个任务。

Note that this isn't exactly what you wanted. These queues are ordered so futures will only get popped off when they're in the front of the queue. If all of the in-flight futures have finished except for the first they'll still stay in the queue, taking up space. Given this constraint you might want to choose a maxsize= slightly more than your desired 20 items.

注意,这并不是你想要的。这些队列是有序的,因此只有当它们排在队列的前面时,期货才会突然消失。如果除第一个以外的所有飞行中的期货都完成了,它们仍然会排在队列中,占据空间。有了这个约束,您可能想要选择一个maxsize=略多于您想要的20个项目。

Extending this

Here we do a simple map->gather pipeline with no logic in between. You could also put other map computations in here or even pull futures out of the queues and do custom work with them on your own. It's easy to break out of the mold provided above.

这里我们做一个简单的map->收集管道,中间没有逻辑。你也可以把其他的地图计算放在这里,或者甚至从队列中取出期货,自己做定制工作。很容易打破上面提供的模具。

#2


1  

The solution posted on github was very useful - https://github.com/dask/distributed/issues/864

github上发布的解决方案非常有用——https://github.com/dask/distributed/issues/864

Solution:

解决方案:

inputs = iter(inputs)
futures = [c.submit(func, next(inputs)) for i in range(maxsize)]
ac = as_completed(futures)

for finished_future in ac:
    # submit new future 
    try:
        new_future = c.submit(func, next(inputs))
        ac.append(new_future)
    except StopIteration:
        pass
    result = finished_future.result() 
    ... # do stuff with result

Query:

查询:

However for determining the workers that are free for throttling the tasks, am trying to utilize the client.has_what() api. Seems like the load on workers does not get reflected immediately similar to what is shown on the status UI page. At times it takes quite a bit of time for has_what to reflect any data.

但是,为了确定可以*地控制任务的工作人员,我尝试使用client.has_what() api。似乎工人的负载不会立即得到反映,类似于状态UI页面上显示的内容。有时,has_what要反映任何数据需要相当长的时间。

Is there another api that can be used to determine number of free workers which can then be used to determine the throttle range similar to what UI is utilizing.

是否有另一个api可以用来确定空闲工人的数量,然后可以用来确定节流范围,类似于UI所使用的。