bapinggaitianli 2020-04-16
上一篇文章写了cinder服务的启动,下面讲一下openstack是如何通过openstack创建一个卷
通过查看cinder的api-paste.ini文件,并且现在是v3版本的API,可以得知目前API的router文件是cinder/api/v3/router.py文件

通过查看router.py文件,可以得知,对于volume的操作都会通过mapper重定向到cinder/api/v3/volume.py文件中进行处理

看一下创建volume的源码
def create(self, req, body):
........
........
new_volume = self.volume_api.create(context,
size,
volume.get(‘display_name‘),
volume.get(‘display_description‘),
**kwargs)
retval = self._view_builder.detail(req, new_volume)
return retval此处调用了self.volume_api.create()去创建卷,self.volume_api 这个变量是VolumeController从V2的api继承过来来的,在初始话的时候被初始化为cinder.volume.api.API(),所以其create方法为cinder/volume/api.py中API类下的create方法
def create(self, context, size, name, description, snapshot=None,
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
scheduler_hints=None,
source_replica=None, consistencygroup=None,
cgsnapshot=None, multiattach=False, source_cg=None,
group=None, group_snapshot=None, source_group=None,
backup=None):
.........
.........
create_what = {
‘context‘: context,
‘raw_size‘: size,
‘name‘: name,
‘description‘: description,
‘snapshot‘: snapshot,
‘image_id‘: image_id,
‘raw_volume_type‘: volume_type,
‘metadata‘: metadata or {},
‘raw_availability_zone‘: availability_zone,
‘source_volume‘: source_volume,
‘scheduler_hints‘: scheduler_hints,
‘key_manager‘: self.key_manager,
‘optional_args‘: {‘is_quota_committed‘: False},
‘consistencygroup‘: consistencygroup,
‘cgsnapshot‘: cgsnapshot,
‘raw_multiattach‘: multiattach,
‘group‘: group,
‘group_snapshot‘: group_snapshot,
‘source_group‘: source_group,
‘backup‘: backup,
}
try:
sched_rpcapi = (self.scheduler_rpcapi if (
not cgsnapshot and not source_cg and
not group_snapshot and not source_group)
else None)
volume_rpcapi = (self.volume_rpcapi if (
not cgsnapshot and not source_cg and
not group_snapshot and not source_group)
else None)
flow_engine = create_volume.get_flow(self.db,
self.image_service,
availability_zones,
create_what,
sched_rpcapi,
volume_rpcapi)
except Exception:
msg = _(‘Failed to create api volume flow.‘)
LOG.exception(msg)
raise exception.CinderException(msg)此处调用了create_flow中的get_flow方法,进行传参和并创建,get_flow采用了taskflow,使用了taskflow中的线性流程,依次添加了ExtractVolumeRequestTesk(), QuotaReserveTask(), EntryCreateTask(), QuotaCommitTask() 以及VolumeCastTask()五个步骤
def get_flow(db_api, image_service_api, availability_zones, create_what,
scheduler_rpcapi=None, volume_rpcapi=None):
"""Constructs and returns the api entrypoint flow.
This flow will do the following:
1. Inject keys & values for dependent tasks.
2. Extracts and validates the input keys & values.
3. Reserves the quota (reverts quota on any failures).
4. Creates the database entry.
5. Commits the quota.
6. Casts to volume manager or scheduler for further processing.
"""
flow_name = ACTION.replace(":", "_") + "_api"
api_flow = linear_flow.Flow(flow_name)
api_flow.add(ExtractVolumeRequestTask(
image_service_api,
availability_zones,
rebind={‘size‘: ‘raw_size‘,
‘availability_zone‘: ‘raw_availability_zone‘,
‘volume_type‘: ‘raw_volume_type‘,
‘multiattach‘: ‘raw_multiattach‘}))
api_flow.add(QuotaReserveTask(),
EntryCreateTask(),
QuotaCommitTask())
if scheduler_rpcapi and volume_rpcapi:
# This will cast it out to either the scheduler or volume manager via
# the rpc apis provided.
api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(api_flow, store=create_what)taskflow会调用添加的每个步骤类的execute方法,taskflow是openstack中的一个重要组建,用于构建逻辑需要精准步骤的业务,涉及的东西比较多,暂时不在这里记录
ExcuactVolumeRequestTask类主要对传过来的参数进行校验,提取各类参数,并根据参数进行zone、镜像等选取的操作,并为QuotaReserveTask 类传递参数
QuotaReserveTask类进行配额检查以及占用
EntryCreateTask类主要是是调用cinder.objects.volume.Volume.create()方法在database中创建记录
QuotaCommitTask类在数据库中进行配额的确认
VolumeCastTask类通过rpc对任务进行投递投递的对象为schduler_rpcapi

