Openstack源码分析 Ml2Plugin start_rpc_listener流程

时间:2022-03-24 22:06:11

上一篇分析到rpc服务的启动过程,主要就是在子进程中调用插件的'start_rpc_listener'方法来消费特定的mq队列消息,并将消息转换为本地方法调用进行处理。本篇详细分析这个流程。


首先把涉及到的主要类的关系图列出来,边结合类图边分析代码:


Openstack源码分析 Ml2Plugin start_rpc_listener流程


对照上面的类图,分析Ml2Plugin核心插件的"start_rpc_listeners"函数:

@log_helpers.log_method_call
def start_rpc_listeners(self):
"""Start the RPC loop to let the plugin communicate with agents."""
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection()
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,
[resources_rpc.ResourcesPushToServerRpcCallback()],
fanout=True)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
return self.conn.consume_in_threads()

结合代码和类图一点儿一点儿分析:

 self._setup_rpc()

def _setup_rpc(self):    """Initialize components to support agent communication."""    self.endpoints = [        rpc.RpcCallbacks(self.notifier, self.type_manager),        securitygroups_rpc.SecurityGroupServerRpcCallback(),        dvr_rpc.DVRServerRpcCallback(),        dhcp_rpc.DhcpRpcCallback(),        agents_db.AgentExtRpcCallback(),        metadata_rpc.MetadataRpcCallback(),        resources_rpc.ResourcesPullRpcCallback()    ]

    _setup_rpc函数的作用是设置插件的endpoints,这个endpoints是rpc消息分发的目的地,所有的rpc调用都会从endpoints中获取。从类图中可以看到,endpoints会被RPCDispatcher对象使用,用于消息分放时调用endpoints的指定函数。


self.topic = topics.PLUGIN

从neutron/common/topics.py中获取Ml2Plugin所关心的队列的topic.这个topic是消息队列所需的,后面介绍。

PLUGIN = 'q-plugin'


self.conn = n_rpc.create_connection()

@removals.removed_kwarg('new')
def create_connection(new=True):
# NOTE(salv-orlando): This is a clever interpretation of the factory design
# patter aimed at preventing plugins from initializing RPC servers upon
# initialization when they are running in the REST over HTTP API server.
# The educated reader will perfectly be able that this a fairly dirty hack
# to avoid having to change the initialization process of every plugin.
if RPC_DISABLED:
return VoidConnection()
return Connection()

    然后调用neutron/common/rpc.py中的create_connection函数创建一个Connection对象。从类图可知,Connection对象的两个主要方法分别是create_consummer和consume_in_threads.内部还有一个rpc_servers的列表,用于存储create_consummer创建的所有rpc_server,这个rpc_server是类图中的“oslo_messaging.server::MessageHandlingServer”,它会通过Dispatcher对象分发消息,并通过Transport对象来监听指定的消息队列,从类图中也可以看出这一点。


self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(
topics.SERVER_RESOURCE_VERSIONS,
[resources_rpc.ResourcesPushToServerRpcCallback()],
fanout=True)
# process state reports despite dedicated rpc workers
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
    然后,调用Connection对象的create_consumer方法创建了3个rpc_server,topic分别为:

PLUGIN = 'q-plugin'

SERVER_RESOURCE_VERSIONS = 'q-server-resource-versions'

REPORTS = 'q-reports-plugin'

neutorn/common/rpc.py:

class Connection(object):

def __init__(self):
super(Connection, self).__init__()
self.servers = []

def create_consumer(self, topic, endpoints, fanout=False):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)

    首先创建了一个Target对象,这个Target对象对消费者来说代表了消息来源,对生产者来说代表了消息的目的地址。

    对于消费者来说,创建时需要指定server地址和topic。对于生产者来说,需要指定topic。

    从类图中看出这个Target对象主要是用于Dispatcher,Dispatcher分发消息时会使用它的一些属性。


    然后调用get_server创建rpc_server,并将其加到servers列表中。从类图中可以看到,rpc_server也实现了上一节讲到的"oslo_service.service::ServiceBase“接口,因此也可以通过start来启动服务。


def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
'eventlet', serializer)
def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None):
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)

    可以看到先创建了一个serialize,这个是用来进行序列化操作的,用于将通用的RPC Context转换为Neutron的Context。

