The following is taken from David Beazley's slides on generators (here for anybody interested).

下面是David Beazley关于生成器的幻灯片(这里是给任何有兴趣的人看的)。

A Task class is defined which wraps a generator that yields futures, the Task class, in full (w/o error handling), follows:


class Task:
    def __init__(self, gen):
        self._gen = gen

    def step(self, value=None):
            fut = self._gen.send(value)
        except StopIteration as exc:

    def _wakeup(self, fut):
        result = fut.result()

In an example, the following recursive function is also defined:


from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=8)

def recursive(n):
   yield pool.submit(time.sleep, 0.001)
   print("Tick :", n)

The following two cases play out:


  1. From the Python REPL, if we define these (or import them if we place them in a file) and then jump-start the recursion with:

    在Python REPL中,如果我们定义这些(或者如果我们将它们放在一个文件中就导入它们),然后用以下方法启动递归:


    it starts printing away seeming to a point where the recursion limit will have been exceeded. It obviously doesn't exceed it though, printing the stack level shows that it stays constant throughout execution. Something else is going on which I don't quite understand.


    NOTE: You'll need to kill the python process if you execute it like this.


  2. If we put all contents (Task, recursive) in a file along with:


    if __name__ == "__main__":

    and then run it with python myfile.py, it stops ticking at 7 (the number of max_workers, it seems).

    然后用python myfile运行它。py,它在7点停止滴答(似乎是max_workers的数量)。

My question is how does it seemingly surpass the recursion limit and why does it behave differently based on how you execute it?


