需要一个线程安全的异步消息队列

时间:2021-06-02 21:04:29

I'm looking for a Python class (preferably part of the standard language, rather than a 3rd party library) to manage asynchronous 'broadcast style' messaging.

我正在寻找一个Python类(最好是标准语言的一部分,而不是第三方库)来管理异步的“广播风格”消息。

I will have one thread which puts messages on the queue (the 'putMessageOnQueue' method must not block) and then multiple other threads which will all be waiting for messages, having presumably called some blocking 'waitForMessage' function. When a message is placed on the queue I want each of the waiting threads to get its own copy of the message.

我将有一个线程将消息放到队列中(“putMessageOnQueue”方法不能阻塞),然后有多个线程等待消息,这些线程可能调用了一些阻塞的“waitForMessage”函数。当消息被放置到队列上时,我希望每个等待的线程都获得消息的自己副本。

I've looked at the built-in Queue class, but I don't think this is suitable because consuming messages seems to involve removing them from the queue, so only 1 client thread would see each one.

我已经查看了内置的队列类,但是我不认为这是合适的,因为使用消息似乎需要从队列中删除它们,因此只有一个客户机线程会看到每个消息。

This seems like it should be a common use-case, can anyone recommend a solution?

这似乎是一个常见的用例,有人能推荐一个解决方案吗?

2 个解决方案

#1


7  

I think the typical approach to this is to use a separate message queue for each thread, and push the message onto every queue which has previously registered an interest in receiving such messages.

我认为这种方法的典型做法是为每个线程使用一个单独的消息队列,并将消息推送到每个队列上,这些队列之前注册了接收此类消息的兴趣。

Something like this ought to work, but it's untested code...

类似这样的东西应该可以工作,但它是未经测试的代码……

from time import sleep
from threading import Thread
from Queue import Queue

class DispatcherThread(Thread):

   def __init__(self, *args, **kwargs):
       super(DispatcherThread, self).__init__(*args, **kwargs)
       self.interested_threads = []

   def run(self):
       while 1:
           if some_condition:
               self.dispatch_message(some_message)
           else:
               sleep(0.1)

   def register_interest(self, thread):
       self.interested_threads.append(thread)

   def dispatch_message(self, message):
       for thread in self.interested_threads:
           thread.put_message(message)



class WorkerThread(Thread):

   def __init__(self, *args, **kwargs):
       super(WorkerThread, self).__init__(*args, **kwargs)
       self.queue = Queue()


   def run(self):

       # Tell the dispatcher thread we want messages
       dispatcher_thread.register_interest(self)

       while 1:
           # Wait for next message
           message = self.queue.get()

           # Process message
           # ...

   def put_message(self, message):
       self.queue.put(message)


dispatcher_thread = DispatcherThread()
dispatcher_thread.start()

worker_threads = []
for i in range(10):
    worker_thread = WorkerThread()
    worker_thread.start()
    worker_threads.append(worker_thread)

dispatcher_thread.join()

#2


2  

I think this is a more straight forward example (taken from the Queue example in Python Lib )

我认为这是一个更直接的示例(取自Python库中的队列示例)

from threading import Thread
from Queue import Queue


num_worker_threads = 2

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

#1


7  

I think the typical approach to this is to use a separate message queue for each thread, and push the message onto every queue which has previously registered an interest in receiving such messages.

我认为这种方法的典型做法是为每个线程使用一个单独的消息队列,并将消息推送到每个队列上,这些队列之前注册了接收此类消息的兴趣。

Something like this ought to work, but it's untested code...

类似这样的东西应该可以工作,但它是未经测试的代码……

from time import sleep
from threading import Thread
from Queue import Queue

class DispatcherThread(Thread):

   def __init__(self, *args, **kwargs):
       super(DispatcherThread, self).__init__(*args, **kwargs)
       self.interested_threads = []

   def run(self):
       while 1:
           if some_condition:
               self.dispatch_message(some_message)
           else:
               sleep(0.1)

   def register_interest(self, thread):
       self.interested_threads.append(thread)

   def dispatch_message(self, message):
       for thread in self.interested_threads:
           thread.put_message(message)



class WorkerThread(Thread):

   def __init__(self, *args, **kwargs):
       super(WorkerThread, self).__init__(*args, **kwargs)
       self.queue = Queue()


   def run(self):

       # Tell the dispatcher thread we want messages
       dispatcher_thread.register_interest(self)

       while 1:
           # Wait for next message
           message = self.queue.get()

           # Process message
           # ...

   def put_message(self, message):
       self.queue.put(message)


dispatcher_thread = DispatcherThread()
dispatcher_thread.start()

worker_threads = []
for i in range(10):
    worker_thread = WorkerThread()
    worker_thread.start()
    worker_threads.append(worker_thread)

dispatcher_thread.join()

#2


2  

I think this is a more straight forward example (taken from the Queue example in Python Lib )

我认为这是一个更直接的示例(取自Python库中的队列示例)

from threading import Thread
from Queue import Queue


num_worker_threads = 2

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done