event()——确保所有等待的线程在event.set()中唤醒

时间:2021-05-28 20:54:45

I have a number of threads which wait on an event, perform some action, then wait on the event again. Another thread will trigger the event when it's appropriate.

我有许多线程,它们等待事件,执行一些操作,然后再次等待事件。另一个线程将在适当的时候触发事件。

I can't figure out a way to ensure that each waiting thread triggers exactly once upon the event being set. I currently have the triggering thread set it, sleep for a bit, then clear it. Unfortunately, this leads to the waiting threads grabbing the set event many times, or none at all.

我想不出一种方法来确保每个等待的线程在设置事件时都触发一次。不幸的是,这会导致等待线程多次抓取set事件,或者根本没有。

I can't simply have the triggering thread spawn the response threads to run them once because they're responses to requests made from elsewhere.

我不能简单地让触发线程产生响应线程来运行它们,因为它们是对来自其他地方的请求的响应。

In short: In Python, how can I have a thread set an event and ensure each waiting thread acts on the event exactly once before it gets cleared?

简而言之:在Python中,如何让一个线程设置一个事件,并确保每个等待的线程在事件被清除之前对事件执行一次?

Update:

更新:

I've tried setting it up using a lock and a queue, but it doesn't work. Here's what I have:

我尝试过使用锁和队列设置它,但它不起作用。这就是我有:

# Globals - used to synch threads
waitingOnEvent = Queue.Queue
MainEvent = threading.Event()
MainEvent.clear()    # Not sure this is necessary, but figured I'd be safe
mainLock = threading.Lock()

def waitCall():
    mainLock.acquire()
    waitingOnEvent.put("waiting")
    mainLock.release()
    MainEvent.wait()
    waitingOnEvent.get(False)
    waitingOnEvent.task_done()
    #do stuff
    return

def triggerCall():
    mainLock.acquire()
    itemsinq = waitingOnEvent.qsize()
    MainEvent.set()
    waitingOnEvent.join()
    MainEvent.clear()
    mainLock.release()
    return

The first time, itemsinq properly reflects how many calls are waiting, but only the first waiting thread to make the call will make it through. From then on, itemsinq is always 1, and the waiting threads take turns; each time the trigger call happens, the next goes through.

第一次,itemsinq正确地反映了等待的调用数量,但是只有第一个等待发出调用的线程才能通过。从那时起,itemsinq始终为1,等待的线程轮流执行;每次触发调用发生时,下一个触发。

Update 2 It appears as though only one of the event.wait() threads is waking up , and yet the queue.join() is working. This suggests to me that one waiting thread wakes up, grabs from the queue and calls task_done(), and that single get()/task_done() somehow empties the queue and allows the join(). The trigger thread then completes the join(), clears the event, and thus prevents the other waiting threads from going through. Why would the queue register as empty/finished after only one get/task_done call, though?

Update 2看起来好像只有一个event.wait()线程正在醒来,而queue.join()仍然在工作。这对我来说意味着一个正在等待的线程醒来,从队列中获取并调用task_done(),而这个单一的get()/task_done()以某种方式清空队列并允许join()。触发线程然后完成join(),清除事件,从而防止其他等待线程通过。但是,为什么在只调用一个get/task_done调用之后,队列就会变成空的/结束了呢?

Only one seems to be waking up, even if I comment out the queue.get() and queue.task_done() and hang the trigger so it can't clear the event.

即使我注释了queue.get()和queue.task_done(),并挂起触发器,使其无法清除事件,但似乎只有一个人醒了过来。

4 个解决方案

#1


9  

You don't need an Event, and you don't need both a Lock and a Queue. All you need is a Queue.

您不需要事件,也不需要锁和队列。你所需要的只是一个队列。

Call queue.put to drop a message in without waiting for it to be delivered or processed.

调用队列。在不等待消息交付或处理的情况下放置消息。

Call queue.get in the worker thread to wait for a message to arrive.

调用队列。进入工作线程等待消息到达。

import threading
import Queue

active_queues = []

class Worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            data = self.mailbox.get()
            if data == 'shutdown':
                print self, 'shutting down'
                return
            print self, 'received a message:', data

    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")
        self.join()


def broadcast_event(data):
    for q in active_queues:
        q.put(data)

t1 = Worker()
t2 = Worker()
t1.start()
t2.start()
broadcast_event("first event")
broadcast_event("second event")
broadcast_event("shutdown")

t1.stop()
t2.stop()

The messages don't have to be strings; they can be any Python object.

消息不必是字符串;它们可以是任何Python对象。

#2


3  

If you want discrete, atomic events that can be processed sequentially by each thread, then do as krs1 & bot403 suggested and use a queue. The Python Queue class is synchronized - you do not have to worry about locking to use it.

