python运维开发之第十一天(RabbitMQ,redis)

时间:2022-02-11 18:15:54

一、RabbitMQ

python的Queue与RabbitMQ之间的理解:

python的进程或线程Queue只能python自己用。RabbitMQ队列多个应用之间共享队列,互相通信。

1、简单的实现生产者与消费者

  生产者

  (1)建立socket连接;(2)声明一个管道;(3)声明队列(queue);(4)通过管道发消息;(5)routing_key(queue名字);(6)body(内容)

  消费者

  (1)建立连接;(2)声明管道;(3)声明队列;(4)消费者声明队列(防止生产者后启动,消费者报错);(5)消费消息;(6)callback如果收到消息就调用函数处理消息 queue队列名字;

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/ import pika
#建立socket连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()
#声明一个队列
channel.queue_declare(queue='hello')
#通过管道发消息,routing_key 队列queue名字 ,body发送内容
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World! 1 2')
print("[x] send 'Hello World! 1 2 '")
connection.close()

producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/ import pika,time
#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()
#声明队列,防止生产者(发送端)没开启,消费者端报错
channel.queue_declare(queue='hello')
#ch管道的内存对象地址,如果收到消息就调用函数callback,处理消息
def callbak(ch,method,properties,body):
print("[x] Received %r " % body)
# time.sleep(30)
#消费消息
channel.basic_consume(callbak,
queue='hello',
no_ack=True #消息有没处理,都不给生产者发确认消息
)
print('[*] Waitting for messages TO exit press ctrl+c')
channel.start_consuming() #开始

consumer

2、消费者对生产者,可以1对多,而且默认是轮询机制

no_ack=True如果注释掉的话,消费者端不给服务器端确认收到消息,服务器端就不会把要发的消息从队列里清除

如下图注释了no_ack,加了一个时间,

python运维开发之第十一天(RabbitMQ,redis)

开启三个消费者,一个生产者,生产者只send一次数据,挨个停止consumer,会发现同一条消息会被重新发给下一个consumer,直到producer收到consumer的确认收到的消息

python运维开发之第十一天(RabbitMQ,redis)

3、队列查询

python运维开发之第十一天(RabbitMQ,redis)

清除队列消息

python运维开发之第十一天(RabbitMQ,redis)

4、消息持久化

(1)durable只是队列持久化

channel.queue_declare(queue='hello',durable=True)

生产者和消费者都需要添加durable=True

(2)要实现消息持久化,还需要

python运维开发之第十一天(RabbitMQ,redis)

5、消息(1对多)实现权重功能

消费者端添加在消费消息之前

channel.basic_qos(prefetch_count=1)

6、广播消息fanout(纯广播)订阅发布

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/ import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout')
#message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!2" channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message) connection.close()

fanout_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("random queuename",queue_name) channel.queue_bind(exchange='logs',
queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

fanout_consumer

7、direct广播模式(有选择性的发送接收消息)

import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

direct_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1) for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) print(severities)
print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

direct_consumer

python运维开发之第十一天(RabbitMQ,redis)

8、更细致的消息判断 type = topic

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/ import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

topic_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/ import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

topic_consumer

python运维开发之第十一天(RabbitMQ,redis)


 
 

