Skip to content

Commit

Permalink
Merge pull request #71 from huangpeng5/suyan_806
Browse files Browse the repository at this point in the history
Driver return share hot and cold capacity as long as manila deliver hot_data_size or cold_data_size when create share
  • Loading branch information
huangpeng5 authored Aug 6, 2024
2 parents 03a7261 + 0c79775 commit a23c0ab
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import netaddr
from oslo_log import log
from manila import context as admin_context
from manila import exception
from manila.share import api
from manila.share import share_types
from manila.share import utils as share_utils
Expand Down Expand Up @@ -83,6 +84,70 @@ def is_ipv4_address(ip_address):
except Exception:
return False

@staticmethod
def _check_share_tier_capacity_param(tier_info, total_size):
if (tier_info.get('hot_data_size') is None or
tier_info.get('cold_data_size') is None):
return

hot_data_size = int(tier_info.get('hot_data_size'))
cold_data_size = int(tier_info.get('cold_data_size'))

if hot_data_size > total_size or cold_data_size > total_size:
error_msg = ("Check share tier param failed, hot_data_size:%s or "
"cold_data_size:%s can not bigger than share total size: %s" %
(hot_data_size, cold_data_size, total_size))
LOG.error(error_msg)
raise exception.InvalidInput(error_msg)

if hot_data_size + cold_data_size != total_size:
error_msg = ("Check share tier param failed, hot_data_size:%s plus "
"cold_data_size:%s must equal to the share total size: %s" %
(hot_data_size, cold_data_size, total_size))
LOG.error(error_msg)
raise exception.InvalidInput(error_msg)

return

@staticmethod
def _check_share_tier_policy_param(tier_info):
"""
Check share tier param is valid or not
"""
hot_data_size = int(tier_info.get('hot_data_size', 0))
cold_data_size = int(tier_info.get('cold_data_size', 0))
tier_place = tier_info.get('tier_place')

if tier_place and tier_place not in constants.SUPPORT_TIER_PLACE:
error_msg = ("The configured tier_place:%s not in support tier place:%s, "
"Please Check" % (tier_place, constants.SUPPORT_TIER_PLACE))
LOG.error(error_msg)
raise exception.InvalidInput(error_msg)

if hot_data_size and cold_data_size and not tier_place:
error_msg = ("Tier place:%s must be set when hot_data_size:%s and "
"cold_data_size:%s all not equal to 0" %
(tier_place, hot_data_size, cold_data_size))
LOG.error(error_msg)
raise exception.InvalidInput(error_msg)

@staticmethod
def _set_tier_data_size(tier_info, total_size):
"""
set hot_data_size if cold data size configured but
hot data size not configured
"""
hot_data_size = tier_info.get('hot_data_size')
cold_data_size = tier_info.get('cold_data_size')
if hot_data_size is None and cold_data_size is None:
return tier_info
elif hot_data_size is None:
tier_info['hot_data_size'] = total_size - int(cold_data_size)
elif cold_data_size is None:
tier_info['cold_data_size'] = total_size - int(hot_data_size)

return tier_info

