上个项目中用到了activemq,只是简单应用,安装完成后直接是用就可以了。由于新项目中一些硬件的限制,需要把消息队列换成rabbitmq。
rabbitmq中的几种模式和机制比activemq多多了,根据业务需要,使用rpc实现功能,其中踩过的一些坑,有必要记录一下了。
上代码,目录结构分为 c_server、c_client、c_hanlder:
c_server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import time
import json
import io
import yaml
s_exchange = input ( "请输入交换机名称->>" ).decode( 'utf-8' ).strip()
s_queue = input ( "输入消息队列名称->>" ).decode( 'utf-8' ).strip()
credentials = pika.plaincredentials( 'system' , 'manager' )
connection = pika.blockingconnection(pika.connectionparameters(host = 'xxx.xxx.xxx.xxx' ,credentials = credentials))
# 定义
channel = connection.channel()
channel.exchange_declare(exchange = s_exchange, exchange_type = 'direct' )
channel.queue_declare(queue = s_queue, exclusive = true)
channel.queue_bind(queue = s_queue, exchange = s_exchange)
def s_manage(content):
# 解决unicode转码问题 json.jsondecoder().decode(content)
str_content = yaml.safe_load(json.loads(content,encoding = 'utf-8' ))
str_res = {
"errorid" : 0 ,
"resp" : str_content[ 'cmd' ],
"errorcont" : "成功"
}
return json.dumps(str_res)
def on_request(ch, method, props, body):
response = s_manage(body)
ch.basic_publish(exchange = '',
routing_key = props.reply_to,
properties = pika.basicproperties(correlation_id = \
props.correlation_id),
body = response)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count = 1 )
channel.basic_consume(on_request, queue = s_queue)
print ( " [x] awaiting rpc requests" )
channel.start_consuming()
|
c_client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import uuid
import json
import io
class rpcclient( object ):
def __init__( self ):
self .credentials = pika.plaincredentials( 'guest' , 'guest' )
self .connection = pika.blockingconnection(pika.connectionparameters(host = 'xxx.xxx.xxx.xxx' ,
credentials = self .credentials))
self .channel = self .connection.channel()
def on_response( self , ch, method, props, body):
if self .callback_id = = props.correlation_id:
self .response = body
ch.basic_ack(delivery_tag = method.delivery_tag)
def get_response( self , callback_queue, callback_id):
'''取队列里的值,获取callback_queued的执行结果'''
self .callback_id = callback_id
self .response = none
self .channel.queue_declare( 'q_manager' , durable = true)
self .channel.basic_consume( self .on_response, # 只要收到消息就执行on_response
queue = callback_queue)
while self .response is none:
self .connection.process_data_events() # 非阻塞版的start_consuming
return self .response
def call( self , queue_name, command, exchange,rout_key): # 命令下发
'''队列里发送数据'''
# result = self.channel.queue_declare(exclusive=false) #exclusive=false 必须这样写
self .callback_queue = 'q_manager' # result.method.queue
self .corr_id = str (uuid.uuid4())
self .channel.basic_publish(exchange = exchange,
routing_key = queue_name,
properties = pika.basicproperties(
reply_to = self .callback_queue, # 发送返回信息的队列name
correlation_id = self .corr_id, # 发送uuid 相当于验证码
),
body = command)
return self .callback_queue, self .corr_id
client
|
c_handler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from c_client import *
import random, time
import threading
import json
import sys
class handler( object ):
def __init__( self ):
self .information = {} # 后台进程信息
def check_all( self , * args):
'''查看所有信息'''
time.sleep( 2 )
print ( '获取消息' )
for key in self .information:
print ( "cid【%s】\t 队列【%s】\t 命令【%s】" % (key, self .information[key][ 0 ],
self .information[key][ 1 ]))
def check_task( self , cmd):
'''查看task_id执行结果'''
time.sleep( 2 )
try :
task_id = int (cmd)
print (task_id)
callback_queue = self .information[task_id][ 2 ]
callback_id = self .information[task_id][ 3 ]
client = rpcclient()
response = client.get_response(callback_queue, callback_id)
print (response)
# print(response.decode())
del self .information[task_id]
except keyerror as e :
print ( "error: [%s]" % e)
except indexerror as e:
print ( "error: [%s]" % e)
def run( self , user_cmd, host, exchange = ' ', rout_key=' ',que=' '):
try :
time.sleep( 2 )
command = user_cmd
task_id = random.randint( 10000 , 99999 )
client = rpcclient()
response = client.call(queue_name = host, command = command,exchange = exchange,rout_key = que)
self .information[task_id] = [host, command, response[ 0 ], response[ 1 ]]
except indexerror as e:
print ( "[error]:%s" % e)
def reflect( self , str ,cmd,host,exchange,que):
'''反射'''
if hasattr ( self , str ):
getattr ( self , str )(cmd,host,exchange,que)
def start( self , m,cmd, host, exchange,que):
while true:
user_resp = input ( "输入处理消息内容id->>" ).decode( 'utf-8' ).strip()
self .check_task(user_resp)
str = m
print ( self .information)
t1 = threading.thread(target = self .reflect, args = ( str ,cmd,host,exchange,que)) #多线程
t1.start()
s_exchange = input ( "请输入交换机名称->>" ).decode( 'utf-8' ).strip()
s_queue = input ( "输入消息队列名称->>" ).decode( 'utf-8' ).strip()
d_cmd_state = input ( "输入json命令->>" ).decode( 'utf-8' ).strip()
s_cmd = json.dumps(d_cmd_state)
handler = handler()
handler.start( 'run' ,s_cmd, s_queue, s_exchange, s_queue)
handler
|
注意要点:1、c_client 发布消息到rabbitmq 需要携带 服务器返回的队列名称,及corr_id
2、c_handler 做了处理,每次发送的内容都会放到task列表中,直到显示id号,就可以查询返回的内容,调用如下:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.cnblogs.com/dugufei/p/9105581.html