多任务生产者消费者
"""
生产者 --> 消费者
生产者
1 生产者从中间件(kafka、mq、redis)中获取数据推送到queue中
2 事件通知并通知消费者线程开始消费
3 每次sleep 0.0001(防止 数据一直为空时占用大量资源)
消费者
1 事件消息接收到生产者消息开始消费
2 消费者在等待 timeout(s)后接受不到消息 wait 等待 生产者通知消费
"""
from threading import Thread, Event
import logging
import _queue
from queue import Queue
import time
from confluent_kafka import Consumer
import django
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
django.setup()
from gateway.models import (
Gateway,
Sensor
)
# logger = logging.getLogger()
class Product(Thread):
"""生产者"""
def __init__(self,
servers: str,
group_id: str,
topic: str,
queue,
event
):
super(Product, self).__init__()
self.queue = queue
self.event = event
self.c = Consumer(
{
'bootstrap.servers': servers,
'group.id': group_id,
'auto.offset.reset': 'earliest'
}
)
self.c.subscribe([topic])
def run(self) -> None:
while True:
time.sleep(0.0001)
data = self.c.poll(1.0)
if data is None:
continue
if data.error():
print(data.error())
continue
data = data.value().decode('utf-8')
self.queue.put(data)
if self.event.isSet() is False:
self.event.clear()
self.event.set()
class ConsumerKafka(Thread):
"""消费者"""
def __init__(self,
queue,
event,
func,
timeout: int = 10
):
super(ConsumerKafka, self).__init__()
self.queue = queue
self.event = event
self.timeout = timeout
self.func = func
def run(self) -> None:
while True:
try:
data = self.queue.get(timeout=self.timeout)
print('consumer', data)
# logger.error(data.error())
except _queue.Empty:
print('consumer is wait')
self.event.clear()
self.event.wait()
else:
# 执行更新操作
self.func(data)
def data_update(msg):
"""数据更新"""
# 此处可以考虑处理一些异常。否则服务在抛出异常后停在这里。从业务实际角度考虑
if __name__ == '__main__':
conf = {
'servers': 'ip:port',
'group_id': 'group',
'topic': 'topic'
}
q = Queue()
e = Event()
product = Product(queue=q, event=e, **conf)
consumer = ConsumerKafka(q, e, data_update)
product.start()
consumer.start()
product.join()
线程的问题还是一个线程在跑。经过queue拆分物尽其用,在对的时间做对的事情。也可以考虑拆成两个进程下协程去实现。协程有线程更高的性能和更少的资源占用