Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifying logic to provide metrics per MINIO endpoint #83

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dss_metrics/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"prometheus_port_num": 8000,
"polling_interval_secs": 1,
"cluster_id": "c01",
"logging_level": "info",
"logging_level": "INFO",
"logging_file": "/var/log/dss/metrics_agent.log",
"metric_categories": ["minio", "target"]
}
178 changes: 76 additions & 102 deletions dss_metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,116 +29,32 @@
OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
from collections import namedtuple
from prometheus_client import start_http_server, Summary, REGISTRY, Metric

import argparse
import json
import multiprocessing
import logging
import os
import time
import utils
from logging.handlers import RotatingFileHandler
from pathlib import Path
from prometheus_client import start_http_server, REGISTRY

import minio_ustat_collector
import minio_rest_collector
import nvmftarget_ustat_collector

MetricInfo = namedtuple("MetricInfo", "key, name, value, tags, timestamp")
COLLECTOR_TIMEOUT = 120


class MetricsCollector(object):
def __init__(self, configs):
self.configs = configs
self.filter = self.configs['filter']
self.whitelist_patterns = utils.get_whitelist_keys()
self.exit_flag = multiprocessing.Event()

def collect(self):
metrics = self.get_metrics()
for metric in metrics:
yield metric

def create_collector_proc(self, name, obj, metrics_data_buffer, exit_flag):
try:
proc = multiprocessing.Process(
name=name,
target=obj.poll_statistics,
args=[metrics_data_buffer, exit_flag]
)
return proc
except Exception as error:
print(f"Error launching collector {error}")

def get_metrics(self):
manager = multiprocessing.Manager()
metrics_data_buffer = manager.list()
metrics = []

print("collecting metrics from DSS cluster..")

num_seconds = self.configs['polling_interval_secs']
num_iterations = 1

collector_procs = []

# initialize collectors
target_ustat_obj = nvmftarget_ustat_collector.NVMFTargetUSTATCollector(
self.configs, num_seconds, num_iterations, self.whitelist_patterns,
filter=self.filter
)
minio_ustat_obj = minio_ustat_collector.MinioUSTATCollector(
self.configs, num_seconds, num_iterations, self.whitelist_patterns,
filter=self.filter
)
minio_rest_obj = minio_rest_collector.MinioRESTCollector(
self.configs, self.whitelist_patterns, filter=self.filter
)

# initialize collector processes
collector_procs.append(self.create_collector_proc(
"target_ustat", target_ustat_obj, metrics_data_buffer,
self.exit_flag))
collector_procs.append(self.create_collector_proc(
"minio_ustat", minio_ustat_obj, metrics_data_buffer,
self.exit_flag))
collector_procs.append(self.create_collector_proc(
"minio_rest", minio_rest_obj, metrics_data_buffer,
self.exit_flag))

# start collectors
for proc in collector_procs:
try:
proc.start()
except Exception as error:
print(f"Failed to start proccess {proc.name}: {str(error)}")

# run collectors for specified duration
time.sleep(self.configs["metrics_agent_runtime_per_interval"])

# send exit flag to stop collectors
self.exit_flag.set()

# wait for collectors to finish and kill any remaining collectors
for proc in collector_procs:
try:
proc.join(COLLECTOR_TIMEOUT)
if proc.is_alive():
print(f"process {proc.name} is hanging, terminating..")
proc.terminate()
except Exception as error:
print(f"Failed to terminate process {proc.name}: {str(error)}")

# populate Prometheus metric objects
for m in metrics_data_buffer:
metric = Metric(m.name, m.key, 'gauge')
metric.add_sample(
m.name,
value=m.value,
labels=m.tags,
timestamp=m.timestamp
)
metrics.append(metric)
return metrics
APP_NAME = "DSS Metrics Agent"

LOGGING_LEVEL = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARN,
"ERROR": logging.ERROR,
"EXCEPTION": logging.ERROR,
"FATAL": logging.FATAL
}

if __name__ == '__main__':
# load CLI args
Expand All @@ -161,20 +77,78 @@ def get_metrics(self):

# load config file args
configs = {}

with open(cli_args['config'], "rb") as cfg:
configs = json.loads(cfg.read().decode('UTF-8', "ignore"))

# merge configs
configs.update(cli_args)

# get other required metadata
filter = configs['filter']
whitelist_patterns = utils.get_whitelist_keys()
polling_interval_secs = configs['polling_interval_secs']
num_iterations = 1
metrics_scopes = {}
# load metrics scope settings
with open("metrics_scope.json", "rb") as scopes:
metrics_scopes = json.loads(scopes.read().decode('UTF-8', "ignore"))

# need to deserialize escape characters from json
metrics_scopes = {string.encode().decode('unicode_escape'): scope
for string, scope in metrics_scopes.items()}

# create logger
logfile_path = configs["logging_file"]
nsarras marked this conversation as resolved.
Show resolved Hide resolved
if not logfile_path:
raise ValueError("Missing logging_file parameter in config file")

os.makedirs(os.path.dirname(logfile_path), exist_ok=True)
logfile = Path(logfile_path)
logfile.touch(exist_ok=True)

logger = logging.getLogger(APP_NAME)
logger.setLevel(LOGGING_LEVEL[configs['logging_level']])

log_format = (
logging.Formatter('%(asctime)s %(levelname)s:%(name)s %(message)s')
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_format)
logger.addHandler(console_handler)

# max size of log file set to 100MB
file_handler = RotatingFileHandler(logfile_path, mode='a',
maxBytes=100*1024*1024, backupCount=1,
encoding=None, delay=0)
file_handler.setFormatter(log_format)
logger.addHandler(file_handler)

logger.info("Successfully created logger!")

# expose metrics on promotheus endpoint
print("\n\n starting http server.... \n\n")
logger.info("\n\n starting server.... \n\n")
try:
start_http_server(8000)

REGISTRY.register(
MetricsCollector(configs)
minio_rest_collector.MinioRESTCollector(
configs, metrics_scopes, whitelist_patterns, filter)
)
REGISTRY.register(
nvmftarget_ustat_collector.NVMFTargetUSTATCollector(
configs, metrics_scopes, polling_interval_secs, num_iterations,
whitelist_patterns, filter
)
)
REGISTRY.register(
minio_ustat_collector.MinioUSTATCollector(
configs, metrics_scopes, polling_interval_secs, num_iterations,
whitelist_patterns, filter
)
)

while True:
time.sleep(1)
except Exception as error:
print(f"Failed to start Metrics http server: {str(error)}")
logger.info(f"Failed to start Metrics http server: {str(error)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can call logger.exception to catch the backtrace

6 changes: 6 additions & 0 deletions dss_metrics/metrics_scope.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"target.subsystem\\d+.kvio.(puts|gets|dels|putBandwidth|getBandwidth)": "target",
"minio_upstream.outstanding.s3_get_req": "minio cluster",
"minio_disk_storage_used_bytes": "minio cluster",
"minio_disk_storage_total_capacity_bytes": "minio cluster"
}
Loading