scheduler_rpcapi在调用get_flow时已经指定

此时cinder-scheduler接收到cinder-api传过来的请求,发送请求的代码部分为 cinder/scheduler/rpcapi.py
def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
request_spec=None, filter_properties=None,
backup_id=None):
volume.create_worker()
cctxt = self._get_cctxt()
msg_args = {‘snapshot_id‘: snapshot_id, ‘image_id‘: image_id,
‘request_spec‘: request_spec,
‘filter_properties‘: filter_properties,
‘volume‘: volume, ‘backup_id‘: backup_id}
if not self.client.can_send_version(‘3.10‘):
msg_args.pop(‘backup_id‘)
return cctxt.cast(ctxt, ‘create_volume‘, **msg_args)此处同样,cinder-scheduler接收为cinder-cherduler的cinder/scheduler/manager.SchedulerManager
@objects.Volume.set_workers
@append_operation_type()
def create_volume(self, context, volume, snapshot_id=None, image_id=None,
request_spec=None, filter_properties=None,
backup_id=None):
self._wait_for_scheduler()
try:
flow_engine = create_volume.get_flow(context,
self.driver,
request_spec,
filter_properties,
volume,
snapshot_id,
image_id,
backup_id)
except Exception:
msg = _("Failed to create scheduler manager volume flow")
LOG.exception(msg)
raise exception.CinderException(msg)
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()此处cinder-scheduler同样使用了taskflow模型对磁盘进行创建,看一下这个get_flow中包含的几个task类
def get_flow(context, driver_api, request_spec=None,
filter_properties=None,
volume=None, snapshot_id=None, image_id=None, backup_id=None):
create_what = {
‘context‘: context,
‘raw_request_spec‘: request_spec,
‘filter_properties‘: filter_properties,
‘volume‘: volume,
‘snapshot_id‘: snapshot_id,
‘image_id‘: image_id,
‘backup_id‘: backup_id,
}
flow_name = ACTION.replace(":", "_") + "_scheduler"
scheduler_flow = linear_flow.Flow(flow_name)
# This will extract and clean the spec from the starting values.
scheduler_flow.add(ExtractSchedulerSpecTask(
rebind={‘request_spec‘: ‘raw_request_spec‘}))
# This will activate the desired scheduler driver (and handle any
# driver related failures appropriately).
scheduler_flow.add(ScheduleCreateVolumeTask(driver_api))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(scheduler_flow, store=create_what)ExtractSchedulerSpecTask 同样为对请求参数进行提取加工以供后续调用
ScheduleCreateVolumeTask中execute中有两个操作,1调用drvier_api进行volume的创建,2.如果创建过程中出现失败,则通过message将消息发送给scheduler
def execute(self, context, request_spec, filter_properties, volume):
try:
self.driver_api.schedule_create_volume(context, request_spec,
filter_properties)
except Exception as e:
self.message_api.create(
context,
message_field.Action.SCHEDULE_ALLOCATE_VOLUME,
resource_uuid=request_spec[‘volume_id‘],
exception=e)
with excutils.save_and_reraise_exception(
reraise=not isinstance(e, exception.NoValidBackend)):
try:
self._handle_failure(context, request_spec, e)
finally:
common.error_out(volume, reason=e)此时driver_api为SchedulerManager初始化时的scheduler_driver


可知driver_api为cinder.cheduler.filter_scheduler.FilterScheduler
def schedule_create_volume(self, context, request_spec, filter_properties):
backend = self._schedule(context, request_spec, filter_properties)
if not backend:
raise exception.NoValidBackend(reason=_("No weighed backends "
"available"))
backend = backend.obj
volume_id = request_spec[‘volume_id‘]
updated_volume = driver.volume_update_db(
context, volume_id,
backend.host,
backend.cluster_name,
availability_zone=backend.service[‘availability_zone‘])
self._post_select_populate_filter_properties(filter_properties,
backend)
# context is not serializable
filter_properties.pop(‘context‘, None)
self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
filter_properties,
allow_reschedule=True)self._schedule 通过传入的参数对后端的进行选择(多个后端的情况下)
最后调用self.volume_rpcapi.create_volume进行volume的创建,volume_api为volume_rpcapi.VolumeAPI()

