The following is taken from David Beazley's slides on generators (here for anybody interested).
下面是David Beazley关于生成器的幻灯片(这里是给任何有兴趣的人看的)。
A Task
class is defined which wraps a generator that yields futures, the Task
class, in full (w/o error handling), follows:
定义了一个任务类,该任务类包含一个生成期货的生成器,任务类完整地(w/o错误处理)如下:
class Task:
def __init__(self, gen):
self._gen = gen
def step(self, value=None):
try:
fut = self._gen.send(value)
fut.add_done_callback(self._wakeup)
except StopIteration as exc:
pass
def _wakeup(self, fut):
result = fut.result()
self.step(result)
In an example, the following recursive function is also defined:
在一个例子中,还定义了以下递归函数:
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(max_workers=8)
def recursive(n):
yield pool.submit(time.sleep, 0.001)
print("Tick :", n)
Task(recursive(n+1)).step()
The following two cases play out:
以下两个案例说明:
-
From the Python REPL, if we define these (or import them if we place them in a file) and then jump-start the recursion with:
在Python REPL中,如果我们定义这些(或者如果我们将它们放在一个文件中就导入它们),然后用以下方法启动递归:
Task(recursive(0)).step()
it starts printing away seeming to a point where the recursion limit will have been exceeded. It obviously doesn't exceed it though, printing the stack level shows that it stays constant throughout execution. Something else is going on which I don't quite understand.
它开始输出,似乎到达递归极限将被超过的点。显然它不会超过它,但是打印堆栈级别表明它在整个执行过程中保持不变。还有一件事我不太明白。
NOTE: You'll need to kill the python process if you execute it like this.
注意:如果您像这样执行python进程,您将需要终止它。
-
If we put all contents (
Task
,recursive
) in a file along with:如果我们将所有内容(任务、递归)放在一个文件中,则:
if __name__ == "__main__": Task(recursive(0)).step()
and then run it with
python myfile.py
, it stops ticking at7
(the number ofmax_workers
, it seems).然后用python myfile运行它。py,它在7点停止滴答(似乎是max_workers的数量)。
My question is how does it seemingly surpass the recursion limit and why does it behave differently based on how you execute it?
我的问题是它是如何超越递归极限的?为什么它的行为会因你执行它的方式而有所不同?
The behavior appears on both Python 3.6.2 and Python 3.5.4 (and I'd guess others in the 3.6
and 3.5
family too).
该行为出现在Python 3.6.2和Python 3.5.4上(我猜3.6和3.5家族中的其他人也会出现)。
2 个解决方案
#1
10
The recursive
generator you show isn't actually recursive in a way that would cause a problem with the system recursion limit.
您展示的递归生成器实际上并不是递归的,它不会导致系统递归限制的问题。
To understand why you need to pay attention to when the recursive
generator's code runs. Unlike a normal function, just calling recursive(0)
doesn't cause it to immediately run its code and make additional recursive calls. Instead, calling recursive(0)
immediately returns a generator object. Only when you send()
to the generator does the code run, and only after you send()
to it a second time does it kick off another call.
要理解为什么需要注意递归生成器的代码何时运行。与普通函数不同,仅仅调用recursive(0)并不会使它立即运行代码并进行额外的递归调用。相反,调用recursive(0)将立即返回生成器对象。只有当您将()发送给生成器时,代码才会运行,并且只有在您再次将()发送给生成器之后,它才会启动另一个调用。
Let's examine the call stack as the code runs. At the top level, we run Task(recursive(0)).step()
. That does three things in a sequence:
让我们在代码运行时检查调用堆栈。在顶层,我们运行Task(递归的(0)).step()。这样做有三件事:
-
recursive(0)
This call returns a generator object immediately. - 这个调用立即返回一个生成器对象。
-
Task(_)
TheTask
object is created, and its__init__
method stores a reference to the generator object created in the first step. - 任务(_)创建任务对象,其__init__方法存储对第一步中创建的生成器对象的引用。
-
_.step()
A method on the task gets called. This is where the action really starts! Let's look at what happens inside the call:将调用任务上的一个方法。这就是行动真正开始的地方!让我们看看电话里面发生了什么:
-
fut = self._gen.send(value)
Here we actually start the generator running, by sending it a value. Let's go deeper and see the generator code run:-
yield pool.submit(time.sleep, 0.001)
This schedules something to be done in another thread. We don't wait for it to happen though. Instead, we get aFuture
that we can use to get notified when it's complete. We yield the future immediately back to the previous level of code. - 收益pool.submit(时间。睡眠,0.001)这个安排在另一个线程中完成。我们不会等待它发生。相反,我们有一个未来,我们可以用来在它完成时得到通知。我们将未来立即返回到以前的代码级别。
-
- fut = self._gen.send(value)在这里,我们通过发送一个值来启动生成器的运行。让我们深入了解生成器代码运行:yield pool.submit(时间。睡眠,0.001)这个安排在另一个线程中完成。我们不会等待它发生。相反,我们有一个未来,我们可以用来在它完成时得到通知。我们将未来立即返回到以前的代码级别。
-
fut.add_done_callback(self._wakeup)
Here we ask for our_wakeup()
method to be called when the future is ready. This always returns immediately! - 这里我们请求在将来准备好时调用我们的_wakeup()方法。这总是立即返回!
- The
step
method ends now. That's right, we're done (for the moment)! This is important for the second part of your question, which I'll discuss more later. - step方法现在结束。没错,我们(暂时)做完了!这对你问题的第二部分很重要,我将在后面进一步讨论。
-
-
The call we made ended, so control flow returns to the REPL if we're running interactively. If we're running as a script, the interpreter instead will reach the end of the scrip and start shutting down (I'll discuss this more below). However, the other threads controlled by the thread pool are still running, and at some point, one of them is going to do some stuff we care about! Let's see what that is.
我们的调用结束了,所以如果我们正在交互运行,控制流返回到REPL。如果我们以脚本的形式运行,解释器将到达脚本的末尾并开始关闭(我将在下面详细讨论这个问题)。然而,由线程池控制的其他线程仍在运行,而且在某些时候,其中一个线程将执行我们所关心的某些操作!我们看看这是什么。
-
When the scheduled function (
time.sleep
) has finished running, the thread it was running in will call the callback we set on theFuture
object. That is, it will callTask._wakup()
on theTask
object we created earlier (which we don't have a reference to anymore at the top level, but theFuture
kept a reference so it's still alive). Let's look at the method:当计划的函数(time.sleep)完成运行时,它运行的线程将调用我们在Future对象上设置的回调。也就是说,它将在我们之前创建的任务对象上调用Task._wakup()(我们在顶层没有引用,但是将来保留一个引用,所以它仍然是活的)。让我们看看方法:
-
result = fut.result()
Store the result of the deferred call. This is irrelevant in this case since we never look at the results (it'sNone
anyway). - result = fut.result()存储延迟调用的结果。在这种情况下,这是不相关的,因为我们从来不看结果(反正也没有)。
-
self.step(result)
Step again! Now we're back to the code we care about. Let's see what it does this time:-
fut = self._gen.send(value)
Send to the generator again, so it takes over. It already yielded once, so this time we start just after theyield
:-
print("Tick :", n)
This is pretty simple. - 打印("Tick:", n)这很简单。
-
Task(recursive(n+1)).step()
This is where things get interesting. This line is just like what we started with. So, like before, this is going to run the logic 1-4 I listed above (including their substeps). But instead of returning to the REPL or ending the script, when thestep()
method returns, it comes back here. - 任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它返回到这里,而不是返回到REPL或结束脚本。
- The
recursive()
generator (the original one, not the new one we just created) has reached its end. So, like any generator that reaches the end of it's code, it raisesStopIteration
. - 递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。
-
- fut = self._gen.send(value)再次发送到生成器,因此它接管。它已经生成了一次,所以这次我们从yield: print(“Tick:”,n)之后开始。任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它不会返回到REPL或结束脚本,而是返回到这里。递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。
- The
StopIteration
is caught and ignored by thetry
/except
block, and thestep()
method ends. - StopIteration被try/except block捕获并忽略,step()方法结束。
-
- self.step(结果)步骤!现在我们回到我们关心的代码。让我们看看这次它做了什么:fut = self._gen.send(value)再次发送给生成器,这样它就接管了。它已经生成了一次,所以这次我们从yield: print(“Tick:”,n)之后开始。任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它不会返回到REPL或结束脚本,而是返回到这里。递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。StopIteration被try/except block捕获并忽略,step()方法结束。
- The
_wakup()
method ends too, so the callback is done. - _wakup()方法也结束了,因此回调已经完成。
-
- Eventually the callback for the
Task
created in the earlier callback will be called as well. So we go back and repeat step 5, over and over, forever (if we're running interactively). - 最后,在早期回调中创建的任务的回调也将被调用。所以我们回到第五步,一遍又一遍地重复,直到永远。
The call stack above explains why the interactive case prints forever. The main thread returns to the REPL (and you can do other stuff with it if you can see past the output from the other threads). But in the pool, each thread schedules another job from the callback of its own job. When the next job finishes, its callback schedules another job and so on.
上面的调用堆栈解释了为什么交互案例永远打印。主线程返回给REPL(如果您可以看到其他线程的输出,您可以使用它做其他的事情)。但是在池中,每个线程从自己的作业的回调中调度另一个作业。当下一个作业完成时,它的回调将调度另一个作业,依此类推。
So why do you get only 8 printouts when you run the code as a script? The answer is hinted at in step 4 above. When running non-interactively, the main thread runs off the end of the script after the first call to Task.step
returns. This prompts the interpreter to try to shut down.
那么,为什么在运行代码时只有8个打印输出呢?答案提示在上面的第4步。当非交互式地运行时,主线程在第一次调用任务之后就会结束脚本的末尾。一步的回报。这促使解释器试图关闭。
The concurrent.futures.thread
module (where ThreadPoolExecutor
is defined) has some fancy logic that tries to clean up nicely when the program shuts down while an executor is still active. It's supposed to stop any idle threads, and signal any that are still running to stop when their current job is complete.
concurrent.futures。线程模块(定义ThreadPoolExecutor的地方)有一些奇妙的逻辑,当程序关闭时,执行器仍然处于活动状态时,它会试图进行良好的清理。它应该停止任何空闲线程,并在当前任务完成时通知仍在运行的任何线程停止。
The exact implementation of that cleanup logic interacts with our code in a very odd way (which may or may not buggy). The effect is that the first thread keeps giving itself more jobs to do, while additional worker threads that are spawned keep exiting immediately after they are spawned. The first worker finally quits when the executor has started as many threads as it wanted to be using (8 in our case).
该清理逻辑的确切实现以一种非常奇怪的方式与我们的代码进行交互(可能会有bug,也可能不会有bug)。其结果是,第一个线程继续为自己提供更多的作业,而生成的其他工作线程则在它们被生成后立即退出。当执行程序启动了它想要使用的所有线程(在我们的示例中是8)时,第一个worker最终退出。
Here's the sequence of events, as I understand it.
这是我理解的事件的顺序。
- We import (indirectly) the
concurrent.futures.thread
module, which usesatexit
to tell the interpreter to run a function named_python_exit
just before the interpreter shuts down. - 我们(间接地)进口concurrent.futures。线程模块,它使用atexit命令解释器在解释器关闭之前运行一个名为_python_exit的函数。
- We create a
ThreadPoolExecutor
with a maximum thread count of 8. It doesn't spawn its worker threads immediately, but will create one each time a job is scheduled until it has all 8. - 我们创建一个最大线程数为8的ThreadPoolExecutor。它不会立即生成它的工作线程,但每次调度作业时都会创建一个,直到它拥有所有8个。
- We schedule our first job (in the deeply nested part of step 3 from the previous list).
- 我们安排我们的第一个作业(在前面列表中步骤3的深层嵌套部分)。
- The executor adds the job to its internal queue, then notices it doesn't have the maximum number of worker threads and starts a new one.
- executor将作业添加到它的内部队列中,然后注意到它没有最大的工作线程数并开始一个新的线程。
- The new thread pops the job off the queue and begins running it. However, the
sleep
call takes much longer than the rest of the steps, so the thread is going to be stuck here for a bit. - 新的线程从队列中取出作业并开始运行它。然而,睡眠调用要比其他步骤花费的时间长得多,因此线程将会在这里停留一段时间。
- The main thread finishes (it's reached step 4 in the previous list).
- 主线程完成(在前面的列表中达到了第4步)。
- The
_python_exit
function gets called by the interpreter, since the interpreter wants to shut down. The function sets a global_shutdown
variable in the module, and sends aNone
to the internal queue of the executor (it sends oneNone
per thread, but there's just the one thread created so far, so it just sends oneNone
). It then blocks the main thread until the thread it knows about has quit. This delays the interpreter shutdown. - 解释器会调用_python_exit函数,因为解释器想要关闭它。函数在模块中设置一个全局_shutdown变量,并向执行程序的内部队列发送一个None(每个线程发送一个None,但是到目前为止只创建了一个线程,所以只发送一个None)。然后它会阻塞主线程,直到它知道的线程退出。这延迟了解释器的关闭。
- The worker thread's call to
time.sleep
returns. It calls the callback function that is registered with its job'sFuture
, which schedules another job. - 工作线程对时间的调用。睡眠的回报。它调用与作业的未来一起注册的回调函数,该函数安排另一个作业。
- Like in step 4 of this list, the executor queues up the job, and starts another thread, since it doesn't have the desired number yet.
- 就像在这个列表的第4步中一样,执行程序将对作业进行排队,并启动另一个线程,因为它还没有期望的数字。
- The new thread tries to grab a job off the internal queue, but gets the
None
value from step 7 which is a signal that it may be done. It sees that the_shutdown
global is set and so it quits. Before it does though, it adds anotherNone
to the queue. - 新线程试图从内部队列中获取一个作业,但是从第7步中获取没有值,这是一个信号,表明它可以完成。它看到_shutdown全局变量被设置,因此它退出。但在此之前,它会向队列添加另一个None。
- The first worker thread finishes its callback. It looks for a new job, and finds the one it queued up itself in step 8. It starts running the job, and like in step 5, which takes a while.
- 第一个工作线程完成它的回调。它寻找一个新的作业,并在步骤8中找到自己排队的作业。它开始运行作业,就像第5步,这需要一段时间。
- Nothing else happens though, since the first worker the only active thread at the moment (the main thread is blocked waiting on the first worker to die, and the other worker shut itself down).
- 但是,由于第一个worker当前唯一的活动线程(主线程被阻塞,等待第一个worker死去,而另一个worker关闭自己),所以没有发生任何其他事情。
- We now repeat steps 8-12 several times. The first worker thread queues up the third through 8th jobs, and the executor spawns a corresponding threads each time since it doesn't have a full set. However, each thread dies immediately, since it gets a
None
off the job queue instead of an actual job to complete. The first worker thread ends up doing all the actual work. - 我们现在重复8-12步。第一个worker线程将第三个作业排队到第8个作业,并且执行程序每次都会生成一个相应的线程,因为它没有一个完整的集合。第一个工作线程最终完成所有实际的工作。
- Finally, after the 8th job, something works differently. This time, when the callback schedules another job, no additional thread is spawned, since the executor knows it's started the requested 8 threads already (it doesn't know that 7 have shut down).
- 最后,在第八份工作之后,事情发生了变化。这一次,当回调调度另一个作业时,不会产生额外的线程,因为执行程序知道它已经启动了请求的8个线程(它不知道7已经关闭)。
- So this time, the
None
that's at the head of the internal job queue gets picked up by the first worker (instead of an actual job). That means it shuts down, rather than doing more work. - 因此,这一次,位于内部作业队列之首的None会被第一个worker(而不是实际的job)接收。这意味着它会停止工作,而不是做更多的工作。
- When the first worker shuts down, the main thread (which has been waiting for it to quit) can finally unblock and the
_python_exit
function completes. This lets the interpreter shut down completely. We're done! - 当第一个worker关闭时,主线程(一直在等待它退出)最终可以解除阻塞并完成_python_exit函数。这使得解释器完全关闭。我们完成了!
This explains the output we see! We gets 8 outputs, all coming from the same worker thread (the first one spawned).
这解释了我们看到的输出!我们得到8个输出,都来自同一个工作线程(第一个派生线程)。
I think there may be a race condition however, in that code. If step 11 happens before step 10 things might break. If the first worker got a None
off the queue and the other newly spawned worker got the real job, the'd swap roles (the first worker would die, and the other one would do the rest of the work, barring more race conditions in the later versions of those steps). However, the main thread would be unblocked as soon as the first worker died. Since it doesn't know about the other threads (since they didn't exist when it made its list of the threads to wait on), it will close the interpreter prematurely.
我认为在代码中可能存在竞争条件。如果第11步发生在第10步之前,那么事情可能会破裂。如果第一个工人从队列中得到一个,而另一个新生成的工人得到了真正的工作,那么交换角色(第一个工人将会死亡,而另一个将完成剩余的工作,除非在这些步骤的后期版本中有更多的竞争条件)。然而,一旦第一个工人死亡,主线程就会被解开。由于它不知道其他线程(因为当它列出要等待的线程列表时它们不存在),它将提前关闭解释器。
I'm not sure if this race is ever likely to happen. I'd guess it's pretty unlikely, since the length of the code path between the new thread starting and it grabbing a job from the queue is much shorter than the path for the existing thread to finish the callback (the part after it queued up the new job) and then look for another job in the queue.
我不确定这场比赛是否会发生。我猜这很可能,因为代码路径的长度之间的新线程开始从队列中,抓住工作比现有的路径更短得多的线程完成回调(排队后新工作部分),然后寻找另一个工作队列中。
I suspect that it's a bug that the ThreadPoolExecutor
lets us exit cleanly when we run our code as a script. The logic for queuing up a new job should probably check the global _shutdown
flag in addition to the executor's own self._shutdown
attribute. If it did, trying to queue up another job after the main thread had finished would raise an exception.
我怀疑当我们以脚本的形式运行代码时,ThreadPoolExecutor允许我们干净地退出,这是一个错误。排队等待新作业的逻辑应该除了执行程序本身之外检查全局_shutdown标志。_shutdown属性。如果它这样做了,那么在主线程完成后尝试对另一个作业进行排队将引发异常。
You can replicate what I think would be saner behavior by creating the ThreadPoolExecutor
in a with
statement:
您可以通过在with语句中创建ThreadPoolExecutor来复制我认为更合理的行为:
# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:
Task(recursive(0)).step()
This will crash soon after the main thread returns from the step()
call. It will look something like this:
这将在主线程从step()调用返回后立即崩溃。它看起来是这样的:
exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
callback(self)
File ".\task_coroutines.py", line 21, in _wakeup
self.step(result)
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 30, in recursive
Task(recursive(n+1)).step()
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 28, in recursive
yield pool.submit(time.sleep, 1)
File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
#2
0
Lets start with what is number 7. That is the number of workers like you have mentioned already, labeled from [0..7]. The Task class needs to be passed recursive
in form a function identifier.
让我们从第七题开始。这是你已经提到过的,从[0..7]标记出来的工人数量。任务类需要以函数标识符的形式递归地传递。
Task(recursive).step(n)
instead of
而不是
Task(recursive(n)).step()
It is because, the recursive function needs to be called inside the pool
environment while in current case recursive
is evaluated in main thread itself. time.sleep
is the only function in current code which is evaluated in task pool.
这是因为需要在池环境中调用递归函数,而在当前情况下,递归是在主线程本身中计算的。时间。睡眠是当前代码中唯一在任务池中计算的函数。
A key facet where code has major issue is recursion. Each thread in the pool is dependent on inner function putting upper limit on execution to number of workers available. The function is not able to finish hence new one cannot execute. Thus, it terminates much before recursion limit is reached.
代码存在主要问题的一个关键方面是递归。池中的每个线程都依赖于内部函数,该函数将执行的上限限制为可用工作人员的数量。功能不能完成,所以新功能不能执行。因此,它在到达递归极限之前就终止了。
#1
10
The recursive
generator you show isn't actually recursive in a way that would cause a problem with the system recursion limit.
您展示的递归生成器实际上并不是递归的,它不会导致系统递归限制的问题。
To understand why you need to pay attention to when the recursive
generator's code runs. Unlike a normal function, just calling recursive(0)
doesn't cause it to immediately run its code and make additional recursive calls. Instead, calling recursive(0)
immediately returns a generator object. Only when you send()
to the generator does the code run, and only after you send()
to it a second time does it kick off another call.
要理解为什么需要注意递归生成器的代码何时运行。与普通函数不同,仅仅调用recursive(0)并不会使它立即运行代码并进行额外的递归调用。相反,调用recursive(0)将立即返回生成器对象。只有当您将()发送给生成器时,代码才会运行,并且只有在您再次将()发送给生成器之后,它才会启动另一个调用。
Let's examine the call stack as the code runs. At the top level, we run Task(recursive(0)).step()
. That does three things in a sequence:
让我们在代码运行时检查调用堆栈。在顶层,我们运行Task(递归的(0)).step()。这样做有三件事:
-
recursive(0)
This call returns a generator object immediately. - 这个调用立即返回一个生成器对象。
-
Task(_)
TheTask
object is created, and its__init__
method stores a reference to the generator object created in the first step. - 任务(_)创建任务对象,其__init__方法存储对第一步中创建的生成器对象的引用。
-
_.step()
A method on the task gets called. This is where the action really starts! Let's look at what happens inside the call:将调用任务上的一个方法。这就是行动真正开始的地方!让我们看看电话里面发生了什么:
-
fut = self._gen.send(value)
Here we actually start the generator running, by sending it a value. Let's go deeper and see the generator code run:-
yield pool.submit(time.sleep, 0.001)
This schedules something to be done in another thread. We don't wait for it to happen though. Instead, we get aFuture
that we can use to get notified when it's complete. We yield the future immediately back to the previous level of code. - 收益pool.submit(时间。睡眠,0.001)这个安排在另一个线程中完成。我们不会等待它发生。相反,我们有一个未来,我们可以用来在它完成时得到通知。我们将未来立即返回到以前的代码级别。
-
- fut = self._gen.send(value)在这里,我们通过发送一个值来启动生成器的运行。让我们深入了解生成器代码运行:yield pool.submit(时间。睡眠,0.001)这个安排在另一个线程中完成。我们不会等待它发生。相反,我们有一个未来,我们可以用来在它完成时得到通知。我们将未来立即返回到以前的代码级别。
-
fut.add_done_callback(self._wakeup)
Here we ask for our_wakeup()
method to be called when the future is ready. This always returns immediately! - 这里我们请求在将来准备好时调用我们的_wakeup()方法。这总是立即返回!
- The
step
method ends now. That's right, we're done (for the moment)! This is important for the second part of your question, which I'll discuss more later. - step方法现在结束。没错,我们(暂时)做完了!这对你问题的第二部分很重要,我将在后面进一步讨论。
-
-
The call we made ended, so control flow returns to the REPL if we're running interactively. If we're running as a script, the interpreter instead will reach the end of the scrip and start shutting down (I'll discuss this more below). However, the other threads controlled by the thread pool are still running, and at some point, one of them is going to do some stuff we care about! Let's see what that is.
我们的调用结束了,所以如果我们正在交互运行,控制流返回到REPL。如果我们以脚本的形式运行,解释器将到达脚本的末尾并开始关闭(我将在下面详细讨论这个问题)。然而,由线程池控制的其他线程仍在运行,而且在某些时候,其中一个线程将执行我们所关心的某些操作!我们看看这是什么。
-
When the scheduled function (
time.sleep
) has finished running, the thread it was running in will call the callback we set on theFuture
object. That is, it will callTask._wakup()
on theTask
object we created earlier (which we don't have a reference to anymore at the top level, but theFuture
kept a reference so it's still alive). Let's look at the method:当计划的函数(time.sleep)完成运行时,它运行的线程将调用我们在Future对象上设置的回调。也就是说,它将在我们之前创建的任务对象上调用Task._wakup()(我们在顶层没有引用,但是将来保留一个引用,所以它仍然是活的)。让我们看看方法:
-
result = fut.result()
Store the result of the deferred call. This is irrelevant in this case since we never look at the results (it'sNone
anyway). - result = fut.result()存储延迟调用的结果。在这种情况下,这是不相关的,因为我们从来不看结果(反正也没有)。
-
self.step(result)
Step again! Now we're back to the code we care about. Let's see what it does this time:-
fut = self._gen.send(value)
Send to the generator again, so it takes over. It already yielded once, so this time we start just after theyield
:-
print("Tick :", n)
This is pretty simple. - 打印("Tick:", n)这很简单。
-
Task(recursive(n+1)).step()
This is where things get interesting. This line is just like what we started with. So, like before, this is going to run the logic 1-4 I listed above (including their substeps). But instead of returning to the REPL or ending the script, when thestep()
method returns, it comes back here. - 任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它返回到这里,而不是返回到REPL或结束脚本。
- The
recursive()
generator (the original one, not the new one we just created) has reached its end. So, like any generator that reaches the end of it's code, it raisesStopIteration
. - 递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。
-
- fut = self._gen.send(value)再次发送到生成器,因此它接管。它已经生成了一次,所以这次我们从yield: print(“Tick:”,n)之后开始。任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它不会返回到REPL或结束脚本,而是返回到这里。递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。
- The
StopIteration
is caught and ignored by thetry
/except
block, and thestep()
method ends. - StopIteration被try/except block捕获并忽略,step()方法结束。
-
- self.step(结果)步骤!现在我们回到我们关心的代码。让我们看看这次它做了什么:fut = self._gen.send(value)再次发送给生成器,这样它就接管了。它已经生成了一次,所以这次我们从yield: print(“Tick:”,n)之后开始。任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它不会返回到REPL或结束脚本,而是返回到这里。递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。StopIteration被try/except block捕获并忽略,step()方法结束。
- The
_wakup()
method ends too, so the callback is done. - _wakup()方法也结束了,因此回调已经完成。
-
- Eventually the callback for the
Task
created in the earlier callback will be called as well. So we go back and repeat step 5, over and over, forever (if we're running interactively). - 最后,在早期回调中创建的任务的回调也将被调用。所以我们回到第五步,一遍又一遍地重复,直到永远。
The call stack above explains why the interactive case prints forever. The main thread returns to the REPL (and you can do other stuff with it if you can see past the output from the other threads). But in the pool, each thread schedules another job from the callback of its own job. When the next job finishes, its callback schedules another job and so on.
上面的调用堆栈解释了为什么交互案例永远打印。主线程返回给REPL(如果您可以看到其他线程的输出,您可以使用它做其他的事情)。但是在池中,每个线程从自己的作业的回调中调度另一个作业。当下一个作业完成时,它的回调将调度另一个作业,依此类推。
So why do you get only 8 printouts when you run the code as a script? The answer is hinted at in step 4 above. When running non-interactively, the main thread runs off the end of the script after the first call to Task.step
returns. This prompts the interpreter to try to shut down.
那么,为什么在运行代码时只有8个打印输出呢?答案提示在上面的第4步。当非交互式地运行时,主线程在第一次调用任务之后就会结束脚本的末尾。一步的回报。这促使解释器试图关闭。
The concurrent.futures.thread
module (where ThreadPoolExecutor
is defined) has some fancy logic that tries to clean up nicely when the program shuts down while an executor is still active. It's supposed to stop any idle threads, and signal any that are still running to stop when their current job is complete.
concurrent.futures。线程模块(定义ThreadPoolExecutor的地方)有一些奇妙的逻辑,当程序关闭时,执行器仍然处于活动状态时,它会试图进行良好的清理。它应该停止任何空闲线程,并在当前任务完成时通知仍在运行的任何线程停止。
The exact implementation of that cleanup logic interacts with our code in a very odd way (which may or may not buggy). The effect is that the first thread keeps giving itself more jobs to do, while additional worker threads that are spawned keep exiting immediately after they are spawned. The first worker finally quits when the executor has started as many threads as it wanted to be using (8 in our case).
该清理逻辑的确切实现以一种非常奇怪的方式与我们的代码进行交互(可能会有bug,也可能不会有bug)。其结果是,第一个线程继续为自己提供更多的作业,而生成的其他工作线程则在它们被生成后立即退出。当执行程序启动了它想要使用的所有线程(在我们的示例中是8)时,第一个worker最终退出。
Here's the sequence of events, as I understand it.
这是我理解的事件的顺序。
- We import (indirectly) the
concurrent.futures.thread
module, which usesatexit
to tell the interpreter to run a function named_python_exit
just before the interpreter shuts down. - 我们(间接地)进口concurrent.futures。线程模块,它使用atexit命令解释器在解释器关闭之前运行一个名为_python_exit的函数。
- We create a
ThreadPoolExecutor
with a maximum thread count of 8. It doesn't spawn its worker threads immediately, but will create one each time a job is scheduled until it has all 8. - 我们创建一个最大线程数为8的ThreadPoolExecutor。它不会立即生成它的工作线程,但每次调度作业时都会创建一个,直到它拥有所有8个。
- We schedule our first job (in the deeply nested part of step 3 from the previous list).
- 我们安排我们的第一个作业(在前面列表中步骤3的深层嵌套部分)。
- The executor adds the job to its internal queue, then notices it doesn't have the maximum number of worker threads and starts a new one.
- executor将作业添加到它的内部队列中,然后注意到它没有最大的工作线程数并开始一个新的线程。
- The new thread pops the job off the queue and begins running it. However, the
sleep
call takes much longer than the rest of the steps, so the thread is going to be stuck here for a bit. - 新的线程从队列中取出作业并开始运行它。然而,睡眠调用要比其他步骤花费的时间长得多,因此线程将会在这里停留一段时间。
- The main thread finishes (it's reached step 4 in the previous list).
- 主线程完成(在前面的列表中达到了第4步)。
- The
_python_exit
function gets called by the interpreter, since the interpreter wants to shut down. The function sets a global_shutdown
variable in the module, and sends aNone
to the internal queue of the executor (it sends oneNone
per thread, but there's just the one thread created so far, so it just sends oneNone
). It then blocks the main thread until the thread it knows about has quit. This delays the interpreter shutdown. - 解释器会调用_python_exit函数,因为解释器想要关闭它。函数在模块中设置一个全局_shutdown变量,并向执行程序的内部队列发送一个None(每个线程发送一个None,但是到目前为止只创建了一个线程,所以只发送一个None)。然后它会阻塞主线程,直到它知道的线程退出。这延迟了解释器的关闭。
- The worker thread's call to
time.sleep
returns. It calls the callback function that is registered with its job'sFuture
, which schedules another job. - 工作线程对时间的调用。睡眠的回报。它调用与作业的未来一起注册的回调函数,该函数安排另一个作业。
- Like in step 4 of this list, the executor queues up the job, and starts another thread, since it doesn't have the desired number yet.
- 就像在这个列表的第4步中一样,执行程序将对作业进行排队,并启动另一个线程,因为它还没有期望的数字。
- The new thread tries to grab a job off the internal queue, but gets the
None
value from step 7 which is a signal that it may be done. It sees that the_shutdown
global is set and so it quits. Before it does though, it adds anotherNone
to the queue. - 新线程试图从内部队列中获取一个作业,但是从第7步中获取没有值,这是一个信号,表明它可以完成。它看到_shutdown全局变量被设置,因此它退出。但在此之前,它会向队列添加另一个None。
- The first worker thread finishes its callback. It looks for a new job, and finds the one it queued up itself in step 8. It starts running the job, and like in step 5, which takes a while.
- 第一个工作线程完成它的回调。它寻找一个新的作业,并在步骤8中找到自己排队的作业。它开始运行作业,就像第5步,这需要一段时间。
- Nothing else happens though, since the first worker the only active thread at the moment (the main thread is blocked waiting on the first worker to die, and the other worker shut itself down).
- 但是,由于第一个worker当前唯一的活动线程(主线程被阻塞,等待第一个worker死去,而另一个worker关闭自己),所以没有发生任何其他事情。
- We now repeat steps 8-12 several times. The first worker thread queues up the third through 8th jobs, and the executor spawns a corresponding threads each time since it doesn't have a full set. However, each thread dies immediately, since it gets a
None
off the job queue instead of an actual job to complete. The first worker thread ends up doing all the actual work. - 我们现在重复8-12步。第一个worker线程将第三个作业排队到第8个作业,并且执行程序每次都会生成一个相应的线程,因为它没有一个完整的集合。第一个工作线程最终完成所有实际的工作。
- Finally, after the 8th job, something works differently. This time, when the callback schedules another job, no additional thread is spawned, since the executor knows it's started the requested 8 threads already (it doesn't know that 7 have shut down).
- 最后,在第八份工作之后,事情发生了变化。这一次,当回调调度另一个作业时,不会产生额外的线程,因为执行程序知道它已经启动了请求的8个线程(它不知道7已经关闭)。
- So this time, the
None
that's at the head of the internal job queue gets picked up by the first worker (instead of an actual job). That means it shuts down, rather than doing more work. - 因此,这一次,位于内部作业队列之首的None会被第一个worker(而不是实际的job)接收。这意味着它会停止工作,而不是做更多的工作。
- When the first worker shuts down, the main thread (which has been waiting for it to quit) can finally unblock and the
_python_exit
function completes. This lets the interpreter shut down completely. We're done! - 当第一个worker关闭时,主线程(一直在等待它退出)最终可以解除阻塞并完成_python_exit函数。这使得解释器完全关闭。我们完成了!
This explains the output we see! We gets 8 outputs, all coming from the same worker thread (the first one spawned).
这解释了我们看到的输出!我们得到8个输出,都来自同一个工作线程(第一个派生线程)。
I think there may be a race condition however, in that code. If step 11 happens before step 10 things might break. If the first worker got a None
off the queue and the other newly spawned worker got the real job, the'd swap roles (the first worker would die, and the other one would do the rest of the work, barring more race conditions in the later versions of those steps). However, the main thread would be unblocked as soon as the first worker died. Since it doesn't know about the other threads (since they didn't exist when it made its list of the threads to wait on), it will close the interpreter prematurely.
我认为在代码中可能存在竞争条件。如果第11步发生在第10步之前,那么事情可能会破裂。如果第一个工人从队列中得到一个,而另一个新生成的工人得到了真正的工作,那么交换角色(第一个工人将会死亡,而另一个将完成剩余的工作,除非在这些步骤的后期版本中有更多的竞争条件)。然而,一旦第一个工人死亡,主线程就会被解开。由于它不知道其他线程(因为当它列出要等待的线程列表时它们不存在),它将提前关闭解释器。
I'm not sure if this race is ever likely to happen. I'd guess it's pretty unlikely, since the length of the code path between the new thread starting and it grabbing a job from the queue is much shorter than the path for the existing thread to finish the callback (the part after it queued up the new job) and then look for another job in the queue.
我不确定这场比赛是否会发生。我猜这很可能,因为代码路径的长度之间的新线程开始从队列中,抓住工作比现有的路径更短得多的线程完成回调(排队后新工作部分),然后寻找另一个工作队列中。
I suspect that it's a bug that the ThreadPoolExecutor
lets us exit cleanly when we run our code as a script. The logic for queuing up a new job should probably check the global _shutdown
flag in addition to the executor's own self._shutdown
attribute. If it did, trying to queue up another job after the main thread had finished would raise an exception.
我怀疑当我们以脚本的形式运行代码时,ThreadPoolExecutor允许我们干净地退出,这是一个错误。排队等待新作业的逻辑应该除了执行程序本身之外检查全局_shutdown标志。_shutdown属性。如果它这样做了,那么在主线程完成后尝试对另一个作业进行排队将引发异常。
You can replicate what I think would be saner behavior by creating the ThreadPoolExecutor
in a with
statement:
您可以通过在with语句中创建ThreadPoolExecutor来复制我认为更合理的行为:
# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:
Task(recursive(0)).step()
This will crash soon after the main thread returns from the step()
call. It will look something like this:
这将在主线程从step()调用返回后立即崩溃。它看起来是这样的:
exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
callback(self)
File ".\task_coroutines.py", line 21, in _wakeup
self.step(result)
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 30, in recursive
Task(recursive(n+1)).step()
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 28, in recursive
yield pool.submit(time.sleep, 1)
File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
#2
0
Lets start with what is number 7. That is the number of workers like you have mentioned already, labeled from [0..7]. The Task class needs to be passed recursive
in form a function identifier.
让我们从第七题开始。这是你已经提到过的,从[0..7]标记出来的工人数量。任务类需要以函数标识符的形式递归地传递。
Task(recursive).step(n)
instead of
而不是
Task(recursive(n)).step()
It is because, the recursive function needs to be called inside the pool
environment while in current case recursive
is evaluated in main thread itself. time.sleep
is the only function in current code which is evaluated in task pool.
这是因为需要在池环境中调用递归函数,而在当前情况下,递归是在主线程本身中计算的。时间。睡眠是当前代码中唯一在任务池中计算的函数。
A key facet where code has major issue is recursion. Each thread in the pool is dependent on inner function putting upper limit on execution to number of workers available. The function is not able to finish hence new one cannot execute. Thus, it terminates much before recursion limit is reached.
代码存在主要问题的一个关键方面是递归。池中的每个线程都依赖于内部函数,该函数将执行的上限限制为可用工作人员的数量。功能不能完成,所以新功能不能执行。因此,它在到达递归极限之前就终止了。