bapinggaitianli 2020-04-16
上一篇文章写了cinder服务的启动,下面讲一下openstack是如何通过openstack创建一个卷
通过查看cinder的api-paste.ini文件,并且现在是v3版本的API,可以得知目前API的router文件是cinder/api/v3/router.py文件
通过查看router.py文件,可以得知,对于volume的操作都会通过mapper重定向到cinder/api/v3/volume.py文件中进行处理
看一下创建volume的源码
def create(self, req, body): ........ ........ new_volume = self.volume_api.create(context, size, volume.get(‘display_name‘), volume.get(‘display_description‘), **kwargs) retval = self._view_builder.detail(req, new_volume) return retval
此处调用了self.volume_api.create()去创建卷,self.volume_api 这个变量是VolumeController从V2的api继承过来来的,在初始话的时候被初始化为cinder.volume.api.API(),所以其create方法为cinder/volume/api.py中API类下的create方法
def create(self, context, size, name, description, snapshot=None, image_id=None, volume_type=None, metadata=None, availability_zone=None, source_volume=None, scheduler_hints=None, source_replica=None, consistencygroup=None, cgsnapshot=None, multiattach=False, source_cg=None, group=None, group_snapshot=None, source_group=None, backup=None): ......... ......... create_what = { ‘context‘: context, ‘raw_size‘: size, ‘name‘: name, ‘description‘: description, ‘snapshot‘: snapshot, ‘image_id‘: image_id, ‘raw_volume_type‘: volume_type, ‘metadata‘: metadata or {}, ‘raw_availability_zone‘: availability_zone, ‘source_volume‘: source_volume, ‘scheduler_hints‘: scheduler_hints, ‘key_manager‘: self.key_manager, ‘optional_args‘: {‘is_quota_committed‘: False}, ‘consistencygroup‘: consistencygroup, ‘cgsnapshot‘: cgsnapshot, ‘raw_multiattach‘: multiattach, ‘group‘: group, ‘group_snapshot‘: group_snapshot, ‘source_group‘: source_group, ‘backup‘: backup, } try: sched_rpcapi = (self.scheduler_rpcapi if ( not cgsnapshot and not source_cg and not group_snapshot and not source_group) else None) volume_rpcapi = (self.volume_rpcapi if ( not cgsnapshot and not source_cg and not group_snapshot and not source_group) else None) flow_engine = create_volume.get_flow(self.db, self.image_service, availability_zones, create_what, sched_rpcapi, volume_rpcapi) except Exception: msg = _(‘Failed to create api volume flow.‘) LOG.exception(msg) raise exception.CinderException(msg)
此处调用了create_flow中的get_flow方法,进行传参和并创建,get_flow采用了taskflow,使用了taskflow中的线性流程,依次添加了ExtractVolumeRequestTesk(), QuotaReserveTask(), EntryCreateTask(), QuotaCommitTask() 以及VolumeCastTask()五个步骤
def get_flow(db_api, image_service_api, availability_zones, create_what, scheduler_rpcapi=None, volume_rpcapi=None): """Constructs and returns the api entrypoint flow. This flow will do the following: 1. Inject keys & values for dependent tasks. 2. Extracts and validates the input keys & values. 3. Reserves the quota (reverts quota on any failures). 4. Creates the database entry. 5. Commits the quota. 6. Casts to volume manager or scheduler for further processing. """ flow_name = ACTION.replace(":", "_") + "_api" api_flow = linear_flow.Flow(flow_name) api_flow.add(ExtractVolumeRequestTask( image_service_api, availability_zones, rebind={‘size‘: ‘raw_size‘, ‘availability_zone‘: ‘raw_availability_zone‘, ‘volume_type‘: ‘raw_volume_type‘, ‘multiattach‘: ‘raw_multiattach‘})) api_flow.add(QuotaReserveTask(), EntryCreateTask(), QuotaCommitTask()) if scheduler_rpcapi and volume_rpcapi: # This will cast it out to either the scheduler or volume manager via # the rpc apis provided. api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api)) # Now load (but do not run) the flow using the provided initial data. return taskflow.engines.load(api_flow, store=create_what)
taskflow会调用添加的每个步骤类的execute方法,taskflow是openstack中的一个重要组建,用于构建逻辑需要精准步骤的业务,涉及的东西比较多,暂时不在这里记录
ExcuactVolumeRequestTask类主要对传过来的参数进行校验,提取各类参数,并根据参数进行zone、镜像等选取的操作,并为QuotaReserveTask 类传递参数
QuotaReserveTask类进行配额检查以及占用
EntryCreateTask类主要是是调用cinder.objects.volume.Volume.create()方法在database中创建记录
QuotaCommitTask类在数据库中进行配额的确认
VolumeCastTask类通过rpc对任务进行投递投递的对象为schduler_rpcapi
scheduler_rpcapi在调用get_flow时已经指定
此时cinder-scheduler接收到cinder-api传过来的请求,发送请求的代码部分为 cinder/scheduler/rpcapi.py
def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None, backup_id=None): volume.create_worker() cctxt = self._get_cctxt() msg_args = {‘snapshot_id‘: snapshot_id, ‘image_id‘: image_id, ‘request_spec‘: request_spec, ‘filter_properties‘: filter_properties, ‘volume‘: volume, ‘backup_id‘: backup_id} if not self.client.can_send_version(‘3.10‘): msg_args.pop(‘backup_id‘) return cctxt.cast(ctxt, ‘create_volume‘, **msg_args)
此处同样,cinder-scheduler接收为cinder-cherduler的cinder/scheduler/manager.SchedulerManager
@objects.Volume.set_workers @append_operation_type() def create_volume(self, context, volume, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None, backup_id=None): self._wait_for_scheduler() try: flow_engine = create_volume.get_flow(context, self.driver, request_spec, filter_properties, volume, snapshot_id, image_id, backup_id) except Exception: msg = _("Failed to create scheduler manager volume flow") LOG.exception(msg) raise exception.CinderException(msg) with flow_utils.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run()
此处cinder-scheduler同样使用了taskflow模型对磁盘进行创建,看一下这个get_flow中包含的几个task类
def get_flow(context, driver_api, request_spec=None, filter_properties=None, volume=None, snapshot_id=None, image_id=None, backup_id=None): create_what = { ‘context‘: context, ‘raw_request_spec‘: request_spec, ‘filter_properties‘: filter_properties, ‘volume‘: volume, ‘snapshot_id‘: snapshot_id, ‘image_id‘: image_id, ‘backup_id‘: backup_id, } flow_name = ACTION.replace(":", "_") + "_scheduler" scheduler_flow = linear_flow.Flow(flow_name) # This will extract and clean the spec from the starting values. scheduler_flow.add(ExtractSchedulerSpecTask( rebind={‘request_spec‘: ‘raw_request_spec‘})) # This will activate the desired scheduler driver (and handle any # driver related failures appropriately). scheduler_flow.add(ScheduleCreateVolumeTask(driver_api)) # Now load (but do not run) the flow using the provided initial data. return taskflow.engines.load(scheduler_flow, store=create_what)
ExtractSchedulerSpecTask 同样为对请求参数进行提取加工以供后续调用
ScheduleCreateVolumeTask中execute中有两个操作,1调用drvier_api进行volume的创建,2.如果创建过程中出现失败,则通过message将消息发送给scheduler
def execute(self, context, request_spec, filter_properties, volume): try: self.driver_api.schedule_create_volume(context, request_spec, filter_properties) except Exception as e: self.message_api.create( context, message_field.Action.SCHEDULE_ALLOCATE_VOLUME, resource_uuid=request_spec[‘volume_id‘], exception=e) with excutils.save_and_reraise_exception( reraise=not isinstance(e, exception.NoValidBackend)): try: self._handle_failure(context, request_spec, e) finally: common.error_out(volume, reason=e)
此时driver_api为SchedulerManager初始化时的scheduler_driver
可知driver_api为cinder.cheduler.filter_scheduler.FilterScheduler
def schedule_create_volume(self, context, request_spec, filter_properties): backend = self._schedule(context, request_spec, filter_properties) if not backend: raise exception.NoValidBackend(reason=_("No weighed backends " "available")) backend = backend.obj volume_id = request_spec[‘volume_id‘] updated_volume = driver.volume_update_db( context, volume_id, backend.host, backend.cluster_name, availability_zone=backend.service[‘availability_zone‘]) self._post_select_populate_filter_properties(filter_properties, backend) # context is not serializable filter_properties.pop(‘context‘, None) self.volume_rpcapi.create_volume(context, updated_volume, request_spec, filter_properties, allow_reschedule=True)
self._schedule 通过传入的参数对后端的进行选择(多个后端的情况下)
最后调用self.volume_rpcapi.create_volume进行volume的创建,volume_api为volume_rpcapi.VolumeAPI()
def create_volume(self, ctxt, volume, request_spec, filter_properties, allow_reschedule=True): cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, ‘create_volume‘, request_spec=request_spec, filter_properties=filter_properties, allow_reschedule=allow_reschedule, volume=volume)
此时cinder-schedule通过rpc对cinder-volume发送创建volume的消息,接收消息的是cinder-volume的VolumeManager
@objects.Volume.set_workers def create_volume(self, context, volume, request_spec=None, filter_properties=None, allow_reschedule=True): """Creates the volume.""" utils.log_unsupported_driver_warning(self.driver) self._set_resource_host(volume) self._update_allocated_capacity(volume) # We lose the host value if we reschedule, so keep it here original_host = volume.host context_elevated = context.elevated() if filter_properties is None: filter_properties = {} if request_spec is None: request_spec = objects.RequestSpec() try: # NOTE(flaper87): Driver initialization is # verified by the task itself. flow_engine = create_volume.get_flow( context_elevated, self, self.db, self.driver, self.scheduler_rpcapi, self.host, volume, allow_reschedule, context, request_spec, filter_properties, image_volume_cache=self.image_volume_cache, ) except Exception: msg = _("Create manager volume flow failed.") LOG.exception(msg, resource={‘type‘: ‘volume‘, ‘id‘: volume.id}) raise exception.CinderException(msg) snapshot_id = request_spec.get(‘snapshot_id‘) source_volid = request_spec.get(‘source_volid‘) if snapshot_id is not None: # Make sure the snapshot is not deleted until we are done with it. locked_action = "%s-%s" % (snapshot_id, ‘delete_snapshot‘) elif source_volid is not None: # Make sure the volume is not deleted until we are done with it. locked_action = "%s-%s" % (source_volid, ‘delete_volume‘) else: locked_action = None def _run_flow(): # This code executes create volume flow. If something goes wrong, # flow reverts all job that was done and reraises an exception. # Otherwise, all data that was generated by flow becomes available # in flow engine‘s storage. with flow_utils.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to # decide if allocated_capacity should be incremented. rescheduled = False try: if locked_action is None: _run_flow() else: with coordination.COORDINATOR.get_lock(locked_action): _run_flow() finally: try: flow_engine.storage.fetch(‘refreshed‘) except tfe.NotFound: # If there‘s no vol_ref, then flow is reverted. Lets check out # if rescheduling occurred. try: rescheduled = flow_engine.storage.get_revert_result( create_volume.OnFailureRescheduleTask.make_name( [create_volume.ACTION])) except tfe.NotFound: pass if rescheduled: # NOTE(geguileo): Volume was rescheduled so we need to update # volume stats because the volume wasn‘t created here. # Volume.host is None now, so we pass the original host value. self._update_allocated_capacity(volume, decrement=True, host=original_host) # Shared targets is only relevant for iSCSI connections. # We default to True to be on the safe side. volume.shared_targets = ( self.driver.capabilities.get(‘storage_protocol‘) == ‘iSCSI‘ and self.driver.capabilities.get(‘shared_targets‘, True)) # TODO(geguileo): service_uuid won‘t be enough on Active/Active # deployments. There can be 2 services handling volumes from the same # backend. volume.service_uuid = self.service_uuid volume.save() LOG.info("Created volume successfully.", resource=volume) return volume.id
上述代码中,同样使用了taskflow,
ExtractVolumeRefTask为提取数据库中volume的具体信息
OnFailureRescheduleTask中execute并无操作,但是revert中有操作,是为了以后的步骤出现错误进行回滚进行部分操作。
ExtractVolumeSpecTask 提取spec信息
NotifyVolumeActionTask 广播volume开始创建的消息
def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume, allow_reschedule, reschedule_context, request_spec, filter_properties, image_volume_cache=None): flow_name = ACTION.replace(":", "_") + "_manager" volume_flow = linear_flow.Flow(flow_name) # This injects the initial starting flow values into the workflow so that # the dependency order of the tasks provides/requires can be correctly # determined. create_what = { ‘context‘: context, ‘filter_properties‘: filter_properties, ‘request_spec‘: request_spec, ‘volume‘: volume, } volume_flow.add(ExtractVolumeRefTask(db, host, set_error=False)) retry = filter_properties.get(‘retry‘, None) # Always add OnFailureRescheduleTask and we handle the change of volume‘s # status when reverting the flow. Meanwhile, no need to revert process of # ExtractVolumeRefTask. do_reschedule = allow_reschedule and request_spec and retry volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, driver, scheduler_rpcapi, do_reschedule)) LOG.debug("Volume reschedule parameters: %(allow)s " "retry: %(retry)s", {‘allow‘: allow_reschedule, ‘retry‘: retry}) volume_flow.add(ExtractVolumeSpecTask(db), NotifyVolumeActionTask(db, "create.start"), CreateVolumeFromSpecTask(manager, db, driver, image_volume_cache), CreateVolumeOnFinishTask(db, "create.end")) # Now load (but do not run) the flow using the provided initial data. return taskflow.engines.load(volume_flow, store=create_what)
CreateVolumeFromSpecTask此处通过传入的create_type不同,调用不同的接口进行卷的创建,以裸磁盘为例(create_type为raw)
CreateVolumeOnFinishTask广播创建磁盘完成
if create_type == ‘raw‘: model_update = self._create_raw_volume(volume, **volume_spec) elif create_type == ‘snap‘: model_update = self._create_from_snapshot(context, volume, **volume_spec) elif create_type == ‘source_vol‘: model_update = self._create_from_source_volume( context, volume, **volume_spec) elif create_type == ‘image‘: model_update = self._create_from_image(context, volume, **volume_spec) elif create_type == ‘backup‘: model_update, need_update_volume = self._create_from_backup( context, volume, **volume_spec) volume_spec.update({‘need_update_volume‘: need_update_volume}) else: raise exception.VolumeTypeNotFound(volume_type_id=create_type)
def _create_raw_volume(self, volume, **kwargs): try: ret = self.driver.create_volume(volume) finally: self._cleanup_cg_in_volume(volume) return ret
此处self.driver为VolumeManager初始化时进行初始化的,可以看出driver是从配置文件中读取的
self.configuration = config.Configuration(volume_backend_opts, config_group=service_name) self._set_tpool_size( self.configuration.backend_native_threads_pool_size) self.stats = {} self.service_uuid = None if not volume_driver: # Get from configuration, which will get the default # if its not using the multi backend volume_driver = self.configuration.volume_driver if volume_driver in MAPPING: LOG.warning("Driver path %s is deprecated, update your " "configuration to the new path.", volume_driver) volume_driver = MAPPING[volume_driver]
配置文件中有写
def create_volume(self, volume): """Creates a logical volume.""" mirror_count = 0 if self.configuration.lvm_mirrors: mirror_count = self.configuration.lvm_mirrors self._create_volume(volume[‘name‘], self._sizestr(volume[‘size‘]), self.configuration.lvm_type, mirror_count)
def _create_volume(self, name, size, lvm_type, mirror_count, vg=None): vg_ref = self.vg if vg is not None: vg_ref = vg vg_ref.create_volume(name, size, lvm_type, mirror_count)
此处self.vg是什么?全局查找一下,具体初始化时间,可以查看上一篇,cinder服务启动中的cinder-volume启动部分
此时创建卷调用的底层代码就可以得知,调用的是lvcreate对卷进行创建。
def create_volume(self, name, size_str, lv_type=‘default‘, mirror_count=0): """Creates a logical volume on the object‘s VG. :param name: Name to use when creating Logical Volume :param size_str: Size to use when creating Logical Volume :param lv_type: Type of Volume (default or thin) :param mirror_count: Use LVM mirroring with specified count """ if lv_type == ‘thin‘: pool_path = ‘%s/%s‘ % (self.vg_name, self.vg_thin_pool) cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-T‘, ‘-V‘, size_str, ‘-n‘, name, pool_path] else: cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-n‘, name, self.vg_name, ‘-L‘, size_str] if mirror_count > 0: cmd.extend([‘--type=mirror‘, ‘-m‘, mirror_count, ‘--nosync‘, ‘--mirrorlog‘, ‘mirrored‘]) terras = int(size_str[:-1]) / 1024.0 if terras >= 1.5: rsize = int(2 ** math.ceil(math.log(terras) / math.log(2))) # NOTE(vish): Next power of two for region size. See: # http://red.ht/U2BPOD cmd.extend([‘-R‘, str(rsize)]) try: self._execute(*cmd, root_helper=self._root_helper, run_as_root=True) except putils.ProcessExecutionError as err: LOG.exception(‘Error creating Volume‘) LOG.error(‘Cmd :%s‘, err.cmd) LOG.error(‘StdOut :%s‘, err.stdout) LOG.error(‘StdErr :%s‘, err.stderr) LOG.error(‘Current state: %s‘, self.get_all_volume_groups(self._root_helper)) raise
后续就是一系列回调和通知啦