def create_volume(self, ctxt, volume, request_spec, filter_properties,
allow_reschedule=True):
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, ‘create_volume‘,
request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=allow_reschedule,
volume=volume)此时cinder-schedule通过rpc对cinder-volume发送创建volume的消息,接收消息的是cinder-volume的VolumeManager
@objects.Volume.set_workers
def create_volume(self, context, volume, request_spec=None,
filter_properties=None, allow_reschedule=True):
"""Creates the volume."""
utils.log_unsupported_driver_warning(self.driver)
self._set_resource_host(volume)
self._update_allocated_capacity(volume)
# We lose the host value if we reschedule, so keep it here
original_host = volume.host
context_elevated = context.elevated()
if filter_properties is None:
filter_properties = {}
if request_spec is None:
request_spec = objects.RequestSpec()
try:
# NOTE(flaper87): Driver initialization is
# verified by the task itself.
flow_engine = create_volume.get_flow(
context_elevated,
self,
self.db,
self.driver,
self.scheduler_rpcapi,
self.host,
volume,
allow_reschedule,
context,
request_spec,
filter_properties,
image_volume_cache=self.image_volume_cache,
)
except Exception:
msg = _("Create manager volume flow failed.")
LOG.exception(msg, resource={‘type‘: ‘volume‘, ‘id‘: volume.id})
raise exception.CinderException(msg)
snapshot_id = request_spec.get(‘snapshot_id‘)
source_volid = request_spec.get(‘source_volid‘)
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
locked_action = "%s-%s" % (snapshot_id, ‘delete_snapshot‘)
elif source_volid is not None:
# Make sure the volume is not deleted until we are done with it.
locked_action = "%s-%s" % (source_volid, ‘delete_volume‘)
else:
locked_action = None
def _run_flow():
# This code executes create volume flow. If something goes wrong,
# flow reverts all job that was done and reraises an exception.
# Otherwise, all data that was generated by flow becomes available
# in flow engine‘s storage.
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
# NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
# decide if allocated_capacity should be incremented.
rescheduled = False
try:
if locked_action is None:
_run_flow()
else:
with coordination.COORDINATOR.get_lock(locked_action):
_run_flow()
finally:
try:
flow_engine.storage.fetch(‘refreshed‘)
except tfe.NotFound:
# If there‘s no vol_ref, then flow is reverted. Lets check out
# if rescheduling occurred.
try:
rescheduled = flow_engine.storage.get_revert_result(
create_volume.OnFailureRescheduleTask.make_name(
[create_volume.ACTION]))
except tfe.NotFound:
pass
if rescheduled:
# NOTE(geguileo): Volume was rescheduled so we need to update
# volume stats because the volume wasn‘t created here.
# Volume.host is None now, so we pass the original host value.
self._update_allocated_capacity(volume, decrement=True,
host=original_host)
# Shared targets is only relevant for iSCSI connections.
# We default to True to be on the safe side.
volume.shared_targets = (
self.driver.capabilities.get(‘storage_protocol‘) == ‘iSCSI‘ and
self.driver.capabilities.get(‘shared_targets‘, True))
# TODO(geguileo): service_uuid won‘t be enough on Active/Active
# deployments. There can be 2 services handling volumes from the same
# backend.
volume.service_uuid = self.service_uuid
volume.save()
LOG.info("Created volume successfully.", resource=volume)
return volume.id上述代码中,同样使用了taskflow,
ExtractVolumeRefTask为提取数据库中volume的具体信息
OnFailureRescheduleTask中execute并无操作,但是revert中有操作,是为了以后的步骤出现错误进行回滚进行部分操作。
ExtractVolumeSpecTask 提取spec信息
NotifyVolumeActionTask 广播volume开始创建的消息
def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume,
allow_reschedule, reschedule_context, request_spec,
filter_properties, image_volume_cache=None):
flow_name = ACTION.replace(":", "_") + "_manager"
volume_flow = linear_flow.Flow(flow_name)
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
create_what = {
‘context‘: context,
‘filter_properties‘: filter_properties,
‘request_spec‘: request_spec,
‘volume‘: volume,
}
volume_flow.add(ExtractVolumeRefTask(db, host, set_error=False))
retry = filter_properties.get(‘retry‘, None)
# Always add OnFailureRescheduleTask and we handle the change of volume‘s
# status when reverting the flow. Meanwhile, no need to revert process of
# ExtractVolumeRefTask.
do_reschedule = allow_reschedule and request_spec and retry
volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, driver,
scheduler_rpcapi, do_reschedule))
LOG.debug("Volume reschedule parameters: %(allow)s "
"retry: %(retry)s", {‘allow‘: allow_reschedule, ‘retry‘: retry})
volume_flow.add(ExtractVolumeSpecTask(db),
NotifyVolumeActionTask(db, "create.start"),
CreateVolumeFromSpecTask(manager,
db,
driver,
image_volume_cache),
CreateVolumeOnFinishTask(db, "create.end"))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(volume_flow, store=create_what)CreateVolumeFromSpecTask此处通过传入的create_type不同,调用不同的接口进行卷的创建,以裸磁盘为例(create_type为raw)
CreateVolumeOnFinishTask广播创建磁盘完成
if create_type == ‘raw‘:
model_update = self._create_raw_volume(volume, **volume_spec)
elif create_type == ‘snap‘:
model_update = self._create_from_snapshot(context, volume,
**volume_spec)
elif create_type == ‘source_vol‘:
model_update = self._create_from_source_volume(
context, volume, **volume_spec)
elif create_type == ‘image‘:
model_update = self._create_from_image(context,
volume,
**volume_spec)
elif create_type == ‘backup‘:
model_update, need_update_volume = self._create_from_backup(
context, volume, **volume_spec)
volume_spec.update({‘need_update_volume‘: need_update_volume})
else:
raise exception.VolumeTypeNotFound(volume_type_id=create_type)def _create_raw_volume(self, volume, **kwargs):
try:
ret = self.driver.create_volume(volume)
finally:
self._cleanup_cg_in_volume(volume)
return ret此处self.driver为VolumeManager初始化时进行初始化的,可以看出driver是从配置文件中读取的
self.configuration = config.Configuration(volume_backend_opts,
config_group=service_name)
self._set_tpool_size(
self.configuration.backend_native_threads_pool_size)
self.stats = {}
self.service_uuid = None
if not volume_driver:
# Get from configuration, which will get the default
# if its not using the multi backend
volume_driver = self.configuration.volume_driver
if volume_driver in MAPPING:
LOG.warning("Driver path %s is deprecated, update your "
"configuration to the new path.", volume_driver)
volume_driver = MAPPING[volume_driver]配置文件中有写