然后就是调用get_rpc_server真正创建rpc_server,可以看到创建了一个RPCDispatcher用于消息分发,并将其用于构造了MessageHandlingServer,这个就是实际上的rpc_server。


这里还有一个TRANSPORT,这个对象用于封装实际的RPC通道。

TRANSPORT在rpc初始化时会进行配置:

oslo_messaging/transport.py:

try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)

return Transport(mgr.driver)

    会从配置文件中获取实际使用的的消息驱动url,默认都是rabbitMQ,因此实际上使用的驱动是RabbitDriver。从上面的类图中也可以看出,Transport会使用实际了AMQP协议的rabbitDriver驱动。这样Transport实际的监听,发送等动作都会交由实际的驱动来处理。


return self.conn.consume_in_threads()
创建完成了3个rpc_server后,最后调用consume_in_threads来启动rpc_server:

neutron/common/rpc.py:

def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
这里依次调用3个rpc_server的start方法。这里的server就是上面创建的MessageHandlingServer,我们来看下start方法:


oslo_messaging/server/py:

@ordered(reset_after='stop')
def start(self, override_pool_size=None):
if self._started:
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
'racy. It is deprecated, and will become a noop '
'in a future release of oslo.messaging. If you '
'need to restart MessageHandlingServer you should '
'instantiate a new object.'))
self._started = True

try:
self.listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)

executor_opts = {}

if self.executor_type == "threading":
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
elif self.executor_type == "eventlet":
eventletutils.warn_eventlet_not_patched(
expected_patched_modules=['thread'],
what="the 'oslo.messaging eventlet executor'")
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)

self._work_executor = self._executor_cls(**executor_opts)
self._poll_executor = self._executor_cls(**executor_opts)

return lambda: self._poll_executor.submit(self._runner)

self.listener = self.dispatcher._listen(self.transport)
首先调用dispatcher的_listen方法开始监听消息,最终会调用对应的消息驱动的listen消息,由于使用的是rabbitMQ,因此最终会调用:

oslo_messaging/_drivers/amqpdriver.py:

def listen(self, target):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)

listener = AMQPListener(self, conn)

conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic,
target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)

return listener

可以看到对于前面3个topic,分别会创建3个consumer


对于PLUGIN = 'q-plugin':

topic消费者:

exchange_name:neutron

routing-key:'q-plugin'

callback:AMQPListener

queue:'q-plugin'


exchange_name:neutron:

routing-key:'q-plugin.ubuntu'

callback:AMQPListener

queue:'q-plugin.ubuntu'


faout消费者:

exchange_name:q-plugin_fanout

routing-key:'q-plugin'

callback:AMQPListener

queue:'q-plugin_fanout_28c93eb354c741959d5a9c362905f05f'后面的特殊字符串为随机生成的唯一串。


对于PLUGIN = 'q-reports-plugin':

topic消费者:

exchange_name:neutron

routing-key:'q-reports-plugin'

callback:AMQPListener

queue:'q-reports-plugin'


exchange_name:neutron:

routing-key:'q-reports-plugin.ubuntu'

callback:AMQPListener

queue:'q-reports-plugin.ubuntu'


faout消费者:

exchange_name:q-reports-plugin_fanout

routing-key:'q-reports-plugin'

callback:AMQPListener

queue:'q-reports-plugin_fanout_71cfb512d9b54decab848e5d69cd17a3'后面的特殊字符串为随机生成的唯一串。


对于PLUGIN = 'q-server-resource-versions':

topic消费者:

exchange_name:neutron

routing-key:'q-server-resource-versions'

callback:AMQPListener

queue:'q-server-resource-versions'


exchange_name:neutron:

routing-key:'q-server-resource-versions.ubuntu'

callback:AMQPListener

queue:'q-server-resource-versions.ubuntu'


faout消费者:

exchange_name:q-server-resource-versions_fanout

routing-key:'q-server-resource-versions'

callback:AMQPListener

queue:'q-server-resource-versions_fanout_9d9ff9e336274b1a8293c0816ab8930b'后面的特殊字符串为随机生成的唯一串。


