I recently wrote a program that used a simple producer/consumer pattern. It initially had a bug related to improper use of threading.Lock that I eventually fixed. But it made me think whether it's possible to implement producer/consumer pattern in a lockless manner.
我最近编写了一个使用简单生产者/消费者模式的程序。它最初有一个与不正确使用线程有关的错误。我最终解决了这个问题。但它让我想到是否有可能以无锁的方式实现生产者/消费者模式。
Requirements in my case were simple:
我的要求很简单:
- One producer thread.
- One consumer thread.
- Queue has place for only one item.
- Producer can produce next item before the current one is consumed. The current item is therefore lost, but that's OK for me.
- Consumer can consume current item before the next one is produced. The current item is therefore consumed twice (or more), but that's OK for me.
一个生产者线程。
一个消费者线程。
队列只有一个项目。
生产者可以在消耗当前物品之前生成下一个物品。因此当前项目丢失了,但这对我来说没问题。
消费者可以在生成下一个项目之前使用当前项目。因此,当前项目被消耗两次(或更多),但这对我来说没问题。
So I wrote this:
所以我写了这个:
QUEUE_ITEM = None
# this is executed in one threading.Thread object
def producer():
global QUEUE_ITEM
while True:
i = produce_item()
QUEUE_ITEM = i
# this is executed in another threading.Thread object
def consumer():
global QUEUE_ITEM
while True:
i = QUEUE_ITEM
consume_item(i)
My question is: Is this code thread-safe?
我的问题是:这个代码是否是线程安全的?
Immediate comment: this code isn't really lockless - I use CPython and it has GIL.
直接评论:这段代码不是真的无锁 - 我使用CPython并且它有GIL。
I tested the code a little and it seems to work. It translates to some LOAD and STORE ops which are atomic because of GIL. But I also know that del x
operation isn't atomic when x implements __del__
method. So if my item has a __del__
method and some nasty scheduling happens, things may break. Or not?
我测试了一点代码,它似乎工作。它转换为一些由于GIL而成为原子的LOAD和STORE操作。但我也知道,当x实现__del__方法时,del x操作不是原子的。因此,如果我的项目有__del__方法并且发生了一些讨厌的调度,那么事情可能会中断。或不?
Another question is: What kind of restrictions (for example on produced items' type) do I have to impose to make the above code work fine?
另一个问题是:为了使上述代码正常工作,我必须强加什么样的限制(例如生成的项目类型)?
My questions are only about theoretical possibility to exploit CPython's and GIL's quirks in order to come up with lockless (i.e. no locks like threading.Lock explicitly in code) solution.
我的问题只是关于利用CPython和GIL的怪癖的理论可能性,以便提出无锁(即没有锁代码,如代码中明确的线程。锁)解决方案。
6 个解决方案
#1
Yes this will work in the way that you described:
是的,这将按照您描述的方式工作:
- That the producer may produce a skippable element.
- That the consumer may consume the same element.
生产者可以生产可跳过的元素。
消费者可能消费相同的元素。
But I also know that del x operation isn't atomic when x implements del method. So if my item has a del method and some nasty scheduling happens, things may break.
但我也知道,当x实现del方法时,del x操作不是原子的。因此,如果我的项目有del方法并且发生了一些令人讨厌的调度,那么事情可能会中断。
I don't see a "del" here. If a del happens in consume_item then the del may occur in the producer thread. I don't think this would be a "problem".
我在这里看不到“del”。如果在consume_item中发生del,则del可能出现在生成器线程中。我不认为这会是一个“问题”。
Don't bother using this though. You will end up using up CPU on pointless polling cycles, and it is not any faster than using a queue with locks since Python already has a global lock.
尽管不要打扰使用它。您将最终在无意义的轮询周期中耗尽CPU,并且它不会比使用带锁的队列更快,因为Python已经具有全局锁定。
#2
Trickery will bite you. Just use Queue to communicate between threads.
诡计会咬你。只需使用Queue在线程之间进行通信。
#3
This is not really thread safe because producer could overwrite QUEUE_ITEM
before consumer has consumed it and consumer could consume QUEUE_ITEM
twice. As you mentioned, you're OK with that but most people aren't.
这不是真正的线程安全,因为生产者可能会在消费者消耗它之前覆盖QUEUE_ITEM,而消费者可能会消耗两次QUEUE_ITEM。正如你所提到的,你没关系,但大多数人都没有。
Someone with more knowledge of cpython internals will have to answer you more theoretical questions.
对cpython内部知识有更多了解的人将不得不回答你更多的理论问题。
#4
I think it's possible that a thread is interrupted while producing/consuming, especially if the items are big objects. Edit: this is just a wild guess. I'm no expert.
我认为在生产/消费时线程可能会被中断,特别是如果项目是大对象。编辑:这只是一个疯狂的猜测。我不是专家。
Also the threads may produce/consume any number of items before the other one starts running.
线程也可以在另一个项开始运行之前产生/消耗任意数量的项。
#5
You can use a list as the queue as long as you stick to append/pop since both are atomic.
只要您坚持追加/弹出,就可以使用列表作为队列,因为两者都是原子的。
QUEUE = []
# this is executed in one threading.Thread object
def producer():
global QUEUE
while True:
i = produce_item()
QUEUE.append(i)
# this is executed in another threading.Thread object
def consumer():
global QUEUE
while True:
try:
i = QUEUE.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
In a class scope like below, you can even clear the queue.
在如下所示的类范围中,您甚至可以清除队列。
class Atomic(object):
def __init__(self):
self.queue = []
# this is executed in one threading.Thread object
def producer(self):
while True:
i = produce_item()
self.queue.append(i)
# this is executed in another threading.Thread object
def consumer(self):
while True:
try:
i = self.queue.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
# There's the possibility producer is still working on it's current item.
def clear_queue(self):
self.queue = []
You'll have to find out which list operations are atomic by looking at the bytecode generated.
您必须通过查看生成的字节码来找出哪些列表操作是原子的。
#6
The __del__
could be a problem as You said. It could be avoided, if only there was a way to prevent the garbage collector from invoking the __del__
method on the old object before We finish assigning the new one to the QUEUE_ITEM
. We would need something like:
你说__del__可能是个问题。如果只有一种方法可以防止垃圾收集器在完成将新的方法分配给QUEUE_ITEM之前调用旧对象上的__del__方法,则可以避免这种情况。我们需要这样的东西:
increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object
I'm afraid, I don't know if it is possible, though.
我担心,我不知道是否有可能。
#1
Yes this will work in the way that you described:
是的,这将按照您描述的方式工作:
- That the producer may produce a skippable element.
- That the consumer may consume the same element.
生产者可以生产可跳过的元素。
消费者可能消费相同的元素。
But I also know that del x operation isn't atomic when x implements del method. So if my item has a del method and some nasty scheduling happens, things may break.
但我也知道,当x实现del方法时,del x操作不是原子的。因此,如果我的项目有del方法并且发生了一些令人讨厌的调度,那么事情可能会中断。
I don't see a "del" here. If a del happens in consume_item then the del may occur in the producer thread. I don't think this would be a "problem".
我在这里看不到“del”。如果在consume_item中发生del,则del可能出现在生成器线程中。我不认为这会是一个“问题”。
Don't bother using this though. You will end up using up CPU on pointless polling cycles, and it is not any faster than using a queue with locks since Python already has a global lock.
尽管不要打扰使用它。您将最终在无意义的轮询周期中耗尽CPU,并且它不会比使用带锁的队列更快,因为Python已经具有全局锁定。
#2
Trickery will bite you. Just use Queue to communicate between threads.
诡计会咬你。只需使用Queue在线程之间进行通信。
#3
This is not really thread safe because producer could overwrite QUEUE_ITEM
before consumer has consumed it and consumer could consume QUEUE_ITEM
twice. As you mentioned, you're OK with that but most people aren't.
这不是真正的线程安全,因为生产者可能会在消费者消耗它之前覆盖QUEUE_ITEM,而消费者可能会消耗两次QUEUE_ITEM。正如你所提到的,你没关系,但大多数人都没有。
Someone with more knowledge of cpython internals will have to answer you more theoretical questions.
对cpython内部知识有更多了解的人将不得不回答你更多的理论问题。
#4
I think it's possible that a thread is interrupted while producing/consuming, especially if the items are big objects. Edit: this is just a wild guess. I'm no expert.
我认为在生产/消费时线程可能会被中断,特别是如果项目是大对象。编辑:这只是一个疯狂的猜测。我不是专家。
Also the threads may produce/consume any number of items before the other one starts running.
线程也可以在另一个项开始运行之前产生/消耗任意数量的项。
#5
You can use a list as the queue as long as you stick to append/pop since both are atomic.
只要您坚持追加/弹出,就可以使用列表作为队列,因为两者都是原子的。
QUEUE = []
# this is executed in one threading.Thread object
def producer():
global QUEUE
while True:
i = produce_item()
QUEUE.append(i)
# this is executed in another threading.Thread object
def consumer():
global QUEUE
while True:
try:
i = QUEUE.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
In a class scope like below, you can even clear the queue.
在如下所示的类范围中,您甚至可以清除队列。
class Atomic(object):
def __init__(self):
self.queue = []
# this is executed in one threading.Thread object
def producer(self):
while True:
i = produce_item()
self.queue.append(i)
# this is executed in another threading.Thread object
def consumer(self):
while True:
try:
i = self.queue.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
# There's the possibility producer is still working on it's current item.
def clear_queue(self):
self.queue = []
You'll have to find out which list operations are atomic by looking at the bytecode generated.
您必须通过查看生成的字节码来找出哪些列表操作是原子的。
#6
The __del__
could be a problem as You said. It could be avoided, if only there was a way to prevent the garbage collector from invoking the __del__
method on the old object before We finish assigning the new one to the QUEUE_ITEM
. We would need something like:
你说__del__可能是个问题。如果只有一种方法可以防止垃圾收集器在完成将新的方法分配给QUEUE_ITEM之前调用旧对象上的__del__方法,则可以避免这种情况。我们需要这样的东西:
increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object
I'm afraid, I don't know if it is possible, though.
我担心,我不知道是否有可能。