python3之rabbitMQ

时间:2022-11-05 20:54:50

1、RabbitMQ介绍

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

消息队列技术是分布式应用间交换信息的一种技术;消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走;通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。

MQ主要作用是接受和转发消息。你可以想想在生活中的一种场景:当你把信件的投进邮筒,邮递员肯定最终会将信件送给收件人。我们可以把MQ比作 邮局和邮递员。

MQ和邮局的主要区别是,它不处理消息,但是,它会接受数据、存储消息数据、转发消息。

2、安装RabbitMQ

linux上安装:

安装配置epel源

   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 
安装erlang
   $ yum -y install erlang
安装RabbitMQ
   $ yum -y install rabbitmq-server

windows上安装:

(1)首先,您需要安装支持的 Windows 版Erlang。运行Erlang Windows安装程序。Erlang将出现在开始菜单中,设置erlang的环境变量(C:\erl9.2\bin;),测试erlang是否安装正确:cmd-输入:erl,能看到eshell版本号,说明安装成功!

(2)下载rabbitMQ:(安装下一步完成即可)

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.3/rabbitmq-server-windows-3.7.3.zip

(3)安装RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及交换的工作情况,安装方法是:打开命令行cd进入rabbitmq的sbin目录(我的目录是:C:\ software \ rabbitmq \ rabbitmq_server-3.6.5 \ sbin),输入:rabbitmq-plugins enable rabbitmq_management命令,稍等会会发现出现plugins安装成功的提示,默认是安装6个插件。

插件安装完之后,在浏览器输入的http://本地主机:15672进行验证

如果不能打开页面解决方法:首先在命令行输入:rabbitmq-service stop,接着输入rabbitmq-service remove,再接着输入rabbitmq-service install,接着输入rabbitmq-service start,最后重新输入rabbitmq-plugins enable rabbitmq_management试试,我是这样解决的。

创建用户名,密码,绑定角色:

RabbitMQ报错解决方法:

C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat status
Status of node rabbit@DESKTOP-6JT7D2H ...
Error: unable to perform an operation on node 'rabbit@DESKTOP-6JT7D2H'. Please see diagnostics information and suggestions below. Most common reasons for this are: * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
* CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
* Target node is not running In addition to the diagnostics info below: * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
* Consult server logs on node rabbit@DESKTOP-6JT7D2H DIAGNOSTICS
=========== attempted to contact: ['rabbit@DESKTOP-6JT7D2H'] rabbit@DESKTOP-6JT7D2H:
* connected to epmd (port ) on DESKTOP-6JT7D2H
* epmd reports node 'rabbit' uses port for inter-node and CLI tool traffic
* TCP connection succeeded but Erlang distribution failed * Authentication failed (rejected by the remote node), please check the Erlang cookie Current node details:
* node name: 'rabbitmqcli57@DESKTOP-6JT7D2H'
* effective user's home directory: C:\Users\Administrator.DESKTOP-6JT7D2H
* Erlang cookie hash: RmzKErjVZUcsMU8wSgBGbA== 解决方法:
将C:\Users\tracyclock\.erlang.cookie 文件拷贝到C:\Windows\System32\config\systemprofile替换掉.erlang.cookie文件
重启rabbitMQ服务:net stop RabbitMQ && net start RabbitMQ

查看用户及用户角色:rabbitmqctl.bat list_users

C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat list_users
Listing users ...
guest [administrator]

新增用户:rabbitmqctl.bat add_user username password

C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat add_user admin
Adding user "admin" ... C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat list_users
Listing users ...
admin []
guest [administrator]

用户角色:

rabbitmq用户角色可分为五类:超级管理员, 监控者, 策略制定者, 普通管理者以及其他。

(1) 超级管理员(administrator)

可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

(2) 监控者(monitoring)

可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

(3) 策略制定者(policymaker)

可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。

(4) 普通管理者(management)

仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