可以看到,对于topic消费者,队列名称和路由键值一样,对于faout消费者,队列名称为交换器名称加一个唯一的随机ID。


分析完了消费者队列的创建,我们知道rpc调用最终会发布到neutron交换器并会被路由给这些队列消费者。我们看下消费者如何处理这些rpc消息。

return lambda: self._poll_executor.submit(self._runner)

MessageHandlingServer在通过listen创建了所有的消费者队列后,便会运行_runner方法.这里的_poll_executor和_worker_executor是:

futurist._futures.GreenThreadPoolExecutor,可以将其理解为线程池,用它来提高rpc服务的吞吐量。这个后面单独再用文章介绍它的实现。

@excutils.forever_retry_uncaught_exceptions
def _runner(self):
while self._started:
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)

if incoming:
self._submit_work(self.dispatcher(incoming))

# listener is stopped but we need to process all already consumed
# messages
while True:
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)

if incoming:
self._submit_work(self.dispatcher(incoming))
else:
return
可以看到调用listener的poll方法等待有消息到来 ,然后交给dispatcher来处理。这个listener就是start中AMQP驱动返回的AMQPListener对象。_dispatcher就是RPCDispatcher对象。


def _submit_work(self, callback):
fut = self._work_executor.submit(callback.run)
fut.add_done_callback(lambda f: callback.done())
可以看到_submit_worker方法会将参数callback的run方法提交给_work_executro执行,并为返回的future安装完成回调即callback的done()方法。因此我们

只需看下self.dispatcher(incoming)方法返回的callback对象的run,done2个方法。


self.dispatcher(incoming)是调用RPCDispatcher对象的__call__方法:

def __call__(self, incoming):
incoming[0].acknowledge()
return dispatcher.DispatcherExecutorContext(
incoming[0], self._dispatch_and_reply)
首先对消息进行确认,这样Rabbit服务器就可以放心地删除消息了。然后返回了一个"dispatcher.DispatcherExecutorContext“对象:

oslo_messaging/dispatcher.py:

class DispatcherExecutorContext(object):

def __init__(self, incoming, dispatch, post=None):
self._result = None
self._incoming = incoming
self._dispatch = dispatch
self._post = post

def run(self):
try:
self._result = self._dispatch(self._incoming)
except Exception:
msg = _('The dispatcher method must catches all exceptions')
LOG.exception(msg)
raise RuntimeError(msg)

def done(self):
if self._post is not None:
self._post(self._incoming, self._result)

可以看到run方法会调用刚刚构造对象时传入的"self._dispatch_and_reply"方法,这个就是刚刚提交给_work_executor执行的处理消息方法。

最终会调用RPCDispatcher的”_dispatch“方法:

def _dispatch(self, ctxt, message):
"""Dispatch an RPC message to the appropriate endpoint method.

:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
:raises: NoSuchMethod, UnsupportedVersion
"""
method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')

found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
if not target:
target = self._default_target

if not (self._is_namespace(target, namespace) and
self._is_compatible(target, version)):
continue

if hasattr(endpoint, method):
localcontext._set_local_context(ctxt)
try:
return self._do_dispatch(endpoint, method, ctxt, args)
finally:
localcontext._clear_local_context()

found_compatible = True

if found_compatible:
raise NoSuchMethod(method)
else:
raise UnsupportedVersion(version, method=method)
可以看到会先从消息中获取以下属性:

    method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')
然后最endpoints中获取消息中指定的method方法并调用之。这个endpoints就是前面也讲过是最终消息处理的目的地,对于Ml2Plugin来说是:

def _setup_rpc(self):
"""Initialize components to support agent communication."""
self.endpoints = [
rpc.RpcCallbacks(self.notifier, self.type_manager),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback(),
metadata_rpc.MetadataRpcCallback(),
resources_rpc.ResourcesPullRpcCallback()
]
这样消息就会交由这些对象中的指定方法来执行了。

done方法会调用构造对象时传递的post参数,由于这里为空,因此什么也没有做。


    这样我们就分析完成了start_rpc_listeners的流程,可以看到插件通过创建消息队列来消费RPC消息,并将RPC消息转换为本地的方法交于endpoints处理。这样就完成了RPC服务端的整个处理流程。