python运维开发之第十一天(RabbitMQ,redis)的更多相关文章

  1. Python运维开发基础10-函数基础【转】

    一,函数的非固定参数 1.1 默认参数 在定义形参的时候,提前给形参赋一个固定的值. #代码演示: def test(x,y=2): #形参里有一个默认参数 print (x) print (y) t ...

  2. Python运维开发基础09-函数基础【转】

    上节作业回顾 #!/usr/bin/env python3 # -*- coding:utf-8 -*- # author:Mr.chen # 实现简单的shell命令sed的替换功能 import ...

  3. Python运维开发基础08-文件基础【转】

    一,文件的其他打开模式 "+"表示可以同时读写某个文件: r+,可读写文件(可读:可写:可追加) w+,写读(不常用) a+,同a(不常用 "U"表示在读取时, ...

  4. Python运维开发基础07-文件基础【转】

    一,文件的基础操作 对文件操作的流程 [x] :打开文件,得到文件句柄并赋值给一个变量 [x] :通过句柄对文件进行操作 [x] :关闭文件 创建初始操作模板文件 [root@localhost sc ...

  5. Python运维开发基础06-语法基础【转】

    上节作业回顾 (讲解+温习120分钟) #!/usr/bin/env python3 # -*- coding:utf-8 -*- # author:Mr.chen # 添加商家入口和用户入口并实现物 ...

  6. Python运维开发基础05-语法基础【转】

    上节作业回顾(讲解+温习90分钟) #!/usr/bin/env python # -*- coding:utf-8 -*- # author:Mr.chen import os,time Tag = ...

  7. Python运维开发基础04-语法基础【转】

    上节作业回顾(讲解+温习90分钟) #!/usr/bin/env python3 # -*- coding:utf-8 -*- # author:Mr.chen # 仅用列表+循环实现“简单的购物车程 ...

  8. Python运维开发基础03-语法基础 【转】

    上节作业回顾(讲解+温习60分钟) #!/usr/bin/env python3 # -*- coding:utf-8 -*- # author:Mr.chen #只用变量和字符串+循环实现“用户登陆 ...

  9. Python运维开发基础02-语法基础【转】

    上节作业回顾(讲解+温习60分钟) #!/bin/bash #user login User="yunjisuan" Passwd="666666" User2 ...

随机推荐

  1. openswitch db files

    http://openvswitch.org/support/dist-docs/ovsdb-tool.1.html FILES The default db is /etc/openvswitch/ ...

  2. Qt WebEngine 网页交互

    环境:Qt5.7.0,VS2013 一.简单介绍 从 Qt5.4 开始已经去掉 Qt WebKit 模块了,使用的是 chrome 内核封装的 QtWebEngine,浏览器相关的类有以下几个: QW ...

  3. Android Studio中如何创建AIDL

    实现客户端添加Book,Service接收并打印出书籍信息 一.创建Book类 1.创建文件夹(本文命名为aidl) 2.创建Book类并继承Parcelable接口(原因:AIDL只能传送继承Par ...

  4. 卸载QTP

    卸载QTP11.5 1.首先需要卸载QTP11.5,点击exe文件,不点击重启 2.然后删除C:"Program Files"下的Mercury Interactive文件夹.(默 ...

  5. 51nod1057-N的阶乘(大数乘法巧解)

    这道大数乘法开始我是想套板子模拟的..然后就发现2/3的例子都wa了.(惊了).然后在思考后发现n2的板子的确过不了这么多的大数.(不看题的下场).所以,我在网上发现了分块求大数的方法.%%% 思路来 ...

  6. DevExpress v18.1新版亮点——CodeRush for VS篇(一)

    用户界面套包DevExpress v18.1日前正式发布,本站将以连载的形式为大家介绍各版本新增内容.本文将介绍了CodeRush for Visual Studio v18.1 的新功能,快来下载试 ...

  7. JavaWeb项目中web.xml有关servlet的基本配置

    JavaWeb项目中web.xml有关servlet的基本配置: 我们注意到,tomcat下的conf中也有一个web.xml文件,没错的,所有的JavaWeb项目中web.xml都继承自服务器下的w ...

  8. [日常] Go语言圣经--示例: 并发的Clock服务习题

    练习 8.1: 修改clock2来支持传入参数作为端口号,然后写一个clockwall的程序,这个程序可以同时与多个clock服务器通信,从多服务器中读取时间,并且在一个表格中一次显示所有服务传回的结 ...

  9. [Android Security] 如何把java代码转换成smali代码

    copy :https://www.cnblogs.com/gordon0918/p/5466514.html 1.概述 Smali是Android系统中Dalvik虚拟机指令语言,在apk逆向过程中 ...

  10. webservice(二)简单实例

    1.建立WSDL文件      建立WSDL的工具很多,eclipse.zendstudio.vs都可以,我个人建议自己写,熟悉结构,另外自动工具对xml schame类型支持在类型中可能会报错. 下 ...