def concurrent_exec_waiting_tasks(self, task_id_list):
# Enable Concurrent Tasks and wait until all tasks complete
threading_task_list = []
Expand Down Expand Up @@ -159,8 +224,8 @@ def _get_share_tier_policy(self, tier_info, tier_param):
"""
metadata_tier_value = self.share_metadata.get(tier_param)
share_tier_value = self.share.get('share_tier_strategy', {}).get(tier_param)
tier_value = metadata_tier_value or share_tier_value
if tier_value:
tier_value = share_tier_value if metadata_tier_value is None else metadata_tier_value
if tier_value is not None:
tier_info[tier_param] = tier_value

def _get_all_share_tier_policy(self):
Expand All @@ -171,10 +236,13 @@ def _get_all_share_tier_policy(self):
tier_info = {}
# get hot data size
self._get_share_tier_policy(tier_info, 'hot_data_size')
# get cold data size
self._get_share_tier_policy(tier_info, 'cold_data_size')
# get tier_grade
self._get_share_tier_policy(tier_info, 'tier_place')
# get tier_migrate_expiration
self._get_share_tier_policy(tier_info, 'tier_migrate_expiration')

return tier_info

def _get_forbidden_dpc_param(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _check_and_get_share_capacity(share_data):
ssd_used_space = share_data.get("ssd_space_used")
hdd_hard_limit = share_data.get("hdd_hard_quota")
hdd_used_space = share_data.get("hdd_space_used")

share_capacity.update({
'ssd_hard_limit': str(ssd_hard_limit),
'ssd_used_space': str(ssd_used_space),
Expand All @@ -81,6 +82,8 @@ def _check_and_get_share_capacity(share_data):
'hdd_avail_space': str(hdd_hard_limit - hdd_used_space)
})

LOG.info("Get share usage:%s of share:%s from share usages successfully",
share_capacity, share_data.get('name'))
return share_capacity

def create_share(self):
Expand Down Expand Up @@ -147,6 +150,7 @@ def parse_cmcc_qos_options(self):
"total_bytes_sec": 0,
"total_iops_sec": 0
}
LOG.info("Get the qos value before qos-freeze, the value is %s", share_qos_info)
return share_qos_info

def _check_domain(self):
Expand Down Expand Up @@ -233,6 +237,7 @@ def _get_or_create_account(self):

def _set_namespace_param(self):
self.namespace_name = 'share-' + self.share.get('share_id')
total_size = self.share.get('size')
param_dict = {
'name': self.namespace_name,
'forbidden_dpc': self._get_forbidden_dpc_param(),
Expand All @@ -242,15 +247,17 @@ def _set_namespace_param(self):
'case_sensitive': constants.CASE_INSENSITIVE
}
self.tier_info = self._get_all_share_tier_policy()
self._set_tier_data_size(self.tier_info, self.share.get('size'))
# check tier capacity param is valid or not
self._check_share_tier_capacity_param(self.tier_info, total_size)
# check tier policy param is valid or not
self._check_share_tier_policy_param(self.tier_info)
hot_data_size = self.tier_info.get('hot_data_size')
total_size = self.share.get('size')

if hot_data_size is None:
return param_dict
hot_data_size = int(hot_data_size)
if hot_data_size > total_size:
LOG.warning("the configured hot data size %s is bigger than total size, "
"set it to total siz %s", hot_data_size, total_size)
hot_data_size = total_size

param_dict.update({
'tier_hot_cap_limit': driver_utils.capacity_unit_up_conversion(
hot_data_size, constants.BASE_VALUE, constants.POWER_BETWEEN_KB_AND_GB),
Expand Down Expand Up @@ -353,6 +360,9 @@ def _check_and_update_tier_size(self, namespace_info, new_size):
:return: None
"""
self.tier_info = self._get_all_share_tier_policy()
self._set_tier_data_size(self.tier_info, new_size)
# check tier capacity param is valid or not
self._check_share_tier_capacity_param(self.tier_info, new_size)
current_hot_data_size = namespace_info.get('tier_hot_cap_limit', 0)
new_hot_data_size = self.tier_info.get('hot_data_size')

Expand Down Expand Up @@ -481,6 +491,7 @@ def _get_location(self):
if 'HDFS' in self.share_proto:
location.append('HDFS:/' + self.namespace_name)

LOG.info("Create share successfully, the location of this share is %s", location)
return location

def _get_quota_info(self, namespace_info, action, parent_id, new_size, parent_type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,15 @@ def change_share(self, new_size, action):
task_id_key = 'task_id'
if not self.share_parent_id:
# gfs场景
new_hot_size = self._get_all_share_tier_policy().get('hot_data_size')
tier_info = self._get_all_share_tier_policy()
self._set_tier_data_size(tier_info, new_size)
# check tier capacity param is valid or not
self._check_share_tier_capacity_param(tier_info, new_size)
gfs_name = constants.SHARE_PREFIX + self.share.get('share_id')
name_locator = '@'.join([gfs_name, cluster_name])
# 修改GFS分级容量
gfs_tier_cap_modify_result = self._check_and_update_gfs_tier_size(
name_locator, new_size, new_hot_size)
name_locator, new_size, tier_info.get('hot_data_size'))
if gfs_tier_cap_modify_result:
self.client.wait_task_until_complete(gfs_tier_cap_modify_result.get(task_id_key))
# 修改GFS配额容量
Expand Down Expand Up @@ -449,6 +452,7 @@ def _get_gfs_location(self):
dpc_path = '/' + self.namespace_name
location.append('DPC:' + dpc_path)

LOG.info("Create share successfully, the location of this share is %s", location)
return location

