Openstack liberty源码分析 之 云主机的启动过程2

时间:2022-01-04 05:01:30

接上一篇: Openstack liberty源码分析 之 云主机的启动过程1

nova-conductor

nova-api通过rpc发送启动云主机请求后,nova-conductor会收到该请求,根据路由映射,该请求会递交给
nova/conductor/manager.py.ComputeTaskManager.build_instances处理,如下(函数说明见注释):

def build_instances(self, context, instances, image, 
filter_properties,
admin_password, injected_files, requested_networks,
security_groups, block_device_mapping=None,
legacy_bdm=True)
:


# TODO(ndipanov): Remove block_device_mapping and
# legacy_bdm in version 2.0 of the RPC API.

#为nova-scheduler生成请求参数
request_spec = scheduler_utils.build_request_spec(context, image,
instances)
# TODO(danms): Remove this in version 2.0 of the RPC API
if (requested_networks and
not isinstance(requested_networks,
objects.NetworkRequestList)):
requested_networks = objects.NetworkRequestList(
objects=[objects.NetworkRequest.from_tuple(t)
for t in requested_networks])
# TODO(melwitt): Remove this in version 2.0 of the RPC API
flavor = filter_properties.get('instance_type')
if flavor and not isinstance(flavor, objects.Flavor):
"""
Code downstream may expect extra_specs to be populated
since it is receiving an object, so lookup the flavor to
ensure
"""

flavor = objects.Flavor.get_by_id(context, flavor['id'])
filter_properties = dict(filter_properties,
instance_type=flavor)

try:
'''
如果指定了group信息并且支持相应的filter,就添加group过滤信息到
filter_properties
'''

scheduler_utils.setup_instance_group(context, request_spec,
filter_properties)
# check retry policy. Rather ugly use of instances[0]...
# but if we've exceeded max retries... then we really only
# have a single instance.
scheduler_utils.populate_retry(filter_properties,
instances[0].uuid)
"""
发送同步消息给nova-scheduler,选取用于创建云主机的主机
接口调用如下:SchedulerClient -> SchedulerQueryClient -
> SchedulerAPI
与主机过滤相关的`filter_prorerties`参数是一个字典,内容如下:

{
u'instance_type':
Flavor(created_at=None,deleted=False,deleted_at=None,disabled=False,ephemeral_gb=0,extra_specs={},flavorid='2',id=5,is_public=True,memory_mb=2048,name='m1.small',projects=[],root_gb=20,rxtx_factor=1.0,swap=0,updated_at=None,vcpu_weight=0,vcpus=1),
'retry': {'num_attempts': 1, 'hosts': []},
u'scheduler_hints': {}
}

启动命令中没有指定`--hints`选项,所以`scheduler_hints`为空,而
`instance_type`是一个Flavor实例与命令行中的`--flavor 2`相关
"""

hosts = self.scheduler_client.select_destinations(context,
request_spec, filter_properties)
except Exception as exc:
updates = {'vm_state': vm_states.ERROR, 'task_state': None}
for instance in instances:
self._set_vm_state_and_notify(
context, instance.uuid, 'build_instances',
updates,
exc, request_spec)
return

#依次发送启动instance的rpc请求到host,由nova-compute完成instance的启动过程
for (instance, host) in itertools.izip(instances, hosts):
try:
instance.refresh()
except (exception.InstanceNotFound,
exception.InstanceInfoCacheNotFound):
LOG.debug('Instance deleted during build',
instance=instance)
continue

local_filter_props = copy.deepcopy(filter_properties)
scheduler_utils.populate_filter_properties(local_filter_props,
host)
# The block_device_mapping passed from the api doesn't contain
# instance specific information
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)

self.compute_rpcapi.build_and_run_instance(context,
instance=instance, host=host['host'],
image=image,
request_spec=request_spec,
filter_properties=local_filter_props,
admin_password=admin_password,
injected_files=injected_files,
requested_networks=requested_networks,
security_groups=security_groups,
block_device_mapping=bdms,
node=host['nodename'],
limits=host['limits'])

从上面的源码可以看出,build_instances方法主要实现过滤参数的组装,然后通过客户端api发送rpc请求到scheduler完成host的选取,最后发送rpc请求到选取的host上,由nova-compute完成云主机的启动。下面来看看nova-scheduler是如何选取主机的。

nova-scheduler

