http://blog.csdn.net/sj13426074890/article/details/7915327
下面就是nova系统架构图:
/- ( LDAP )
[ Auth Manager ] ---
| \- ( DB )
|
| [ scheduler ] - [ volume ] - ( ATAoE/iSCSI )
| /
[ Web Dashboard ] -> [ api ] -- < AMQP > ------ [ network ] - ( Flat/Vlan )
| \
< HTTP > [ scheduler ] - [ compute ] - ( libvirt/xen )
| |
[ objectstore ] < - retrieves images
- DB:数据库,被所有的组件共享
- Web Dashboard: web页面,调用所有的api接口
- api: api组件接收http请求,转换命令通过AMQP和其他组件进行通信,或者直接通过http请求访问objectstore组件
- Auth Manager: 组件负责用户,工程和角色,可以以数据库或者LDAP做后端
- objectstore: 存储和检索镜像
- scheduler: 根据所指定的scheduler类,选择最合适的host
- volume: 卷的管理
- network: 管理 ip forwarding, bridges, and vlans
- compute: 管理与hypervisor和虚拟机的连接
- python启动脚本 bin/nova-compute
- utils.default_flagfile() #设置flag文件路径.
- flags.FLAGS(sys.argv) #把flag文件中的参数放到args中。
- logging.setup() #设置日志
- utils.monkey_patch()
- server = service.Service.create(binary='nova-compute') #创建服务
- service.serve(server) #启动服务
- service.wait() #等待请求
下面我讲按6大步进行讲述:
第一步:utils.default_flagfile()
- if args is None:
- args = sys.argv
- for arg in args:
- if arg.find('flagfile') != -1:
- break
- else:
- if not os.path.isabs(filename):
- # turn relative filename into an absolute path
- script_dir = os.path.dirname(inspect.stack()[-1][1])
- filename = os.path.abspath(os.path.join(script_dir, filename))
- if not os.path.exists(filename):
- filename = "./nova.conf"
- if not os.path.exists(filename):
- filename = '/etc/nova/nova.conf'
- flagfile = '--flagfile=%s' % filename
- args.insert(1, flagfile)
- 其实所完成的功能也就是把--flagfile=/etc/nova/nova.conf加到入参数列表中。
第二步:flags.FLAGS(sys.argv)
flags.py中有一句
FLAGS = FlagValues(),那么直接查看FlagValues()这个类,这个类是继承于gflags.FlagValues.
第三步:logging.setup()
这个是对这个服务开启日志功能。
第四步:server = service.Service.create(binary='nova-compute')
这个函数位于/nova/service.py: 类Service中
- class Service(object):
- """Service object for binaries running on hosts.
- A service takes a manager and enables rpc by listening to queues based
- on topic. It also periodically runs tasks on the manager and reports
- it state to the database services table."""
- 函数定义如下:
- @classmethod
- def create(cls, host=None, binary=None, topic=None, manager=None,
- report_interval=None, periodic_interval=None):
- """Instantiates class and passes back application object.
- :param host: defaults to FLAGS.host
- :param binary: defaults to basename of executable
- :param topic: defaults to bin_name - 'nova-' part
- :param manager: defaults to FLAGS.<topic>_manager
- :param report_interval: defaults to FLAGS.report_interval
- :param periodic_interval: defaults to FLAGS.periodic_interval
- """
- if not host:
- host = FLAGS.host #host name ‘nova’
- if not binary:
- binary = os.path.basename(inspect.stack()[-1][1]) # 这是因为python可以查看动栈的内容。
- # 所以可以得到压入栈中的脚本的名字 这时,binary="nova-compute"
- if not topic:
- topic = binary.rpartition('nova-')[2] #设置topic的名字:也就是把 binary的nova-去掉。
- if not manager:
- manager = FLAGS.get('%s_manager' % topic, None) #很明显这里得到的是compute_manager.
- if not report_interval:
- report_interval = FLAGS.report_interval
- if not periodic_interval:
- periodic_interval = FLAGS.periodic_interval
- service_obj = cls(host, binary, topic, manager,
- report_interval, periodic_interval) #利用 Service.__init__()构造函数生成一个对象
- return service_obj
接下来我们看看Service.__init__()
- def __init__(self, host, binary, topic, manager, report_interval=None,
- periodic_interval=None, *args, **kwargs):
- self.host = host
- self.binary = binary
- self.topic = topic
- self.manager_class_name = manager
- manager_class = utils.import_class(self.manager_class_name)
- self.manager = manager_class(host=self.host, *args, **kwargs)
- self.report_interval = report_interval
- self.periodic_interval = periodic_interval
- super(Service, self).__init__(*args, **kwargs)
- self.saved_args, self.saved_kwargs = args, kwargs
- self.timers = []
1、指定host、binary、topic。这个相对简单。
self.host = host
self.binary = binary
self.topic = topic
2、动态指定manager类,并动态生成实例 。
self.manager_class_name = manager #在create函数中指定。
manager_class = utils.import_class(self.manager_class_name) #动态地import此类。
self.manager = manager_class(host=self.host, *args, **kwargs) #动态地生成这个类的实例 。
那么这时会调用ComputeManager::__init__()函数
我们进入ComputeManager::__init__()函数
- def __init__(self, compute_driver=None, *args, **kwargs):
- """Load configuration options and connect to the hypervisor."""
- # TODO(vish): sync driver creation logic with the rest of the system
- # and re-document the module docstring
- if not compute_driver:
- compute_driver = FLAGS.compute_driver #这部分在后面会仔细分析
- #cfg.StrOpt('compute_driver',default='nova.virt.connection.get_connection',
- #help='Driver to use for controlling virtualization')
- try:
- self.driver = utils.check_isinstance(
- utils.import_object(compute_driver),
- driver.ComputeDriver) #动态的加载 虚拟机的控制驱动对象
- except ImportError as e:
- LOG.error(_("Unable to load the virtualization driver: %s") % (e))
- sys.exit(1)
- self.network_api = network.API()
- self.volume_api = volume.API()
- self.network_manager = utils.import_object(FLAGS.network_manager)
- self._last_host_check = 0
- self._last_bw_usage_poll = 0
- self._last_info_cache_heal = 0
- super(ComputeManager, self).__init__(service_name="compute",
- *args, **kwargs)
这里顺便看一下ComputeManager的类视图。
3、设置参数:应该是服务间隔时间之类的。
self.report_interval = report_interval
self.periodic_interval = periodic_interval
4、 设置多出来的一些参数。
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
第五步:service.serve(server)开启服务
- def serve(*servers):
- global _launcher # class Launcher 一个全局对象
- if not _launcher:
- _launcher = Launcher()
- for server in servers:
- _launcher.launch_server(server)
类Launcher 主要作用是为每个服务开启一个线程
- def launch_server(self, server):
- """Load and start the given server.
- :param server: The server you would like to start.
- :returns: None
- """
- gt = eventlet.spawn(self.run_server, server) #启动线程运行 run_server
- self._services.append(gt) #保存到 _services里面
- @staticmethod
- def run_server(server):
- """Start and wait for a server to finish.
- :param service: Server to run and wait for.
- :returns: None
- """
- server.start()
- server.wait()
其中server.start() #启动服务 我们来查看下 service.py中 Service类中start方法
- <strong> </strong><span style="font-size:14px;">def start(self):
- vcs_string = version.version_string_with_vcs() #获取版本号
- LOG.audit(_('Starting %(topic)s node (version %(vcs_string)s)'),
- {'topic': self.topic, 'vcs_string': vcs_string})
- utils.cleanup_file_locks() #清除锁文件
- self.manager.init_host()
- self.model_disconnected = False
- ctxt = context.get_admin_context()
- try:
- service_ref = db.service_get_by_args(ctxt,
- self.host,
- self.binary)
- self.service_id = service_ref['id']
- except exception.NotFound:
- self._create_service_ref(ctxt)
- if 'nova-compute' == self.binary:
- self.manager.update_available_resource(ctxt)
- self.conn = rpc.create_connection(new=True)
- LOG.debug(_("Creating Consumer connection for Service %s") %
- self.topic)
- # Share this same connection for these Consumers
- self.conn.create_consumer(self.topic, self, fanout=False)
- node_topic = '%s.%s' % (self.topic, self.host)
- self.conn.create_consumer(node_topic, self, fanout=False)
- self.conn.create_consumer(self.topic, self, fanout=True)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
- if self.report_interval:
- pulse = utils.LoopingCall(self.report_state)
- pulse.start(interval=self.report_interval, now=False)
- self.timers.append(pulse)
- if self.periodic_interval:
- periodic = utils.LoopingCall(self.periodic_tasks)
- periodic.start(interval=self.periodic_interval, now=False)
- self.timers.append(periodic)</span>
下面我们对start方法进行分析:
1、设置版本
vcs_string = version.version_string_with_vcs()
logging.audit(_('Starting %(topic)s node (version %(vcs_string)s)'),
{'topic': self.topic, 'vcs_string': vcs_string})
2、初始化init_host(self): nova.compute.ComputeManager
其中 self.driver.init_host(host=self.host)
这里需要查看一下在__init__函数中driver的设置。
if not compute_driver:
compute_driver = FLAGS.compute_driver
FLAGS.compute_driver的值也是在/nova/compute/manager.py中设置:
flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
'Driver to use for controlling virtualization')
关于get_connection下篇blog继续分析
3 . 接着从service.start()接着init_host()之后,
得到context.然后再更新当前机器上可用的资源。
- self.model_disconnected = False
- ctxt = context.get_admin_context()
- try:
- service_ref = db.service_get_by_args(ctxt,
- self.host,
- self.binary)
- self.service_id = service_ref['id']
- except exception.NotFound:
- self._create_service_ref(ctxt)
- if 'nova-compute' == self.binary:
- self.manager.update_available_resource(ctxt)
4. 更新RPC链接
- <span style="font-size:18px;"> self.conn = rpc.create_connection(new=True)
- LOG.debug(_("Creating Consumer connection for Service %s") %
- self.topic)
- # Share this same connection for these Consumers
- self.conn.create_consumer(self.topic, self, fanout=False)
- node_topic = '%s.%s' % (self.topic, self.host)
- self.conn.create_consumer(node_topic, self, fanout=False)
- self.conn.create_consumer(self.topic, self, fanout=True)
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
- if self.report_interval:
- pulse = utils.LoopingCall(self.report_state) #循环调用report_state
- pulse.start(interval=self.report_interval, now=False)
- self.timers.append(pulse)
- if self.periodic_interval:
- periodic = utils.LoopingCall(self.periodic_tasks) #循环调用 periodic_tasks 下面详细说明
- periodic.start(interval=self.periodic_interval, now=False)
- self.timers.append(periodic)</span>
Nova-Compute启动的主要工作是把环境配置好。让RqbbitMQ的消息队列建立起来。最后服务的启动则主要是让rabbitmq的consumer运行。并且进入wait状态。消息的响应则主要是找到相应的函数地址,并执行之。
六. Wait() 等待
一般服务启动之后,都会有wait()。那么这里也需要看一下服务的wait()。从而可以知道是什么东西在后台真正地运行。
- /usr/bin/nova-compute
- service.wait()
- /nova/service.py:Class:Service::wait()
上篇blog对bin/nova-compute按步进行了分析,这篇主要对其中ComputeManager进行分析
- <span style="font-size:18px;">class ComputeManager(manager.SchedulerDependentManager):
- """Manages the running instances from creation to destruction."""
- def __init__(self, compute_driver=None, *args, **kwargs):
- """Load configuration options and connect to the hypervisor."""
- # TODO(vish): sync driver creation logic with the rest of the system
- # and re-document the module docstring
- if not compute_driver:
- compute_driver = FLAGS.compute_driver
- try:
- self.driver = utils.check_isinstance(
- utils.import_object(compute_driver),
- driver.ComputeDriver)
- except ImportError as e:
- LOG.error(_("Unable to load the virtualization driver: %s") % (e))
- sys.exit(1)
- self.network_api = network.API()
- self.volume_api = volume.API()
- self.network_manager = utils.import_object(FLAGS.network_manager)
- self._last_host_check = 0
- self._last_bw_usage_poll = 0
- self._last_info_cache_heal = 0
- super(ComputeManager, self).__init__(service_name="compute",
- *args, **kwargs)</span>
首先设置 self.driver
FLAGS.compute_driver的值也是在/nova/compute/manager.py中设置:
flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
'Driver to use for controlling virtualization')
从而可知comute_dirver = nova.virt.connection.get_connection
那么需要查看一下virt/connection.get_connection中的init_host
那么应该是调用了/nova/virt/connection.py:get_connection函数。
get_connection()获得与hypervisor的连接
首先来说,get_connection的入口点位于/nova/virt/connection.py:get_connection()中。
- <span style="font-size:14px;">t = FLAGS.connection_type
- if t == 'fake':
- conn = fake.get_connection(read_only)
- elif t == 'libvirt':
- conn = libvirt_conn.get_connection(read_only)
- elif t == 'xenapi':
- conn = xenapi_conn.get_connection(read_only)
- elif t == 'hyperv':
- conn = hyperv.get_connection(read_only)
- elif t == 'vmwareapi':
- conn = vmwareapi_conn.get_connection(read_only)
- else:
- raise Exception('Unknown connection type "%s"' % t)
- if conn is None:
- LOG.error(_('Failed to open connection to the hypervisor'))
- sys.exit(1)
- return utils.check_isinstance(conn, driver.ComputeDriver)</span>
由于一般我们使用的是libvirt.所以这里特别关注一下libvirt.
/nova/virt/connection.py的开头,我们会发现这么一句:
from nova.virt.libvirt import connection as libvirt_conn
说明libvirt_conn指向/nova/virt/libvirt/connection.py
那么就直接查看./libvirt/connection.py中的:get_connection函数。
/nova/virt/libvirt/connection.py:get_connection()
- def get_connection(read_only):
- # These are loaded late so that there's no need to install these
- # libraries when not using libvirt.
- # Cheetah is separate because the unit tests want to load Cheetah,
- # but not libvirt.
- global libvirt
- if libvirt is None:
- libvirt = __import__('libvirt')
- _late_load_cheetah()
- return LibvirtConnection(read_only)
现在接下来看一下LibvirtConnection类。位于/nova/virt/libvirt/connection.py文件中
- def __init__(self, read_only):
- super(LibvirtConnection, self).__init__()
- self._host_state = None
- self._initiator = None
- self._wrapped_conn = None
- self.container = None
- self.read_only = read_only
- if FLAGS.firewall_driver not in firewall.drivers:
- FLAGS.set_default('firewall_driver', firewall.drivers[0])
- fw_class = utils.import_class(FLAGS.firewall_driver)
- self.firewall_driver = fw_class(get_connection=self._get_connection)
- self.vif_driver = utils.import_object(FLAGS.libvirt_vif_driver)
- self.volume_drivers = {}
- for driver_str in FLAGS.libvirt_volume_drivers:
- driver_type, _sep, driver = driver_str.partition('=')
- driver_class = utils.import_class(driver)
- self.volume_drivers[driver_type] = driver_class(self)
- self._host_state = None
- disk_prefix_map = {"lxc": "", "uml": "ubd", "xen": "sd"}
- if FLAGS.libvirt_disk_prefix:
- self._disk_prefix = FLAGS.libvirt_disk_prefix
- else:
- self._disk_prefix = disk_prefix_map.get(FLAGS.libvirt_type, 'vd')
- self.default_root_device = self._disk_prefix + 'a'
- self.default_second_device = self._disk_prefix + 'b'
- self.default_third_device = self._disk_prefix + 'c'
- self._disk_cachemode = None
- self.image_cache_manager = imagecache.ImageCacheManager()
1、父类初始化
super(LibvirtConnection, self).__init__()
2、设置firewallDriver
fw_class = utils.import_class(FLAGS.firewall_driver)
在此文件的上方有:这里还只是获得类名,并不是得到一个实例。
flags.DEFINE_string('firewall_driver',
'nova.virt.libvirt.firewall.IptablesFirewallDriver',
'Firewall driver (defaults to iptables)')
应该是引入防火墙规则之类的。
3、实例化防火墙类。
self.firewall_driver = fw_class(get_connection=self._get_connection)
那么这里应该会调用_get_connection()函数。
4、获得链接调用_get_connection()
- def _get_connection(self):
- if not self._wrapped_conn or not self._test_connection():
- LOG.debug(_('Connecting to libvirt: %s'), self.uri)
- if not FLAGS.libvirt_nonblocking:
- self._wrapped_conn = self._connect(self.uri,
- self.read_only)
- else:
- self._wrapped_conn = tpool.proxy_call(
- (libvirt.virDomain, libvirt.virConnect),
- self._connect, self.uri, self.read_only)
- return self._wrapped_conn
- _conn = property(_get_connection)
5、再查看_connect()
- @staticmethod
- def _connect(uri, read_only):
- auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT],
- 'root',
- None]
- if read_only:
- return libvirt.openReadOnly(uri)
- else:
- return libvirt.openAuth(uri, auth, 0)
这个函数是从底层libvirt库中拿到链接。
在nova/service.py中Service类中start方法中
- if self.periodic_interval:
- periodic = utils.LoopingCall(self.periodic_tasks) #循环调用 periodic_tasks 下面详细说明
- periodic.start(interval=self.periodic_interval, now=False)
- self.timers.append(periodic)
此出循环调用 self.periodic_tasks
- def periodic_tasks(self, raise_on_error=False):
- """Tasks to be run at a periodic interval."""
- ctxt = context.get_admin_context()
- self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
调用self.manager. periodic_tasks 函数。
其中manager 就是nova/compute/manager.py 中 ComputeManager
ComputeManager的父类SchedulerDependentManager
SchedulerDependentManager父类Manager(nova/manager.py)(可以参考nova-compute启动分析-1中manager类图)
- def periodic_tasks(self, context, raise_on_error=False):
- """Tasks to be run at a periodic interval."""
- for task_name, task in self._periodic_tasks:
- full_task_name = '.'.join([self.__class__.__name__, task_name])
- ticks_to_skip = self._ticks_to_skip[task_name]
- if ticks_to_skip > 0:
- LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
- " ticks left until next run"), locals())
- self._ticks_to_skip[task_name] -= 1
- continue
- self._ticks_to_skip[task_name] = task._ticks_between_runs
- LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
- try:
- task(self, context)
- except Exception as e:
- if raise_on_error:
- raise
- LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
- locals())
这个方法其实就是隔_ticks_to_skip次运行保存在_periodic_tasks中的方法
其中属性 _periodic_tasks 来自 __metaclass__ = ManagerMeta
这个时候来看下 ManagerMeta 类(nova/manager.py)
- class ManagerMeta(type):
- def __init__(cls, names, bases, dict_):
- """Metaclass that allows us to collect decorated periodic tasks."""
- super(ManagerMeta, cls).__init__(names, bases, dict_)
- # NOTE(sirp): if the attribute is not present then we must be the base
- # class, so, go ahead an initialize it. If the attribute is present,
- # then we're a subclass so make a copy of it so we don't step on our
- # parent's toes.
- try:
- cls._periodic_tasks = cls._periodic_tasks[:]
- except AttributeError:
- cls._periodic_tasks = []
- try:
- cls._ticks_to_skip = cls._ticks_to_skip.copy()
- except AttributeError:
- cls._ticks_to_skip = {}
- for value in cls.__dict__.values():
- if getattr(value, '_periodic_task', False):
- task = value
- name = task.__name__
- cls._periodic_tasks.append((name, task))
- cls._ticks_to_skip[name] = task._ticks_between_runs
其实很简单, if getattr(value, '_periodic_task', False):
如果某个 cls.__dict__.values() 的属性存_periodic_task
就加入到 cls._periodic_tasks 中 ,
其中这个属性的设置 是通过装饰函数进行完成的 def periodic_task(nova/manager.py)
- def periodic_task(*args, **kwargs):
- """Decorator to indicate that a method is a periodic task.
- This decorator can be used in two ways:
- 1. Without arguments '@periodic_task', this will be run on every tick
- of the periodic scheduler.
- 2. With arguments, @periodic_task(ticks_between_runs=N), this will be
- run on every N ticks of the periodic scheduler.
- """
- def decorator(f):
- f._periodic_task = True
- f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
- return f
- # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
- # and without parens.
- #
- # In the 'with-parens' case (with kwargs present), this function needs to
- # return a decorator function since the interpreter will invoke it like:
- #
- # periodic_task(*args, **kwargs)(f)
- #
- # In the 'without-parens' case, the original function will be passed
- # in as the first argument, like:
- #
- # periodic_task(f)
- if kwargs:
- return decorator
- else:
- return decorator(args[0])