Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifying logic to provide metrics per MINIO endpoint #83

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions dss_metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
from prometheus_client import start_http_server, Summary, REGISTRY, Metric
import argparse
import json
import logging
from logging.handlers import RotatingFileHandler
import multiprocessing
import os
import time
import utils

Expand All @@ -51,6 +54,7 @@ def __init__(self, configs):
self.filter = self.configs['filter']
self.whitelist_patterns = utils.get_whitelist_keys()
self.exit_flag = multiprocessing.Event()
self.logger = logging.getLogger('root')
nsarras marked this conversation as resolved.
Show resolved Hide resolved

def collect(self):
metrics = self.get_metrics()
Expand All @@ -66,14 +70,14 @@ def create_collector_proc(self, name, obj, metrics_data_buffer, exit_flag):
)
return proc
except Exception as error:
print(f"Error launching collector {error}")
logger.error(f"Error launching collector {error}")

def get_metrics(self):
manager = multiprocessing.Manager()
metrics_data_buffer = manager.list()
metrics = []

print("collecting metrics from DSS cluster..")
logger.info("collecting metrics from DSS cluster..")

num_seconds = self.configs['polling_interval_secs']
num_iterations = 1
Expand Down Expand Up @@ -109,7 +113,7 @@ def get_metrics(self):
try:
proc.start()
except Exception as error:
print(f"Failed to start proccess {proc.name}: {str(error)}")
logger.error(f"Failed to start {proc.name}:{str(error)}")

# run collectors for specified duration
time.sleep(self.configs["metrics_agent_runtime_per_interval"])
Expand All @@ -122,10 +126,10 @@ def get_metrics(self):
try:
proc.join(COLLECTOR_TIMEOUT)
if proc.is_alive():
print(f"process {proc.name} is hanging, terminating..")
logger.warning(f"process {proc.name} hanging, terminating")
proc.terminate()
except Exception as error:
print(f"Failed to terminate process {proc.name}: {str(error)}")
logger.error(f"Failed to terminate {proc.name}: {str(error)}")

# populate Prometheus metric objects
for m in metrics_data_buffer:
Expand All @@ -141,6 +145,7 @@ def get_metrics(self):


if __name__ == '__main__':

