gokeibi 2020-01-02
比较忙,很长世间没空看openstack源码,抽时间看了一下cinder的源码,贴下学习心得。本文简单写一下cinder的三个服务的启动,cinder-api, cinder-scheduler, 以及cinder-volume,三者启动都差不多
1、cinder-api
入口文件为/usr/bin/cinder-api,由此可知,入口为cinder.cmd.api文件中的main函数
#!/usr/bin/python2
# PBR Generated from u‘console_scripts‘
import sys
from cinder.cmd.api import main
if __name__ == "__main__":
sys.exit(main())main函数如下,主要关注14-16行即可
def main():
objects.register_all()
gmr_opts.set_defaults(CONF)
CONF(sys.argv[1:], project=‘cinder‘,
version=version.version_string())
config.set_middleware_defaults()
logging.setup(CONF, "cinder")
python_logging.captureWarnings(True)
utils.monkey_patch()
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
rpc.init(CONF)
launcher = service.process_launcher()
server = service.WSGIService(‘osapi_volume‘)
launcher.launch_service(server, workers=server.workers)
launcher.wait()14行创建一个 ProcessLauncher 对象,以便后续对app进行launch
15行创建 WSGIService 对象,名称为 osapi_volume
16行,launch对象调用launch_service方法对server进行处理,通过查看源码,是调用了ProcessLauncher对象的_start_child对服务进行处理
def launch_service(self, service, workers=1):
"""Launch a service with a given number of workers.
:param service: a service to launch, must be an instance of
:class:`oslo_service.service.ServiceBase`
:param workers: a number of processes in which a service
will be running
"""
_check_service_base(service) #对象类型校验
wrap = ServiceWrapper(service, workers) #对象包装
# Hide existing objects from the garbage collector, so that most
# existing pages will remain in shared memory rather than being
# duplicated between subprocesses in the GC mark-and-sweep. (Requires
# Python 3.7 or later.)
if hasattr(gc, ‘freeze‘):
gc.freeze()
LOG.info(‘Starting %d workers‘, wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)_start_child方法很简单,调用了os.fork()创建了一个子进程,如果创建子进程成功,再次调用_child_process方法
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don‘t fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(‘Forking too fast, sleeping‘)
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()
os._exit(status)
LOG.debug(‘Started child %d‘, pid)
wrap.children.add(pid)
self.children[pid] = wrap
return piddef _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don‘t share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher(self.conf, restart_method=self.restart_method)
launcher.launch_service(service)
return launcher_child_process方法中调用了eventlet,获取hub,并且创建一个线程,对进程进行观察,同时创建一个Launcher对象,对服务进行lanch,launch_service
def launch_service(self, service, workers=1):
"""Load and start the given service.
:param service: The service you would like to start, must be an
instance of :class:`oslo_service.service.ServiceBase`
:param workers: This param makes this method compatible with
ProcessLauncher.launch_service. It must be None, 1 or
omitted.
:returns: None
"""
if workers is not None and workers != 1:
raise ValueError(_("Launcher asked to start multiple workers"))
_check_service_base(service)
service.backdoor_port = self.backdoor_port
self.services.add(service) #关键Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的service的start()方法
def add(self, service):
"""Add a service to a list and create a thread to run it.
:param service: service to run
"""
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
try:
service.start()
except Exception:
LOG.exception(‘Error starting thread.‘)
raise SystemExit(1)
else:
done.wait()这个service即最初提到的WSGIService 对象,查看一下其start方法
def start(self):
"""Start serving this service using loaded configuration.
Also, retrieve updated port number in case ‘0‘ was passed in, which
indicates a random port should be used.
:returns: None
"""
if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port此时self.manager为None,关键执行步骤为self.server.start(),这个server为WSGIService 进行init的时候构造的对象
class WSGIService(service.ServiceBase):
"""Provides ability to launch API from a ‘paste‘ configuration."""
def __init__(self, name, loader=None):
"""Initialize, but do not start the WSGI server.
:param name: The name of the WSGI server given to the loader.
:param loader: Loads the WSGI application using the given name.
:returns: None
"""
self.name = name
self.manager = self._get_manager()
self.loader = loader or wsgi.Loader(CONF)
self.app = self.loader.load_app(name)
self.host = getattr(CONF, ‘%s_listen‘ % name, "0.0.0.0")
self.port = getattr(CONF, ‘%s_listen_port‘ % name, 0)
self.use_ssl = getattr(CONF, ‘%s_use_ssl‘ % name, False)
self.workers = (getattr(CONF, ‘%s_workers‘ % name, None) or
processutils.get_worker_count())
if self.workers and self.workers < 1:
worker_name = ‘%s_workers‘ % name
msg = (_("%(worker_name)s value of %(workers)d is invalid, "
"must be greater than 0.") %
{‘worker_name‘: worker_name,
‘workers‘: self.workers})
raise exception.InvalidConfigurationValue(msg)
setup_profiler(name, self.host)
self.server = wsgi.Server(CONF,
name,
self.app,
host=self.host,
port=self.port,
use_ssl=self.use_ssl) # 这里def start(self):
"""Start serving a WSGI application.
:returns: None
"""
# The server socket object will be closed after server exits,
# but the underlying file descriptor will remain open, and will
# give bad file descriptor error. So duplicating the socket object,
# to keep file descriptor usable.
self.dup_socket = self.socket.dup()
if self._use_ssl:
self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)
wsgi_kwargs = {
‘func‘: eventlet.wsgi.server,
‘sock‘: self.dup_socket,
‘site‘: self.app,
‘protocol‘: self._protocol,
‘custom_pool‘: self._pool,
‘log‘: self._logger,
‘log_format‘: self.conf.wsgi_log_format,
‘debug‘: False,
‘keepalive‘: self.conf.wsgi_keep_alive,
‘socket_timeout‘: self.client_socket_timeout
}
if self._max_url_len:
wsgi_kwargs[‘url_length_limit‘] = self._max_url_len
self._server = eventlet.spawn(**wsgi_kwargs)至此,cinder-api启动顺利启动
2、cinder-scheduler
入口文件为/usr/bin/cinder-scheduler,则实际调用文件为cinder/cmd/scheduler.py下的main
#!/usr/bin/python2
# PBR Generated from u‘console_scripts‘
import sys
from cinder.cmd.scheduler import main
if __name__ == "__main__":
sys.exit(main())def main():
objects.register_all()
gmr_opts.set_defaults(CONF)
CONF(sys.argv[1:], project=‘cinder‘,
version=version.version_string())
logging.setup(CONF, "cinder")
python_logging.captureWarnings(True)
utils.monkey_patch()
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
server = service.Service.create(binary=‘cinder-scheduler‘)
service.serve(server)
service.wait()实际启动服务的只有10,11,12行,通过Service对象的类方法create创建一个名server的service,然后用serve方法(实际调用launch对service进行处理),launch方法通过判断serve传进的worker参数来判断,传入的对象是process还是service,但是不管是service还是process,都是调用了launch_service这个接口,此处,同上述api所述,Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的Service的start()方法。
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None,
coordination=False, cluster=None, **kwargs):
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary
if not manager:
subtopic = topic.rpartition(‘cinder-‘)[2]
manager = CONF.get(‘%s_manager‘ % subtopic, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_interval is None:
periodic_interval = CONF.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name,
coordination=coordination,
cluster=cluster, **kwargs)
return service_objdef serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_(‘serve() can only be called once‘))
_launcher = service.launch(CONF, server, workers=workers)def launch(conf, service, workers=1, restart_method=‘reload‘):
"""Launch a service with a given number of workers.
:param conf: an instance of ConfigOpts
:param service: a service to launch, must be an instance of
:class:`oslo_service.service.ServiceBase`
:param workers: a number of processes in which a service will be running
:param restart_method: Passed to the constructed launcher. If ‘reload‘, the
launcher will call reload_config_files on SIGHUP. If ‘mutate‘, it will
call mutate_config_files on SIGHUP. Other values produce a ValueError.
:returns: instance of a launcher that was used to launch the service
"""
if workers is not None and workers <= 0:
raise ValueError(_("Number of workers should be positive!"))
if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)
return launcher至此,cinder-scheduler启动完成
3、cinder-volume
cinder-volume的入口文件为/usr/bin/cinder-volume,由此可知真正的入口函数为cinder/cmd/volume.py中的main函数
#!/usr/bin/python2
# PBR Generated from u‘console_scripts‘
import sys
from cinder.cmd.volume import main
if __name__ == "__main__":
sys.exit(main())def _launch_services_win32():
if CONF.backend_name and CONF.backend_name not in CONF.enabled_backends:
msg = _(‘The explicitly passed backend name "%(backend_name)s" is not ‘
‘among the enabled backends: %(enabled_backends)s.‘)
raise exception.InvalidInput(
reason=msg % dict(backend_name=CONF.backend_name,
enabled_backends=CONF.enabled_backends))
# We‘ll avoid spawning a subprocess if a single backend is requested.
single_backend_name = (CONF.enabled_backends[0]
if len(CONF.enabled_backends) == 1
else CONF.backend_name)
if single_backend_name:
launcher = service.get_launcher()
_launch_service(launcher, single_backend_name)
elif CONF.enabled_backends:
# We‘re using the ‘backend_name‘ argument, requesting a certain backend
# and constructing the service object within the child process.
launcher = service.WindowsProcessLauncher()
py_script_re = re.compile(r‘.*\.py\w?$‘)
for backend in filter(None, CONF.enabled_backends):
cmd = sys.argv + [‘--backend_name=%s‘ % backend]
# Recent setuptools versions will trim ‘-script.py‘ and ‘.exe‘
# extensions from sys.argv[0].
if py_script_re.match(sys.argv[0]):
cmd = [sys.executable] + cmd
launcher.add_process(cmd)
_notify_service_started()
_ensure_service_started()
launcher.wait()
def _launch_services_posix():
launcher = service.get_launcher()
for backend in filter(None, CONF.enabled_backends):
_launch_service(launcher, backend)
_ensure_service_started()
launcher.wait()
def main():
objects.register_all()
gmr_opts.set_defaults(CONF)
CONF(sys.argv[1:], project=‘cinder‘,
version=version.version_string())
logging.setup(CONF, "cinder")
python_logging.captureWarnings(True)
priv_context.init(root_helper=shlex.split(utils.get_root_helper()))
utils.monkey_patch()
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
global LOG
LOG = logging.getLogger(__name__)
if not CONF.enabled_backends:
LOG.error(‘Configuration for cinder-volume does not specify ‘
‘"enabled_backends". Using DEFAULT section to configure ‘
‘drivers is not supported since Ocata.‘)
sys.exit(1)
if os.name == ‘nt‘:
# We cannot use oslo.service to spawn multiple services on Windows.
# It relies on forking, which is not available on Windows.
# Furthermore, service objects are unmarshallable objects that are
# passed to subprocesses.
_launch_services_win32()
else:
_launch_services_posix()根据平台是windows还是linux,进行不同的调用,因为是在linux上部署,所有调用的函数为_launch_services_posix(),其中调用了_launch_service(launcher, backend)创建Service对象,同上述cinder-schedule的启动流程,后面不再累述了~