线程等待超时时的任意睡眠

时间:2021-09-11 20:42:48

Before I start describing my question, it might worth mentioning that I'm using Python 2.7. I haven't checked, but this might be irrelevant for Python 3.x.

在我开始描述我的问题之前,有必要指出我正在使用Python 2.7。我还没有检查过,但是这可能与Python 3.x无关。

While working with Python's Queues, I've discovered something strange. Usually, when getting an object from the Queue, I allow long but finite timeout (such as a few seconds), to allow debugging and error reporting in case no object was found, when one was expected. What I've found out is that sometimes there's a strange gap between the time when an object was inserted into a previously empty Queue, and the time the get method of the very same Queue has returned that object, even though the method was called before the put was called for that object.

在处理Python的队列时,我发现了一些奇怪的东西。通常,当从队列中获取对象时,我允许长时间但有限的超时(比如几秒),以便在没有找到对象时(当需要对象时)允许调试和错误报告。有时我发现有一个奇怪的差距一个对象的时候前面插入一个空的队列,和时间同一队列的get方法返回对象,即使把之前的方法被称为是呼吁该对象。

Digging a little bit I've discovered that the gap was filled by sleeping. In the Queue module, if the timeout argument that is being passed to the get method is not None, and is positive, the non_empty Condition's wait method is called with a positive argument (that is not 100% precise; in fact, the Queue's "_qsize" method, which returns the length of the underlying deque is first verified to return 0, but as long as the queue was empty in the first place, the next thing is the condition's wait).

稍微挖掘一下,我发现空隙被睡觉填满了。在队列模块中,如果传递给get方法的超时参数不是None,并且是正的,则调用non_empty条件的等待方法为正参数(这不是100%精确的;实际上,队列的“_qsize”方法(返回底层deque的长度)首先被验证为返回0,但只要队列首先是空的,接下来的事情就是条件的等待)。

The Conditions's wait method acts differently if it gets a timeout or not. If it does not get any timeout, it simply calls waiter.acquire. This is defined in C and is beyond what I understand, but it seems like it works properly. However, if timeout is given, a bizarre sequence of sleeps occur instead, when the sleep times start at some arbitrary size (1 milisecond), and gets longer over time. Here's the exact code which runs:

如果条件的等待方法获得超时或没有超时,则其行为会有所不同。如果没有超时,它只调用waiter.acquire。这是在C语言中定义的,超出了我的理解范围,但它似乎运行得很正常。然而,如果给定超时,当睡眠时间以任意大小(1毫秒)开始并随时间变长时,就会出现奇怪的睡眠序列。下面是具体的代码:

# Balancing act:  We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive.  The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
    gotit = waiter.acquire(0)
    if gotit:
        break
    remaining = endtime - _time()
    if remaining <= 0:
        break
    delay = min(delay * 2, remaining, .05)
    _sleep(delay)

This is clearly the reason for the gap I've found between the time the new object was put into the previously-empty Queue, and the time that the already-called get method has returned that object. As the delay time grows exponentially until blocked by a huge (from my perspective) size of 0.05 seconds, it creates surprising and unwanted significant sleeps in my application's life.

这显然是我发现新对象被放入之前空队列的时间和已经调用的get方法返回该对象的时间之间存在差距的原因。随着延迟时间呈指数级增长,直到被0.05秒的巨大(从我的角度来看)大小阻塞,它在我的应用程序生命中创建了令人惊讶和不希望的重要休眠。

Can you explain what's the purpose of this? Are Python developers assume no Python user will care about such time lengths? Is there a quick workaround or a proper fix? Do you recommend me to overload the threading module?

你能解释一下这样做的目的吗?Python开发人员是否认为没有Python用户会关心这样的时间长度?是否有一个快速的解决方案或者一个合适的解决方案?你建议我重载线程模块吗?

2 个解决方案

#1


5  

I recently got hit by the same problem, and I also tracked it down to this exact block of code in the threading module.

最近我遇到了同样的问题,我也在线程模块中找到了这段代码。

It sucks.

它糟透了。


