上一篇分析到rpc服务的启动过程,主要就是在子进程中调用插件的'start_rpc_listener'方法来消费特定的mq队列消息,并将消息转换为本地方法调用进行处理。本篇详细分析这个流程。
首先把涉及到的主要类的关系图列出来,边结合类图边分析代码:
对照上面的类图,分析Ml2Plugin核心插件的"start_rpc_listeners"函数:
@log_helpers.log_method_call
def start_rpc_listeners(self):
"""Start the RPC loop to let the plugin communicate with agents."""
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection()
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,
[resources_rpc.ResourcesPushToServerRpcCallback()],
fanout=True)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
return self.conn.consume_in_threads()
结合代码和类图一点儿一点儿分析:
self._setup_rpc()def _setup_rpc(self): """Initialize components to support agent communication.""" self.endpoints = [ rpc.RpcCallbacks(self.notifier, self.type_manager), securitygroups_rpc.SecurityGroupServerRpcCallback(), dvr_rpc.DVRServerRpcCallback(), dhcp_rpc.DhcpRpcCallback(), agents_db.AgentExtRpcCallback(), metadata_rpc.MetadataRpcCallback(), resources_rpc.ResourcesPullRpcCallback() ]
_setup_rpc函数的作用是设置插件的endpoints,这个endpoints是rpc消息分发的目的地,所有的rpc调用都会从endpoints中获取。从类图中可以看到,endpoints会被RPCDispatcher对象使用,用于消息分放时调用endpoints的指定函数。
self.topic = topics.PLUGIN
从neutron/common/topics.py中获取Ml2Plugin所关心的队列的topic.这个topic是消息队列所需的,后面介绍。
PLUGIN = 'q-plugin'
self.conn = n_rpc.create_connection()
@removals.removed_kwarg('new')
def create_connection(new=True):
# NOTE(salv-orlando): This is a clever interpretation of the factory design
# patter aimed at preventing plugins from initializing RPC servers upon
# initialization when they are running in the REST over HTTP API server.
# The educated reader will perfectly be able that this a fairly dirty hack
# to avoid having to change the initialization process of every plugin.
if RPC_DISABLED:
return VoidConnection()
return Connection()
然后调用neutron/common/rpc.py中的create_connection函数创建一个Connection对象。从类图可知,Connection对象的两个主要方法分别是create_consummer和consume_in_threads.内部还有一个rpc_servers的列表,用于存储create_consummer创建的所有rpc_server,这个rpc_server是类图中的“oslo_messaging.server::MessageHandlingServer”,它会通过Dispatcher对象分发消息,并通过Transport对象来监听指定的消息队列,从类图中也可以看出这一点。
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)然后,调用Connection对象的create_consumer方法创建了3个rpc_server,topic分别为:
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,
[resources_rpc.ResourcesPushToServerRpcCallback()],
fanout=True)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
PLUGIN = 'q-plugin'
SERVER_RESOURCE_VERSIONS = 'q-server-resource-versions'
REPORTS = 'q-reports-plugin'
neutorn/common/rpc.py:
class Connection(object):
def __init__(self):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)
首先创建了一个Target对象,这个Target对象对消费者来说代表了消息来源,对生产者来说代表了消息的目的地址。
对于消费者来说,创建时需要指定server地址和topic。对于生产者来说,需要指定topic。
从类图中看出这个Target对象主要是用于Dispatcher,Dispatcher分发消息时会使用它的一些属性。
然后调用get_server创建rpc_server,并将其加到servers列表中。从类图中可以看到,rpc_server也实现了上一节讲到的"oslo_service.service::ServiceBase“接口,因此也可以通过start来启动服务。
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
'eventlet', serializer)
def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None):
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
可以看到先创建了一个serialize,这个是用来进行序列化操作的,用于将通用的RPC Context转换为Neutron的Context。
然后就是调用get_rpc_server真正创建rpc_server,可以看到创建了一个RPCDispatcher用于消息分发,并将其用于构造了MessageHandlingServer,这个就是实际上的rpc_server。
这里还有一个TRANSPORT,这个对象用于封装实际的RPC通道。
TRANSPORT在rpc初始化时会进行配置:
oslo_messaging/transport.py:
try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
会从配置文件中获取实际使用的的消息驱动url,默认都是rabbitMQ,因此实际上使用的驱动是RabbitDriver。从上面的类图中也可以看出,Transport会使用实际了AMQP协议的rabbitDriver驱动。这样Transport实际的监听,发送等动作都会交由实际的驱动来处理。
return self.conn.consume_in_threads()创建完成了3个rpc_server后,最后调用consume_in_threads来启动rpc_server:
neutron/common/rpc.py:
def consume_in_threads(self):这里依次调用3个rpc_server的start方法。这里的server就是上面创建的MessageHandlingServer,我们来看下start方法:
for server in self.servers:
server.start()
return self.servers
oslo_messaging/server/py:
@ordered(reset_after='stop')
def start(self, override_pool_size=None):
if self._started:
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
'racy. It is deprecated, and will become a noop '
'in a future release of oslo.messaging. If you '
'need to restart MessageHandlingServer you should '
'instantiate a new object.'))
self._started = True
try:
self.listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
executor_opts = {}
if self.executor_type == "threading":
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
elif self.executor_type == "eventlet":
eventletutils.warn_eventlet_not_patched(
expected_patched_modules=['thread'],
what="the 'oslo.messaging eventlet executor'")
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
self._poll_executor = self._executor_cls(**executor_opts)
return lambda: self._poll_executor.submit(self._runner)
self.listener = self.dispatcher._listen(self.transport)首先调用dispatcher的_listen方法开始监听消息,最终会调用对应的消息驱动的listen消息,由于使用的是rabbitMQ,因此最终会调用:
oslo_messaging/_drivers/amqpdriver.py:
def listen(self, target):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic,
target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
return listener
可以看到对于前面3个topic,分别会创建3个consumer
对于PLUGIN = 'q-plugin':
topic消费者:
exchange_name:neutron
routing-key:'q-plugin'
callback:AMQPListener
queue:'q-plugin'
exchange_name:neutron:
routing-key:'q-plugin.ubuntu'
callback:AMQPListener
queue:'q-plugin.ubuntu'
faout消费者:
exchange_name:q-plugin_fanout
routing-key:'q-plugin'
callback:AMQPListener
queue:'q-plugin_fanout_28c93eb354c741959d5a9c362905f05f'后面的特殊字符串为随机生成的唯一串。
对于PLUGIN = 'q-reports-plugin':
topic消费者:
exchange_name:neutron
routing-key:'q-reports-plugin'
callback:AMQPListener
queue:'q-reports-plugin'
exchange_name:neutron:
routing-key:'q-reports-plugin.ubuntu'
callback:AMQPListener
queue:'q-reports-plugin.ubuntu'
faout消费者:
exchange_name:q-reports-plugin_fanout
routing-key:'q-reports-plugin'
callback:AMQPListener
queue:'q-reports-plugin_fanout_71cfb512d9b54decab848e5d69cd17a3'后面的特殊字符串为随机生成的唯一串。
对于PLUGIN = 'q-server-resource-versions':
topic消费者:
exchange_name:neutron
routing-key:'q-server-resource-versions'
callback:AMQPListener
queue:'q-server-resource-versions'
exchange_name:neutron:
routing-key:'q-server-resource-versions.ubuntu'
callback:AMQPListener
queue:'q-server-resource-versions.ubuntu'
faout消费者:
exchange_name:q-server-resource-versions_fanout
routing-key:'q-server-resource-versions'
callback:AMQPListener
queue:'q-server-resource-versions_fanout_9d9ff9e336274b1a8293c0816ab8930b'后面的特殊字符串为随机生成的唯一串。
可以看到,对于topic消费者,队列名称和路由键值一样,对于faout消费者,队列名称为交换器名称加一个唯一的随机ID。
分析完了消费者队列的创建,我们知道rpc调用最终会发布到neutron交换器并会被路由给这些队列消费者。我们看下消费者如何处理这些rpc消息。
return lambda: self._poll_executor.submit(self._runner)MessageHandlingServer在通过listen创建了所有的消费者队列后,便会运行_runner方法.这里的_poll_executor和_worker_executor是:
futurist._futures.GreenThreadPoolExecutor,可以将其理解为线程池,用它来提高rpc服务的吞吐量。这个后面单独再用文章介绍它的实现。
@excutils.forever_retry_uncaught_exceptions可以看到调用listener的poll方法等待有消息到来 ,然后交给dispatcher来处理。这个listener就是start中AMQP驱动返回的AMQPListener对象。_dispatcher就是RPCDispatcher对象。
def _runner(self):
while self._started:
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)
if incoming:
self._submit_work(self.dispatcher(incoming))
# listener is stopped but we need to process all already consumed
# messages
while True:
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)
if incoming:
self._submit_work(self.dispatcher(incoming))
else:
return
def _submit_work(self, callback):可以看到_submit_worker方法会将参数callback的run方法提交给_work_executro执行,并为返回的future安装完成回调即callback的done()方法。因此我们
fut = self._work_executor.submit(callback.run)
fut.add_done_callback(lambda f: callback.done())
只需看下self.dispatcher(incoming)方法返回的callback对象的run,done2个方法。
self.dispatcher(incoming)是调用RPCDispatcher对象的__call__方法:
def __call__(self, incoming):首先对消息进行确认,这样Rabbit服务器就可以放心地删除消息了。然后返回了一个"dispatcher.DispatcherExecutorContext“对象:
incoming[0].acknowledge()
return dispatcher.DispatcherExecutorContext(
incoming[0], self._dispatch_and_reply)
oslo_messaging/dispatcher.py:
class DispatcherExecutorContext(object):
def __init__(self, incoming, dispatch, post=None):
self._result = None
self._incoming = incoming
self._dispatch = dispatch
self._post = post
def run(self):
try:
self._result = self._dispatch(self._incoming)
except Exception:
msg = _('The dispatcher method must catches all exceptions')
LOG.exception(msg)
raise RuntimeError(msg)
def done(self):
if self._post is not None:
self._post(self._incoming, self._result)
可以看到run方法会调用刚刚构造对象时传入的"self._dispatch_and_reply"方法,这个就是刚刚提交给_work_executor执行的处理消息方法。
最终会调用RPCDispatcher的”_dispatch“方法:
def _dispatch(self, ctxt, message):可以看到会先从消息中获取以下属性:
"""Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
:raises: NoSuchMethod, UnsupportedVersion
"""
method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')
found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
if not target:
target = self._default_target
if not (self._is_namespace(target, namespace) and
self._is_compatible(target, version)):
continue
if hasattr(endpoint, method):
localcontext._set_local_context(ctxt)
try:
return self._do_dispatch(endpoint, method, ctxt, args)
finally:
localcontext._clear_local_context()
found_compatible = True
if found_compatible:
raise NoSuchMethod(method)
else:
raise UnsupportedVersion(version, method=method)
method = message.get('method')然后最endpoints中获取消息中指定的method方法并调用之。这个endpoints就是前面也讲过是最终消息处理的目的地,对于Ml2Plugin来说是:
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')
def _setup_rpc(self):这样消息就会交由这些对象中的指定方法来执行了。
"""Initialize components to support agent communication."""
self.endpoints = [
rpc.RpcCallbacks(self.notifier, self.type_manager),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback(),
metadata_rpc.MetadataRpcCallback(),
resources_rpc.ResourcesPullRpcCallback()
]
done方法会调用构造对象时传递的post参数,由于这里为空,因此什么也没有做。
这样我们就分析完成了start_rpc_listeners的流程,可以看到插件通过创建消息队列来消费RPC消息,并将RPC消息转换为本地的方法交于endpoints处理。这样就完成了RPC服务端的整个处理流程。