前提条件:rabbitmq服务启动、oslo相关模块已安装、 /etc/test/test.conf文件存在且配置了rabbitmq相关的配置项
主要配置项如下:
rabbit_host=127.0.0.1
rabbit_userid=test
rabbit_password=123456
rpc server端代码如下:
import oslo_messaging as messaging
from oslo_config import cfg
from oslo_service import service
CONF = cfg.CONF
TRANSPORT = None
CONF(None, project='test')
TRANSPORT = messaging.get_transport(CONF)class WorkerManager(object): target = messaging.Target(version='1.0') def __init__(self, host=None): self.host = hostdef test_get(self, context, get_id):
print "test_get method is called"class Service(service.Service):def __init__(self, host, binary, topic, manager=None, *args, **kwargs):super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.manager = WorkerManager(host=self.host)self.rpcserver = None
def start(self):target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
self.rpcserver = messaging.get_rpc_server(TRANSPORT, target, endpoints) self.rpcserver.start() def __getattr__(self, key): manager = self.__dict__.get('manager', None) return getattr(manager, key) def kill(self): self.stop() def stop(self, graceful=False): try: self.rpcserver.stop() self.rpcserver.wait() except Exception: pass super(Service, self).stop() def reset(self): """Reset a service in case it received a SIGHUP.""" passserver=Service(host='testHostname', binary='test-bin', topic='test')_launcher = service.launch(CONF, server, workers=1)_launcher.wait()rpc client端代码如下:
import oslo_messaging as messaging
from oslo_config import cfg
CONF = cfg.CONF
CONF(None, project='test')
TRANSPORT = None
TRANSPORT = messaging.get_transport(CONF)
class RpcApi(object):
def __init__(self):
target = messaging.Target(topic='test', version='1.0')
self.client = messaging.RPCClient(TRANSPORT, target)
def test_get(self, ctxt, get_id):
version = '1.0'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'test_get', get_id=get_id)
rpc_api=RpcApi()
rpc_api.test_get({},'aaaa')