如果需要离散的原子事件,可以按每个线程按顺序处理,然后按照krs1和bot403的建议使用队列。Python队列类是同步的——使用它不需要担心锁的问题。

If however your needs are simpler (the event tells you that you have data available to read, etc), you can subscribe/register your threads as observers of an object responsible for triggering the events. This object would maintain the list of observer threading.Event objects. On a trigger, it can then call set() on all of the threading.Event objects in the list.

如果您的需求更简单(事件告诉您您可以读取数据,等等),您可以订阅/注册您的线程,作为触发事件的对象的观察者。该对象将维护观察者线程的列表。事件对象。在触发器上,它可以调用所有线程上的set()。列表中的事件对象。

#3


2  

One solution I've used in the past is the Queue class for interthread communication. It is threadsafe and can be used to easy communication between threads when using both the multiprocessing and threading libraries. You could have the child threads waiting for something to enter the queue and then process the new entry. The Queue class also has a get() method which takes a handy blocking argument.

我过去使用的一个解决方案是用于线程间通信的队列类。它是线程安全的,在使用多线程处理和线程库时,可以方便地用于线程之间的通信。您可以让子线程等待一些东西进入队列,然后处理新的条目。队列类还有一个get()方法,它使用了一个方便的阻塞参数。

#4


1  

I'm not a python programmer but if an event can only be processed once perhaps you need to switch to a message queue with the appropriate locking so that when one thread wakes up and receives the event message it will process it and remove it from the queue so its not there if other threads wake up and look in the queue.

我不是一个python程序员,但是如果一个事件只能加工一次也许你需要切换到一个消息队列与适当的锁定,这样当一个线程醒来和接收事件消息将处理它,并从队列中删除它所以它没有其他线程醒来,看看在队列中。

#1


9  

You don't need an Event, and you don't need both a Lock and a Queue. All you need is a Queue.

您不需要事件,也不需要锁和队列。你所需要的只是一个队列。

Call queue.put to drop a message in without waiting for it to be delivered or processed.

调用队列。在不等待消息交付或处理的情况下放置消息。

Call queue.get in the worker thread to wait for a message to arrive.

调用队列。进入工作线程等待消息到达。

import threading
import Queue

active_queues = []

class Worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            data = self.mailbox.get()
            if data == 'shutdown':
                print self, 'shutting down'
                return
            print self, 'received a message:', data

    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")
        self.join()


def broadcast_event(data):
    for q in active_queues:
        q.put(data)

t1 = Worker()
t2 = Worker()
t1.start()
t2.start()
broadcast_event("first event")
broadcast_event("second event")
broadcast_event("shutdown")

t1.stop()
t2.stop()

The messages don't have to be strings; they can be any Python object.

消息不必是字符串;它们可以是任何Python对象。

#2


3  

If you want discrete, atomic events that can be processed sequentially by each thread, then do as krs1 & bot403 suggested and use a queue. The Python Queue class is synchronized - you do not have to worry about locking to use it.

如果需要离散的原子事件,可以按每个线程按顺序处理,然后按照krs1和bot403的建议使用队列。Python队列类是同步的——使用它不需要担心锁的问题。

If however your needs are simpler (the event tells you that you have data available to read, etc), you can subscribe/register your threads as observers of an object responsible for triggering the events. This object would maintain the list of observer threading.Event objects. On a trigger, it can then call set() on all of the threading.Event objects in the list.

如果您的需求更简单(事件告诉您您可以读取数据,等等),您可以订阅/注册您的线程,作为触发事件的对象的观察者。该对象将维护观察者线程的列表。事件对象。在触发器上,它可以调用所有线程上的set()。列表中的事件对象。

#3


2  

One solution I've used in the past is the Queue class for interthread communication. It is threadsafe and can be used to easy communication between threads when using both the multiprocessing and threading libraries. You could have the child threads waiting for something to enter the queue and then process the new entry. The Queue class also has a get() method which takes a handy blocking argument.

我过去使用的一个解决方案是用于线程间通信的队列类。它是线程安全的,在使用多线程处理和线程库时,可以方便地用于线程之间的通信。您可以让子线程等待一些东西进入队列,然后处理新的条目。队列类还有一个get()方法,它使用了一个方便的阻塞参数。

#4


1  

I'm not a python programmer but if an event can only be processed once perhaps you need to switch to a message queue with the appropriate locking so that when one thread wakes up and receives the event message it will process it and remove it from the queue so its not there if other threads wake up and look in the queue.

我不是一个python程序员,但是如果一个事件只能加工一次也许你需要切换到一个消息队列与适当的锁定,这样当一个线程醒来和接收事件消息将处理它,并从队列中删除它所以它没有其他线程醒来,看看在队列中。