Python线程实际应用

时间:2022-12-23 17:02:48

多任务生产者消费者

"""
生产者 --> 消费者
生产者 
		1 生产者从中间件(kafka、mq、redis)中获取数据推送到queue中
		2 事件通知并通知消费者线程开始消费
		3 每次sleep 0.0001(防止 数据一直为空时占用大量资源)
		
消费者 
		1 事件消息接收到生产者消息开始消费
		2 消费者在等待 timeout(s)后接受不到消息 wait 等待 生产者通知消费
"""

from threading import Thread, Event
import logging

import _queue
from queue import Queue

import time

from confluent_kafka import Consumer

import django
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
django.setup()

from gateway.models import (
    Gateway,
    Sensor
)


# logger = logging.getLogger()


class Product(Thread):
    """生产者"""

    def __init__(self,
                 servers: str,
                 group_id: str,
                 topic: str,
                 queue,
                 event
                 ):
        super(Product, self).__init__()
        self.queue = queue
        self.event = event

        self.c = Consumer(
            {
                'bootstrap.servers': servers,
                'group.id': group_id,
                'auto.offset.reset': 'earliest'
            }
        )
        self.c.subscribe([topic])

    def run(self) -> None:

        while True:
            time.sleep(0.0001)
            data = self.c.poll(1.0)
            if data is None:
                continue
            if data.error():
                print(data.error())
                continue
            data = data.value().decode('utf-8')
            self.queue.put(data)
            if self.event.isSet() is False:
                self.event.clear()
                self.event.set()


class ConsumerKafka(Thread):
    """消费者"""

    def __init__(self,
                 queue,
                 event,
                 func,
                 timeout: int = 10
                 ):
        super(ConsumerKafka, self).__init__()
        self.queue = queue
        self.event = event
        self.timeout = timeout
        self.func = func

    def run(self) -> None:
        while True:
            try:
                data = self.queue.get(timeout=self.timeout)
                print('consumer', data)
                # logger.error(data.error())
            except _queue.Empty:
                print('consumer is wait')
                self.event.clear()
                self.event.wait()
            else:
                # 执行更新操作
                self.func(data)


def data_update(msg):
    """数据更新"""

   # 此处可以考虑处理一些异常。否则服务在抛出异常后停在这里。从业务实际角度考虑


if __name__ == '__main__':
    conf = {
        'servers': 'ip:port',
        'group_id': 'group',
        'topic': 'topic'
    }

    q = Queue()
    e = Event()

    product = Product(queue=q, event=e, **conf)

    consumer = ConsumerKafka(q, e, data_update)

    product.start()
    consumer.start()
    product.join()

线程的问题还是一个线程在跑。经过queue拆分物尽其用,在对的时间做对的事情。也可以考虑拆成两个进程下协程去实现。协程有线程更高的性能和更少的资源占用