The behavior appears on both Python 3.6.2 and Python 3.5.4 (and I'd guess others in the 3.6 and 3.5 family too).

该行为出现在Python 3.6.2和Python 3.5.4上(我猜3.6和3.5家族中的其他人也会出现)。

The recursive generator you show isn't actually recursive in a way that would cause a problem with the system recursion limit.


To understand why you need to pay attention to when the recursive generator's code runs. Unlike a normal function, just calling recursive(0) doesn't cause it to immediately run its code and make additional recursive calls. Instead, calling recursive(0) immediately returns a generator object. Only when you send() to the generator does the code run, and only after you send() to it a second time does it kick off another call.


Let's examine the call stack as the code runs. At the top level, we run Task(recursive(0)).step(). That does three things in a sequence:


  1. recursive(0) This call returns a generator object immediately.
  2. 这个调用立即返回一个生成器对象。
  3. Task(_) The Task object is created, and its __init__ method stores a reference to the generator object created in the first step.
  4. 任务(_)创建任务对象,其__init__方法存储对第一步中创建的生成器对象的引用。
  5. _.step() A method on the task gets called. This is where the action really starts! Let's look at what happens inside the call:


    • fut = self._gen.send(value) Here we actually start the generator running, by sending it a value. Let's go deeper and see the generator code run:
      • yield pool.submit(time.sleep, 0.001) This schedules something to be done in another thread. We don't wait for it to happen though. Instead, we get a Future that we can use to get notified when it's complete. We yield the future immediately back to the previous level of code.
      • 收益pool.submit(时间。睡眠,0.001)这个安排在另一个线程中完成。我们不会等待它发生。相反,我们有一个未来,我们可以用来在它完成时得到通知。我们将未来立即返回到以前的代码级别。
    • fut = self._gen.send(value)在这里,我们通过发送一个值来启动生成器的运行。让我们深入了解生成器代码运行:yield pool.submit(时间。睡眠,0.001)这个安排在另一个线程中完成。我们不会等待它发生。相反,我们有一个未来,我们可以用来在它完成时得到通知。我们将未来立即返回到以前的代码级别。
    • fut.add_done_callback(self._wakeup) Here we ask for our _wakeup() method to be called when the future is ready. This always returns immediately!
    • 这里我们请求在将来准备好时调用我们的_wakeup()方法。这总是立即返回!
    • The step method ends now. That's right, we're done (for the moment)! This is important for the second part of your question, which I'll discuss more later.
    • step方法现在结束。没错,我们(暂时)做完了!这对你问题的第二部分很重要,我将在后面进一步讨论。
  6. The call we made ended, so control flow returns to the REPL if we're running interactively. If we're running as a script, the interpreter instead will reach the end of the scrip and start shutting down (I'll discuss this more below). However, the other threads controlled by the thread pool are still running, and at some point, one of them is going to do some stuff we care about! Let's see what that is.


  7. When the scheduled function (time.sleep) has finished running, the thread it was running in will call the callback we set on the Future object. That is, it will call Task._wakup() on the Task object we created earlier (which we don't have a reference to anymore at the top level, but the Future kept a reference so it's still alive). Let's look at the method:


    • result = fut.result() Store the result of the deferred call. This is irrelevant in this case since we never look at the results (it's None anyway).
    • result = fut.result()存储延迟调用的结果。在这种情况下,这是不相关的,因为我们从来不看结果(反正也没有)。
    • self.step(result) Step again! Now we're back to the code we care about. Let's see what it does this time:
      • fut = self._gen.send(value) Send to the generator again, so it takes over. It already yielded once, so this time we start just after the yield:
        • print("Tick :", n) This is pretty simple.
        • 打印("Tick:", n)这很简单。
        • Task(recursive(n+1)).step() This is where things get interesting. This line is just like what we started with. So, like before, this is going to run the logic 1-4 I listed above (including their substeps). But instead of returning to the REPL or ending the script, when the step() method returns, it comes back here.
        • 任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它返回到这里,而不是返回到REPL或结束脚本。
        • The recursive() generator (the original one, not the new one we just created) has reached its end. So, like any generator that reaches the end of it's code, it raises StopIteration.
        • 递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。
      • fut = self._gen.send(value)再次发送到生成器,因此它接管。它已经生成了一次,所以这次我们从yield: print(“Tick:”,n)之后开始。任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它不会返回到REPL或结束脚本,而是返回到这里。递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。
      • The StopIteration is caught and ignored by the try/except block, and the step() method ends.
      • StopIteration被try/except block捕获并忽略,step()方法结束。
    • self.step(结果)步骤!现在我们回到我们关心的代码。让我们看看这次它做了什么:fut = self._gen.send(value)再次发送给生成器,这样它就接管了。它已经生成了一次,所以这次我们从yield: print(“Tick:”,n)之后开始。任务(递归(n+1)).step()这是事情变得有趣的地方。这条线和我们一开始画的一样。因此,像以前一样,它将运行上面列出的逻辑1-4(包括它们的子步骤)。但是,当step()方法返回时,它不会返回到REPL或结束脚本,而是返回到这里。递归()生成器(最初的生成器,而不是我们刚刚创建的新生成器)已经完成。因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration。StopIteration被try/except block捕获并忽略,step()方法结束。
    • The _wakup() method ends too, so the callback is done.
    • _wakup()方法也结束了,因此回调已经完成。
  8. Eventually the callback for the Task created in the earlier callback will be called as well. So we go back and repeat step 5, over and over, forever (if we're running interactively).
  9. 最后,在早期回调中创建的任务的回调也将被调用。所以我们回到第五步,一遍又一遍地重复,直到永远。

The call stack above explains why the interactive case prints forever. The main thread returns to the REPL (and you can do other stuff with it if you can see past the output from the other threads). But in the pool, each thread schedules another job from the callback of its own job. When the next job finishes, its callback schedules another job and so on.


So why do you get only 8 printouts when you run the code as a script? The answer is hinted at in step 4 above. When running non-interactively, the main thread runs off the end of the script after the first call to Task.step returns. This prompts the interpreter to try to shut down.


The concurrent.futures.thread module (where ThreadPoolExecutor is defined) has some fancy logic that tries to clean up nicely when the program shuts down while an executor is still active. It's supposed to stop any idle threads, and signal any that are still running to stop when their current job is complete.


The exact implementation of that cleanup logic interacts with our code in a very odd way (which may or may not buggy). The effect is that the first thread keeps giving itself more jobs to do, while additional worker threads that are spawned keep exiting immediately after they are spawned. The first worker finally quits when the executor has started as many threads as it wanted to be using (8 in our case).


Here's the sequence of events, as I understand it.


  1. We import (indirectly) the concurrent.futures.thread module, which uses atexit to tell the interpreter to run a function named _python_exit just before the interpreter shuts down.
  2. 我们(间接地)进口concurrent.futures。线程模块,它使用atexit命令解释器在解释器关闭之前运行一个名为_python_exit的函数。
  3. We create a ThreadPoolExecutor with a maximum thread count of 8. It doesn't spawn its worker threads immediately, but will create one each time a job is scheduled until it has all 8.
  4. 我们创建一个最大线程数为8的ThreadPoolExecutor。它不会立即生成它的工作线程,但每次调度作业时都会创建一个,直到它拥有所有8个。
  5. We schedule our first job (in the deeply nested part of step 3 from the previous list).
  6. 我们安排我们的第一个作业(在前面列表中步骤3的深层嵌套部分)。
  7. The executor adds the job to its internal queue, then notices it doesn't have the maximum number of worker threads and starts a new one.
  8. executor将作业添加到它的内部队列中,然后注意到它没有最大的工作线程数并开始一个新的线程。
  9. The new thread pops the job off the queue and begins running it. However, the sleep call takes much longer than the rest of the steps, so the thread is going to be stuck here for a bit.
  10. 新的线程从队列中取出作业并开始运行它。然而,睡眠调用要比其他步骤花费的时间长得多,因此线程将会在这里停留一段时间。
  11. The main thread finishes (it's reached step 4 in the previous list).
  12. 主线程完成(在前面的列表中达到了第4步)。
  13. The _python_exit function gets called by the interpreter, since the interpreter wants to shut down. The function sets a global _shutdown variable in the module, and sends a None to the internal queue of the executor (it sends one None per thread, but there's just the one thread created so far, so it just sends one None). It then blocks the main thread until the thread it knows about has quit. This delays the interpreter shutdown.
  14. 解释器会调用_python_exit函数,因为解释器想要关闭它。函数在模块中设置一个全局_shutdown变量,并向执行程序的内部队列发送一个None(每个线程发送一个None,但是到目前为止只创建了一个线程,所以只发送一个None)。然后它会阻塞主线程,直到它知道的线程退出。这延迟了解释器的关闭。
  15. The worker thread's call to time.sleep returns. It calls the callback function that is registered with its job's Future, which schedules another job.
  16. 工作线程对时间的调用。睡眠的回报。它调用与作业的未来一起注册的回调函数,该函数安排另一个作业。
  17. Like in step 4 of this list, the executor queues up the job, and starts another thread, since it doesn't have the desired number yet.
  18. 就像在这个列表的第4步中一样,执行程序将对作业进行排队,并启动另一个线程,因为它还没有期望的数字。
  19. The new thread tries to grab a job off the internal queue, but gets the None value from step 7 which is a signal that it may be done. It sees that the _shutdown global is set and so it quits. Before it does though, it adds another None to the queue.
  20. 新线程试图从内部队列中获取一个作业,但是从第7步中获取没有值,这是一个信号,表明它可以完成。它看到_shutdown全局变量被设置,因此它退出。但在此之前,它会向队列添加另一个None。
  21. The first worker thread finishes its callback. It looks for a new job, and finds the one it queued up itself in step 8. It starts running the job, and like in step 5, which takes a while.
  22. 第一个工作线程完成它的回调。它寻找一个新的作业,并在步骤8中找到自己排队的作业。它开始运行作业,就像第5步,这需要一段时间。
  23. Nothing else happens though, since the first worker the only active thread at the moment (the main thread is blocked waiting on the first worker to die, and the other worker shut itself down).
  24. 但是,由于第一个worker当前唯一的活动线程(主线程被阻塞,等待第一个worker死去,而另一个worker关闭自己),所以没有发生任何其他事情。
  25. We now repeat steps 8-12 several times. The first worker thread queues up the third through 8th jobs, and the executor spawns a corresponding threads each time since it doesn't have a full set. However, each thread dies immediately, since it gets a None off the job queue instead of an actual job to complete. The first worker thread ends up doing all the actual work.
  26. 我们现在重复8-12步。第一个worker线程将第三个作业排队到第8个作业,并且执行程序每次都会生成一个相应的线程,因为它没有一个完整的集合。第一个工作线程最终完成所有实际的工作。
  27. Finally, after the 8th job, something works differently. This time, when the callback schedules another job, no additional thread is spawned, since the executor knows it's started the requested 8 threads already (it doesn't know that 7 have shut down).
  28. 最后,在第八份工作之后,事情发生了变化。这一次,当回调调度另一个作业时,不会产生额外的线程,因为执行程序知道它已经启动了请求的8个线程(它不知道7已经关闭)。
  29. So this time, the None that's at the head of the internal job queue gets picked up by the first worker (instead of an actual job). That means it shuts down, rather than doing more work.
  30. 因此,这一次,位于内部作业队列之首的None会被第一个worker(而不是实际的job)接收。这意味着它会停止工作,而不是做更多的工作。
  31. When the first worker shuts down, the main thread (which has been waiting for it to quit) can finally unblock and the _python_exit function completes. This lets the interpreter shut down completely. We're done!
  32. 当第一个worker关闭时,主线程(一直在等待它退出)最终可以解除阻塞并完成_python_exit函数。这使得解释器完全关闭。我们完成了!

This explains the output we see! We gets 8 outputs, all coming from the same worker thread (the first one spawned).


I think there may be a race condition however, in that code. If step 11 happens before step 10 things might break. If the first worker got a None off the queue and the other newly spawned worker got the real job, the'd swap roles (the first worker would die, and the other one would do the rest of the work, barring more race conditions in the later versions of those steps). However, the main thread would be unblocked as soon as the first worker died. Since it doesn't know about the other threads (since they didn't exist when it made its list of the threads to wait on), it will close the interpreter prematurely.


I'm not sure if this race is ever likely to happen. I'd guess it's pretty unlikely, since the length of the code path between the new thread starting and it grabbing a job from the queue is much shorter than the path for the existing thread to finish the callback (the part after it queued up the new job) and then look for another job in the queue.


I suspect that it's a bug that the ThreadPoolExecutor lets us exit cleanly when we run our code as a script. The logic for queuing up a new job should probably check the global _shutdown flag in addition to the executor's own self._shutdown attribute. If it did, trying to queue up another job after the main thread had finished would raise an exception.


You can replicate what I think would be saner behavior by creating the ThreadPoolExecutor in a with statement:


# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:

This will crash soon after the main thread returns from the step() call. It will look something like this:


exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
  File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
  File ".\task_coroutines.py", line 21, in _wakeup
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 30, in recursive
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 28, in recursive
    yield pool.submit(time.sleep, 1)
  File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown



Lets start with what is number 7. That is the number of workers like you have mentioned already, labeled from [0..7]. The Task class needs to be passed recursive in form a function identifier.



instead of



It is because, the recursive function needs to be called inside the pool environment while in current case recursive is evaluated in main thread itself. time.sleep is the only function in current code which is evaluated in task pool.


A key facet where code has major issue is recursion. Each thread in the pool is dependent on inner function putting upper limit on execution to number of workers available. The function is not able to finish hence new one cannot execute. Thus, it terminates much before recursion limit is reached.