Can you explain what's the purpose of this? Are Python developers assume no Python user will care about such time lengths?

你能解释一下这样做的目的吗?Python开发人员是否认为没有Python用户会关心这样的时间长度?

Beats me...

难倒我了……


Do you recommend me to overload the threading module?

你建议我重载线程模块吗?

Either overload the threading module, or migrate to python3, where this part of the implementation has been fixed.

要么重载线程模块,要么迁移到python3,这部分实现已经被修复。

In my case, migrating to python3 would have been a huge effort, so I chose the former. What I did was:

在我的例子中,迁移到python3会是一个巨大的努力,所以我选择了前者。我所做的是:

  1. I created a quick .so file (using cython) with an interface to pthread. It includes python functions which invoke the corresponding pthread_mutex_* functions, and links against libpthread. Specifically, the function most relevant to the task we're interested in is pthread_mutex_timedlock.
  2. 我创建了一个带有pthread接口的快速。so文件(使用cython)。它包括调用相应pthread_mutex_*函数的python函数,以及针对libpthread的链接。具体来说,与我们感兴趣的任务最相关的函数是pthread_mutex_timedlock。
  3. I created a new threading2 module, (and replaced all import threading lines in my codebase with import threading2). In threading2, I re-defined all the relevant classes from threading (Lock, Condition, Event), and also ones from Queue which I use a lot (Queue and PriorityQueue). The Lock class was completely re-implemented using pthread_mutex_* functions, but the rest were much easier -- I simply subclassed the original (e.g. threading.Event), and overridden __init__ to create my new Lock type. The rest just worked.
  4. 我创建了一个新的threading2模块(并替换了在我的代码库中导入threading2的所有导入线程)。在threading2中,我从线程(锁、条件、事件)和我经常使用的队列(队列和PriorityQueue)中重新定义了所有相关的类。使用pthread_mutex_*函数完全重新实现了锁类,但其余的要简单得多——我只是对原始的(例如,thread . event)进行子类化,并重写__init__以创建新的锁类型。剩下的只是工作。

The implementation of the new Lock type was very similar to the original implementation in threading, but I based the new implemenation of acquire on the code I found in python3's threading module (which, naturally, is much simpler than the abovementioned "balancing act" block). This part was fairly easy.

新的锁类型的实现与线程中的原始实现非常相似,但是我基于在python3的线程模块中发现的代码(自然比上面提到的“平衡行为”块要简单得多)实现了acquire。这部分相当容易。

(Btw, the result in my case was 30% speedup of my massively-multithreaded process. Even more than I expected.)

