diff --git a/dss_metrics/config/config.json b/dss_metrics/config/config.json index 8dabb21..15da174 100644 --- a/dss_metrics/config/config.json +++ b/dss_metrics/config/config.json @@ -8,7 +8,7 @@ "prometheus_port_num": 8000, "polling_interval_secs": 1, "cluster_id": "c01", - "logging_level": "info", + "logging_level": "INFO", "logging_file": "/var/log/dss/metrics_agent.log", "metric_categories": ["minio", "target"] } \ No newline at end of file diff --git a/dss_metrics/metrics.py b/dss_metrics/metrics.py index 91983cf..857b6a4 100644 --- a/dss_metrics/metrics.py +++ b/dss_metrics/metrics.py @@ -29,116 +29,32 @@ OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -from collections import namedtuple -from prometheus_client import start_http_server, Summary, REGISTRY, Metric + import argparse import json -import multiprocessing +import logging +import os import time import utils +from logging.handlers import RotatingFileHandler +from pathlib import Path +from prometheus_client import start_http_server, REGISTRY import minio_ustat_collector import minio_rest_collector import nvmftarget_ustat_collector -MetricInfo = namedtuple("MetricInfo", "key, name, value, tags, timestamp") -COLLECTOR_TIMEOUT = 120 - - -class MetricsCollector(object): - def __init__(self, configs): - self.configs = configs - self.filter = self.configs['filter'] - self.whitelist_patterns = utils.get_whitelist_keys() - self.exit_flag = multiprocessing.Event() - - def collect(self): - metrics = self.get_metrics() - for metric in metrics: - yield metric - - def create_collector_proc(self, name, obj, metrics_data_buffer, exit_flag): - try: - proc = multiprocessing.Process( - name=name, - target=obj.poll_statistics, - args=[metrics_data_buffer, exit_flag] - ) - return proc - except Exception as error: - print(f"Error launching collector {error}") - - def get_metrics(self): - manager = multiprocessing.Manager() - metrics_data_buffer = manager.list() - metrics = [] - - print("collecting metrics from DSS cluster..") - - num_seconds = self.configs['polling_interval_secs'] - num_iterations = 1 - - collector_procs = [] - - # initialize collectors - target_ustat_obj = nvmftarget_ustat_collector.NVMFTargetUSTATCollector( - self.configs, num_seconds, num_iterations, self.whitelist_patterns, - filter=self.filter - ) - minio_ustat_obj = minio_ustat_collector.MinioUSTATCollector( - self.configs, num_seconds, num_iterations, self.whitelist_patterns, - filter=self.filter - ) - minio_rest_obj = minio_rest_collector.MinioRESTCollector( - self.configs, self.whitelist_patterns, filter=self.filter - ) - # initialize collector processes - collector_procs.append(self.create_collector_proc( - "target_ustat", target_ustat_obj, metrics_data_buffer, - self.exit_flag)) - collector_procs.append(self.create_collector_proc( - "minio_ustat", minio_ustat_obj, metrics_data_buffer, - self.exit_flag)) - collector_procs.append(self.create_collector_proc( - "minio_rest", minio_rest_obj, metrics_data_buffer, - self.exit_flag)) - - # start collectors - for proc in collector_procs: - try: - proc.start() - except Exception as error: - print(f"Failed to start proccess {proc.name}: {str(error)}") - - # run collectors for specified duration - time.sleep(self.configs["metrics_agent_runtime_per_interval"]) - - # send exit flag to stop collectors - self.exit_flag.set() - - # wait for collectors to finish and kill any remaining collectors - for proc in collector_procs: - try: - proc.join(COLLECTOR_TIMEOUT) - if proc.is_alive(): - print(f"process {proc.name} is hanging, terminating..") - proc.terminate() - except Exception as error: - print(f"Failed to terminate process {proc.name}: {str(error)}") - - # populate Prometheus metric objects - for m in metrics_data_buffer: - metric = Metric(m.name, m.key, 'gauge') - metric.add_sample( - m.name, - value=m.value, - labels=m.tags, - timestamp=m.timestamp - ) - metrics.append(metric) - return metrics +APP_NAME = "DSS Metrics Agent" +LOGGING_LEVEL = { + "INFO": logging.INFO, + "DEBUG": logging.DEBUG, + "WARNING": logging.WARN, + "ERROR": logging.ERROR, + "EXCEPTION": logging.ERROR, + "FATAL": logging.FATAL +} if __name__ == '__main__': # load CLI args @@ -161,20 +77,78 @@ def get_metrics(self): # load config file args configs = {} + with open(cli_args['config'], "rb") as cfg: configs = json.loads(cfg.read().decode('UTF-8', "ignore")) # merge configs configs.update(cli_args) + # get other required metadata + filter = configs['filter'] + whitelist_patterns = utils.get_whitelist_keys() + polling_interval_secs = configs['polling_interval_secs'] + num_iterations = 1 + metrics_scopes = {} + # load metrics scope settings + with open("metrics_scope.json", "rb") as scopes: + metrics_scopes = json.loads(scopes.read().decode('UTF-8', "ignore")) + + # need to deserialize escape characters from json + metrics_scopes = {string.encode().decode('unicode_escape'): scope + for string, scope in metrics_scopes.items()} + + # create logger + logfile_path = configs["logging_file"] + if not logfile_path: + raise ValueError("Missing logging_file parameter in config file") + + os.makedirs(os.path.dirname(logfile_path), exist_ok=True) + logfile = Path(logfile_path) + logfile.touch(exist_ok=True) + + logger = logging.getLogger(APP_NAME) + logger.setLevel(LOGGING_LEVEL[configs['logging_level']]) + + log_format = ( + logging.Formatter('%(asctime)s %(levelname)s:%(name)s %(message)s') + ) + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_format) + logger.addHandler(console_handler) + + # max size of log file set to 100MB + file_handler = RotatingFileHandler(logfile_path, mode='a', + maxBytes=100*1024*1024, backupCount=1, + encoding=None, delay=0) + file_handler.setFormatter(log_format) + logger.addHandler(file_handler) + + logger.info("Successfully created logger!") + # expose metrics on promotheus endpoint - print("\n\n starting http server.... \n\n") + logger.info("\n\n starting server.... \n\n") try: start_http_server(8000) + REGISTRY.register( - MetricsCollector(configs) + minio_rest_collector.MinioRESTCollector( + configs, metrics_scopes, whitelist_patterns, filter) ) + REGISTRY.register( + nvmftarget_ustat_collector.NVMFTargetUSTATCollector( + configs, metrics_scopes, polling_interval_secs, num_iterations, + whitelist_patterns, filter + ) + ) + REGISTRY.register( + minio_ustat_collector.MinioUSTATCollector( + configs, metrics_scopes, polling_interval_secs, num_iterations, + whitelist_patterns, filter + ) + ) + while True: time.sleep(1) except Exception as error: - print(f"Failed to start Metrics http server: {str(error)}") + logger.info(f"Failed to start Metrics http server: {str(error)}") diff --git a/dss_metrics/metrics_scope.json b/dss_metrics/metrics_scope.json new file mode 100644 index 0000000..0485469 --- /dev/null +++ b/dss_metrics/metrics_scope.json @@ -0,0 +1,6 @@ +{ + "target.subsystem\\d+.kvio.(puts|gets|dels|putBandwidth|getBandwidth)": "target", + "minio_upstream.outstanding.s3_get_req": "minio cluster", + "minio_disk_storage_used_bytes": "minio cluster", + "minio_disk_storage_total_capacity_bytes": "minio cluster" +} \ No newline at end of file diff --git a/dss_metrics/minio_rest_collector.py b/dss_metrics/minio_rest_collector.py index b1be4d2..29ad244 100644 --- a/dss_metrics/minio_rest_collector.py +++ b/dss_metrics/minio_rest_collector.py @@ -30,18 +30,24 @@ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -import json -import metrics +import logging import re import requests import socket -import subprocess import time +import utils + +import metrics +from prometheus_client import Metric +from prometheus_client.registry import Collector -class MinioRESTCollector(object): - def __init__(self, configs, whitelist_patterns, filter=False): + +class MinioRESTCollector(Collector): + def __init__(self, configs, metrics_scopes, whitelist_patterns, + filter=False): self.configs = configs + self.metrics_scopes = metrics_scopes self.whitelist_patterns = whitelist_patterns self.filter = filter self.minio_metrics = {'minio_disk_storage_used_bytes', @@ -52,36 +58,74 @@ def __init__(self, configs, whitelist_patterns, filter=False): self.mc = configs['mc_binary_path'] self.conf_json_bucket_suffix = configs['conf_json_bucket_suffix'] self.cluster_id = self.configs['cluster_id'] - self.TYPE = 'minio' + self.TYPE = 'minio_rest' + self.logger = logging.getLogger(metrics.APP_NAME) + + def collect(self): + self.logger.debug("--- MINIO REST COLLECTOR ---") - def poll_statistics(self, metrics_data_buffer, exit_flag): - cluster_endpoint_map = self.get_miniocluster_endpoint_map() + cluster_endpoint_map = utils.get_miniocluster_endpoint_map(self.mc, + self.logger) + cluster_endpoint_map_items = cluster_endpoint_map.items() - while True: - if exit_flag.is_set(): - break - for _, endpts in cluster_endpoint_map.items(): - if not endpts or len(endpts) == 0: - continue + if (not cluster_endpoint_map_items or + len(cluster_endpoint_map_items) == 0): + self.logger.error("No MINIO endpoints found") + raise ValueError("No MINIO endpoints found") - minio_endpoint = endpts.pop() - miniocluster_id = self.get_minio_cluster_uuid(minio_endpoint) + all_minio_endpoints = [] + for _, endpts in cluster_endpoint_map_items: + all_minio_endpoints.extend(list(endpts)) + + try: + for minio_endpoint in all_minio_endpoints: + miniocluster_id = utils.get_minio_cluster_uuid( + minio_endpoint, self.url_prefix, + self.cluster_id_url_suffix) minio_metrics = self.get_minio_metrics_from_endpoint( minio_endpoint) + if not miniocluster_id or not minio_metrics: + self.logger.error("Failed to retrieve MINIO metadata") + raise ValueError("Failed to retrieve MINIO metadata") + tags = {} tags['cluster_id'] = self.cluster_id tags['target_id'] = socket.gethostname() tags['minio_id'] = miniocluster_id + tags['minio_endpoint'] = minio_endpoint tags['type'] = self.TYPE - for metric in minio_metrics: - if self.filter and not self.check_whitelist_key(metric[0]): + for key, value in minio_metrics: + if (self.filter and not + self.check_whitelist_key(key)): continue - metrics_data_buffer.append( - metrics.MetricInfo( - metric[0], metric[0], metric[1], tags, time.time()) + + # check if scope applies to metric + scope = self.get_metric_scope(key) + if scope: + tags['scope'] = scope + + name = key + metric = Metric(name, key, 'gauge') + metric.add_sample( + name, + value=value, + labels=tags, + timestamp=time.time() ) + self.logger.debug(f"collected: {metric}") + yield metric + + except Exception as error: + self.logger.error( + f"Error: {str(error)} during MINIO REST collection") + + def get_metric_scope(self, key): + for regex in self.metrics_scopes.keys(): + if re.match(regex, key): + return self.metrics_scopes[regex] + return None def check_whitelist_key(self, key): for regex in self.whitelist_patterns: @@ -89,53 +133,6 @@ def check_whitelist_key(self, key): return True return False - def get_miniocluster_endpoint_map(self): - proc = subprocess.Popen( - [self.mc, 'config', 'host', 'list'], - stdout=subprocess.PIPE - ) - local_minio_host = None - miniocluster_endpoint_map = dict() # { : {} } - try: - for line in proc.stdout.readlines(): - # find local minio host or endpoint - decoded_line = line.decode('utf-8').strip("\n") - if decoded_line.startswith('local_'): - local_minio_host = decoded_line.strip() - break - except Exception as error: - print(f"Unable to read host list: {str(error)}") - return {} - - if local_minio_host: - conf_json_path = local_minio_host + self.conf_json_bucket_suffix - try: - proc = subprocess.Popen([self.mc, 'cat', conf_json_path], - stdout=subprocess.PIPE) - dss_conf_dict = json.loads( - proc.communicate()[0].decode('utf-8')) - - for cluster in dss_conf_dict["clusters"]: - minio_cluster_id = cluster["id"] - minio_endpoints = set() - for endpoint_info in cluster["endpoints"]: - minio_endpoints.add( - endpoint_info["ipv4"] + ":" - + str(endpoint_info["port"]) - ) - miniocluster_endpoint_map[minio_cluster_id] = ( - minio_endpoints) - except KeyError as error: - print( - f"conf.json missing cluster information: {str(error)}" - ) - except Exception as error: - print(f"Error when processing conf.json: {str(error)}") - else: - raise ValueError("Unable to find local minio host/endpoint") - - return miniocluster_endpoint_map - def get_minio_metrics_from_endpoint(self, endpoint): url = self.url_prefix + endpoint + self.metrics_url_suffix r = requests.get(url) @@ -145,10 +142,3 @@ def get_minio_metrics_from_endpoint(self, endpoint): key, val = line.split(" ") metrics_data.append((key, float(val))) return metrics_data - - def get_minio_cluster_uuid(self, endpoint): - url = (self.url_prefix + endpoint - + self.cluster_id_url_suffix) - r = requests.get(url) - minio_uuid = dict(r.json())["UUID"] - return minio_uuid diff --git a/dss_metrics/minio_ustat_collector.py b/dss_metrics/minio_ustat_collector.py index 37025f5..43c8b3b 100644 --- a/dss_metrics/minio_ustat_collector.py +++ b/dss_metrics/minio_ustat_collector.py @@ -31,19 +31,23 @@ """ import json -import metrics -import os +import logging import re import socket import subprocess import time import utils -import uuid +import metrics + +from prometheus_client import Metric +from prometheus_client.registry import Collector -class MinioUSTATCollector(object): - def __init__(self, configs, seconds, num_iterations, + +class MinioUSTATCollector(Collector): + def __init__(self, configs, metrics_scopes, seconds, num_iterations, whitelist_patterns, filter=False): + self.metrics_scopes = metrics_scopes self.configs = configs self.ustat_path = self.configs['ustat_binary_path'] self.cluster_id = self.configs['cluster_id'] @@ -51,15 +55,21 @@ def __init__(self, configs, seconds, num_iterations, self.num_iterations = num_iterations self.whitelist_patterns = whitelist_patterns self.filter = filter - self.TYPE = 'minio' + self.TYPE = 'minio_ustat' + self.url_prefix = "http://" + self.cluster_id_url_suffix = "/minio/cluster_id" + self.logger = logging.getLogger(metrics.APP_NAME) + + def collect(self): + self.logger.debug("--- MINIO USTAT COLLECTOR ---") - def poll_statistics(self, metrics_data_buffer, exit_flag): minio_uuid = None stats_output = {} minio_proc_map = {} # { uuid: proc } # spawn a collector and get uuid for each minio instance pid_list = self.get_minio_instances() + collection_time = time.time() for pid in pid_list: try: @@ -68,26 +78,24 @@ def poll_statistics(self, metrics_data_buffer, exit_flag): + str(pid) + ' ' + str(self.seconds) + ' ' + str(self.num_iterations) ) - proc_file = os.path.join('/proc', str(pid), 'cmdline') - proc_cmd = 'minio' - with open(proc_file) as f: - proc_cmd = f.readline() - minio_uuid = uuid.uuid3(uuid.NAMESPACE_DNS, proc_cmd) + + minio_endpoint = utils.get_minio_endpoint_from_process( + pid, self.logger) + minio_uuid = utils.get_minio_cluster_uuid( + minio_endpoint, self.url_prefix, + self.cluster_id_url_suffix) + proc = subprocess.Popen(cmd.split(' '), stdout=subprocess.PIPE) stats_output['time'] = collection_time minio_proc_map[minio_uuid] = proc except Exception as error: - print(f'Caught exception while running USTAT {str(error)}') + self.logger.error(f'Error while running USTAT {str(error)}') device_subsystem_map = utils.get_device_subsystem_map() for minio_uuid, proc in minio_proc_map.items(): - while True: - if exit_flag.is_set(): - # shutdown ustat collector - self.shutdown_ustat_collector(proc) - break - - line = proc.stdout.readline().decode('utf-8') + lines = proc.stdout.readlines() + for line in lines: + line = line.decode('utf-8') if not line or len(line) <= 2 or ('=' not in line): continue @@ -125,25 +133,40 @@ def poll_statistics(self, metrics_data_buffer, exit_flag): elif fields[0] == 'minio_upstream': tags['subsystem_id'] = 'minio_upstream' - """ - XOR operation - check if filter, then whitelist match should be True - if not filter, than whitelist match should be False - """ - if valid_value_flag and self.filter == whitelist_match: - metrics_data_buffer.append( - metrics.MetricInfo(full_key, metric_name, - value, tags, time.time()) - ) + if valid_value_flag: + if (self.filter == whitelist_match) or ( + not self.filter): + + # check if scope applies to metric + scope = self.get_metric_scope(full_key) + if scope: + tags['scope'] = scope + + name = full_key.replace(".", "") + metric = Metric(name, full_key, 'gauge') + metric.add_sample( + name, + value=value, + labels=tags, + timestamp=time.time() + ) + self.logger.debug(f"collected: {metric}") + yield metric except Exception as error: - print('Failed to handle line %s, Error: %s', line, - str(error)) + self.logger.error('Error handling line %s, Error: %s', + line, str(error)) + + def get_metric_scope(self, key): + for regex in self.metrics_scopes.keys(): + if re.match(regex, key): + return self.metrics_scopes[regex] + return None def shutdown_ustat_collector(self, proc): try: proc.terminate() except Exception: - print('ustat process termination exception ', exc_info=True) + self.logger.warning('failed to terminate ustat', exc_info=True) proc.kill() def get_minio_instances(self): @@ -153,7 +176,7 @@ def get_minio_instances(self): try: pid_list = utils.find_process_pid(proc_name, cmds) except Exception as error: - print( + self.logger.error( f'Error: unable to get MINIO PID list {str(error)}' ) return pid_list diff --git a/dss_metrics/nvmftarget_ustat_collector.py b/dss_metrics/nvmftarget_ustat_collector.py index 4d5f70a..dada962 100644 --- a/dss_metrics/nvmftarget_ustat_collector.py +++ b/dss_metrics/nvmftarget_ustat_collector.py @@ -30,18 +30,24 @@ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -import metrics +import logging import re import socket import subprocess import time import utils +import metrics + +from prometheus_client import Metric +from prometheus_client.registry import Collector -class NVMFTargetUSTATCollector(object): - def __init__(self, configs, seconds, num_iterations, + +class NVMFTargetUSTATCollector(Collector): + def __init__(self, configs, metrics_scopes, seconds, num_iterations, whitelist_patterns, filter=False): self.configs = configs + self.metrics_scopes = metrics_scopes self.ustat_path = self.configs['ustat_binary_path'] self.cluster_id = self.configs['cluster_id'] self.nvmf_pid, status = utils.check_spdk_running() @@ -49,9 +55,12 @@ def __init__(self, configs, seconds, num_iterations, self.num_iterations = num_iterations self.whitelist_patterns = whitelist_patterns self.filter = filter - self.TYPE = 'target' + self.TYPE = 'target_ustat' + self.logger = logging.getLogger(metrics.APP_NAME) + + def collect(self): + self.logger.debug("--- NVMF Target USTAT COLLECTOR ---") - def poll_statistics(self, metrics_data_buffer, exit_flag): try: cmd = ( self.ustat_path + ' -p ' @@ -61,7 +70,7 @@ def poll_statistics(self, metrics_data_buffer, exit_flag): ) proc = subprocess.Popen(cmd.split(' '), stdout=subprocess.PIPE) except Exception as e: - print(f'Caught exception while running USTAT {str(e)}') + self.logger.error(f'Error while running USTAT {str(e)}') return subsystem_num_to_nqn_map = {} @@ -69,19 +78,16 @@ def poll_statistics(self, metrics_data_buffer, exit_flag): raw_data_queue = [] while True: - if exit_flag.is_set(): - # shutdown USTAT collector - self.shutdown_ustat_collector(proc) - # store captured metrics - self.store_metrics(raw_data_queue, - subsystem_num_to_nqn_map, - metrics_data_buffer) - break - - line = proc.stdout.readline().decode('utf-8') - - if not line or len(line) <= 2 or ('=' not in line): - continue + # loop until no more output left in USTAT + try: + line = proc.stdout.readline() + if not line: + break + line = line.decode('utf-8') + if len(line) <= 2 or ('=' not in line): + continue + except Exception as e: + self.logger.error(f"error processing Target USTAT: {str(e)}") # process nvmf target USTAT line try: @@ -133,19 +139,13 @@ def poll_statistics(self, metrics_data_buffer, exit_flag): "time": time.time() } raw_data_queue.append(data) + except Exception as e: + self.logger.error(f'Failed to handle {line}, Error: {str(e)}') - except Exception as error: - print(f'Failed to handle line {line}, Error: {str(error)}') - - def shutdown_ustat_collector(self, proc): - try: - proc.terminate() - except Exception: - print('ustat process termination exception ', exc_info=True) - proc.kill() + # shutdown USTAT collector subprocess + self.shutdown_ustat_collector(proc) - def store_metrics(self, raw_data_queue, subsystem_num_to_nqn_map, - metrics_data_buffer): + # All of USTAT output processed, now Metric objects can be populated for data in raw_data_queue: if ( data['valid_value_flag'] and @@ -158,8 +158,31 @@ def store_metrics(self, raw_data_queue, subsystem_num_to_nqn_map, data['tags']['subsystem_id'] = ( subsystem_num_to_nqn_map[data['subsystem_num']] ) - metrics_data_buffer.append( - metrics.MetricInfo(data['full_key'], data['metric_name'], - data['value'], data['tags'], - data['time']) + + # check if scope applies to metric + scope = self.get_metric_scope(data['full_key']) + if scope: + data['tags']['scope'] = scope + + metric = Metric(data['metric_name'], data['full_key'], 'gauge') + metric.add_sample( + data['metric_name'], + value=data['value'], + labels=data['tags'], + timestamp=data['time'] ) + self.logger.debug(f"collected: {metric}") + yield metric + + def shutdown_ustat_collector(self, proc): + try: + proc.terminate() + except Exception: + self.logger.warning('failed to terminate ustat', exc_info=True) + proc.kill() + + def get_metric_scope(self, key): + for regex in self.metrics_scopes.keys(): + if re.match(regex, key): + return self.metrics_scopes[regex] + return None diff --git a/dss_metrics/utils.py b/dss_metrics/utils.py index 3e4a532..339f860 100644 --- a/dss_metrics/utils.py +++ b/dss_metrics/utils.py @@ -30,11 +30,13 @@ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ - +import json import os import psutil import re +import requests import socket +import subprocess import time import ast @@ -273,3 +275,78 @@ def get_whitelist_keys(): with open(file_path) as f: whitelist = f.read().splitlines() return whitelist + + +def get_minio_endpoint_from_process(pid, logger): + proc_filepath = f"/proc/{pid}/cmdline" + + proc = subprocess.Popen( + ['cat', proc_filepath], + stdout=subprocess.PIPE + ) + minio_cmd = proc.stdout.read().decode("utf-8") + endpts_found = re.findall("--address(.*?)/", minio_cmd) + if endpts_found: + minio_endpoint = endpts_found[0].rstrip('\x00').strip('\x00') + else: + logger.error("Unable to find MINIO Endpt from MINIO process") + raise ValueError("Unable to find MINIO Endpt from MINIO process") + return minio_endpoint + + +def get_minio_cluster_uuid(endpoint, url_prefix, cluster_id_url_suffix): + url = str(url_prefix + endpoint + + cluster_id_url_suffix) + r = requests.get(url) + minio_uuid = dict(r.json())["UUID"] + return minio_uuid + + +def get_miniocluster_endpoint_map(mc_binary_path, logger): + proc = subprocess.Popen( + [mc_binary_path, 'config', 'host', 'list'], + stdout=subprocess.PIPE + ) + local_minio_host = None + miniocluster_endpoint_map = dict() # { : {} } + try: + for line in proc.stdout.readlines(): + # find local minio host or endpoint + decoded_line = line.decode('utf-8').strip("\n") + if decoded_line.startswith('local_'): + local_minio_host = decoded_line.strip() + break + except Exception as error: + logger.error(f"Unable to read host list: {str(error)}") + return {} + + if local_minio_host: + conf_json_path = local_minio_host + "/dss/conf.json" + try: + proc = subprocess.Popen([mc_binary_path, 'cat', conf_json_path], + stdout=subprocess.PIPE) + dss_conf_dict = json.loads( + proc.communicate()[0].decode('utf-8')) + + for cluster in dss_conf_dict["clusters"]: + minio_cluster_id = cluster["id"] + minio_endpoints = set() + for endpoint_info in cluster["endpoints"]: + minio_endpoints.add( + endpoint_info["ipv4"] + ":" + + str(endpoint_info["port"]) + ) + miniocluster_endpoint_map[minio_cluster_id] = ( + minio_endpoints) + except KeyError as error: + logger.error( + f"conf.json missing cluster information: {str(error)}" + ) + except Exception as error: + logger.error( + f"Error when processing conf.json: {str(error)}") + else: + logger.error("Unable to find local minio host/endpoint") + raise ValueError("Unable to find local minio host/endpoint") + + return miniocluster_endpoint_map