scheduler 收到客户端发来的select_destination请求消息后,根据路由映射会将请求映射给
nova/scheduler/manager.py.SchedulerManager.select_destinations方法, 该方法不做任何处理, 直接调用
nova/scheduler/filter_scheduler.py.FilterScheduler.select_destinations,一起来看看代码(函数说明见注释):

 def select_destinations(self, context, request_spec, 
filter_properties):
"""Selects a filtered set of hosts and nodes."""
self.notifier.info(context,
'scheduler.select_destinations.start',
dict(request_spec=request_spec))

num_instances = request_spec['num_instances']
#根据过滤条件选择合适的主机,返回一个host列表
selected_hosts = self._schedule(context, request_spec,
filter_properties)

# Couldn't fulfill the request_spec
if len(selected_hosts) < num_instances:
"""
NOTE(Rui Chen): If multiple creates failed, set
the updated time of selected HostState to None so
that these HostStates are refreshed according to
database in next schedule, and release the resource
consumed by instance in the process of selecting
host.
"
""
for host in selected_hosts:
host.obj.updated = None

"""
Log the details but don't put those into the
reason since
we don't want to give away too much information
about our actual environment.
"
""
LOG.debug('There are %(hosts)d hosts available but '
'%(num_instances)d instances requested to
build.'
,
{'hosts': len(selected_hosts),
'num_instances': num_instances})

reason = _('There are not enough hosts available.')
raise exception.NoValidHost(reason=reason)

#生成host字典
dests = [dict(host=host.obj.host,
nodename=host.obj.nodename,
limits=host.obj.limits) for host in
selected_hosts]

self.notifier.info(context,
'scheduler.select_destinations.end',
dict(request_spec=request_spec))
#返回给调用者的host列表,这里是返回给`nova-conductor`
return dests

函数实现很简单,调用_schedule方法选取host,如果选取的host数小于实例数抛异常,否则放回一个host字典给nova-conductor。下面继续来看看_schedule的实现,该方法根据过滤条件完成host的选取,过程大体分为三个部分:

  • 更新过滤参数
 def _schedule(self, context, request_spec, filter_properties):
"""Returns a list of hosts that meet the required
specs, ordered by their fitness.
"""

elevated = context.elevated()
instance_properties = request_spec['instance_properties']

# NOTE(danms): Instance here is still a dict, which is
#converted from
# an object. The pci_requests are a dict as well.
#Convert this when
# we get an object all the way to this path.
# TODO(sbauza): Will be fixed later by the RequestSpec object
pci_requests = instance_properties.get('pci_requests')
if pci_requests:
pci_requests = (
objects.InstancePCIRequests.from_request_spec_instance_props(
pci_requests))
instance_properties['pci_requests'] = pci_requests

instance_type = request_spec.get("instance_type", None)

update_group_hosts =
filter_properties.get('group_updated', False)
#加载nova.conf文件中指定的过滤选项
#用户可以通过nova.conf中的scheduler_json_config_location
#参数指定一个包含过滤参数的json格式的过滤文件
config_options = self._get_configuration_options()

#更新过滤参数
filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,
'instance_type': instance_type})
  • 获取所有的活动host
def _schedule(self, context, request_spec, filter_properties):

hosts = self._get_all_host_states(elevated)

_get_all_host_states方法是一个包装函数,它调用
nova/scheduler/host_manager.py.HostManager.get_all_host_states返回一个包含可用主机的迭代器,代码如下:

  def get_all_host_states(self, context):