(顺便说一句,在我的例子中,结果是大规模多线程进程的速度提高了30%。甚至比我预期的还要多。

I hope this helps.

我希望这可以帮助。

#2


0  

What you could do to be sure that Queue is not doing something weird is to use the method get_nowait and the Exception Empty. Take a look at these lines that I have in our production servers. (Of course modified to fit in this example).

您可以做的是确保队列没有做什么奇怪的事情,就是使用get_nowait方法和异常空。看看我在生产服务器上的这些代码行。(当然是为了适应这个例子而修改的)。

from Queue import Queue, Empty

while receiver.isAlive:
    try:
        rec = Record(queue.get_nowait())
    except Empty:
        # Set someTime with the value you want
        someTime = 0.1
        sleep(someTime)
    else:
        doSomething(rec)

Also, have in mind the following:

同时,请记住以下几点:

The time.sleep() function uses the underlying operating system’s sleep() function. Ultimately there are limitations of this function. For example on a standard Windows installation, the smallest interval you may sleep is 10 – 13 milliseconds. The Linux kernels tend to have a higher tick rate, where the intervals are generally closer to 1 millisecond.

函数的作用是:使用底层操作系统的sleep()函数。最终,这个函数是有局限性的。例如,在标准的Windows安装中,您可以睡眠的最小间隔是10 - 13毫秒。Linux内核通常有更高的滴答速率,在这里间隔通常更接近1毫秒。

#1


5  

I recently got hit by the same problem, and I also tracked it down to this exact block of code in the threading module.

最近我遇到了同样的问题,我也在线程模块中找到了这段代码。

It sucks.

它糟透了。


Can you explain what's the purpose of this? Are Python developers assume no Python user will care about such time lengths?

你能解释一下这样做的目的吗?Python开发人员是否认为没有Python用户会关心这样的时间长度?

Beats me...

难倒我了……


Do you recommend me to overload the threading module?

你建议我重载线程模块吗?

Either overload the threading module, or migrate to python3, where this part of the implementation has been fixed.

要么重载线程模块,要么迁移到python3,这部分实现已经被修复。

In my case, migrating to python3 would have been a huge effort, so I chose the former. What I did was:

在我的例子中,迁移到python3会是一个巨大的努力,所以我选择了前者。我所做的是:

  1. I created a quick .so file (using cython) with an interface to pthread. It includes python functions which invoke the corresponding pthread_mutex_* functions, and links against libpthread. Specifically, the function most relevant to the task we're interested in is pthread_mutex_timedlock.
  2. 我创建了一个带有pthread接口的快速。so文件(使用cython)。它包括调用相应pthread_mutex_*函数的python函数,以及针对libpthread的链接。具体来说,与我们感兴趣的任务最相关的函数是pthread_mutex_timedlock。
  3. I created a new threading2 module, (and replaced all import threading lines in my codebase with import threading2). In threading2, I re-defined all the relevant classes from threading (Lock, Condition, Event), and also ones from Queue which I use a lot (Queue and PriorityQueue). The Lock class was completely re-implemented using pthread_mutex_* functions, but the rest were much easier -- I simply subclassed the original (e.g. threading.Event), and overridden __init__ to create my new Lock type. The rest just worked.
  4. 我创建了一个新的threading2模块(并替换了在我的代码库中导入threading2的所有导入线程)。在threading2中,我从线程(锁、条件、事件)和我经常使用的队列(队列和PriorityQueue)中重新定义了所有相关的类。使用pthread_mutex_*函数完全重新实现了锁类,但其余的要简单得多——我只是对原始的(例如,thread . event)进行子类化,并重写__init__以创建新的锁类型。剩下的只是工作。

The implementation of the new Lock type was very similar to the original implementation in threading, but I based the new implemenation of acquire on the code I found in python3's threading module (which, naturally, is much simpler than the abovementioned "balancing act" block). This part was fairly easy.

新的锁类型的实现与线程中的原始实现非常相似,但是我基于在python3的线程模块中发现的代码(自然比上面提到的“平衡行为”块要简单得多)实现了acquire。这部分相当容易。

(Btw, the result in my case was 30% speedup of my massively-multithreaded process. Even more than I expected.)

(顺便说一句,在我的例子中,结果是大规模多线程进程的速度提高了30%。甚至比我预期的还要多。

I hope this helps.

我希望这可以帮助。

#2


0  

What you could do to be sure that Queue is not doing something weird is to use the method get_nowait and the Exception Empty. Take a look at these lines that I have in our production servers. (Of course modified to fit in this example).

您可以做的是确保队列没有做什么奇怪的事情,就是使用get_nowait方法和异常空。看看我在生产服务器上的这些代码行。(当然是为了适应这个例子而修改的)。

from Queue import Queue, Empty

while receiver.isAlive:
    try:
        rec = Record(queue.get_nowait())
    except Empty:
        # Set someTime with the value you want
        someTime = 0.1
        sleep(someTime)
    else:
        doSomething(rec)

Also, have in mind the following:

同时,请记住以下几点:

The time.sleep() function uses the underlying operating system’s sleep() function. Ultimately there are limitations of this function. For example on a standard Windows installation, the smallest interval you may sleep is 10 – 13 milliseconds. The Linux kernels tend to have a higher tick rate, where the intervals are generally closer to 1 millisecond.

函数的作用是:使用底层操作系统的sleep()函数。最终,这个函数是有局限性的。例如,在标准的Windows安装中,您可以睡眠的最小间隔是10 - 13毫秒。Linux内核通常有更高的滴答速率,在这里间隔通常更接近1毫秒。