(5) 其他的

无法登陆管理控制台,通常就是普通的生产者和消费者。

授权用户为超级管理员:rabbitmqctl.bat set_user_tags admin administrator

C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ... C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat list_users
Listing users ...
admin [administrator]
guest [administrator]

用户可以设置多个角色:rabbitmqctl.bat set_user_tags username tag1 tag2 ...

修改用户密码:rabbitmqctl change_password userName newPassword

C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat change_password admin
Changing password for user "admin" ...

删掉用户:rabbitmqctl.bat delete_user username

C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat delete_user guest
Deleting user "guest" ... C:\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmqctl.bat list_users
Listing users ...
admin [administrator]

权限相关命令为:

(1) 设置用户权限

rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP

(2) 查看(指定hostpath)所有用户的权限信息

rabbitmqctl  list_permissions  [-p  VHostPath]

(3) 查看指定用户的权限信息

rabbitmqctl  list_user_permissions  User

(4)  清除用户的权限信息

rabbitmqctl  clear_permissions  [-p VHostPath]  User

3、python3使用pika python客户端

发出消息(生产者):

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/22 15:51
# @Author : Py.qi
# @File : rabbitMQ_send_1.py
# @Software: PyCharm import pika,sys
print('send....start....')
while True:
inputso=input('soinsideto:')
if inputso == 'quit':
break
#与RabbitMQ服务器建立链接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#建立隧道
channel = connection.channel()
#创建队列名称zhang
channel.queue_declare(queue='zhang')
#发送信息:exchange指定交换,routing_key指定队列名,body指定消息内容
  channel.basic_publish(exchange='',routing_key='zhang',body=inputso)
#关闭链接
connection.close()

接收者(消费者):

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/22 15:57
# @Author : Py.qi
# @File : rabbitMQ_rescv_1.py
# @Software: PyCharm import pika
#创建链接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#建立隧道
channel = connection.channel()
#创建队列名,此处也可省略,在找不到队列时创建
channel.queue_declare(queue='zhang')
#使用回调函数callback来接收消息并打印消息
def callback(ch,method,properties,body):
print('recived:',body)
#指定队列接收消息,callback接收消息,queue指定队列,no_ack不给发送者发送确认消息
channel.basic_consume(callback,queue='zhang',no_ack=True)
print('waiting for message,to exit press ctrl+c')
#持续接收消息,阻塞
channel.start_consuming()

默认情况下,RabbitMQ会按顺序将每条消息发送到下一个使用者,每个消费者按顺序获得同样数量的消息,这种分配方式称为循环法。

(1)消息持久化:

当RabbitMQ退出或者崩溃时,它会忘记队列和消息,需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久化。

在创建队列时指定durable为True来标记队列持久化:

channel.queue_declare(queue='hehe',durable=True)

将消息标记为持久化,通过提供值为2的delivery_mode属性

channel.basic_publish(exchange='',
routing_key='hehe',
body=inputso,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))

(2)消息公平分发

如果RabbitMQ只管按顺序把消息发送到每个消费者上,不考虑消费者的负载,很可能出现一个机器配置不高的消费者那里堆积很多消息处理不完,同时配置高的机器却很轻松。为了解决这个问题,可以在每个消费者端,配置perfetch=1,意识就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再发新消息了。

python3之rabbitMQ

为了解决这个问题,我们可以使用basic.qos方法和 prefetch_count = 1设置。这告诉RabbitMQ一次不要向工作人员发送多于一条消息。或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是仍然忙碌的下一个工作人员。

channel.basic_qos(prefetch_count = 1)

完整代码:

生产者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/22 15:51
# @Author : Py.qi
# @File : rabbitMQ_send_1.py
# @Software: PyCharm import pika
print('send....start....')
while True:
inputso=input('soinsideto:')
if inputso == 'quit':
break
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel = connection.channel()
channel.queue_declare(queue='hehe',durable=True)
channel.basic_publish(exchange='',
routing_key='hehe',
body=inputso,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
connection.close()

消费者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/22 15:57
# @Author : Py.qi
# @File : rabbitMQ_rescv_1.py
# @Software: PyCharm import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel = connection.channel()
channel.queue_declare(queue='hehe',durable=True)
def callback(ch,method,properties,body):
print('recived:',body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) #确认发送消息个数
channel.basic_consume(callback,queue='hehe',no_ack=True)
print('waiting for message,to exit press ctrl+c')
channel.start_consuming()

(3)消息发布与订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

发布消息类似于广播效果,需要用到exchange,在定义exchange时指定类型来决定哪些queue符合条件,可以接收消息:

有几种可用的交换类型:direct, topic, headers 和fanout。我们将关注最后一个fanout。我们创建该类型的交换,并将其称为logs:

channel.exchange_declare(exchange='logs',
exchange_type='fanout')

fanout:所有bind到此exchange的queue都可以接收到消息

direct:通过routingKey和exchange决定的哪个唯一的queue可以接收消息

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。

  表达式符号:#代表一个或多个字符,*代表任何字符

fanout交换,它只是将收到的所有消息广播到它所知道的所有队列中。

要列出服务器上的交换,可以使用命令rabbitmqctl:

sudo rabbitmqctl list_exchanges

广播消息,fanout实例:

生产者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/24 17:05
# @Author : Py.qi
# @File : rabbit_send_fanout.py
# @Software: PyCharm
import pika
import sys
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel=connection.channel()
#指定交换类型
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#message = ' '.join(sys.argv[1:]) or 'info:hello world!'
for i in range(10):
channel.basic_publish(exchange='logs',routing_key='',body=str(i))
print('[x]sent %r'%i)
connection.close()

消费者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/24 17:12
# @Author : Py.qi
# @File : rabbit_recv_fanout.py
# @Software: PyCharm import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.146.129'))
channel = connection.channel()
#指定交换类型
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
#让服务器随机生成队列名,一旦消费者链接关闭,队列将被删除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#将队列绑定到交换上
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

接收消息时指定队列, exchange type = direct

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判断应该将数据发送至指定队列。

生产者:发布消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/26 9:25
# @Author : Py.qi
# @File : rabbit_send_direct.py
# @Software: PyCharm import pika
import sys conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel = conn.channel()
#绑定exchange类型为指定接收
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#判断参数个数
severity=sys.argv[1] if len(sys.argv) > 1 else 'info'
#发送消息
message = ' '.join(sys.argv[2:]) or 'hello world'
#消息发送到exchange交换队列
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print('[x]sent %r:%r'%(severity,message))
conn.close()

消费者:接收消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/26 9:54
# @Author : Py.qi
# @File : rabbit_recver_direct.py
# @Software: PyCharm import pika,sys
conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel = conn.channel()
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#随机队列名,结束即删除
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue
#接收队列消息关键参数
severities=sys.argv[1:]
if not severities:
sys.stderr.write('Usage:%s[info][warning][error]\n'%sys.argv[0])
sys.exit(1)
#循环将消息发送到有关键字队列中
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print('[*]waiting for logs,To exit press CTRL+C')
def callback(ch,method,properites,body):
print('[x]%r:%r'%(method.routing_key,body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

模糊匹配:

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

#:表示可以匹配0个或多个单词

*:表示只能匹配一个单词

生产者:发送消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/26 10:42
# @Author : Py.qi
# @File : rabbit_send_topic.py
# @Software: PyCharm import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.146.129'))
channel = connection.channel()
#指定exchange为模糊匹配topic
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

消费者:接收消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/2/26 10:43
# @Author : Py.qi
# @File : rabbit_recve_topic.py
# @Software: PyCharm
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.146.129'))
channel = connection.channel()
#exchange模式topic
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

python3之rabbitMQ