"""Returns a list of HostStates that represents all the
hosts
the HostManager knows about. Also, each of the
consumable resources
in HostState are pre-populated and adjusted based on
data in the db.
"
""
#从数据库中获取`nova-compute`服务列表
#我的调试环境中只有一个:
service_refs = {service.host: service
for service in
objects.ServiceList.get_by_binary(
context, 'nova-compute')}
# Get resource usage across the available compute nodes:
#从数据库中获取所有计算节点及资源使用情况
#我的调试环境中只有一个:
compute_nodes = objects.ComputeNodeList.get_all(context)
seen_nodes = set()
#更新节点的资源,内存,磁盘,cpu等
for compute in compute_nodes:
service = service_refs.get(compute.host)

if not service:
LOG.warning(_LW(
"No compute service record found for host %
(host)s"
),
{'host': compute.host})
continue
host = compute.host
node = compute.hypervisor_hostname
state_key = (host, node)
host_state = self.host_state_map.get(state_key)
if host_state:
host_state.update_from_compute_node(compute)
else:
host_state = self.host_state_cls(host, node,
compute=compute)
self.host_state_map[state_key] = host_state
# We force to update the aggregates info each time
#a new request
# comes in, because some changes on the aggregates
#could have been
# happening after setting this field for the first time
host_state.aggregates = [self.aggs_by_id[agg_id]
for agg_id in
self.host_aggregates_map[
host_state.host]]
host_state.update_service(dict(service))
self._add_instance_info(context, compute, host_state)
seen_nodes.add(state_key)

# remove compute nodes from host_state_map if they are
#not active
dead_nodes = set(self.host_state_map.keys()) - seen_nodes
for state_key in dead_nodes:
host, node = state_key
LOG.info(_LI("Removing dead compute node %(host)s:%
(node)s "

"from scheduler"), {'host': host,
'node': node})
del self.host_state_map[state_key]

#返回主机迭代器
return six.itervalues(self.host_state_map)
  • 根据过滤条件,选取host
def _schedule(self, context, request_spec, filter_properties):
'''代码很直观,根据之前得到的hosts和过滤条件,选择符合要求的host,如
果有多个,则随机选取一个
'''

selected_hosts = []
num_instances = request_spec.get('num_instances', 1)
for num in range(num_instances):
# Filter local hosts based on requirements ...
# 返回满足过滤条件的host,所使用的过滤器可以通过nova.conf文件中
#的scheduler_default_filters选项指定,相关的过滤器代码位于
#nova/scheduler/filters
hosts = self.host_manager.get_filtered_hosts(hosts,
filter_properties, index=num)
if not hosts:
# Can't get any more locally.
break

LOG.debug("Filtered %(hosts)s", {'hosts': hosts})
#通过权重过滤器进一步过滤host,返回一个按照权重降序排列的host列
#表,权重过滤器可以通过nova.conf文件中
#的scheduler_weight_classes选项指定,相关的过滤器代码位于
#nova/scheduler/weights
weighed_hosts =
self.host_manager.get_weighed_hosts(hosts,
filter_properties)

LOG.debug("Weighed %(hosts)s", {'hosts': weighed_hosts})
#设置host随机选择范围,默认选择第一个
scheduler_host_subset_size = CONF.scheduler_host_subset_size
if scheduler_host_subset_size > len(weighed_hosts):
scheduler_host_subset_size = len(weighed_hosts)
if scheduler_host_subset_size < 1:
scheduler_host_subset_size = 1

chosen_host = random.choice(
weighed_hosts[0:scheduler_host_subset_size])
LOG.debug("Selected host: %(host)s", {'host': chosen_host})
selected_hosts.append(chosen_host)

# Now consume the resources so the filter/weights
# will change for the next instance.
chosen_host.obj.consume_from_instance(
instance_properties)
if update_group_hosts is True:
# NOTE(sbauza): Group details are serialized into a
#list now
# that they are populated by the conductor, we need
#to
# deserialize them
if isinstance(filter_properties['group_hosts'], list):
filter_properties['group_hosts'] = set(
filter_properties['group_hosts'])
filter_properties['group_hosts'].add(
chosen_host.obj.host)

_schedule方法通过get_filtered_hosts方法得到一个满足各种过滤条件的hosts集合,之后通过get_weighed_hosts方法得到一个最优weight_hosts集合,通常情况下选取第一个host作为目标,下面先来看看get_filtered_hosts的实现(函数说明见注释):

 def get_filtered_hosts(self, hosts, filter_properties,
filter_class_names=None, index=0):
"""Filter hosts and return only ones passing all filters."""

#如果filter_properties中指定了ignore_hosts,则排除
def _strip_ignore_hosts(host_map, hosts_to_ignore):
ignored_hosts = []
for host in hosts_to_ignore:
for (hostname, nodename) in list(host_map.keys()):
if host == hostname:
del host_map[(hostname, nodename)]
ignored_hosts.append(host)
ignored_hosts_str = ', '.join(ignored_hosts)
LOG.info(_LI('Host filter ignoring hosts: %s'),
ignored_hosts_str)
#如果filter_properties中指定了forced_hosts,排除不在
#forced_hosts中主机
def _match_forced_hosts(host_map, hosts_to_force):
forced_hosts = []
for (hostname, nodename) in list(host_map.keys()):
if hostname not in hosts_to_force:
del host_map[(hostname, nodename)]
else:
forced_hosts.append(hostname)
if host_map:
forced_hosts_str = ', '.join(forced_hosts)
msg = _LI('Host filter forcing available hosts
to %s'
)
else:
forced_hosts_str = ', '.join(hosts_to_force)
msg = _LI("No hosts matched due to not matching "
"'force_hosts' value of '%s'")
LOG.info(msg % forced_hosts_str)
#如果filter_properties中指定了forced_nodes,排除不在
#forced_nodes中主机
def _match_forced_nodes(host_map, nodes_to_force):
forced_nodes = []
for (hostname, nodename) in list(host_map.keys()):
if nodename not in nodes_to_force:
del host_map[(hostname, nodename)]
else:
forced_nodes.append(nodename)
if host_map:
forced_nodes_str = ', '.join(forced_nodes)
msg = _LI('Host filter forcing available nodes
to %s'
)
else:
forced_nodes_str = ', '.join(nodes_to_force)
msg = _LI("No nodes matched due to not matching "
"'force_nodes' value of '%s'")
LOG.info(msg % forced_nodes_str)

#如果没有指定filter_class_names参数,则使用默认的过滤器
#过滤器通过nova.conf文件中的scheduler_default_filter选项指
#定
if filter_class_names is None:
filters = self.default_filters
else:
filters = self._choose_host_filters(filter_class_names)
ignore_hosts = filter_properties.get('ignore_hosts', [])
force_hosts = filter_properties.get('force_hosts', [])
force_nodes = filter_properties.get('force_nodes', [])

if ignore_hosts or force_hosts or force_nodes:
# NOTE(deva): we can't assume "host" is unique because
# one host may have many nodes.
name_to_cls_map = {(x.host, x.nodename): x for x in
hosts}
#排除ingore_hosts中的host
if ignore_hosts:
_strip_ignore_hosts(name_to_cls_map, ignore_hosts)
if not name_to_cls_map:
return []
# NOTE(deva): allow force_hosts and force_nodes
#independently
#排除不在force_hosts中的host
if force_hosts:
_match_forced_hosts(name_to_cls_map, force_hosts)
#排除不在force_nodes中的node
if force_nodes:
_match_forced_nodes(name_to_cls_map, force_nodes)
if force_hosts or force_nodes:
# NOTE(deva): Skip filters when forcing host or node
#如果执行上述过滤后,还有host可用则直接返回
if name_to_cls_map:
return name_to_cls_map.values()
hosts = six.itervalues(name_to_cls_map)

#逐个调用各filter过滤hosts列表,返回满足所有条件的host
return self.filter_handler.get_filtered_objects(filters,
hosts, filter_properties, index)

再来看get_weighed_hosts方法,其直接调
nova/weights.py.BaseWeightHandler.get_weighed_objects,根据传入的权重过滤器对输入的host重新排序,代码如下:

def get_weighed_objects(self, weighers, obj_list, 
weighing_properties)
:

"""Return a sorted (descending), normalized list of WeighedObjects."""
#object_class是WeighedObject类
weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]

#如果只有一个host就直接返回
if len(weighed_objs) <= 1:
return weighed_objs
#调用各权重过滤器
for weigher in weighers:
weights = weigher.weigh_objects(weighed_objs,
weighing_properties)

# Normalize the weights
weights = normalize(weights,
minval=weigher.minval,
maxval=weigher.maxval)
#累加权重值
for i, weight in enumerate(weights):
obj = weighed_objs[i]
obj.weight += weigher.weight_multiplier() * weight
#根据权重降序排序,返回一个有序列表
return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)

总结下,下面的简表显示了nova-conductor,nova-scheduler的交互过程:

`nova-conductor`                `nova-scheduler`
`build_instances`
rpc request
`select_destinations` --> `select_destinations`
`select_destinations`
`_scheduler`
`get_filterd_hosts`
`get_weighed_hosts`
rpc response
<--

rpc request to nova-computer
`build_and_run_instance` -->

至此,云主机启动过程nova-conductornova-scheduler处理逻辑分析就结束了,下一篇博文将分析nova-compute的处理过程,尽请期待。