之前看网上都是清一色pika包的例子,就用的pika包,最大问题是非多线程安全,改为使用rabbitpy。大幅改善了pika多线程需要加锁,和外网推送延迟又不能开多线程导致推送慢的问题。
rabbitpy有个适配器,可以把rabbitpy包的channel适配成与pika包的channel的相同公有方法,减少了难度。
高层次封装,使用参数来控制使用什么包来操作rabbitmq。
# -*- coding: utf-8 -*-
# @Author : ydf
from collections import Callable
import time
from threading import Lock
import rabbitpy
from pika import BasicProperties
# noinspection PyUnresolvedReferences
from rabbitpy.message import Properties
import pika
from pika.adapters.blocking_connection import BlockingChannel
from app.utils_ydf import LogManager
from app.utils_ydf.mixins import LoggerMixin
from app.utils_ydf import decorators
from app.utils_ydf import BoundedThreadPoolExecutor
from app import config as app_config LogManager('pika.heartbeat').get_logger_and_add_handlers(1)
LogManager('rabbitpy').get_logger_and_add_handlers(2)
LogManager('rabbitpy.base').get_logger_and_add_handlers(2) class RabbitmqClientRabbitPy:
"""
使用rabbitpy包。
""" # noinspection PyUnusedLocal
def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}'
self.connection = rabbitpy.Connection(rabbit_url) def creat_a_channel(self) -> rabbitpy.AMQP:
return rabbitpy.AMQP(self.connection.channel()) # 使用适配器,使rabbitpy包的公有方法几乎接近pika包的channel的方法。 class RabbitmqClientPika:
"""
使用pika包,多线程不安全的包。
""" def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
credentials = pika.PlainCredentials(username, password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host, port, virtual_host, credentials, heartbeat=heartbeat)) def creat_a_channel(self) -> BlockingChannel:
return self.connection.channel() class RabbitMqFactory:
def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60, is_use_rabbitpy=1):
"""
:param username:
:param password:
:param port:
:param virtual_host:
:param heartbeat:
:param is_use_rabbitpy: 为0使用pika,多线程不安全。为1使用rabbitpy,多线程安全的包。
"""
if is_use_rabbitpy:
self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat)
else:
self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat) def get_rabbit_cleint(self):
return self.rabbit_client class RabbitmqPublisher(LoggerMixin):
def __init__(self, queue_name, is_use_rabbitpy=1, log_level_int=10):
"""
:param queue_name:
:param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika。
:param log_level_int:
"""
self._queue_name = queue_name
self._is_use_rabbitpy = is_use_rabbitpy
self.logger.setLevel(log_level_int)
self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint()
self.channel = self.rabbit_client.creat_a_channel()
self.queue = self.channel.queue_declare(queue=queue_name, durable=True)
self._lock_for_pika = Lock()
self._lock_for_count = Lock()
self._current_time = None
self.count_per_minute = None
self._init_count()
self.logger.info(f'{self.__class__} 被实例化了') def _init_count(self):
with self._lock_for_count:
self._current_time = time.time()
self.count_per_minute = 0 def publish(self, msg: str):
if self._is_use_rabbitpy:
self._publish_rabbitpy(msg)
else:
self._publish_pika(msg)
self.logger.debug(f'向{self._queue_name} 队列,推送消息 {msg}')
with self._lock_for_count:
self.count_per_minute += 1
if time.time() - self._current_time > 60:
self._init_count()
self.logger.info(f'一分钟内推送了 {self.count_per_minute} 条消息到 {self.channel.connection} 中') @decorators.tomorrow_threads(100)
def _publish_rabbitpy(self, msg: str):
# noinspection PyTypeChecker
self.channel.basic_publish(
exchange='',
routing_key=self._queue_name,
body=msg,
properties={'delivery_mode': 2},
) def _publish_pika(self, msg: str):
with self._lock_for_pika: # 亲测pika多线程publish会出错。
self.channel.basic_publish(exchange='',
routing_key=self._queue_name,
body=msg,
properties=BasicProperties(
delivery_mode=2, # make message persistent
)
) def clear(self):
self.channel.queue_purge(self._queue_name) def get_message_count(self):
if self._is_use_rabbitpy:
return self._get_message_count_rabbitpy()
else:
return self._get_message_count_pika() def _get_message_count_pika(self):
queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
return queue.method.message_count def _get_message_count_rabbitpy(self):
ch = self.rabbit_client.connection.channel()
q = rabbitpy.amqp_queue.Queue(ch, self._queue_name)
q.durable = True
msg_count = q.declare(passive=True)[0]
ch.close()
return msg_count class RabbitmqConsumer(LoggerMixin):
def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=10, is_print_detail_exception=True, is_use_rabbitpy=1):
"""
:param queue_name:
:param consuming_function: 处理消息的函数,函数有且只能有一个参数,参数表示消息。是为了简单,放弃策略和模板来强制参数。
:param threads_num:
:param max_retry_times:
:param log_level:
:param is_print_detail_exception:
:param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika.
"""
self._queue_name = queue_name
self.consuming_function = consuming_function
self._threads_num = threads_num
self.threadpool = BoundedThreadPoolExecutor(threads_num)
self._max_retry_times = max_retry_times
self.logger.setLevel(log_level)
self.logger.info(f'{self.__class__} 被实例化')
self._is_print_detail_exception = is_print_detail_exception
self._is_use_rabbitpy = is_use_rabbitpy def start_consuming_message(self):
if self._is_use_rabbitpy:
self.start_consuming_message_rabbitpy()
else:
self.start_consuming_message_pika() @decorators.keep_circulating(1) # 是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
def start_consuming_message_rabbitpy(self):
# noinspection PyArgumentEqualDefault
channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel() # type: rabbitpy.AMQP # 此处先固定使用pika.
channel.queue_declare(queue=self._queue_name, durable=True)
channel.basic_qos(prefetch_count=self._threads_num)
for message in channel.basic_consume(self._queue_name):
body = message.body.decode()
self.logger.debug(f'从rabbitmq取出的消息是: {body}')
self.threadpool.submit(self.__consuming_function_rabbitpy, message) def __consuming_function_rabbitpy(self, message, current_retry_times=0):
if current_retry_times < self._max_retry_times:
# noinspection PyBroadException
try:
self.consuming_function(message.body.decode())
message.ack()
except Exception as e:
self.logger.error(f'函数 {self.consuming_function} 第{current_retry_times+1}次发生错误,\n 原因是{e}', exc_info=self._is_print_detail_exception)
self.__consuming_function_rabbitpy(message, current_retry_times + 1)
else:
self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败') # 错得超过指定的次数了,就确认消费了。
message.ack() @decorators.keep_circulating(1) # 是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
def start_consuming_message_pika(self):
channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() # 此处先固定使用pika.
channel.queue_declare(queue=self._queue_name, durable=True)
channel.basic_qos(prefetch_count=self._threads_num) def callback(ch, method, properties, body):
body = body.decode()
self.logger.debug(f'从rabbitmq取出的消息是: {body}')
self.threadpool.submit(self.__consuming_function_pika, ch, method, properties, body) channel.basic_consume(callback,
queue=self._queue_name,
# no_ack=True
)
channel.start_consuming() @staticmethod
def ack_message_pika(channelx, delivery_tagx):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channelx.is_open:
channelx.basic_ack(delivery_tagx)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass def __consuming_function_pika(self, ch, method, properties, body, current_retry_times=0):
if current_retry_times < self._max_retry_times:
# noinspection PyBroadException
try:
self.consuming_function(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
except Exception as e:
self.logger.error(f'函数 {self.consuming_function} 第{current_retry_times+1}次发生错误,\n 原因是{e}', exc_info=self._is_print_detail_exception)
self.__consuming_function_pika(ch, method, properties, body, current_retry_times + 1)
else:
self.logger.critical(f'达到最大重试次数 {self._max_retry_times} 后,仍然失败') # 错得超过指定的次数了,就确认消费了。
ch.basic_ack(delivery_tag=method.delivery_tag)
# self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag)) if __name__ == '__main__':
with decorators.TimerContextManager():
# noinspection PyArgumentEqualDefault
rabbitmq_publisher = RabbitmqPublisher('queue_test', is_use_rabbitpy=1)
# print(rabbitmq_publisher.get_message_count()) # def pub(msg):
# # print(msg)
# rabbitmq_publisher.publish(msg)
#
#
# [pub(str(i)) for i in range(200000)]
# time.sleep(10)
# def f(body):
# print('.... ', body)
# time.sleep(10) # 模拟做某事需要阻塞10秒种,必须用并发。
#
#
# rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=200, is_use_rabbitpy=0)
# rabbitmq_consumer.start_consuming_message()
python rabbitmq的库,rabbitpy代替pika的更多相关文章
-
Python之路-python(rabbitmq、redis)
一.RabbitMQ队列 安装python rabbitMQ module pip install pika or easy_install pika or 源码 https://pypi.pytho ...
-
python RabbitMQ队列/redis
RabbitMQ队列 rabbitMQ是消息队列:想想之前的我们学过队列queue:threading queue(线程queue,多个线程之间进行数据交互).进程queue(父进程与子进程进行交互或 ...
-
python RabbitMQ队列使用(入门篇)
---恢复内容开始--- python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种que ...
-
Python RabbitMQ消息队列
python内的队列queue 线程 queue:不同线程交互,不能夸进程 进程 queue:只能用于父进程与子进程,或者同一父进程下的多个子进程,进行交互 注:不同的两个独立进程是不能交互的. ...
-
python RabbitMQ队列使用
python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下 ...
-
python Rabbitmq编程(一)
python Rabbitmq编程(一) 实现最简单的队列通信 send端 #!/usr/bin/env python import pika credentials = pika.PlainCred ...
-
Python底层socket库
Python底层socket库将Unix关于网络通信的系统调用对象化处理,是底层函数的高级封装,socket()函数返回一个套接字,它的方法实现了各种套接字系统调用.read与write与Python ...
-
【C++实现python字符串函数库】strip、lstrip、rstrip方法
[C++实现python字符串函数库]strip.lstrip.rstrip方法 这三个方法用于删除字符串首尾处指定的字符,默认删除空白符(包括'\n', '\r', '\t', ' '). s.st ...
-
【C++实现python字符串函数库】二:字符串匹配函数startswith与endswith
[C++实现python字符串函数库]字符串匹配函数startswith与endswith 这两个函数用于匹配字符串的开头或末尾,判断是否包含另一个字符串,它们返回bool值.startswith() ...
随机推荐
-
JavaScript实现通过的集合类
集合是一种数据结构,用以表示非重复值的无序集合.集合的基础方法包括添加值.检测值是否在集合中,这种集合需要一种通用的实现,以保证操作效率. JavaScript的对象是属性名以及与之对应的值的基本集合 ...
-
OGRE 2.1 Windows 编译
版权所有,转载请注明链接 OGRE 2.1 Windows 编译 环境: Windows 7 64Bit Visual Studio 2012 OGRE 2.1 CMake 2.8.12.1 OGRE ...
-
jdk jre jvm 关系
很多朋友可能跟我一样,已经使用JAVA开发很久了,可是对JDK,JRE,JVM这三者的联系与区别,一直都是模模糊糊的. 今天特写此文,来整理下三者的关系. JDK : Java Development ...
-
HDU 5924 Mr. Frog’s Problem 【模拟】 (2016CCPC东北地区大学生程序设计竞赛)
Mr. Frog's Problem Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 65536/65536 K (Java/Other ...
-
PHP MYSQL 搜索周边坐标,并计算两个点之间的距离
搜索附近地点,例如,坐标(39.91, 116.37)附近500米内的人,首先算出“给定坐标附近500米”这个范围的坐标范围. 虽然它是个圆,但我们可以先求出该圆的外接正方形,然后拿正方形的经纬度范围 ...
-
Python base64编码,转图片
我在做火车票抢票器的时候遇到一个问题,就是验证码提取的:一般验证码都是一些http请求的url,但是火车票网站遇到了我没有见过的以data:image/jpg;base64开头的字符串.现在我们就用P ...
-
C语言实现循环队列的初始化&;进队&;出队&;读取队头元素&;判空-2
/*顺序表实现队列的一系列操作(设置flag标志不损失数组空间)*/ #include<stdio.h> #include<stdlib.h> #define Queue_Si ...
-
java构造函数和初始化
1.如果构造函数的第一条语句是一条普通的语句.也就是说,不是对this()或super()的调用,那么java会插入一个隐式的super()调用,从而调用超类的默认构造函数.从该调用返回时,Java将 ...
-
JavaScript实现元素拖动性能优化
前言:前几天没事干写了个小网站,打算用原生的javascript实现元素的拖动,但是事情并没有想象的那么顺利,首先是实现了拖动的元素卡的不能再卡,简直不能够,上图~~ 看见没?这就是效果,简直让人欲哭 ...
-
【BZOJ】3123: [Sdoi2013]森林
题解 ------------------ 我莫不是一个智障吧 我把testdata的编号 当成数据组数读进来 我简直有毒 以为哪里写错了自闭了好久 实际上这题很简单,只要愉悦地开个启发式合并,然后每 ...