From 2f65d2aee079e756961531f411160d7883339cb8 Mon Sep 17 00:00:00 2001 From: doubletao318 Date: Sat, 30 Jan 2021 01:36:54 -0800 Subject: [PATCH] Release 2.2.RC3 --- Cinder/Victoria/__init__.py | 0 Cinder/Victoria/constants.py | 75 ++++ Cinder/Victoria/dsware.py | 700 ++++++++++++++++++++++++++++++++ Cinder/Victoria/fs_client.py | 714 +++++++++++++++++++++++++++++++++ Cinder/Victoria/fs_conf.py | 128 ++++++ Cinder/Victoria/fs_flow.py | 409 +++++++++++++++++++ Cinder/Victoria/fs_qos.py | 63 +++ Cinder/Victoria/fs_utils.py | 754 +++++++++++++++++++++++++++++++++++ 8 files changed, 2843 insertions(+) create mode 100644 Cinder/Victoria/__init__.py create mode 100644 Cinder/Victoria/constants.py create mode 100644 Cinder/Victoria/dsware.py create mode 100644 Cinder/Victoria/fs_client.py create mode 100644 Cinder/Victoria/fs_conf.py create mode 100644 Cinder/Victoria/fs_flow.py create mode 100644 Cinder/Victoria/fs_qos.py create mode 100644 Cinder/Victoria/fs_utils.py diff --git a/Cinder/Victoria/__init__.py b/Cinder/Victoria/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Cinder/Victoria/constants.py b/Cinder/Victoria/constants.py new file mode 100644 index 0000000..712db1e --- /dev/null +++ b/Cinder/Victoria/constants.py @@ -0,0 +1,75 @@ +# Copyright (c) 2016 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +DEFAULT_TIMEOUT = 50 +LOGIN_SOCKET_TIMEOUT = 32 +GET_VOLUME_PAGE_NUM = 1 +GET_VOLUME_PAGE_SIZE = 1000 +GET_SNAPSHOT_PAGE_NUM = 1 +GET_SNAPSHOT_PAGE_SIZE = 1000 +GET_QOS_PAGE_NUM = 1 +GET_QOS_PAGE_SIZE = 100 + +CONNECT_ERROR = 403 +ERROR_UNAUTHORIZED = 10000003 +ERROR_USER_OFFLINE = '1077949069' +VOLUME_NOT_EXIST = (31000000, 50150005) +SNAPSHOT_NOT_EXIST = (50150006,) + +BASIC_URI = '/dsware/service/' +CONF_PATH = "/etc/cinder/cinder.conf" +HOST_GROUP_PREFIX = "OpenStack_" + +CONF_ADDRESS = "dsware_rest_url" +CONF_MANAGER_IP = "manager_ips" +CONF_POOLS = "dsware_storage_pools" +CONF_PWD = "san_password" +CONF_USER = "san_login" + +QOS_MUST_SET = ["maxIOPS", "maxMBPS"] +QOS_KEYS = ["maxIOPS", "maxMBPS", "total_iops_sec", "total_bytes_sec"] +QOS_SCHEDULER_KEYS = ["scheduleType", "startDate", "startTime", + "durationTime", "dayOfWeek"] +QOS_PREFIX = "OpenStack_" +QOS_SCHEDULER_DEFAULT_TYPE = "0" +QOS_SCHEDULER_WEEK_TYPE = "3" +QOS_SUPPORT_SCHEDULE_VERSION = "8.0" +SECONDS_OF_DAY = 24 * 60 * 60 +SECONDS_OF_HOUR = 60 * 60 +SNAPSHOT_HEALTH_STATUS = ( + SNAPSHOT_HEALTH_STATS_NORMAL, + SNAPSHOT_HEALTH_STATS_FAULT) = (1, 2) +SNAPSHOT_RUNNING_STATUS = ( + SNAPSHOT_RUNNING_STATUS_ONLINE, + SNAPSHOT_RUNNING_STATUS_OFFLINE, + SNAPSHOT_RUNNING_STATUS_ROLLBACKING) = (27, 28, 44) +SNAPSHOT_ROLLBACK_PROGRESS_FINISH = 100 +SNAPSHOT_ROLLBACK_TIMEOUT = 60 * 60 * 24 +WAIT_INTERVAL = 10 +WEEK_DAYS = ["Sun", "Mon", "Tue", "Wed", "Thur", "Fri", "Sat"] +TIMEZONE = {"Asia/Beijing": "Asia/Shanghai"} +MAX_NAME_LENGTH = 31 +MAX_IOPS_VALUE = 999999999 +MAX_MBPS_VALUE = 999999 +HOST_FLAG = 0 +URL_NOT_FOUND = "Not Found for url" +HOST_ISCSI_RELATION_EXIST = 540157748 +DSWARE_MULTI_ERROR = 1 +HOST_ALREADY_EXIST = 50157019 +HOST_MAPPING_EXIST = 50157027 +HOST_MAPPING_GROUP_EXIST = 50157058 +HOSTGROUP_ALREADY_EXIST = 50157044 +INITIATOR_ALREADY_EXIST = 50155102 +INITIATOR_IN_HOST = 50157021 diff --git a/Cinder/Victoria/dsware.py b/Cinder/Victoria/dsware.py new file mode 100644 index 0000000..8984033 --- /dev/null +++ b/Cinder/Victoria/dsware.py @@ -0,0 +1,700 @@ +# Copyright (c) 2018 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +from multiprocessing import Lock +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import units + +from cinder import coordination +from cinder import exception +from cinder.i18n import _ +from cinder import interface +from cinder.volume import driver +from cinder.volume.drivers.fusionstorage import constants +from cinder.volume.drivers.fusionstorage import fs_client +from cinder.volume.drivers.fusionstorage import fs_conf +from cinder.volume.drivers.fusionstorage import fs_flow +from cinder.volume.drivers.fusionstorage import fs_qos +from cinder.volume.drivers.fusionstorage import fs_utils +from cinder.volume.drivers.san import san +from cinder.volume import volume_utils + +LOG = logging.getLogger(__name__) + +volume_opts = [ + cfg.DictOpt('manager_ips', + default={}, + help='This option is to support the FSA to mount across the ' + 'different nodes. The parameters takes the standard dict ' + 'config form, manager_ips = host1:ip1, host2:ip2...'), + cfg.StrOpt('dsware_rest_url', + default='', + help='The address of FusionStorage array. For example, ' + '"dsware_rest_url=xxx"'), + cfg.StrOpt('dsware_storage_pools', + default="", + help='The list of pools on the FusionStorage array, the ' + 'semicolon(;) was used to split the storage pools, ' + '"dsware_storage_pools = xxx1; xxx2; xxx3"'), + cfg.ListOpt('target_ips', + default=[], + help='The ips of FSA node were used to find the target ' + 'initiator and target ips in ISCSI initialize connection.' + ' For example: "target_ips = ip1, ip2"'), + cfg.IntOpt('scan_device_timeout', + default=3, + help='scan_device_timeout indicates the waiting time for ' + 'scanning device disks on the host. It only takes effect' + ' on SCSI. Default value is 3, the type is Int, the unit ' + 'is seconds. For example: "scan_device_timeout = 5"'), + cfg.ListOpt('iscsi_manager_groups', + default=[], + help='The ip groups of FSA node were used to find the target ' + 'initiator and target ips in ISCSI in order to balance ' + 'business network. For example: ' + '"iscsi_manager_groups = ip1;ip2;ip3, ip4;ip5;ip6"'), + cfg.BoolOpt('use_ipv6', + default=False, + help='Whether to return target_portal and target_iqn in ' + 'IPV6 format') +] + +CONF = cfg.CONF +CONF.register_opts(volume_opts) + + +@interface.volumedriver +class DSWAREBaseDriver(driver.VolumeDriver): + VERSION = '2.2.RC3' + CI_WIKI_NAME = 'Huawei_FusionStorage_CI' + + def __init__(self, *args, **kwargs): + super(DSWAREBaseDriver, self).__init__(*args, **kwargs) + + if not self.configuration: + msg = _('Configuration is not found.') + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + self.configuration.append_config_values(volume_opts) + self.configuration.append_config_values(san.san_opts) + self.conf = fs_conf.FusionStorageConf(self.configuration, self.host) + self.client = None + self.fs_qos = None + self.manager_groups = self.configuration.iscsi_manager_groups + self.lock = Lock() + + @staticmethod + def get_driver_options(): + return volume_opts + + def do_setup(self, context): + self.conf.update_config_value() + url_str = self.configuration.san_address + url_user = self.configuration.san_user + url_password = self.configuration.san_password + + self.client = fs_client.RestCommon( + fs_address=url_str, fs_user=url_user, + fs_password=url_password) + self.client.login() + self.fs_qos = fs_qos.FusionStorageQoS(self.client) + + def check_for_setup_error(self): + all_pools = self.client.query_pool_info() + all_pools_name = [p['poolName'] for p in all_pools + if p.get('poolName')] + + for pool in self.configuration.pools_name: + if pool not in all_pools_name: + msg = _('Storage pool %(pool)s does not exist ' + 'in the FusionStorage.') % {'pool': pool} + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + def _update_pool_stats(self): + backend_name = self.configuration.safe_get( + 'volume_backend_name') or self.__class__.__name__ + data = {"volume_backend_name": backend_name, + "driver_version": "2.2.RC3", + "thin_provisioning_support": False, + "pools": [], + "vendor_name": "Huawei" + } + all_pools = self.client.query_pool_info() + + for pool in all_pools: + if pool['poolName'] in self.configuration.pools_name: + single_pool_info = self._update_single_pool_info_status(pool) + data['pools'].append(single_pool_info) + return data + + def _get_capacity(self, pool_info): + pool_capacity = {} + + total = float(pool_info['totalCapacity']) / units.Ki + free = (float(pool_info['totalCapacity']) - + float(pool_info['usedCapacity'])) / units.Ki + provisioned = float(pool_info['usedCapacity']) / units.Ki + pool_capacity['total_capacity_gb'] = total + pool_capacity['free_capacity_gb'] = free + pool_capacity['provisioned_capacity_gb'] = provisioned + + return pool_capacity + + def _update_single_pool_info_status(self, pool_info): + status = {} + capacity = self._get_capacity(pool_info=pool_info) + status.update({ + "pool_name": pool_info['poolName'], + "total_capacity_gb": capacity['total_capacity_gb'], + "free_capacity_gb": capacity['free_capacity_gb'], + "provisioned_capacity_gb": capacity['provisioned_capacity_gb'], + "QoS_support": True, + 'multiattach': True, + }) + return status + + def get_volume_stats(self, refresh=False): + self.client.keep_alive() + stats = self._update_pool_stats() + return stats + + def _check_volume_exist(self, volume): + vol_name = self._get_vol_name(volume) + result = self.client.query_volume_by_name(vol_name=vol_name) + if result: + return result + + def _raise_exception(self, msg): + LOG.error(msg) + raise exception.VolumeBackendAPIException(data=msg) + + def _get_pool_id(self, volume): + pool_id = None + pool_name = volume_utils.extract_host(volume.host, level='pool') + all_pools = self.client.query_pool_info() + for pool in all_pools: + if pool_name == pool['poolName']: + pool_id = pool['poolId'] + + if pool_id is None: + msg = _('Storage pool %(pool)s does not exist on the array. ' + 'Please check.') % {"pool": pool_id} + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + return pool_id + + def _get_vol_name(self, volume): + provider_location = volume.get("provider_location", None) + if provider_location: + vol_name = json.loads(provider_location).get("name") + else: + vol_name = volume.name + return vol_name + + def _add_qos_to_volume(self, volume, vol_name): + try: + opts = fs_utils.get_volume_params(volume, self.client) + if opts.get("qos"): + self.fs_qos.add(opts["qos"], vol_name) + except Exception: + self.client.delete_volume(vol_name=vol_name) + raise + + def create_volume(self, volume): + pool_id = self._get_pool_id(volume) + vol_name = volume.name + vol_size = volume.size + vol_size *= units.Ki + self.client.create_volume( + pool_id=pool_id, vol_name=vol_name, vol_size=vol_size) + + self._add_qos_to_volume(volume, vol_name) + + def delete_volume(self, volume): + vol_name = self._get_vol_name(volume) + if self._check_volume_exist(volume): + self.fs_qos.remove(vol_name) + self.client.delete_volume(vol_name=vol_name) + + def extend_volume(self, volume, new_size): + vol_name = self._get_vol_name(volume) + if not self._check_volume_exist(volume): + msg = _("Volume: %(vol_name)s does not exist!" + ) % {"vol_name": vol_name} + self._raise_exception(msg) + else: + new_size *= units.Ki + self.client.expand_volume(vol_name, new_size) + + def _check_snapshot_exist(self, volume, snapshot): + pool_id = self._get_pool_id(volume) + snapshot_name = self._get_snapshot_name(snapshot) + result = self.client.query_snapshot_by_name( + pool_id=pool_id, snapshot_name=snapshot_name) + return result if result else None + + def _get_snapshot_name(self, snapshot): + provider_location = snapshot.get("provider_location", None) + if provider_location: + snapshot_name = json.loads(provider_location).get("name") + else: + snapshot_name = snapshot.name + return snapshot_name + + def _expand_volume_when_create(self, vol_name, vol_size): + try: + vol_info = self.client.query_volume_by_name(vol_name) + current_size = vol_info.get('volSize') + if current_size < vol_size: + self.client.expand_volume(vol_name, vol_size) + except Exception: + self.client.delete_volume(vol_name=vol_name) + raise + + def create_volume_from_snapshot(self, volume, snapshot): + vol_name = self._get_vol_name(volume) + snapshot_name = self._get_snapshot_name(snapshot) + vol_size = volume.size + + if not self._check_snapshot_exist(snapshot.volume, snapshot): + msg = _("Snapshot: %(name)s does not exist!" + ) % {"name": snapshot_name} + self._raise_exception(msg) + elif self._check_volume_exist(volume): + msg = _("Volume: %(vol_name)s already exists!" + ) % {'vol_name': vol_name} + self._raise_exception(msg) + else: + vol_size *= units.Ki + self.client.create_volume_from_snapshot( + snapshot_name=snapshot_name, vol_name=vol_name, + vol_size=vol_size) + self._add_qos_to_volume(volume, vol_name) + self._expand_volume_when_create(vol_name, vol_size) + + def create_cloned_volume(self, volume, src_volume): + vol_name = self._get_vol_name(volume) + src_vol_name = self._get_vol_name(src_volume) + + vol_size = volume.size + vol_size *= units.Ki + + if not self._check_volume_exist(src_volume): + msg = _("Volume: %(vol_name)s does not exist!" + ) % {"vol_name": src_vol_name} + self._raise_exception(msg) + else: + self.client.create_volume_from_volume( + vol_name=vol_name, vol_size=vol_size, + src_vol_name=src_vol_name) + self._add_qos_to_volume(volume, vol_name) + self._expand_volume_when_create(vol_name, vol_size) + + def create_snapshot(self, snapshot): + snapshot_name = self._get_snapshot_name(snapshot) + vol_name = self._get_vol_name(snapshot.volume) + + self.client.create_snapshot( + snapshot_name=snapshot_name, vol_name=vol_name) + + def delete_snapshot(self, snapshot): + snapshot_name = self._get_snapshot_name(snapshot) + + if self._check_snapshot_exist(snapshot.volume, snapshot): + self.client.delete_snapshot(snapshot_name=snapshot_name) + + def _get_vol_info(self, pool_id, vol_name, vol_id): + if vol_name: + return self.client.query_volume_by_name(vol_name) + + elif vol_id: + try: + return self.client.query_volume_by_id(vol_id) + except Exception: + LOG.warning("Query volume info by id failed!") + return self.client.get_volume_by_id(pool_id, vol_id) + + def _get_volume_info(self, pool_id, existing_ref): + vol_name = existing_ref.get('source-name') + vol_id = existing_ref.get('source-id') + + if not (vol_name or vol_id): + msg = _('Must specify source-name or source-id.') + raise exception.ManageExistingInvalidReference( + existing_ref=existing_ref, reason=msg) + + vol_info = self._get_vol_info(pool_id, vol_name, vol_id) + + if not vol_info: + msg = _("Can't find volume on the array, please check the " + "source-name or source-id.") + raise exception.ManageExistingInvalidReference( + existing_ref=existing_ref, reason=msg) + return vol_info + + def _check_need_changes_for_manage(self, volume, vol_name): + old_qos = {} + new_qos = {} + new_opts = fs_utils.get_volume_params(volume, self.client) + old_opts = fs_utils.get_volume_specs(self.client, vol_name) + + # Not support from existence to absence or change + if old_opts.get("qos"): + if old_opts.get("qos") != new_opts.get("qos"): + msg = (_("The current volume qos is: %(old_qos)s, the manage " + "volume qos is: %(new_qos)s") + % {"old_qos": old_opts.get("qos"), + "new_qos": new_opts.get("qos")}) + self._raise_exception(msg) + elif new_opts.get("qos"): + new_qos["qos"] = new_opts.get("qos") + old_qos["qos"] = {} + + change_opts = {"old_opts": old_qos, + "new_opts": new_qos} + + return change_opts + + def _change_qos_remove(self, vol_name, new_opts, old_opts): + if old_opts.get("qos") and not new_opts.get("qos"): + self.fs_qos.remove(vol_name) + + def _change_qos_add(self, vol_name, new_opts, old_opts): + if not old_opts.get("qos") and new_opts.get("qos"): + self.fs_qos.add(new_opts["qos"], vol_name) + + def _change_qos_update(self, vol_name, new_opts, old_opts): + if old_opts.get("qos") and new_opts.get("qos"): + self.fs_qos.update(new_opts["qos"], vol_name) + + def _change_lun(self, vol_name, new_opts, old_opts): + def _change_qos(): + self._change_qos_remove(vol_name, new_opts, old_opts) + self._change_qos_add(vol_name, new_opts, old_opts) + self._change_qos_update(vol_name, new_opts, old_opts) + + _change_qos() + + def manage_existing(self, volume, existing_ref): + pool = self._get_pool_id(volume) + vol_info = self._get_volume_info(pool, existing_ref) + vol_pool_id = vol_info.get('poolId') + vol_name = vol_info.get('volName') + + if pool != vol_pool_id: + msg = (_("The specified LUN does not belong to the given " + "pool: %s.") % pool) + raise exception.ManageExistingInvalidReference( + existing_ref=existing_ref, reason=msg) + + change_opts = self._check_need_changes_for_manage(volume, vol_name) + self._change_lun(vol_name, change_opts.get("new_opts"), + change_opts.get("old_opts")) + + provider_location = {"name": vol_name} + return {'provider_location': json.dumps(provider_location)} + + def manage_existing_get_size(self, volume, existing_ref): + pool = self._get_pool_id(volume) + vol_info = self._get_volume_info(pool, existing_ref) + remainder = float(vol_info.get("volSize")) % units.Ki + + if remainder != 0: + msg = _("The volume size must be an integer multiple of 1 GB.") + self._raise_exception(msg) + + size = float(vol_info.get("volSize")) / units.Ki + return int(size) + + def unmanage(self, volume): + return + + def _get_snapshot_info(self, volume, existing_ref): + snapshot_name = existing_ref.get('source-name') + if not snapshot_name: + msg = _("Can't find volume on the array, please check the " + "source-name.") + raise exception.ManageExistingInvalidReference( + existing_ref=existing_ref, reason=msg) + + pool_id = self._get_pool_id(volume) + snapshot_info = self.client.query_snapshot_by_name( + pool_id, snapshot_name=snapshot_name) + if not snapshot_info: + msg = _("Can't find snapshot on the array.") + raise exception.ManageExistingInvalidReference( + existing_ref=existing_ref, reason=msg) + + return snapshot_info + + def _check_snapshot_match_volume(self, vol_name, snapshot_name): + snapshot_info = self.client.query_snapshots_of_volume( + vol_name, snapshot_name) + return snapshot_info + + def manage_existing_snapshot(self, snapshot, existing_ref): + volume = snapshot.volume + snapshot_info = self._get_snapshot_info(volume, existing_ref) + vol_name = self._get_vol_name(volume) + if not self._check_snapshot_match_volume( + vol_name, snapshot_info.get("snapName")): + msg = (_("The specified snapshot does not belong to the given " + "volume: %s.") % vol_name) + raise exception.ManageExistingInvalidReference( + existing_ref=existing_ref, reason=msg) + + provider_location = {"name": snapshot_info.get('snapName')} + return {'provider_location': json.dumps(provider_location)} + + def manage_existing_snapshot_get_size(self, snapshot, existing_ref): + snapshot_info = self._get_snapshot_info(snapshot.volume, existing_ref) + remainder = float(snapshot_info.get("snapSize")) % units.Ki + + if remainder != 0: + msg = _("The snapshot size must be an integer multiple of 1 GB.") + self._raise_exception(msg) + size = float(snapshot_info.get("snapSize")) / units.Ki + return int(size) + + def unmanage_snapshot(self, snapshot): + return + + def _check_need_changes_for_retype(self, volume, new_type, host, vol_name): + before_change = {} + after_change = {} + if volume.host != host["host"]: + msg = (_("Do not support retype between different host. Volume's " + "host: %(vol_host)s, host's host: %(host)s") + % {"vol_host": volume.host, "host": host['host']}) + LOG.error(msg) + raise exception.InvalidInput(msg) + + old_opts = fs_utils.get_volume_specs(self.client, vol_name) + new_opts = fs_utils.get_volume_type_params(new_type, self.client) + if old_opts.get('qos') != new_opts.get('qos'): + before_change["qos"] = old_opts.get("qos") + after_change["qos"] = new_opts.get("qos") + + change_opts = {"old_opts": before_change, + "new_opts": after_change} + return change_opts + + def retype(self, context, volume, new_type, diff, host): + LOG.info("Retype volume: %(vol)s, new_type: %(new_type)s, " + "diff: %(diff)s, host: %(host)s", + {"vol": volume.id, + "new_type": new_type, + "diff": diff, + "host": host}) + + vol_name = self._get_vol_name(volume) + change_opts = self._check_need_changes_for_retype( + volume, new_type, host, vol_name) + self._change_lun(vol_name, change_opts.get("new_opts"), + change_opts.get("old_opts")) + + return True, None + + def _rollback_snapshot(self, vol_name, snap_name): + def _snapshot_rollback_finish(): + snapshot_info = self.client.get_snapshot_info_by_name(snap_name) + if not snapshot_info: + msg = (_("Failed to get rollback info with snapshot %s.") + % snap_name) + self._raise_exception(msg) + + if snapshot_info.get('health_status') not in ( + constants.SNAPSHOT_HEALTH_STATS_NORMAL,): + msg = _("The snapshot %s is abnormal.") % snap_name + self._raise_exception(msg) + + if (snapshot_info.get('rollback_progress') == + constants.SNAPSHOT_ROLLBACK_PROGRESS_FINISH or + snapshot_info.get('rollback_endtime')): + LOG.info("Snapshot %s rollback successful.", snap_name) + return True + return False + + if fs_utils.is_snapshot_rollback_available(self.client, snap_name): + self.client.rollback_snapshot(vol_name, snap_name) + + try: + fs_utils.wait_for_condition( + _snapshot_rollback_finish, constants.WAIT_INTERVAL, + constants.SNAPSHOT_ROLLBACK_TIMEOUT) + except exception.VolumeBackendAPIException: + self.client.cancel_rollback_snapshot(snap_name) + raise + + def revert_to_snapshot(self, context, volume, snapshot): + vol_name = self._get_vol_name(volume) + snap_name = self._get_snapshot_name(snapshot) + if not self._check_snapshot_exist(snapshot.volume, snapshot): + msg = _("Snapshot: %(name)s does not exist!" + ) % {"name": snap_name} + self._raise_exception(msg) + + if not self._check_volume_exist(volume): + msg = _("Volume: %(vol_name)s does not exist!" + ) % {'vol_name': vol_name} + self._raise_exception(msg) + + self._rollback_snapshot(vol_name, snap_name) + + def create_export(self, context, volume, connector): + pass + + def ensure_export(self, context, volume): + pass + + def remove_export(self, context, volume): + pass + + +class DSWAREDriver(DSWAREBaseDriver): + def get_volume_stats(self, refresh=False): + stats = DSWAREBaseDriver.get_volume_stats(self, refresh) + stats['storage_protocol'] = 'SCSI' + return stats + + def _get_manager_ip(self, context): + if self.configuration.manager_ips.get(context['host']): + return self.configuration.manager_ips.get(context['host']) + else: + msg = _("The required host: %(host)s and its manager ip are not " + "included in the configuration file." + ) % {"host": context['host']} + self._raise_exception(msg) + + def _attach_volume(self, context, volume, properties, remote=False): + vol_name = self._get_vol_name(volume) + if not self._check_volume_exist(volume): + msg = _("Volume: %(vol_name)s does not exist!" + ) % {"vol_name": vol_name} + self._raise_exception(msg) + manager_ip = self._get_manager_ip(properties) + result = self.client.attach_volume(vol_name, manager_ip) + attach_path = result[vol_name][0]['devName'].encode('unicode-escape') + attach_info = dict() + attach_info['device'] = dict() + attach_info['device']['path'] = attach_path + if attach_path == '': + msg = _("Host attach volume failed!") + self._raise_exception(msg) + return attach_info, volume + + def _detach_volume(self, context, attach_info, volume, properties, + force=False, remote=False, ignore_errors=False): + vol_name = self._get_vol_name(volume) + if self._check_volume_exist(volume): + manager_ip = self._get_manager_ip(properties) + self.client.detach_volume(vol_name, manager_ip) + + def initialize_connection(self, volume, connector): + vol_name = self._get_vol_name(volume) + manager_ip = self._get_manager_ip(connector) + if not self._check_volume_exist(volume): + msg = _("Volume: %(vol_name)s does not exist!" + ) % {"vol_name": vol_name} + self._raise_exception(msg) + self.client.attach_volume(vol_name, manager_ip) + volume_info = self.client.query_volume_by_name(vol_name=vol_name) + vol_wwn = volume_info.get('wwn') + by_id_path = "/dev/disk/by-id/" + "wwn-0x%s" % vol_wwn + properties = {'device_path': by_id_path} + import time + LOG.info("Wait %(t)s second(s) for scanning the target device %(dev)s." + % {"t": self.configuration.scan_device_timeout, + "dev": by_id_path}) + time.sleep(self.configuration.scan_device_timeout) + return {'driver_volume_type': 'local', + 'data': properties} + + def terminate_connection(self, volume, connector, **kwargs): + attachments = volume.volume_attachment + if volume.multiattach and len(attachments) > 1 and sum( + 1 for a in attachments if a.connector == connector) > 1: + LOG.info("Volume is multi-attach and attached to the same host" + " multiple times") + return + + if self._check_volume_exist(volume): + manager_ip = self._get_manager_ip(connector) + vol_name = self._get_vol_name(volume) + self.client.detach_volume(vol_name, manager_ip) + LOG.info("Terminate iscsi connection successful.") + + +class DSWAREISCSIDriver(DSWAREBaseDriver): + def check_for_setup_error(self): + super(DSWAREISCSIDriver, self).check_for_setup_error() + fs_utils.check_iscsi_group_valid( + self.client, self.manager_groups, self.configuration.use_ipv6) + + def get_volume_stats(self, refresh=False): + stats = DSWAREBaseDriver.get_volume_stats(self, refresh) + stats['storage_protocol'] = 'iSCSI' + return stats + + @coordination.synchronized('huawei-mapping-{connector[host]}') + def initialize_connection(self, volume, connector): + LOG.info("Start to initialize iscsi connection, volume: %(vol)s, " + "connector: %(con)s", {"vol": volume, "con": connector}) + if not self._check_volume_exist(volume): + msg = _('The volume: %(vol)s is not on the ' + 'array') % {'vol': volume} + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + vol_name = self._get_vol_name(volume) + properties = fs_flow.initialize_iscsi_connection( + self.client, vol_name, connector, self.configuration, + self.manager_groups, self.lock) + + LOG.info("Finish initialize iscsi connection, return: %s, the " + "remaining manager groups are %s", + properties, self.manager_groups) + return {'driver_volume_type': 'iscsi', 'data': properties} + + def terminate_connection(self, volume, connector, **kwargs): + host = connector['host'] if 'host' in connector else "" + + @coordination.synchronized('huawei-mapping-{host}') + def _terminate_connection_locked(host): + LOG.info("Start to terminate iscsi connection, volume: %(vol)s, " + "connector: %(con)s", {"vol": volume, "con": connector}) + attachments = volume.volume_attachment + if volume.multiattach and len(attachments) > 1 and sum( + 1 for a in attachments if a.connector == connector) > 1: + LOG.info("Volume is multi-attach and attached to the same host" + " multiple times") + return + + if not self._check_volume_exist(volume): + LOG.info("Terminate_connection, volume %(vol)s is not exist " + "on the array ", {"vol": volume}) + return + + vol_name = self._get_vol_name(volume) + fs_flow.terminate_iscsi_connection( + self.client, vol_name, connector) + + LOG.info("Terminate iscsi connection successful.") + return _terminate_connection_locked(host) diff --git a/Cinder/Victoria/fs_client.py b/Cinder/Victoria/fs_client.py new file mode 100644 index 0000000..7067349 --- /dev/null +++ b/Cinder/Victoria/fs_client.py @@ -0,0 +1,714 @@ +# Copyright (c) 2018 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import requests +import six +import time + +from cinder import exception +from cinder.i18n import _ +from cinder.volume.drivers.fusionstorage import constants + +from oslo_log import log as logging +LOG = logging.getLogger(__name__) + + +class RestCommon(object): + def __init__(self, fs_address, fs_user, fs_password): + self.address = fs_address + self.user = fs_user + self.password = fs_password + + self.session = None + self.token = None + self.version = None + + self.init_http_head() + + LOG.warning("Suppressing requests library SSL Warnings") + requests.packages.urllib3.disable_warnings( + requests.packages.urllib3.exceptions.InsecureRequestWarning) + requests.packages.urllib3.disable_warnings( + requests.packages.urllib3.exceptions.InsecurePlatformWarning) + + def init_http_head(self): + self.session = requests.Session() + self.session.headers.update({ + "Content-Type": "application/json;charset=UTF-8", + }) + self.session.verify = False + + def _construct_url(self, url, get_version, get_system_time): + if get_system_time: + return self.address + url + elif get_version: + return self.address + constants.BASIC_URI + url + else: + return self.address + constants.BASIC_URI + "v1.2" + url + + @staticmethod + def _deal_call_result(result, filter_flag, json_flag, req_dict): + if not filter_flag: + LOG.info(''' + Request URL: %(url)s, + Call Method: %(method)s, + Request Data: %(data)s, + Response Data: %(res)s, + Result Data: %(res_json)s''', {'url': req_dict.get("url"), + 'method': req_dict.get("method"), + 'data': req_dict.get("data"), + 'res': result, + 'res_json': result.json()}) + + return result.json() if json_flag else result + + def call(self, url, method, data=None, + call_timeout=constants.DEFAULT_TIMEOUT, **input_kwargs): + filter_flag = input_kwargs.get("filter_flag") + json_flag = input_kwargs.get("json_flag", True) + get_version = input_kwargs.get("get_version") + get_system_time = input_kwargs.get("get_system_time") + + kwargs = {'timeout': call_timeout} + if data is not None: + kwargs['data'] = json.dumps(data) + + call_url = self._construct_url(url, get_version, get_system_time) + func = getattr(self.session, method.lower()) + + try: + result = func(call_url, **kwargs) + except Exception as err: + LOG.error('Bad response from server: %(url)s. ' + 'Error: %(err)s'), {'url': call_url, 'err': err} + return {"error": { + "code": constants.CONNECT_ERROR, + "description": "Connect to server error."}} + + try: + result.raise_for_status() + except requests.HTTPError as exc: + return {"error": {"code": exc.response.status_code, + "description": six.text_type(exc)}} + + req_dict = {"url": call_url, "method": method, "data": data} + return self._deal_call_result(result, filter_flag, json_flag, req_dict) + + @staticmethod + def _assert_rest_result(result, err_str): + if isinstance(result.get('result'), dict): + if result['result'].get("code") != 0: + msg = (_('%(err)s\nresult: %(res)s.') % {'err': err_str, + 'res': result}) + LOG.error(msg) + raise exception.VolumeBackendAPIException(data=msg) + elif result.get('result') != 0: + msg = (_('%(err)s\nresult: %(res)s.') % {'err': err_str, + 'res': result}) + LOG.error(msg) + raise exception.VolumeBackendAPIException(data=msg) + + def get_version(self): + url = 'rest/version' + self.session.headers.update({ + "Referer": self.address + constants.BASIC_URI + }) + result = self.call(url=url, method='GET', get_version=True) + self._assert_rest_result(result, _('Get version session error.')) + if result.get("currentVersion"): + self.version = result["currentVersion"] + + def login(self): + self.get_version() + url = '/sec/login' + data = {"userName": self.user, "password": self.password} + result = self.call(url, 'POST', data=data, + call_timeout=constants.LOGIN_SOCKET_TIMEOUT, + filter_flag=True, json_flag=False) + self._assert_rest_result(result.json(), _('Login session error.')) + self.token = result.headers['X-Auth-Token'] + + self.session.headers.update({ + "x-auth-token": self.token + }) + + def logout(self): + url = '/sec/logout' + if self.address: + result = self.call(url, 'POST') + self._assert_rest_result(result, _('Logout session error.')) + + def keep_alive(self): + url = '/sec/keepAlive' + result = self.call(url, 'POST', filter_flag=True) + + if (result.get('result') == constants.ERROR_UNAUTHORIZED or + result.get("errorCode") == constants.ERROR_USER_OFFLINE): + try: + self.login() + except Exception: + LOG.error('The FusionStorage may have been powered off. ' + 'Power on the FusionStorage and then log in.') + raise + else: + self._assert_rest_result(result, _('Keep alive session error.')) + + def query_pool_info(self, pool_id=None): + pool_id = str(pool_id) + if pool_id != 'None': + url = '/storagePool' + '?poolId=' + pool_id + else: + url = '/storagePool' + result = self.call(url, 'GET', filter_flag=True) + self._assert_rest_result(result, _("Query pool session error.")) + return result['storagePools'] + + def _get_volume_num_by_pool(self, pool_id): + pool_info = self.query_pool_info(pool_id) + return pool_info[0].get('volumeNum', 0) + + def _query_volumes_by_batch(self, pool_id, page_num, page_size=1000): + url = '/volume/list' + params = {'poolId': pool_id, + 'pageNum': page_num, 'pageSize': page_size} + + result = self.call(url, 'POST', params) + if result.get('errorCode') in constants.VOLUME_NOT_EXIST: + return None + self._assert_rest_result( + result, "Query all volume session error") + return result.get('volumeList') + + def get_volume_by_id(self, pool_id, vol_id): + vol_cnt = self._get_volume_num_by_pool(pool_id) + page_num = constants.GET_VOLUME_PAGE_NUM + page_size = constants.GET_VOLUME_PAGE_SIZE + while vol_cnt > 0: + vol_list = self._query_volumes_by_batch(pool_id, page_num, + page_size) + for vol_info in vol_list: + if int(vol_info.get('volId')) == int(vol_id): + return vol_info + + vol_cnt -= page_size + page_num += 1 + return None + + def _query_snapshot_of_volume_batch(self, vol_name, snapshot_name, + batch_num=1, batch_limit=1000): + url = '/volume/snapshot/list' + params = {"volName": vol_name, "batchLimit": batch_limit, + "batchNum": batch_num, + "filters": {"volumeName": snapshot_name}} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, 'Query snapshots of volume session error.') + return result + + @staticmethod + def _get_snapshot_from_result(batch_result, snapshot_key, snapshot_name): + for res in batch_result.get('snapshotList', []): + if res.get(snapshot_key) == snapshot_name: + return res + + def query_snapshots_of_volume(self, vol_name, snapshot_name): + batch_num = constants.GET_SNAPSHOT_PAGE_NUM + batch_size = constants.GET_SNAPSHOT_PAGE_SIZE + while True: + batch_result = self._query_snapshot_of_volume_batch( + vol_name, snapshot_name, batch_num, batch_size) + snapshot_info = self._get_snapshot_from_result( + batch_result, 'snapshotName', snapshot_name) + if snapshot_info: + return snapshot_info + if batch_result.get('totalNum') < batch_size: + break + batch_num += 1 + return None + + def query_volume_by_name(self, vol_name): + url = '/volume/queryByName?volName=' + vol_name + result = self.call(url, 'GET') + if result.get('errorCode') in constants.VOLUME_NOT_EXIST: + return None + self._assert_rest_result( + result, _("Query volume by name session error")) + return result.get('lunDetailInfo') + + def query_volume_by_id(self, vol_id): + url = '/volume/queryById?volId=' + vol_id + result = self.call(url, 'GET') + if result.get('errorCode') in constants.VOLUME_NOT_EXIST: + return None + self._assert_rest_result( + result, _("Query volume by ID session error")) + return result.get('lunDetailInfo') + + def create_volume(self, vol_name, vol_size, pool_id): + url = '/volume/create' + params = {"volName": vol_name, "volSize": vol_size, "poolId": pool_id} + result = self.call(url, "POST", params) + self._assert_rest_result(result, _('Create volume session error.')) + + def delete_volume(self, vol_name): + url = '/volume/delete' + params = {"volNames": [vol_name]} + result = self.call(url, "POST", params) + if result.get('errorCode') in constants.VOLUME_NOT_EXIST: + return None + self._assert_rest_result(result, _('Delete volume session error.')) + + def attach_volume(self, vol_name, manage_ip): + url = '/volume/attach' + params = {"volName": [vol_name], "ipList": [manage_ip]} + result = self.call(url, "POST", params) + self._assert_rest_result(result, _('Attach volume session error.')) + + if int(result[vol_name][0]['errorCode']) != 0: + msg = _("Host attach volume failed!") + LOG.error(msg) + raise exception.VolumeBackendAPIException(data=msg) + return result + + def detach_volume(self, vol_name, manage_ip): + url = '/volume/detach/' + params = {"volName": [vol_name], "ipList": [manage_ip]} + result = self.call(url, "POST", params) + self._assert_rest_result(result, _('Detach volume session error.')) + + def expand_volume(self, vol_name, new_vol_size): + url = '/volume/expand' + params = {"volName": vol_name, "newVolSize": new_vol_size} + result = self.call(url, "POST", params) + self._assert_rest_result(result, _('Expand volume session error.')) + + def _query_snapshot_by_name_batch(self, pool_id, snapshot_name, + batch_num=1, batch_size=1000): + url = '/snapshot/list' + params = {"poolId": pool_id, "pageNum": batch_num, + "pageSize": batch_size, + "filters": {"volumeName": snapshot_name}} + + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _('query snapshot list session error.')) + return result + + def query_snapshot_by_name(self, pool_id, snapshot_name): + batch_num = constants.GET_SNAPSHOT_PAGE_NUM + batch_size = constants.GET_SNAPSHOT_PAGE_SIZE + while True: + batch_result = self._query_snapshot_by_name_batch( + pool_id, snapshot_name, batch_num, batch_size) + snapshot_info = self._get_snapshot_from_result( + batch_result, 'snapName', snapshot_name) + if snapshot_info: + return snapshot_info + if batch_result.get('totalNum') < batch_size: + break + batch_num += 1 + return None + + def create_snapshot(self, snapshot_name, vol_name): + url = '/snapshot/create/' + params = {"volName": vol_name, "snapshotName": snapshot_name} + result = self.call(url, "POST", params) + self._assert_rest_result(result, _('Create snapshot error.')) + + def delete_snapshot(self, snapshot_name): + url = '/snapshot/delete/' + params = {"snapshotName": snapshot_name} + result = self.call(url, "POST", params) + if result.get('errorCode') in constants.SNAPSHOT_NOT_EXIST: + return None + self._assert_rest_result(result, _('Delete snapshot session error.')) + + def create_volume_from_snapshot(self, snapshot_name, vol_name, vol_size): + url = '/snapshot/volume/create/' + params = {"src": snapshot_name, "volName": vol_name, + "volSize": vol_size} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _('Create volume from snapshot session error.')) + + def create_volume_from_volume(self, vol_name, vol_size, src_vol_name): + temp_snapshot_name = "temp" + src_vol_name + "clone" + vol_name + + self.create_snapshot(vol_name=src_vol_name, + snapshot_name=temp_snapshot_name) + + self.create_volume_from_snapshot(snapshot_name=temp_snapshot_name, + vol_name=vol_name, vol_size=vol_size) + + self.delete_snapshot(snapshot_name=temp_snapshot_name) + + @staticmethod + def _is_detail_error(result, detail_error_code): + if result.get("result", "") == constants.DSWARE_MULTI_ERROR: + for err in result.get("detail", []): + if err.get("errorCode") == detail_error_code: + return True + return False + return True + + def create_host(self, host_name): + url = '/host/create' + params = {"hostName": host_name} + result = self.call(url, "POST", params) + if self._is_detail_error(result, constants.HOST_ALREADY_EXIST): + return None + + self._assert_rest_result(result, _('Create host session error.')) + + def delete_host(self, host_name): + url = '/host/delete' + params = {"hostName": host_name} + result = self.call(url, "POST", params) + if self._is_detail_error(result, constants.HOST_MAPPING_EXIST): + return None + + self._assert_rest_result(result, _('Delete host session error.')) + + def get_all_host(self): + url = '/host/list' + result = self.call(url, "GET") + self._assert_rest_result(result, _('Get all host session error')) + return result.get("hostList", []) + + def get_host_by_volume(self, vol_name): + url = '/lun/host/list' + params = {"lunName": vol_name} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Get host by volume name session error")) + return result.get("hostList", []) + + def map_volume_to_host(self, host_name, vol_name): + url = '/host/lun/add' + params = {"hostName": host_name, "lunNames": [vol_name]} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Map volumes to host session error")) + + def unmap_volume_from_host(self, host_name, vol_name): + url = '/host/lun/delete' + params = {"hostName": host_name, "lunNames": [vol_name]} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Unmap volumes from host session error")) + + def get_host_lun(self, host_name): + url = '/host/lun/list' + params = {"hostName": host_name} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, "Get host mapped lun info session error") + return result.get("hostLunList", []) + + def get_associate_initiator_by_host_name(self, host_name): + url = '/port/host/list' + params = {"hostName": host_name} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, "Get associate initiator by host name session error") + return result.get("portList", []) + + def create_hostgroup(self, host_group_name): + url = '/hostGroup/add' + params = {"hostGroupName": host_group_name} + result = self.call(url, "POST", params) + if self._is_detail_error(result, constants.HOSTGROUP_ALREADY_EXIST): + return None + self._assert_rest_result( + result, _("Create HostGroup session error")) + + def delete_hostgroup(self, host_group_name): + url = '/hostGroup/delete' + params = {"hostGroupName": host_group_name} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Delete HostGroup session error")) + + def get_all_hostgroup(self): + url = '/hostGroup/list' + result = self.call(url, "GET") + self._assert_rest_result(result, _("Get HostGroup session error")) + return result.get("groupList", []) + + def add_host_to_hostgroup(self, host_group_name, host_name): + url = '/hostGroup/host/add' + params = {"hostGroupName": host_group_name, "hostList": [host_name]} + result = self.call(url, "POST", params) + if self._is_detail_error(result, constants.HOST_MAPPING_GROUP_EXIST): + return None + + self._assert_rest_result( + result, _("Add host to HostGroup session error")) + + def remove_host_from_hostgroup(self, host_group_name, host_name): + url = '/hostGroup/host/delete' + params = {"hostGroupName": host_group_name, "hostList": [host_name]} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Delete host from HostGroup session error")) + + def get_host_in_hostgroup(self, host_group_name): + url = '/hostGroup/host/list' + params = {"hostGroupName": host_group_name} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Get host in HostGroup session error")) + return result.get("hostList", []) + + def get_all_initiator_on_array(self): + url = '/port/list' + params = {} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Get all initiator on array session error")) + return result.get("portList", []) + + def add_initiator_to_array(self, initiator_name): + url = 'iscsi/createPort' + params = {"portName": initiator_name} + result = self.call(url, "POST", params, get_version=True) + if self._is_detail_error(result, constants.INITIATOR_ALREADY_EXIST): + return None + self._assert_rest_result( + result, _("Add initiator to array session error")) + + def remove_initiator_from_array(self, initiator_name): + url = 'iscsi/deletePort' + params = {"portName": initiator_name} + result = self.call(url, "POST", params, get_version=True) + self._assert_rest_result( + result, _("Remove initiator from array session error")) + + def add_initiator_to_host(self, host_name, initiator): + url = '/host/port/add' + params = {"hostName": host_name, "portNames": [initiator]} + result = self.call(url, "POST", params) + if self._is_detail_error(result, constants.INITIATOR_IN_HOST): + associate_host_ini = self.get_associate_initiator_by_host_name( + host_name) + if initiator in associate_host_ini: + return None + self._assert_rest_result( + result, _("Add initiator to host session error")) + + def delete_initiator_from_host(self, host_name, initiator): + url = '/host/port/delete' + params = {"hostName": host_name, "portNames": [initiator]} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Delete initiator from host session error")) + + def get_host_associate_initiator(self, initiator): + url = '/host/port/list' + params = {"portName": [initiator]} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Get host by initiator session error")) + return result['portHostMap'].get(initiator, []) + + def get_target_port(self, target_ip): + url = "/iscsi/port/list" + params = {"nodeMgrIps": [target_ip]} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Get iscsi port info session error")) + return result.get("nodeResultList", []) + + def create_qos(self, qos_name, qos_params): + url = "/qos/create" + params = {"qosName": qos_name, "qosSpecInfo": qos_params} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Create QoS session error")) + + def delete_qos(self, qos_name): + url = "/qos/delete" + params = {"qosNames": [qos_name]} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Delete QoS session error")) + + def modify_qos(self, qos_name, qos_params): + url = "/qos/modify" + params = {"qosName": qos_name, "qosSpecInfo": qos_params} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Modify QoS session error")) + + def associate_qos_with_volume(self, vol_name, qos_name): + url = "/qos/volume/associate" + params = {"keyNames": [vol_name], "qosName": qos_name} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Associate QoS with volume session error")) + + def disassociate_qos_with_volume(self, vol_name, qos_name): + url = "/qos/volume/disassociate" + params = {"keyNames": [vol_name], "qosName": qos_name} + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Disassociate QoS with volume session error")) + + def get_qos_by_vol_name(self, vol_name): + url = "/volume/qos?volName=%s" % vol_name + result = self.call(url, "GET") + self._assert_rest_result( + result, _("Get QoS by volume name session error")) + + return result + + def get_qos_volume_info(self, pool_id, qos_name, + batch_num=1, batch_size=5): + url = "/qos/volume/list?type=associated" + params = {"pageNum": batch_num, + "pageSize": batch_size, + "queryType": "volume", + "qosName": qos_name, + "poolId": pool_id} + + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Get QoS info session error")) + return result.get("volumes", []) + + def get_fsm_version(self): + url = "/version" + result = self.call(url, "GET") + self._assert_rest_result( + result, _("Get FSM version session error.")) + return result.get("version") + + def get_system_time_zone(self): + url = "/time/querytimezone" + result = self.call(url, "GET") + self._assert_rest_result( + result, _("Get system time zone session error.")) + + return result.get("timeZone") + + def get_time_config(self): + url = "/api/v2/common/time_config" + result = self.call(url, "GET", get_system_time=True) + if result.get('result', {}).get("code") != 0: + msg = (_('Get system time config session error. result: %(res)s.') + % {'res': result}) + LOG.error(msg) + raise exception.VolumeBackendAPIException(data=msg) + if result.get("data"): + return result.get("data")[0] + return {} + + def get_snapshot_info_by_name(self, snapshot_name): + url = "/api/v2/block_service/snapshots" + params = {"name": snapshot_name} + result = self.call(url, "GET", params, get_system_time=True) + self._assert_rest_result( + result, _("Get snapshot info session error.")) + return result.get("data", {}) + + def rollback_snapshot(self, vol_name, snapshot_name): + url = "/snapshot/rollback" + params = {"snapshotName": snapshot_name, + "volumeName": vol_name + } + result = self.call(url, "POST", params) + self._assert_rest_result( + result, _("Rollback snapshot session error.")) + + def cancel_rollback_snapshot(self, snapshot_name): + url = "/api/v2/block_service/snapshots" + params = {"name": snapshot_name, + "action": "cancel_rollback" + } + result = self.call(url, "POST", params, get_system_time=True) + self._assert_rest_result( + result, _("Cancel rollback snapshot session error.")) + + def get_iscsi_portal(self): + url = "/dsware/service/cluster/dswareclient/queryIscsiPortal" + result = self.call(url, "POST", data={}, get_system_time=True) + self._assert_rest_result( + result, _("Get ISCSI portal session error.")) + return result.get("nodeResultList", []) + + def get_host_iscsi_service(self, host_name): + url = "/api/v2/block_service/iscsi_sessions" + params = {"host_name": host_name} + result = self.call(url, "GET", params, get_system_time=True) + self._assert_rest_result( + result, _("Get host iscsi service session error.")) + return result.get("data", []) + + def add_iscsi_host_relation(self, host_name, ip_list): + if not ip_list: + return + url = "/dsware/service/iscsi/addIscsiHostRelation" + ip_strings = ";".join(ip_list) + params = [{"content": ip_strings, "key": host_name, "flag": 0}] + try: + result = self.call(url, "POST", params, get_system_time=True) + if result.get("errorCode") == constants.HOST_ISCSI_RELATION_EXIST: + result = self.get_iscsi_host_relation(host_name) + if result: + return None + self._assert_rest_result( + result, _("Add iscsi host relation session error.")) + except Exception as err: + if constants.URL_NOT_FOUND in six.text_type(err): + return None + else: + raise + + def get_iscsi_host_relation(self, host_name): + iscsi_ips = [] + url = "/dsware/service/iscsi/queryIscsiHostRelation" + params = [{"key": host_name, "flag": 0}] + try: + result = self.call(url, "POST", params, get_system_time=True) + self._assert_rest_result( + result, _("Get iscsi host relation session error.")) + + for iscsi in result.get("hostList", []): + if int(iscsi.get("flag")) == constants.HOST_FLAG: + iscsi_ips = iscsi.get("content", "").split(";") + return iscsi_ips + except Exception as err: + if constants.URL_NOT_FOUND in six.text_type(err): + return iscsi_ips + else: + raise + + def delete_iscsi_host_relation(self, host_name, ip_list): + if not ip_list: + return + + url = "/dsware/service/iscsi/deleteIscsiHostRelation" + ip_strings = ";".join(ip_list) + params = [{"content": ip_strings, "key": host_name, "flag": 0}] + try: + result = self.call(url, "POST", params, get_system_time=True) + self._assert_rest_result( + result, _("Delete iscsi host relation session error.")) + except Exception as err: + if constants.URL_NOT_FOUND in six.text_type(err): + return None + else: + raise diff --git a/Cinder/Victoria/fs_conf.py b/Cinder/Victoria/fs_conf.py new file mode 100644 index 0000000..7f8b8d9 --- /dev/null +++ b/Cinder/Victoria/fs_conf.py @@ -0,0 +1,128 @@ +# Copyright (c) 2018 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import os +import six + +from oslo_log import log as logging +from six.moves import configparser + +from cinder import exception +from cinder.i18n import _ +from cinder import utils +from cinder.volume.drivers.fusionstorage import constants + + +LOG = logging.getLogger(__name__) + + +class FusionStorageConf(object): + def __init__(self, configuration, host): + self.configuration = configuration + self._check_host(host) + + def _check_host(self, host): + if host and len(host.split('@')) > 1: + self.host = host.split('@')[1] + else: + msg = _("The host %s is not reliable. Please check cinder-volume " + "backend.") % host + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + def update_config_value(self): + self._encode_authentication() + self._pools_name() + self._san_address() + self._san_user() + self._san_password() + + @staticmethod + def _encode_param(encode_param): + need_encode = False + if encode_param is not None and not encode_param.startswith('!&&&'): + encoded = base64.b64encode(six.b(encode_param)).decode() + encode_param = '!&&&' + encoded + need_encode = True + return need_encode, encode_param + + def _encode_authentication(self): + name_node = self.configuration.safe_get(constants.CONF_USER) + pwd_node = self.configuration.safe_get(constants.CONF_PWD) + + name_encode, name_node = self._encode_param(name_node) + pwd_encode, pwd_node = self._encode_param(pwd_node) + + if name_encode or pwd_encode: + self._rewrite_conf(name_node, pwd_node) + + def _rewrite_conf(self, name_node, pwd_node): + if os.path.exists(constants.CONF_PATH): + utils.execute("chmod", "666", constants.CONF_PATH, + run_as_root=True) + conf = configparser.ConfigParser() + conf.read(constants.CONF_PATH) + if name_node: + conf.set(self.host, constants.CONF_USER, name_node) + if pwd_node: + conf.set(self.host, constants.CONF_PWD, pwd_node) + fh = open(constants.CONF_PATH, 'w') + conf.write(fh) + fh.close() + utils.execute("chmod", "644", constants.CONF_PATH, + run_as_root=True) + + def _assert_text_result(self, text, mess): + if not text: + msg = _("%s is not configured.") % mess + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + def _san_address(self): + address = self.configuration.safe_get(constants.CONF_ADDRESS) + self._assert_text_result(address, mess=constants.CONF_ADDRESS) + setattr(self.configuration, 'san_address', address) + + def _decode_text(self, text): + return (base64.b64decode(six.b(text[4:])).decode() if + text.startswith('!&&&') else text) + + def _san_user(self): + user_text = self.configuration.safe_get(constants.CONF_USER) + self._assert_text_result(user_text, mess=constants.CONF_USER) + user = self._decode_text(user_text) + setattr(self.configuration, 'san_user', user) + + def _san_password(self): + pwd_text = self.configuration.safe_get(constants.CONF_PWD) + self._assert_text_result(pwd_text, mess=constants.CONF_PWD) + pwd = self._decode_text(pwd_text) + setattr(self.configuration, 'san_password', pwd) + + def _pools_name(self): + pools_name = self.configuration.safe_get(constants.CONF_POOLS) + self._assert_text_result(pools_name, mess=constants.CONF_POOLS) + pools = set(x.strip() for x in pools_name.split(';') if x.strip()) + if not pools: + msg = _('No valid storage pool configured.') + LOG.error(msg) + raise exception.InvalidInput(msg) + setattr(self.configuration, 'pools_name', list(pools)) + + def _manager_ip(self): + manager_ips = self.configuration.safe_get(constants.CONF_MANAGER_IP) + self._assert_text_result(manager_ips, mess=constants.CONF_MANAGER_IP) + setattr(self.configuration, 'manager_ips', manager_ips) diff --git a/Cinder/Victoria/fs_flow.py b/Cinder/Victoria/fs_flow.py new file mode 100644 index 0000000..f756060 --- /dev/null +++ b/Cinder/Victoria/fs_flow.py @@ -0,0 +1,409 @@ +# Copyright (c) 2019 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +import taskflow.engines +from taskflow.patterns import linear_flow +from taskflow import task +from taskflow.types import failure + +from cinder import exception +from cinder.i18n import _ +from cinder.volume.drivers.fusionstorage import fs_utils + + +LOG = logging.getLogger(__name__) + + +class CheckLunInHostTask(task.Task): + default_provides = 'is_lun_in_host' + + def __init__(self, client, *args, **kwargs): + super(CheckLunInHostTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_name): + is_lun_in_host = fs_utils.is_lun_in_host(self.client, host_name) + if is_lun_in_host: + LOG.info("The host %s is attached by other lun.", host_name) + return is_lun_in_host + + +class CreateHostCheckTask(task.Task): + def __init__(self, client, *args, **kwargs): + super(CreateHostCheckTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_name): + if not fs_utils.is_host_add_to_array(self.client, host_name): + LOG.info("Create a new host: %s on the array", host_name) + self.client.create_host(host_name) + else: + LOG.info("The host: %s is already on the array", host_name) + + +class DeleteHostWithCheck(task.Task): + def __init__(self, client, *args, **kwargs): + super(DeleteHostWithCheck, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_name, is_host_in_group): + if not is_host_in_group and fs_utils.is_host_add_to_array( + self.client, host_name): + LOG.info("Delete host: %s from the array", host_name) + host_iscsi = self.client.get_iscsi_host_relation(host_name) + if host_iscsi: + self.client.delete_iscsi_host_relation(host_name, host_iscsi) + self.client.delete_host(host_name) + + +class CreateHostGroupWithCheckTask(task.Task): + def __init__(self, client, *args, **kwargs): + super(CreateHostGroupWithCheckTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_group_name): + if not fs_utils.is_hostgroup_add_to_array( + self.client, host_group_name): + LOG.info("Create a HostGroup: %s on the array", host_group_name) + self.client.create_hostgroup(host_group_name) + else: + LOG.info("The HostGroup: %s is already on the array", + host_group_name) + + +class DeleteHostGroupWithCheck(task.Task): + def __init__(self, client, *args, **kwargs): + super(DeleteHostGroupWithCheck, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_group_name, is_host_in_group): + if not is_host_in_group and fs_utils.is_host_group_empty( + self.client, host_group_name): + LOG.info("Delete HostGroup: %s from the array", host_group_name) + self.client.delete_hostgroup(host_group_name) + + +class AddHostToHostGroupTask(task.Task): + def __init__(self, client, *args, **kwargs): + super(AddHostToHostGroupTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_name, host_group_name): + if not fs_utils.is_host_in_host_group(self.client, host_name, + host_group_name): + LOG.info("Add host: %(host)s to HostGroup: %(HostGroup)s", + {"host": host_name, "HostGroup": host_group_name}) + self.client.add_host_to_hostgroup(host_group_name, host_name) + else: + LOG.info("The host: %(host)s is already in HostGroup: " + "%(HostGroup)s", {"host": host_name, + "HostGroup": host_group_name}) + + +class RemoveHostFromHostGroupWithCheck(task.Task): + default_provides = 'is_host_in_group' + + def __init__(self, client, *args, **kwargs): + super(RemoveHostFromHostGroupWithCheck, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_group_name, host_name, is_initiator_in_host): + is_host_in_group = True + if not is_initiator_in_host and fs_utils.is_host_in_host_group( + self.client, host_name, host_group_name): + LOG.info("Remove host: %(host)s from HostGroup: %(HostGroup)s", + {"host": host_name, "HostGroup": host_group_name}) + self.client.remove_host_from_hostgroup(host_group_name, host_name) + is_host_in_group = False + return is_host_in_group + + +class AddInitiatorWithCheckTask(task.Task): + def __init__(self, client, *args, **kwargs): + super(AddInitiatorWithCheckTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, initiator_name): + if not fs_utils.is_initiator_add_to_array(self.client, initiator_name): + LOG.info("Create a new initiator: %s on the array", initiator_name) + self.client.add_initiator_to_array(initiator_name) + else: + LOG.info("The initiator: %s is already on the array", + initiator_name) + + +class RemoveInitiatorWithCheck(task.Task): + def __init__(self, client, *args, **kwargs): + super(RemoveInitiatorWithCheck, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, initiator_list, is_initiator_in_host): + if not is_initiator_in_host: + for initiator in initiator_list: + host_list = self.client.get_host_associate_initiator(initiator) + if not host_list: + LOG.info("Remove initiator: %s from the array", initiator) + self.client.remove_initiator_from_array(initiator) + + +class AssociateInitiatorToHostTask(task.Task): + def __init__(self, client, *args, **kwargs): + super(AssociateInitiatorToHostTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, initiator_name, host_name): + if not fs_utils.is_initiator_associate_to_host( + self.client, host_name, initiator_name): + LOG.info("Associate initiator: %(initiator)s to host: %(host)s.", + {"initiator": initiator_name, "host": host_name}) + self.client.add_initiator_to_host(host_name, initiator_name) + else: + LOG.info("The initiator: %(initiator)s is already associate to " + "host: %(host)s.", {"initiator": initiator_name, + "host": host_name}) + + +class DeleteInitiatorFromHostWithCheck(task.Task): + default_provides = ('is_initiator_in_host', 'initiator_list') + + def __init__(self, client, *args, **kwargs): + super(DeleteInitiatorFromHostWithCheck, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_name, is_lun_in_host): + is_initiator_in_host = True + initiator_list = [] + if not is_lun_in_host: + initiator_list = self.client.get_associate_initiator_by_host_name( + host_name) + for initiator in initiator_list: + LOG.info("Dissociate initiator: %(init)s with host: %(host)s.", + {"init": initiator, "host": host_name}) + self.client.delete_initiator_from_host(host_name, + initiator) + is_initiator_in_host = False + return is_initiator_in_host, initiator_list + + +class MapLunToHostTask(task.Task): + def __init__(self, client, *args, **kwargs): + super(MapLunToHostTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_name, vol_name): + LOG.info("Map lun: %(lun)s to host %(host)s.", + {"lun": vol_name, "host": host_name}) + self.client.map_volume_to_host(host_name, vol_name) + + def revert(self, result, host_name, vol_name, **kwargs): + LOG.warning("Revert map lun to host task.") + if isinstance(result, failure.Failure): + return + self.client.unmap_volume_from_host(host_name, vol_name) + + +class UnMapLunFromHostTask(task.Task): + def __init__(self, client, *args, **kwargs): + super(UnMapLunFromHostTask, self).__init__(*args, **kwargs) + self.client = client + + def execute(self, host_name, vol_name): + LOG.info("Unmap lun: %(lun)s with host %(host)s.", + {"lun": vol_name, "host": host_name}) + self.client.unmap_volume_from_host(host_name, vol_name) + + +class GetISCSIProperties(task.Task): + default_provides = 'properties' + + def __init__(self, client, configuration, manager_groups, thread_lock, + *args, **kwargs): + super(GetISCSIProperties, self).__init__(*args, **kwargs) + self.client = client + self.configuration = configuration + self.manager_groups = manager_groups + self.thread_lock = thread_lock + + @staticmethod + def _construct_properties(multipath, target_lun, target_ips, target_iqns): + properties = {} + if multipath: + properties.update({ + "target_luns": [target_lun] * len(target_ips), + "target_iqns": target_iqns, + "target_portals": target_ips, + }) + else: + properties.update({ + "target_lun": target_lun, + "target_iqn": target_iqns[0], + "target_portal": target_ips[0], + }) + return properties + + def _find_target_ips(self): + config_target_ips = self.configuration.target_ips + target_ips, target_iqns = [], [] + for tgt_ip in config_target_ips: + target_ip, target_iqn = fs_utils.get_target_portal( + self.client, tgt_ip, self.configuration.use_ipv6) + if not target_ip: + continue + + target_portal, __ = fs_utils.format_target_portal(target_ip) + target_ips.append(target_portal) + target_iqns.append(target_iqn) + + if not target_ips: + msg = _("There is no valid target ip in %s.") % config_target_ips + LOG.warning(msg) + raise exception.InvalidInput(msg) + + return target_ips, target_iqns + + def _find_iscsi_ips(self, host_name): + valid_iscsi_ips, valid_node_ips = fs_utils.get_valid_iscsi_info( + self.client) + target_ips, target_iqns = fs_utils.get_iscsi_info_from_host( + self.client, host_name, valid_iscsi_ips) + + iscsi_manager_groups = self.configuration.iscsi_manager_groups + if not target_ips: + (node_ips, target_ips, target_iqns + ) = fs_utils.get_iscsi_info_from_conf( + self.manager_groups, iscsi_manager_groups, + self.configuration.use_ipv6, + valid_iscsi_ips, valid_node_ips, self.thread_lock) + if target_ips: + self.client.add_iscsi_host_relation(host_name, node_ips) + + if not target_ips: + msg = (_( + "Can not find a valid target ip in iscsi_manager_groups %s") + % iscsi_manager_groups) + LOG.warning(msg) + raise exception.InvalidInput(msg) + + LOG.info("Get iscsi target info, target ips: %s, target iqns: %s" + % (target_ips, target_iqns)) + return target_ips, target_iqns + + def execute(self, host_name, vol_name, multipath): + LOG.info("Get ISCSI initialize connection properties.") + target_lun = fs_utils.get_target_lun(self.client, host_name, vol_name) + + if self.configuration.iscsi_manager_groups: + target_ips, target_iqns = self._find_iscsi_ips(host_name) + else: + target_ips, target_iqns = self._find_target_ips() + + return self._construct_properties(multipath, target_lun, + target_ips, target_iqns) + + +def get_iscsi_required_params(vol_name, connector, client=None): + if "host" in connector: + host_name = fs_utils.encode_host_name(connector['host']) + host_group_name = fs_utils.encode_host_group_name(host_name) + initiator_name = connector['initiator'] + multipath = connector.get("multipath") + else: + host_list = client.get_host_by_volume(vol_name) + if len(host_list) > 1: + msg = ('Terminate_connection: multiple mapping of volume %(vol)s ' + 'found, no host specified, host_list: ' + '%(host)s') % {'vol': vol_name, 'host': host_list} + LOG.warning(msg) + return None, None, None, None, None + elif len(host_list) == 1: + host_name = host_list[0]['hostName'] + host_group_name = fs_utils.encode_host_group_name(host_name) + initiator_name = None + multipath = None + else: + LOG.info("Terminate_connection: the volume %(vol)s does not map " + "to any host", {"vol": vol_name}) + return None, None, None, None, None + + LOG.info("Get iscsi required params. volume: %(vol)s, host: %(host)s," + " host_group: %(host_group)s, initiator: %(initiator)s, " + "multipath: %(multipath)s", + {"vol": vol_name, "host": host_name, + "host_group": host_group_name, "initiator": initiator_name, + "multipath": multipath}) + return vol_name, host_name, host_group_name, initiator_name, multipath + + +def initialize_iscsi_connection(client, vol_name, connector, configuration, + manager_groups, thread_lock): + (vol_name, host_name, host_group_name, initiator_name, + multipath) = get_iscsi_required_params(vol_name, connector) + + store_spec = {'vol_name': vol_name, + 'host_name': host_name, + 'host_group_name': host_group_name, + 'initiator_name': initiator_name, + 'multipath': multipath, + 'connector_host_name': connector.get("host")} + work_flow = linear_flow.Flow('initialize_iscsi_connection') + + if fs_utils.is_volume_associate_to_host(client, vol_name, host_name): + LOG.info("Volume: %(vol)s has associated to the host: %(host)s", + {"vol": vol_name, "host": host_name}) + else: + work_flow.add( + CreateHostCheckTask(client), + CreateHostGroupWithCheckTask(client), + AddHostToHostGroupTask(client), + AddInitiatorWithCheckTask(client), + AssociateInitiatorToHostTask(client), + MapLunToHostTask(client) + ) + + work_flow.add( + GetISCSIProperties(client, configuration, manager_groups, thread_lock) + ) + + engine = taskflow.engines.load(work_flow, store=store_spec) + engine.run() + return engine.storage.fetch('properties') + + +def terminate_iscsi_connection(client, vol_name, connector): + (vol_name, host_name, host_group_name, + _, _) = get_iscsi_required_params(vol_name, connector, client) + + store_spec = {'vol_name': vol_name, + 'host_name': host_name, + 'host_group_name': host_group_name} + work_flow = linear_flow.Flow('terminate_iscsi_connection') + if host_name and fs_utils.is_host_add_to_array(client, host_name): + if fs_utils.is_volume_associate_to_host(client, vol_name, host_name): + work_flow.add( + UnMapLunFromHostTask(client) + ) + work_flow.add( + CheckLunInHostTask(client), + DeleteInitiatorFromHostWithCheck(client), + RemoveInitiatorWithCheck(client), + RemoveHostFromHostGroupWithCheck(client), + DeleteHostWithCheck(client), + DeleteHostGroupWithCheck(client) + ) + + engine = taskflow.engines.load(work_flow, store=store_spec) + engine.run() diff --git a/Cinder/Victoria/fs_qos.py b/Cinder/Victoria/fs_qos.py new file mode 100644 index 0000000..731c45e --- /dev/null +++ b/Cinder/Victoria/fs_qos.py @@ -0,0 +1,63 @@ +# Copyright (c) 2019 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo_log import log as logging + +from cinder import exception +from cinder.volume.drivers.fusionstorage import constants + +LOG = logging.getLogger(__name__) + + +class FusionStorageQoS(object): + def __init__(self, client): + self.client = client + + def add(self, qos, vol_name): + localtime = time.strftime('%Y%m%d%H%M%S', time.localtime()) + qos_name = constants.QOS_PREFIX + localtime + self.client.create_qos(qos_name, qos) + try: + self.client.associate_qos_with_volume(vol_name, qos_name) + except exception.VolumeBackendAPIException: + self.remove(vol_name) + raise + + def _is_qos_associate_to_volume(self, qos_name): + all_pools = self.client.query_pool_info() + volumes = None + for pool in all_pools: + volumes = self.client.get_qos_volume_info( + pool.get('poolId'), qos_name) + if volumes: + break + return volumes + + def remove(self, vol_name): + vol_qos = self.client.get_qos_by_vol_name(vol_name) + qos_name = vol_qos.get("qosName") + if qos_name: + self.client.disassociate_qos_with_volume(vol_name, qos_name) + + if not self._is_qos_associate_to_volume(qos_name): + self.client.delete_qos(qos_name) + + def update(self, qos, vol_name): + vol_qos = self.client.get_qos_by_vol_name(vol_name) + qos_name = vol_qos.get("qosName") + if qos_name: + self.client.modify_qos(qos_name, qos) diff --git a/Cinder/Victoria/fs_utils.py b/Cinder/Victoria/fs_utils.py new file mode 100644 index 0000000..a850b14 --- /dev/null +++ b/Cinder/Victoria/fs_utils.py @@ -0,0 +1,754 @@ +# Copyright (c) 2019 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import hashlib +import ipaddress +import pytz +import random +import six +import time + +from oslo_log import log as logging +from oslo_service import loopingcall +from oslo_utils import units + +from cinder import context +from cinder import exception +from cinder.i18n import _ +from cinder import objects +from cinder.volume.drivers.fusionstorage import constants +from cinder.volume import qos_specs +from cinder.volume import volume_types + + +LOG = logging.getLogger(__name__) + + +def is_volume_associate_to_host(client, vol_name, host_name): + lun_host_list = client.get_host_by_volume(vol_name) + for host in lun_host_list: + if host.get('hostName') == host_name: + return host.get("lunId") + + +def is_initiator_add_to_array(client, initiator_name): + initiator_list = client.get_all_initiator_on_array() + for initiator in initiator_list: + if initiator.get('portName') == initiator_name: + return initiator.get('portName') + + +def is_initiator_associate_to_host(client, host_name, initiator_name): + initiator_list = client.get_associate_initiator_by_host_name(host_name) + return initiator_name in initiator_list + + +def get_target_lun(client, host_name, vol_name): + hostlun_list = client.get_host_lun(host_name) + for hostlun in hostlun_list: + if hostlun.get("lunName") == vol_name: + return hostlun.get("lunId") + + +def _get_target_portal(port_list, use_ipv6): + for port in port_list: + if port.get("iscsiStatus") == "active": + iscsi_portal = ":".join(port.get("iscsiPortal").split(":")[:-1]) + ip_addr = ipaddress.ip_address(six.text_type(iscsi_portal)) + if use_ipv6 ^ ip_addr.version == 6: + continue + + return port.get("iscsiPortal"), port.get('targetName') + return None, None + + +def get_target_portal(client, target_ip, use_ipv6): + tgt_portal = client.get_target_port(target_ip) + for node_portal in tgt_portal: + if node_portal.get("nodeMgrIp") == target_ip: + port_list = node_portal.get("iscsiPortalList", []) + return _get_target_portal(port_list, use_ipv6) + + +def is_lun_in_host(client, host_name): + hostlun_list = client.get_host_lun(host_name) + return len(hostlun_list) + + +def is_host_add_to_array(client, host_name): + all_hosts = client.get_all_host() + for host in all_hosts: + if host.get("hostName") == host_name: + return host.get("hostName") + + +def is_hostgroup_add_to_array(client, host_group_name): + all_host_groups = client.get_all_hostgroup() + for host_group in all_host_groups: + if host_group.get("hostGroupName") == host_group_name: + return host_group.get("hostGroupName") + + +def is_host_group_empty(client, host_group_name): + all_host = client.get_host_in_hostgroup(host_group_name) + return not all_host + + +def is_host_in_host_group(client, host_name, host_group_name): + all_host = client.get_host_in_hostgroup(host_group_name) + return host_name in all_host + + +def get_volume_params(volume, client): + volume_type = _get_volume_type(volume) + return get_volume_type_params(volume_type, client) + + +def _get_volume_type(volume): + if volume.volume_type: + return volume.volume_type + if volume.volume_type_id: + return volume_types.get_volume_type(None, volume.volume_type_id) + + +def get_volume_type_params(volume_type, client): + vol_params = {} + + if isinstance(volume_type, dict) and volume_type.get('qos_specs_id'): + vol_params['qos'] = _get_qos_specs(volume_type['qos_specs_id'], client) + elif isinstance(volume_type, objects.VolumeType + ) and volume_type.qos_specs_id: + vol_params['qos'] = _get_qos_specs(volume_type.qos_specs_id, client) + + LOG.info('volume opts %s.', vol_params) + return vol_params + + +def _get_trigger_qos(qos, client): + if qos.get(constants.QOS_SCHEDULER_KEYS[0]): + if client.get_fsm_version() >= constants.QOS_SUPPORT_SCHEDULE_VERSION: + qos = _check_and_convert_qos(qos, client) + else: + msg = _('FusionStorage Version is not suitable for QoS: %s') % qos + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + return qos + + +def _is_qos_specs_valid(specs): + if specs is None: + return False + + if specs.get('consumer') == 'front-end': + return False + return True + + +def _raise_qos_not_set(qos): + if not set(constants.QOS_MUST_SET).intersection(set(qos.keys())): + msg = _('One of %s must be set for QoS: %s') % ( + constants.QOS_KEYS, qos) + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + +def _raise_qos_is_invalid(qos_key): + if qos_key not in constants.QOS_KEYS + constants.QOS_SCHEDULER_KEYS: + msg = _('QoS key %s is not valid.') % qos_key + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + +def _set_qos(qos, qos_key, qos_value): + if qos_key in constants.QOS_KEYS: + if int(qos_value) <= 0: + msg = _('QoS value for %s must > 0.') % qos_key + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + # the maxIOPS priority is greater than total_iops_sec and + # the maxMBPS priority is greater than total_bytes_sec + if qos_key == "maxIOPS": + qos['maxIOPS'] = int(qos_value) + elif qos_key == "total_iops_sec" and qos.get("maxIOPS") is None: + qos['maxIOPS'] = int(qos_value) + elif qos_key == "maxMBPS": + qos['maxMBPS'] = int(qos_value) + elif qos_key == "total_bytes_sec" and qos.get("maxMBPS") is None: + qos_value = int(qos_value) + if 0 < qos_value < units.Mi: + qos_value = units.Mi + qos['maxMBPS'] = int(qos_value / units.Mi) + elif qos_key in constants.QOS_SCHEDULER_KEYS: + qos[qos_key] = qos_value.strip() + return qos + + +def _set_default_qos(qos): + if not qos.get('maxIOPS'): + qos["maxIOPS"] = constants.MAX_IOPS_VALUE + if not qos.get('maxMBPS'): + qos["maxMBPS"] = constants.MAX_MBPS_VALUE + if "total_iops_sec" in qos: + qos.pop("total_iops_sec") + if "total_bytes_sec" in qos: + qos.pop("total_bytes_sec") + + +def _get_qos_specs(qos_specs_id, client): + ctxt = context.get_admin_context() + specs = qos_specs.get_qos_specs(ctxt, qos_specs_id) + if not _is_qos_specs_valid(specs): + return {} + + kvs = specs.get('specs', {}) + LOG.info('The QoS specs is: %s.', kvs) + + qos = dict() + for k, v in kvs.items(): + _raise_qos_is_invalid(k) + qos = _set_qos(qos, k, v) + + _raise_qos_not_set(qos) + _set_default_qos(qos) + qos = _get_trigger_qos(qos, client) + + return qos + + +def _deal_date_increase_or_decrease(is_date_decrease, is_date_increase, qos): + if is_date_decrease: + config_date_sec = qos[constants.QOS_SCHEDULER_KEYS[1]] + qos[constants.QOS_SCHEDULER_KEYS[1]] = (config_date_sec - + constants.SECONDS_OF_DAY) + + if is_date_increase: + config_date_sec = qos[constants.QOS_SCHEDULER_KEYS[1]] + qos[constants.QOS_SCHEDULER_KEYS[1]] = (config_date_sec + + constants.SECONDS_OF_DAY) + return qos + + +def _check_default_scheduler(qos, is_default_scheduler, configed_none_default): + if is_default_scheduler and configed_none_default: + msg = (_("The default scheduler: %(type)s is not allowed to config " + "other scheduler policy") + % {"type": qos[constants.QOS_SCHEDULER_KEYS[0]]}) + LOG.error(msg) + raise exception.InvalidInput(msg) + + +def _check_week_scheduler(qos, configed_week_scheduler, configed_none_default): + if configed_week_scheduler and ( + configed_none_default != len(constants.QOS_SCHEDULER_KEYS) - 1): + msg = (_("The week scheduler type %(type)s params number are " + "incorrect.") + % {"type": qos[constants.QOS_SCHEDULER_KEYS[0]]}) + LOG.error(msg) + raise exception.InvalidInput(msg) + + +def _check_scheduler_count(qos, is_default_scheduler, configed_week_scheduler, + configed_none_default): + if (not is_default_scheduler and not configed_week_scheduler and + configed_none_default != len(constants.QOS_SCHEDULER_KEYS) - 2): + msg = (_("The scheduler type %(type)s params number are incorrect.") + % {"type": qos[constants.QOS_SCHEDULER_KEYS[0]]}) + LOG.error(msg) + raise exception.InvalidInput(msg) + + +def _check_and_convert_qos(qos, client): + configed_none_default = 0 + sys_loc_time = _get_sys_time(client) + sys_loc_time = time.strptime(datetime.datetime.now(sys_loc_time).strftime( + "%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S") + + (qos, is_default_scheduler, + configed_week_scheduler) = _convert_schedule_type(qos) + + qos, configed_none_default = _convert_start_date( + qos, sys_loc_time, configed_none_default) + + (qos, configed_none_default, + is_date_decrease, is_date_increase) = _convert_start_time( + qos, client, sys_loc_time, configed_none_default) + + qos, configed_none_default = _convert_duration_time( + qos, configed_none_default) + + qos, configed_none_default = _convert_day_of_week( + qos, configed_none_default) + + _check_default_scheduler(qos, is_default_scheduler, configed_none_default) + _check_week_scheduler(qos, configed_week_scheduler, configed_none_default) + _check_scheduler_count(qos, is_default_scheduler, configed_week_scheduler, + configed_none_default) + + return _deal_date_increase_or_decrease( + is_date_decrease, is_date_increase, qos) + + +def _get_sys_time(client): + time_zone = client.get_system_time_zone() + try: + sys_loc_time = pytz.timezone(time_zone) + except Exception as err: + LOG.warning("Time zone %(zone)s does not exist in the operating " + "system, reason: %(err)s" + % {"zone": time_zone, "err": err}) + sys_loc_time = pytz.timezone(constants.TIMEZONE[time_zone]) + return sys_loc_time + + +def _deal_dst_time(time_config, cur_time): + LOG.info("Current system time is %(cur)s.", {"cur": cur_time}) + use_dst = int(time_config.get("use_dst", 0)) + # Current time is or not dst time + cur_is_in_dst = False + if use_dst: + start_time = time_config["dst_begin_date"] + end_time = time_config["dst_end_date"] + if (end_time >= cur_time >= start_time or + cur_time <= end_time < start_time or + end_time < start_time <= cur_time): + cur_is_in_dst = True + + LOG.info("Current date in DST: %(cur)s.", {"cur": cur_is_in_dst}) + return cur_is_in_dst + + +def _get_qos_time_params_east_zone(time_zone, config_sec, + cur_date_in_dst_time): + is_date_decrease = False + if cur_date_in_dst_time: + time_zone += constants.SECONDS_OF_HOUR + + if config_sec >= time_zone: + qos_time_params = int(config_sec - time_zone) + else: + qos_time_params = int(config_sec + (constants.SECONDS_OF_DAY + - time_zone)) + is_date_decrease = True + return qos_time_params, is_date_decrease + + +def _get_qos_time_params_west_zone(time_zone, config_sec, + cur_date_in_dst_time): + is_date_increase = False + if cur_date_in_dst_time: + time_zone -= constants.SECONDS_OF_HOUR + if config_sec + time_zone < constants.SECONDS_OF_DAY: + qos_time_params = int(config_sec + time_zone) + else: + qos_time_params = int(config_sec + time_zone - + constants.SECONDS_OF_DAY) + is_date_increase = True + return qos_time_params, is_date_increase + + +def _get_qos_time_params(zone_flag, time_zone, config_sec, + cur_date_in_dst_time): + LOG.info("time_zone is: %(time_zone)s, zone flag is: %(zone)s " + "config time_seconds is: %(config)s", + {"time_zone": time_zone, "zone": zone_flag, + "config": config_sec}) + is_date_increase = False + is_date_decrease = False + if zone_flag: + qos_time_params, is_date_decrease = _get_qos_time_params_east_zone( + time_zone, config_sec, cur_date_in_dst_time) + else: + qos_time_params, is_date_increase = _get_qos_time_params_west_zone( + time_zone, config_sec, cur_date_in_dst_time) + LOG.info("qos time is: %(time)s, is_date_decrease is %(decrease)s, " + "is_date_increase is %(crease)s" % + {"time": qos_time_params, + "decrease": is_date_decrease, + "crease": is_date_increase}) + return qos_time_params, is_date_decrease, is_date_increase + + +def _convert_schedule_type(qos): + is_default_scheduler = True + configed_week_scheduler = False + schedule_type = constants.QOS_SCHEDULER_KEYS[0] + if qos.get(schedule_type): + # Distinguish type + if qos[schedule_type] != constants.QOS_SCHEDULER_DEFAULT_TYPE: + is_default_scheduler = False + if qos[schedule_type] == constants.QOS_SCHEDULER_WEEK_TYPE: + configed_week_scheduler = True + qos[schedule_type] = int(qos[schedule_type]) + + return qos, is_default_scheduler, configed_week_scheduler + + +def _get_diff_time(time_config): + time_zone = time_config.get("time_zone") + if not time_zone: + msg = _("The time zone info %s is invalid.") % time_zone + LOG.info(msg) + raise exception.InvalidInput(msg) + + zone_flag, time_zone = ((False, time_zone.split("-")[1]) + if "-" in time_zone + else (True, time_zone.split("+")[1])) + time_zone = time.strptime(time_zone, '%H:%M') + diff_time = datetime.timedelta(hours=time_zone.tm_hour, + minutes=time_zone.tm_min).seconds + return zone_flag, diff_time + + +def _convert_start_date(qos, sys_loc_time, configed_none_default): + start_date = constants.QOS_SCHEDULER_KEYS[1] + sys_date_time = time.strftime("%Y-%m-%d", sys_loc_time) + diff_utc_time = time.altzone if time.daylight else time.timezone + if qos.get(start_date): + # Convert the config date to timestamp + cur_date = time.mktime(time.strptime( + sys_date_time, '%Y-%m-%d')) - diff_utc_time + try: + config_date = time.mktime(time.strptime( + qos[start_date], '%Y-%m-%d')) - diff_utc_time + except Exception as err: + msg = (_("The start date %(date)s is illegal. Reason: %(err)s") + % {"date": qos[start_date], "err": err}) + LOG.error(msg) + raise exception.InvalidInput(msg) + + if config_date < cur_date: + msg = (_("The start date %(date)s is earlier than current " + "time") % {"date": qos[start_date]}) + LOG.error(msg) + raise exception.InvalidInput(msg) + qos[start_date] = int(config_date) + configed_none_default += 1 + return qos, configed_none_default + + +def _convert_start_time(qos, client, sys_loc_time, configed_none_default): + start_date = constants.QOS_SCHEDULER_KEYS[1] + start_time = constants.QOS_SCHEDULER_KEYS[2] + is_date_increase = False + is_date_decrease = False + sys_dst_time = time.strftime("%m-%d %H:%M:%S", sys_loc_time) + if qos.get(start_time): + if qos.get(start_date) is None: + msg = (_("The start date %(date)s is not config.") + % {"date": qos.get(start_date)}) + LOG.error(msg) + raise exception.InvalidInput(msg) + # Convert the config time to green time + try: + config_time = time.strptime(qos[start_time], "%H:%M") + except Exception as err: + msg = (_("The start time %(time)s is illegal. Reason: %(err)s") + % {"time": qos[start_time], "err": err}) + LOG.error(msg) + raise exception.InvalidInput(msg) + config_sec = datetime.timedelta( + hours=config_time.tm_hour, minutes=config_time.tm_min).seconds + + time_config = client.get_time_config() + + cur_date_in_dst_time = _deal_dst_time( + time_config, sys_dst_time) + + LOG.info("System time is: %s", sys_loc_time) + zone_flag, time_zone = _get_diff_time(time_config) + + (qos_time_params, is_date_decrease, + is_date_increase) = _get_qos_time_params( + zone_flag, time_zone, config_sec, + cur_date_in_dst_time) + + qos[start_time] = qos_time_params + configed_none_default += 1 + return qos, configed_none_default, is_date_decrease, is_date_increase + + +def _convert_duration_time(qos, configed_none_default): + duration_time = constants.QOS_SCHEDULER_KEYS[3] + if qos.get(duration_time): + # Convert the config duration time to seconds + if qos[duration_time] == "24:00": + config_duration_sec = constants.SECONDS_OF_DAY + else: + try: + config_duration_time = time.strptime( + qos[duration_time], "%H:%M") + except Exception as err: + msg = (_("The duration time %(time)s is illegal. " + "Reason: %(err)s") + % {"time": qos[duration_time], "err": err}) + LOG.error(msg) + raise exception.InvalidInput(msg) + + config_duration_sec = datetime.timedelta( + hours=config_duration_time.tm_hour, + minutes=config_duration_time.tm_min).seconds + qos[duration_time] = int(config_duration_sec) + configed_none_default += 1 + return qos, configed_none_default + + +def _is_config_weekday_valid(config_days_list, config_days): + for config in config_days_list: + if config not in constants.WEEK_DAYS: + msg = (_("The week day %s is illegal.") % config_days) + LOG.error(msg) + raise exception.InvalidInput(msg) + + +def _convert_day_of_week(qos, configed_none_default): + day_of_week = constants.QOS_SCHEDULER_KEYS[4] + if qos.get(day_of_week): + # Convert the week days + config_days = 0 + config_days_list = qos[day_of_week].split() + _is_config_weekday_valid(config_days_list, qos[day_of_week]) + + for index in range(len(constants.WEEK_DAYS)): + if constants.WEEK_DAYS[index] in config_days_list: + config_days += pow(2, index) + qos[day_of_week] = int(config_days) + configed_none_default += 1 + return qos, configed_none_default + + +def get_volume_specs(client, vol_name): + vol_info = {} + qos_info = {} + vol_qos = client.get_qos_by_vol_name(vol_name) + for key, value in vol_qos.get("qosSpecInfo", {}).items(): + if (key in (constants.QOS_KEYS + constants.QOS_SCHEDULER_KEYS) and + int(value)): + qos_info[key] = int(value) + vol_info['qos'] = qos_info + return vol_info + + +def is_snapshot_rollback_available(client, snap_name): + snapshot_info = client.get_snapshot_info_by_name(snap_name) + + running_status = snapshot_info.get("running_status") + health_status = snapshot_info.get("health_status") + + if running_status not in ( + constants.SNAPSHOT_RUNNING_STATUS_ONLINE, + constants.SNAPSHOT_RUNNING_STATUS_ROLLBACKING): + err_msg = (_("The running status %(status)s of snapshot %(name)s.") + % {"status": running_status, "name": snap_name}) + LOG.error(err_msg) + raise exception.InvalidSnapshot(reason=err_msg) + + if health_status not in (constants.SNAPSHOT_HEALTH_STATS_NORMAL, ): + err_msg = (_("The health status %(status)s of snapshot %(name)s.") + % {"status": running_status, "name": snap_name}) + LOG.error(err_msg) + raise exception.InvalidSnapshot(reason=err_msg) + + if constants.SNAPSHOT_RUNNING_STATUS_ONLINE == snapshot_info.get( + 'running_status'): + return True + + return False + + +def wait_for_condition(func, interval, timeout): + start_time = time.time() + + def _inner(): + result = func() + + if result: + raise loopingcall.LoopingCallDone() + + if int(time.time()) - start_time > timeout: + msg = (_('wait_for_condition: %s timed out.') + % func.__name__) + LOG.error(msg) + raise exception.VolumeBackendAPIException(data=msg) + + timer = loopingcall.FixedIntervalLoopingCall(_inner) + timer.start(interval=interval).wait() + + +def encode_host_name(host_name): + if host_name and len(host_name) > constants.MAX_NAME_LENGTH: + encoded_name = hashlib.md5(host_name.encode('utf-8')).hexdigest() + return encoded_name[:constants.MAX_NAME_LENGTH] + else: + return host_name + + +def encode_host_group_name(host_name): + host_group_name = constants.HOST_GROUP_PREFIX + host_name + if host_group_name and len(host_group_name) > constants.MAX_NAME_LENGTH: + return host_name + else: + return host_group_name + + +def get_valid_iscsi_info(client): + valid_iscsi_ips = {} + valid_node_ips = {} + all_iscsi_portal = client.get_iscsi_portal() + for iscsi_info in all_iscsi_portal: + if iscsi_info['status'] != 'successful': + continue + iscsi_portal_list = iscsi_info["iscsiPortalList"] + iscsi_ips = [] + for portal in iscsi_portal_list: + if portal["iscsiStatus"] == "active": + target_portal, iscsi_ip = format_target_portal( + portal["iscsiPortal"]) + + iscsi_ips.append(iscsi_ip) + valid_iscsi_ips[iscsi_ip] = { + "iscsi_portal": target_portal, + "iscsi_target_iqn": portal["targetName"]} + valid_node_ips[iscsi_info["nodeMgrIp"]] = iscsi_ips + + LOG.info("valid iscsi ips info is: %s, valid node ips is %s", + valid_iscsi_ips, valid_node_ips) + return valid_iscsi_ips, valid_node_ips + + +def _check_iscsi_ip_valid(manager_ip, valid_node_ips, use_ipv6): + if manager_ip not in valid_node_ips: + msg = _('The config manager ip %s is not valid node.') % manager_ip + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + target_ips = valid_node_ips[manager_ip] + is_ipv4 = False + is_ipv6 = False + for target_ip in target_ips: + ip_addr = ipaddress.ip_address(six.text_type(target_ip)) + if ip_addr.version == 6: + is_ipv6 = True + else: + is_ipv4 = True + + if not (is_ipv6 and is_ipv4) and use_ipv6 != is_ipv6: + config_ip_format = "ipv6" if use_ipv6 else "ipv4" + current_ip_format = "ipv6" if is_ipv6 else "ipv4" + msg = (_('The config ip %(iscsi_ip)s format is %(config)s, actually ' + 'the ip format is %(current)s') + % {"iscsi_ip": manager_ip, + "config": config_ip_format, + "current": current_ip_format}) + LOG.error(msg) + raise exception.InvalidInput(reason=msg) + + +def check_iscsi_group_valid(client, manager_groups, use_ipv6): + if not manager_groups: + return + + _, valid_node_ips = get_valid_iscsi_info(client) + for manager_group in manager_groups: + manager_ips = manager_group.strip().split(";") + for manager_ip in manager_ips: + _check_iscsi_ip_valid(manager_ip.strip(), valid_node_ips, use_ipv6) + + +def format_target_portal(portal): + _target_ip = portal.split(":") + iscsi_ip = ":".join(_target_ip[:-1]) + if ipaddress.ip_address(six.text_type(iscsi_ip)).version == 6: + target_portal = '[' + iscsi_ip + ']' + ":" + _target_ip[-1] + else: + target_portal = portal + + return target_portal, iscsi_ip + + +def _get_manager_ips(manager_groups): + index = random.randint(0, len(manager_groups) - 1) + manager_group = manager_groups.pop(index) + + manager_ips = manager_group.strip().split(";") + LOG.info("Get iscsi ips %s.", manager_ips) + return [manager_ip.strip() for manager_ip in manager_ips + if manager_ip.strip()] + + +def get_iscsi_info_from_host(client, host_name, valid_iscsi_ips): + iscsi_ips, target_ips, target_iqns = [], [], [] + host_session_iscsi = client.get_host_iscsi_service(host_name) + for iscsi in host_session_iscsi: + iscsi_ips.append(iscsi["iscsi_service_ip"]) + + host_db_iscsi = client.get_iscsi_host_relation(host_name) + if iscsi_ips and not host_db_iscsi: + client.add_iscsi_host_relation(host_name, iscsi_ips) + host_db_iscsi = iscsi_ips + + if not iscsi_ips or not host_db_iscsi: + iscsi_ips = list(set(iscsi_ips) | set(host_db_iscsi)) + else: + iscsi_ips = host_db_iscsi + + for iscsi_ip in iscsi_ips: + if iscsi_ip in valid_iscsi_ips: + target_ips.append(valid_iscsi_ips[iscsi_ip]["iscsi_portal"]) + target_iqns.append(valid_iscsi_ips[iscsi_ip]["iscsi_target_iqn"]) + + if not target_ips: + client.delete_iscsi_host_relation(host_name, host_db_iscsi) + return target_ips, target_iqns + + +def _get_target_info(manager_ips, use_ipv6, valid_iscsi_ips, valid_node_ips): + node_ips, target_ips, target_iqns = [], [], [] + for manager_ip in manager_ips: + for node_ip in valid_node_ips.get(manager_ip, []): + ip_version = ipaddress.ip_address(six.text_type(node_ip)).version + if use_ipv6 ^ ip_version == 6: + continue + node_ips.append(node_ip) + target_ips.append(valid_iscsi_ips[node_ip]["iscsi_portal"]) + target_iqns.append( + valid_iscsi_ips[node_ip]["iscsi_target_iqn"]) + + return node_ips, target_ips, target_iqns + + +def get_iscsi_info_from_conf(manager_groups, iscsi_manager_groups, use_ipv6, + valid_iscsi_ips, valid_node_ips, thread_lock): + node_ips, target_ips, target_iqns = [], [], [] + manager_group_len = len(manager_groups + iscsi_manager_groups) + + for _ in range(manager_group_len): + thread_lock.acquire() + try: + manager_ips = _get_manager_ips(manager_groups) + if not manager_groups: + manager_groups.extend(iscsi_manager_groups) + except Exception: + raise + finally: + thread_lock.release() + + node_ips, target_ips, target_iqns = _get_target_info( + manager_ips, use_ipv6, valid_iscsi_ips, valid_node_ips) + if target_ips: + break + + return node_ips, target_ips, target_iqns