def create_volume(self, volume):
"""Creates a logical volume."""
mirror_count = 0
if self.configuration.lvm_mirrors:
mirror_count = self.configuration.lvm_mirrors
self._create_volume(volume[‘name‘],
self._sizestr(volume[‘size‘]),
self.configuration.lvm_type,
mirror_count)def _create_volume(self, name, size, lvm_type, mirror_count, vg=None):
vg_ref = self.vg
if vg is not None:
vg_ref = vg
vg_ref.create_volume(name, size, lvm_type, mirror_count)此处self.vg是什么?全局查找一下,具体初始化时间,可以查看上一篇,cinder服务启动中的cinder-volume启动部分

此时创建卷调用的底层代码就可以得知,调用的是lvcreate对卷进行创建。
def create_volume(self, name, size_str, lv_type=‘default‘, mirror_count=0):
"""Creates a logical volume on the object‘s VG.
:param name: Name to use when creating Logical Volume
:param size_str: Size to use when creating Logical Volume
:param lv_type: Type of Volume (default or thin)
:param mirror_count: Use LVM mirroring with specified count
"""
if lv_type == ‘thin‘:
pool_path = ‘%s/%s‘ % (self.vg_name, self.vg_thin_pool)
cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-T‘, ‘-V‘, size_str, ‘-n‘,
name, pool_path]
else:
cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-n‘, name, self.vg_name,
‘-L‘, size_str]
if mirror_count > 0:
cmd.extend([‘--type=mirror‘, ‘-m‘, mirror_count, ‘--nosync‘,
‘--mirrorlog‘, ‘mirrored‘])
terras = int(size_str[:-1]) / 1024.0
if terras >= 1.5:
rsize = int(2 ** math.ceil(math.log(terras) / math.log(2)))
# NOTE(vish): Next power of two for region size. See:
# http://red.ht/U2BPOD
cmd.extend([‘-R‘, str(rsize)])
try:
self._execute(*cmd,
root_helper=self._root_helper,
run_as_root=True)
except putils.ProcessExecutionError as err:
LOG.exception(‘Error creating Volume‘)
LOG.error(‘Cmd :%s‘, err.cmd)
LOG.error(‘StdOut :%s‘, err.stdout)
LOG.error(‘StdErr :%s‘, err.stderr)
LOG.error(‘Current state: %s‘,
self.get_all_volume_groups(self._root_helper))
raise后续就是一系列回调和通知啦