nsarras marked this conversation as resolved.
Show resolved Hide resolved
# load CLI args
parser = argparse.ArgumentParser(description='DSS Metrics Agent CLI')
parser.add_argument(
Expand All @@ -167,8 +172,34 @@ def get_metrics(self):
# merge configs
configs.update(cli_args)

# create logger
logfile_path = configs["logging_file"]
nsarras marked this conversation as resolved.
Show resolved Hide resolved
if not logfile_path:
raise ValueError("Missing logging_file parameter in config file")

os.makedirs(os.path.dirname(logfile_path), exist_ok=True)
if not os.path.exists(logfile_path):
os.mknod(logfile_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.mknod() is used to create /dev/ usually.
Instead of that you can do

with os.open(file_path, flags) as f:
      pass


log_format = (
logging.Formatter('%(asctime)s %(levelname)s:%(name)s %(message)s')
)
logger = logging.getLogger('root')
logger.setLevel(logging.INFO)

# max size of log file set to 100MB
file_handler = RotatingFileHandler(logfile_path, mode='a',
maxBytes=100*1024*1024, backupCount=1,
encoding=None, delay=0)
console_handler = logging.StreamHandler()
file_handler.setFormatter(log_format)
console_handler.setFormatter(log_format)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.info("Successfully created logger!")

# expose metrics on promotheus endpoint
print("\n\n starting http server.... \n\n")
logger.info("\n\n starting http server.... \n\n")
try:
start_http_server(8000)
REGISTRY.register(
Expand All @@ -177,4 +208,4 @@ def get_metrics(self):
while True:
time.sleep(1)
except Exception as error:
print(f"Failed to start Metrics http server: {str(error)}")
logger.info(f"Failed to start Metrics http server: {str(error)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can call logger.exception to catch the backtrace

72 changes: 47 additions & 25 deletions dss_metrics/minio_rest_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"""

import json
import logging
import metrics
import re
import requests
Expand All @@ -53,35 +54,56 @@ def __init__(self, configs, whitelist_patterns, filter=False):
self.conf_json_bucket_suffix = configs['conf_json_bucket_suffix']
self.cluster_id = self.configs['cluster_id']
self.TYPE = 'minio'
self.logger = logging.getLogger('root')

def poll_statistics(self, metrics_data_buffer, exit_flag):
cluster_endpoint_map = self.get_miniocluster_endpoint_map()
cluster_endpoint_map_items = cluster_endpoint_map.items()

while True:
if exit_flag.is_set():
break
for _, endpts in cluster_endpoint_map.items():
if not endpts or len(endpts) == 0:
continue

minio_endpoint = endpts.pop()
miniocluster_id = self.get_minio_cluster_uuid(minio_endpoint)
minio_metrics = self.get_minio_metrics_from_endpoint(
minio_endpoint)

tags = {}
tags['cluster_id'] = self.cluster_id
tags['target_id'] = socket.gethostname()
tags['minio_id'] = miniocluster_id
tags['type'] = self.TYPE

for metric in minio_metrics:
if self.filter and not self.check_whitelist_key(metric[0]):
continue
metrics_data_buffer.append(
metrics.MetricInfo(
metric[0], metric[0], metric[1], tags, time.time())
)
if (not cluster_endpoint_map_items or
len(cluster_endpoint_map_items) == 0):
self.logger.error("No MINIO endpoints found")
raise ValueError("No MINIO endpoints found")

all_minio_endpoints = []
for _, endpts in cluster_endpoint_map_items:
all_minio_endpoints.extend(list(endpts))

try:
while True:
if exit_flag.is_set():
break

for minio_endpoint in all_minio_endpoints:
miniocluster_id = self.get_minio_cluster_uuid(
minio_endpoint)
minio_metrics = self.get_minio_metrics_from_endpoint(
minio_endpoint)

if not miniocluster_id or not minio_metrics:
self.logger.error("Failed to retrieve MINIO metadata")
raise ValueError("Failed to retrieve MINIO metadata")

tags = {}
tags['cluster_id'] = self.cluster_id
tags['target_id'] = socket.gethostname()
nsarras marked this conversation as resolved.
Show resolved Hide resolved
tags['minio_id'] = miniocluster_id
tags['minio_endpoint'] = minio_endpoint
tags['type'] = self.TYPE

for metric in minio_metrics:
if (self.filter and not
self.check_whitelist_key(metric[0])):
continue
metrics_data_buffer.append(
metrics.MetricInfo(
metric[0], metric[0], metric[1], tags,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass metrics selectively?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because we may not want to expose all metrics to the end user all of the time, hence why we filter using a whitelist if the filter flag is given

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, why not metrics.MetricInfo(metric, tags..) ? Let MetricInfo filter out.
metrics.MetricInfo(metric[0], metric[0], metric[1], tags, doen't really help readability, else its better to use named params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See new commits on PR, I made a lot of changes and not using this approach anymore

time.time())
)
self.logger.debug(f"{__file__} captured {metric}")
except Exception as error:
self.logger.error(
f"Error: {str(error)} during MINIO REST collection")

nsarras marked this conversation as resolved.
Show resolved Hide resolved
def check_whitelist_key(self, key):
for regex in self.whitelist_patterns:
Expand Down
12 changes: 7 additions & 5 deletions dss_metrics/minio_ustat_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"""

import json
import logging
import metrics
import os
import re
Expand All @@ -52,6 +53,7 @@ def __init__(self, configs, seconds, num_iterations,
self.whitelist_patterns = whitelist_patterns
self.filter = filter
self.TYPE = 'minio'
self.logger = logging.getLogger('root')

def poll_statistics(self, metrics_data_buffer, exit_flag):
minio_uuid = None
Expand All @@ -77,7 +79,7 @@ def poll_statistics(self, metrics_data_buffer, exit_flag):
stats_output['time'] = collection_time
minio_proc_map[minio_uuid] = proc
except Exception as error:
print(f'Caught exception while running USTAT {str(error)}')
self.logger.error(f'Error while running USTAT {str(error)}')

device_subsystem_map = utils.get_device_subsystem_map()
for minio_uuid, proc in minio_proc_map.items():
Expand Down Expand Up @@ -136,14 +138,14 @@ def poll_statistics(self, metrics_data_buffer, exit_flag):
value, tags, time.time())
)
except Exception as error:
print('Failed to handle line %s, Error: %s', line,
str(error))
self.logger.error('Error handling line %s, Error: %s',
line, str(error))

def shutdown_ustat_collector(self, proc):
try:
proc.terminate()
except Exception:
print('ustat process termination exception ', exc_info=True)
self.logger.warning('failed to terminate ustat', exc_info=True)
proc.kill()

def get_minio_instances(self):
Expand All @@ -153,7 +155,7 @@ def get_minio_instances(self):
try:
pid_list = utils.find_process_pid(proc_name, cmds)
except Exception as error:
print(
self.logger.error(
f'Error: unable to get MINIO PID list {str(error)}'
)
return pid_list
Expand Down
10 changes: 6 additions & 4 deletions dss_metrics/nvmftarget_ustat_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""

import logging
import metrics
import re
import socket
Expand All @@ -50,6 +51,7 @@ def __init__(self, configs, seconds, num_iterations,
self.whitelist_patterns = whitelist_patterns
self.filter = filter
self.TYPE = 'target'
self.logger = logging.getLogger('root')

def poll_statistics(self, metrics_data_buffer, exit_flag):
try:
Expand All @@ -61,7 +63,7 @@ def poll_statistics(self, metrics_data_buffer, exit_flag):
)
proc = subprocess.Popen(cmd.split(' '), stdout=subprocess.PIPE)
except Exception as e:
print(f'Caught exception while running USTAT {str(e)}')
self.logger.error(f'Error while running USTAT {str(e)}')
return

subsystem_num_to_nqn_map = {}
Expand Down Expand Up @@ -134,14 +136,14 @@ def poll_statistics(self, metrics_data_buffer, exit_flag):
}
raw_data_queue.append(data)

except Exception as error:
print(f'Failed to handle line {line}, Error: {str(error)}')
except Exception as e:
self.logger.error(f'Failed to handle {line}, Error: {str(e)}')

def shutdown_ustat_collector(self, proc):
try:
proc.terminate()
except Exception:
print('ustat process termination exception ', exc_info=True)
self.logger.warning('failed to terminate ustat', exc_info=True)
proc.kill()

def store_metrics(self, raw_data_queue, subsystem_num_to_nqn_map,
Expand Down