def _set_gfs_create_param(self):
Expand Down Expand Up @@ -482,16 +486,17 @@ def _set_disk_pool_size_limit_param(self):
:return:
"""
disk_pool_size_limit_param = {}
total_size = self.share.get('size')
self.tier_info = self._get_all_share_tier_policy()
self._set_tier_data_size(self.tier_info, self.share.get('size'))
# check tier capacity param is valid or not
self._check_share_tier_capacity_param(self.tier_info, total_size)
# check tier policy param is valid or not
self._check_share_tier_policy_param(self.tier_info)
hot_data_size = self.tier_info.get('hot_data_size')
total_size = self.share.get('size')
if hot_data_size is None:
return disk_pool_size_limit_param
hot_data_size = int(hot_data_size)
if hot_data_size > total_size:
LOG.warning("the configured hot data size %s is bigger than total size, "
"set it to total siz %s", hot_data_size, total_size)
hot_data_size = total_size
disk_pool_size_limit_param.update({
'tier_hot_limit': str(driver_utils.capacity_unit_up_conversion(
hot_data_size, constants.BASE_VALUE, constants.POWER_BETWEEN_KB_AND_GB)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ def get_share_tier_status(self):
'name_locator': name_locator
})
if len(migrate_policy) <= 0:
LOG.info(_("migrate_policy {0} not found".format(name_locator)))
LOG.warning("migrate_policy %s not found, return {}", name_locator)
return {}
policy = migrate_policy[0]
return {
share_tier_status = {
"tier_status": self._dme_policy_status_to_enum_num(policy.get("policy_status")),
"tier_process": policy.get("migration_percent"),
"tier_type": self._dme_tier_grade_to_enum_suyan_str(policy.get("tier_grade")),
"tier_path": policy.get("file_name_filter", {}).get("filter")
}
LOG.debug("Get share tier status:%s successfully", share_tier_status)
return share_tier_status

def terminate_share_tier(self):
name_locator_info = self._combine_name_locator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@ def get_impl_type():
@staticmethod
def _check_and_set_tier_quota(namespace_info, all_share_usages, name_key):
namespace_name = namespace_info.get(name_key)
if not namespace_info.get('tier_hot_cap_limit'):
LOG.info("Namespace %s not set hot_data_size, don't return ssd and hhd "
"capacity", namespace_name)
used_key = 'used'
tier_hot_cap_limit = namespace_info.get('tier_hot_cap_limit')
tier_cold_cap_limit = namespace_info.get('tier_cold_cap_limit')
if tier_hot_cap_limit is None and tier_cold_cap_limit is None:
return all_share_usages

ssd_hard_quota = driver_utils.capacity_unit_up_conversion(
namespace_info.get('tier_hot_cap_limit'), constants.BASE_VALUE, 1)
tier_hot_cap_limit, constants.BASE_VALUE, 1)
hdd_hard_quota = driver_utils.capacity_unit_up_conversion(
namespace_info.get('tier_cold_cap_limit'), constants.BASE_VALUE, 1)
tier_cold_cap_limit, constants.BASE_VALUE, 1)
tier_perf_cap = json.loads(namespace_info.get('tier_perf_cap', '{}'))
ssd_space_used = tier_perf_cap.get('hot', {}).get('used', 0)
hdd_space_used = tier_perf_cap.get('cold', {}).get('used', 0)
ssd_space_used = tier_perf_cap.get('hot', {}).get(used_key)
hdd_space_used = tier_perf_cap.get('cold', {}).get(used_key)
all_share_usages.get(namespace_name).update(
{
'ssd_hard_quota': ssd_hard_quota,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def _get_dtree_location(self):
dpc_path = self._get_dpc_path('/' + share_path)
location.append('DPC:' + dpc_path)

LOG.info("Create share successfully, the location of this share is %s", location)
return location

def _get_dtree_namespace_info(self):
Expand Down Expand Up @@ -314,6 +315,7 @@ def _get_share_capacity(self, share_usages):
share_info = share_usages.get(self.namespace_name)

if not share_info:
LOG.info("Can not find share in share_usages. return {}")
return {}

return self._check_and_get_share_capacity(share_info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,18 @@ def get_share_tier_status(self):

if not migrate_policy:
# 不存在分级策略报错
LOG.info(_("migrate_policy {0} for fs {1} not found"
.format(migrate_policy_name, namespace_name)))
LOG.warning("migrate_policy %s for fs %s not found, return {}",
migrate_policy_name, namespace_name)
return {}
else:
return {
"tier_status": migrate_policy.get("policy_status"),
"tier_process": migrate_policy.get("migration_percent"),
"tier_type": self._pacific_tier_grade_to_enum_suyan_str(migrate_policy.get("strategy")),
"tier_path": migrate_policy.get("path_name")
}
share_tier_status = {
"tier_status": migrate_policy.get("policy_status"),
"tier_process": migrate_policy.get("migration_percent"),
"tier_type": self._pacific_tier_grade_to_enum_suyan_str(
migrate_policy.get("strategy")),
"tier_path": migrate_policy.get("path_name")
}
LOG.debug("Get share tier status:%s successfully", share_tier_status)
return share_tier_status

def terminate_share_tier(self):
self._get_account_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
DISK_TYPE_SAS: SAS_USED_CAP_KEY,
DISK_TYPE_SATA: SATA_USED_CAP_KEY
}
SUPPORT_TIER_PLACE = ['hot', 'cold']
DISK_POOL_TIER_ENUM = {
'0': 'warm',
'1': 'hot',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def _nas_product(self, xml_root):

def _nas_storage_pools(self, xml_root):
text = xml_root.findtext('Filesystem/StoragePool')
self.check_config_exist(text, 'Storage/StoragePool')
self.check_config_exist(text, 'Filesystem/StoragePool')

if self.config.product == constants.PRODUCT_PACIFIC:
pool_is_digit_list = [pool_id.strip().isdigit() for pool_id in text.split(';')]
Expand Down

0 comments on commit a23c0ab

Please sign in to comment.