From 3888fe581d0e44a0d5e8552226a16ceacf2b20b6 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 8 Jul 2024 18:04:28 +0200 Subject: [PATCH 01/39] Initial file for the opmon microservice --- opmon-protobuf-dbwriter/dbwriter.py | 219 ++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 opmon-protobuf-dbwriter/dbwriter.py diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py new file mode 100644 index 0000000..4ee492f --- /dev/null +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -0,0 +1,219 @@ +# @file dbwriter.py Writing Opmon entries into InfluxDB +# This is part of the DUNE DAQ software, copyright 2020. +# Licensing/copyright details are in the COPYING file that you should have +# received with this code. +# + +import kafkaopmon.OpMonSubscriber as opmonsub +import google.protobuf.json_format as pb_json +from influxdb import InfluxDBClient +from functools import partial +import psycopg2 +import json +import click +import logging + + +CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--subscriber-bootstrap', type=click.STRING, default="monkafka.cern.ch:30092", help="boostrap server and port of the OpMonSubscriber") +@click.option('--subscriber-group', type=click.STRING, default=None, help='group ID of the OpMonSubscriber') +@click.option('--subscriber-timeout', type=click.INT, default=500, help='timeout in ms used in the OpMonSubscriber') + +@click.option('--influxdb-address', type=click.STRING, default='opmondb.cern.ch', help='address of the influx db') +@click.option('--influxdb-port', type=click.INT, default=31002, help='port of the influxdb') +@click.option('--influxdb-name', type=click.STRING, default='influxv3', help='name used in the influxdb query') +@click.option('--influxdb-create', type=click.BOOL, default=True, help='Creates the influxdb if it does not exists') + +@click.option('--debug', type=click.BOOL, default=True, help='Set debug print levels') + +def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, + influxdb_address, influxdb_port, influxdb_name, influxdb_create, + debug): + + logging.basicConfig( + format='%(asctime)s %(levelname)-8s %(message)s', + level=logging.DEBUG if debug else logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S') + + try: + con = psycopg2.connect(host=db_address, + port=db_port, + user=db_user, + password=db_password, + dbname=db_name) + except Exception as e: + logging.error(e) + logging.fatal('Connection to the database failed, aborting...') + exit() + + global table_name + table_name = '"' + db_table + '"' + + cur = con.cursor() + + try: # try to make sure tables exist + create_database(cursor=cur, connection=con) + except: + con.rollback() + logging.info( "Database was already created" ) + else : + logging.info( "Database creation: Success" ) + finally: + logging.info( "Database is ready" ) + + check_tables(cursor=cur, connection=con) + + subscriber_conf = json.loads("{}") + subscriber_conf["bootstrap"] = subscriber_bootstrap + subscriber_conf["timeout"] = subscriber_timeout + if subscriber_group: + subscriber_conf["group_id"] = subscriber_group + + sub = erssub.ERSSubscriber(subscriber_conf) + + callback_function = partial(process_chain, + cursor=cur, + connection=con) + + sub.add_callback(name="postgres", + function=callback_function) + + sub.start() + + +def process_chain( chain, cursor, connection ) : + logging.debug(chain) + + counter = 0; + success = False + while(not success) : + counter += 1 + try : + for cause in reversed(chain.causes) : + process_issue(issue=cause, + session=chain.session, + cursor=cursor) + + process_issue(issue=chain.final, + session=chain.session, + cursor=cursor) + connection.commit() + except psycopg2.errors.UndefinedTable as e: + logging.error(e) + logging.error("Table was undefined yet it was supposed to be defined at this point") + connection.rollback() + create_database(cursor=cursor, + connection=connection) + except psycopg2.errors.UndefinedColumn as e: + logging.warning(e) + connection.rollback() + clean_database(cursor=cursor, + connection=connection) + create_database(cursor=cursor, + connection=connection) + except Exception as e: + logging.error("Something unexpected happened") + logging.error(e) + + else: + success=True + logging.debug(f"Entry sent after {counter} attempts") + + if (counter > 2) : + if not success : + logging.error("Issue failed to be delivered") + logging.error(pb_json.MessageToJson(chain)) + break + + +def process_issue( issue, session, cursor ) : + fields = [] + values = [] + + ## top level info + add_entry("session", session, fields, values) + add_entry("issue_name", issue.name, fields, values) + add_entry("severity", issue.severity, fields, values) + add_entry("time", issue.time, fields, values) + + ## context related info + add_entry("cwd", issue.context.cwd, fields, values) + add_entry("file_name", issue.context.file_name, fields, values) + add_entry("function_name", issue.context.function_name, fields, values) + add_entry("host_name", issue.context.host_name, fields, values) + add_entry("line_number", issue.context.line_number, fields, values) + add_entry("package_name", issue.context.package_name, fields, values) + + add_entry("process_id", issue.context.process_id, fields, values) + add_entry("thread_id", issue.context.thread_id, fields, values) + add_entry("user_id", issue.context.user_id, fields, values) + add_entry("user_name", issue.context.user_name, fields, values) + add_entry("application_name", issue.context.application_name, fields, values) + + # heavy information + add_entry("inheritance", '/'.join(issue.inheritance), fields, values) + add_entry("message", issue.message, fields, values) + add_entry("params", issue.parameters, fields, values) + + command = f"INSERT INTO {table_name} ({','.join(fields)}) VALUES ({('%s, ' * len(values))[:-2]});" + + logging.debug(command) + cursor.execute(command, values) + + +def add_entry(field, value, fields, values): + fields.append(field) + values.append(str(value)) + + +def clean_database(cursor, connection): + command = f"DROP TABLE {table_name} ;" + + logging.debug(command) + cursor.execute(command) + connection.commit() + +def check_tables(cursor, connection) : + command = """SELECT relname FROM pg_class WHERE relkind='r' + AND relname !~ '^(pg_|sql_)';""" + + logging.debug(command) + cursor.execute(command) + tables = [i[0] for i in cursor.fetchall()] # A list() of tables. + logging.info(f"Tables: {tables}") + return tables + +def create_database(cursor, connection): + command = f"CREATE TABLE {table_name} (" + command += ''' + session TEXT, + issue_name TEXT, + inheritance TEXT, + message TEXT, + params TEXT, + severity TEXT, + time BIGINT, + cwd TEXT, + file_name TEXT, + function_name TEXT, + host_name TEXT, + package_name TEXT, + user_name TEXT, + application_name TEXT, + user_id INT, + process_id INT, + thread_id INT, + line_number INT + ); ''' + + logging.debug(command) + cursor.execute(command) + connection.commit() + + + +if __name__ == '__main__': + cli() + From e6cd1a93d6370abef9d146d50d6f2d28b77901db Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 10 Jul 2024 12:53:38 +0200 Subject: [PATCH 02/39] initial draft for testing --- opmon-protobuf-dbwriter/dbwriter.py | 232 +++++++--------------------- 1 file changed, 60 insertions(+), 172 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 4ee492f..aa466f7 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -4,23 +4,30 @@ # received with this code. # -import kafkaopmon.OpMonSubscriber as opmonsub +import kafkaopmon.OpMonSubscriber as opmon_sub import google.protobuf.json_format as pb_json +from google.protobuf.timestamp_pb2 import Timestamp +import opmonlib.opmon_entry_pb2 as opmon_schema + from influxdb import InfluxDBClient from functools import partial -import psycopg2 import json import click import logging +import queue +import threading CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.command(context_settings=CONTEXT_SETTINGS) +# subscriber options @click.option('--subscriber-bootstrap', type=click.STRING, default="monkafka.cern.ch:30092", help="boostrap server and port of the OpMonSubscriber") -@click.option('--subscriber-group', type=click.STRING, default=None, help='group ID of the OpMonSubscriber') -@click.option('--subscriber-timeout', type=click.INT, default=500, help='timeout in ms used in the OpMonSubscriber') +@click.option('--subscriber-group', type=click.STRING, default=None, help='group ID of the OpMonSubscriber') +@click.option('--subscriber-timeout', type=click.INT, default=500, help='timeout in ms used in the OpMonSubscriber') +@click.option('--subscriber-topic', type=click.STRING, multiple=True, default=['opmon_stream'] ) +#influx options @click.option('--influxdb-address', type=click.STRING, default='opmondb.cern.ch', help='address of the influx db') @click.option('--influxdb-port', type=click.INT, default=31002, help='port of the influxdb') @click.option('--influxdb-name', type=click.STRING, default='influxv3', help='name used in the influxdb query') @@ -37,182 +44,63 @@ def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, level=logging.DEBUG if debug else logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') - try: - con = psycopg2.connect(host=db_address, - port=db_port, - user=db_user, - password=db_password, - dbname=db_name) - except Exception as e: - logging.error(e) - logging.fatal('Connection to the database failed, aborting...') - exit() - - global table_name - table_name = '"' + db_table + '"' - - cur = con.cursor() - - try: # try to make sure tables exist - create_database(cursor=cur, connection=con) - except: - con.rollback() - logging.info( "Database was already created" ) - else : - logging.info( "Database creation: Success" ) - finally: - logging.info( "Database is ready" ) - - check_tables(cursor=cur, connection=con) - - subscriber_conf = json.loads("{}") - subscriber_conf["bootstrap"] = subscriber_bootstrap - subscriber_conf["timeout"] = subscriber_timeout - if subscriber_group: - subscriber_conf["group_id"] = subscriber_group - - sub = erssub.ERSSubscriber(subscriber_conf) - - callback_function = partial(process_chain, - cursor=cur, - connection=con) + influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) + db_list = influx.get_list_database() + logging.info("Available DBs:",db_list) + if {"name":influxdb_name} not in db_list: + logging.warning(influxdb_name, "DB not available") + if influxdb_create: + influx.create_database(influxdb_name); + logging.info("New list of DBs:", influx.get_list_database()) + + influx.switch_database(influxdb_name) + + sub = opmon_sub.OpMonSubscriber( bootstrap=subscriber_bootstrap, + topics=topic, + group_id = subscriber_group, + timeout_ms = subscriber_timeout) + + # this is a list of single json entries + q = queue.Queue() + + callback_function = partial(process_entry, + q = q ) - sub.add_callback(name="postgres", + sub.add_callback(name="to_influx", function=callback_function) sub.start() - -def process_chain( chain, cursor, connection ) : - logging.debug(chain) - - counter = 0; - success = False - while(not success) : - counter += 1 - try : - for cause in reversed(chain.causes) : - process_issue(issue=cause, - session=chain.session, - cursor=cursor) - - process_issue(issue=chain.final, - session=chain.session, - cursor=cursor) - connection.commit() - except psycopg2.errors.UndefinedTable as e: - logging.error(e) - logging.error("Table was undefined yet it was supposed to be defined at this point") - connection.rollback() - create_database(cursor=cursor, - connection=connection) - except psycopg2.errors.UndefinedColumn as e: - logging.warning(e) - connection.rollback() - clean_database(cursor=cursor, - connection=connection) - create_database(cursor=cursor, - connection=connection) - except Exception as e: - logging.error("Something unexpected happened") - logging.error(e) - - else: - success=True - logging.debug(f"Entry sent after {counter} attempts") - - if (counter > 2) : - if not success : - logging.error("Issue failed to be delivered") - logging.error(pb_json.MessageToJson(chain)) - break - - -def process_issue( issue, session, cursor ) : - fields = [] - values = [] - - ## top level info - add_entry("session", session, fields, values) - add_entry("issue_name", issue.name, fields, values) - add_entry("severity", issue.severity, fields, values) - add_entry("time", issue.time, fields, values) - - ## context related info - add_entry("cwd", issue.context.cwd, fields, values) - add_entry("file_name", issue.context.file_name, fields, values) - add_entry("function_name", issue.context.function_name, fields, values) - add_entry("host_name", issue.context.host_name, fields, values) - add_entry("line_number", issue.context.line_number, fields, values) - add_entry("package_name", issue.context.package_name, fields, values) - - add_entry("process_id", issue.context.process_id, fields, values) - add_entry("thread_id", issue.context.thread_id, fields, values) - add_entry("user_id", issue.context.user_id, fields, values) - add_entry("user_name", issue.context.user_name, fields, values) - add_entry("application_name", issue.context.application_name, fields, values) - - # heavy information - add_entry("inheritance", '/'.join(issue.inheritance), fields, values) - add_entry("message", issue.message, fields, values) - add_entry("params", issue.parameters, fields, values) - - command = f"INSERT INTO {table_name} ({','.join(fields)}) VALUES ({('%s, ' * len(values))[:-2]});" - - logging.debug(command) - cursor.execute(command, values) - - -def add_entry(field, value, fields, values): - fields.append(field) - values.append(str(value)) - - -def clean_database(cursor, connection): - command = f"DROP TABLE {table_name} ;" - - logging.debug(command) - cursor.execute(command) - connection.commit() - -def check_tables(cursor, connection) : - command = """SELECT relname FROM pg_class WHERE relkind='r' - AND relname !~ '^(pg_|sql_)';""" +def process_entry( entry : opmon_schema.OpMonEntry, + q : queue.Queue ) : + d = to_dict(entry) + js = json.dumps(d) + #q.put(js) + logging.debug(js) + +def to_dict( entry : opmon_schema.OpMonEntry ) -> dict : + ret = dict(measurement = entry.measurement) + ret['fields'] = entry.data ## will this work as expected? + ret['tags'] = create_tags(entry) + ret['time'] = entry.time.ToJsonString() + + +def create_tags( entry : opmon_schema.OpMonEntry ) -> dict : + opmon_id = entry.origin + #session and application + tags = dict(session = opmon_id.session, + application = opmon_id.application) - logging.debug(command) - cursor.execute(command) - tables = [i[0] for i in cursor.fetchall()] # A list() of tables. - logging.info(f"Tables: {tables}") - return tables - -def create_database(cursor, connection): - command = f"CREATE TABLE {table_name} (" - command += ''' - session TEXT, - issue_name TEXT, - inheritance TEXT, - message TEXT, - params TEXT, - severity TEXT, - time BIGINT, - cwd TEXT, - file_name TEXT, - function_name TEXT, - host_name TEXT, - package_name TEXT, - user_name TEXT, - application_name TEXT, - user_id INT, - process_id INT, - thread_id INT, - line_number INT - ); ''' - - logging.debug(command) - cursor.execute(command) - connection.commit() + #element and subelements + struct = opmon_id.substructure + for i in range(len(struct)) + name='sub'*i + 'element' + tags[name] = struct[i] + #custom origin + tags |= entry.custom_origin + return tags if __name__ == '__main__': cli() From 8546ed725c2f5b82875de9fa98d64fbfb0599647 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 10 Jul 2024 13:00:20 +0200 Subject: [PATCH 03/39] Running state for tests --- opmon-protobuf-dbwriter/dbwriter.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index aa466f7..ba6170a 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -35,7 +35,7 @@ @click.option('--debug', type=click.BOOL, default=True, help='Set debug print levels') -def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, +def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_topic, influxdb_address, influxdb_port, influxdb_name, influxdb_create, debug): @@ -44,19 +44,19 @@ def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, level=logging.DEBUG if debug else logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') - influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) - db_list = influx.get_list_database() - logging.info("Available DBs:",db_list) - if {"name":influxdb_name} not in db_list: - logging.warning(influxdb_name, "DB not available") - if influxdb_create: - influx.create_database(influxdb_name); - logging.info("New list of DBs:", influx.get_list_database()) +# influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) +# db_list = influx.get_list_database() +# logging.info("Available DBs:",db_list) +# if {"name":influxdb_name} not in db_list: +# logging.warning(influxdb_name, "DB not available") +# if influxdb_create: +# influx.create_database(influxdb_name); +# logging.info("New list of DBs:", influx.get_list_database()) - influx.switch_database(influxdb_name) +# influx.switch_database(influxdb_name) sub = opmon_sub.OpMonSubscriber( bootstrap=subscriber_bootstrap, - topics=topic, + topics=subscriber_topic, group_id = subscriber_group, timeout_ms = subscriber_timeout) @@ -93,7 +93,7 @@ def create_tags( entry : opmon_schema.OpMonEntry ) -> dict : #element and subelements struct = opmon_id.substructure - for i in range(len(struct)) + for i in range(len(struct)) : name='sub'*i + 'element' tags[name] = struct[i] From 4142862648981973138cfd342f9abc462625ef6a Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 10 Jul 2024 14:47:01 +0200 Subject: [PATCH 04/39] correct creation of the json structure --- opmon-protobuf-dbwriter/dbwriter.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index ba6170a..ac9ce94 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -75,14 +75,27 @@ def process_entry( entry : opmon_schema.OpMonEntry, q : queue.Queue ) : d = to_dict(entry) js = json.dumps(d) - #q.put(js) logging.debug(js) + #q.put(js) + def to_dict( entry : opmon_schema.OpMonEntry ) -> dict : ret = dict(measurement = entry.measurement) - ret['fields'] = entry.data ## will this work as expected? + ret['fields'] = unpack_payload(entry) ret['tags'] = create_tags(entry) ret['time'] = entry.time.ToJsonString() + return ret + +def unpack_payload( entry : opmon_schema.OpMonEntry ) -> dict : + data = entry.data + ret = dict() + for key in data : + value = data[key] + kind = value.WhichOneof('kind') + casted_value = getattr(value, value.WhichOneof('kind')) + ret[key] = casted_value + + return ret def create_tags( entry : opmon_schema.OpMonEntry ) -> dict : From 4b974992309986912097230f030ec49923fd6f02 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 10 Jul 2024 14:54:37 +0200 Subject: [PATCH 05/39] a bit of cleanup --- opmon-protobuf-dbwriter/dbwriter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index ac9ce94..b76fb9d 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -30,7 +30,7 @@ #influx options @click.option('--influxdb-address', type=click.STRING, default='opmondb.cern.ch', help='address of the influx db') @click.option('--influxdb-port', type=click.INT, default=31002, help='port of the influxdb') -@click.option('--influxdb-name', type=click.STRING, default='influxv3', help='name used in the influxdb query') +@click.option('--influxdb-name', type=click.STRING, default='test_influx', help='name used in the influxdb query') @click.option('--influxdb-create', type=click.BOOL, default=True, help='Creates the influxdb if it does not exists') @click.option('--debug', type=click.BOOL, default=True, help='Set debug print levels') @@ -91,7 +91,6 @@ def unpack_payload( entry : opmon_schema.OpMonEntry ) -> dict : ret = dict() for key in data : value = data[key] - kind = value.WhichOneof('kind') casted_value = getattr(value, value.WhichOneof('kind')) ret[key] = casted_value From 1851d7702408893904bc681b606148321a6764c7 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 10 Jul 2024 18:04:04 +0200 Subject: [PATCH 06/39] Great progress in the microservice --- opmon-protobuf-dbwriter/dbwriter.py | 53 +++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index b76fb9d..a62e058 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -60,7 +60,7 @@ def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_t group_id = subscriber_group, timeout_ms = subscriber_timeout) - # this is a list of single json entries + # this is a list of Entries q = queue.Queue() callback_function = partial(process_entry, @@ -68,15 +68,57 @@ def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_t sub.add_callback(name="to_influx", function=callback_function) + + thread = threading.Thread(target=consume, daemon=True, args=(q,1,) ) + thread.start() sub.start() +def consume( q : queue.Queue, timeout_ms, + influx : InfluxDBClient = None ) : + logging.info("Starting consumer thread") + batch=[] + batch_ms = 0 + while True : + try : + entry = q.get(timeout=1) + + if ( entry.ms - batch_ms < timeout_ms ) : + batch.append(entry.json) + batch_ms = min(batch_ms, entry.ms) + + if ( entry.ms - batch_ms >= timeout_ms ) : + send_batch(batch) + batch=[entry.json] + batch_ms = entry.ms + + except queue.Empty : + logging.debug("Queue is empty") + send_batch(batch, influx) + batch=[] + batch_ms=0 + +def send_batch( batch : list, + influx : InfluxDBClient = None ) : + if len(batch) > 0 : + logging.debug("Sending %s points", len(batch) ) + if influx : + try : + influx.write_points(batch) + except influxdb.exceptions.InfluxDBClientError as e: + logging.error(e) + except : + logging.error("Something went wrong: json batch not sent") + else : + print(batch) + + def process_entry( entry : opmon_schema.OpMonEntry, q : queue.Queue ) : d = to_dict(entry) js = json.dumps(d) - logging.debug(js) - #q.put(js) + e = Entry(json=js, ms=entry.time.ToMilliseconds()) + q.put(e) def to_dict( entry : opmon_schema.OpMonEntry ) -> dict : @@ -114,6 +156,11 @@ def create_tags( entry : opmon_schema.OpMonEntry ) -> dict : return tags +class Entry : + def __init__(self, json, ms) : + self.json = json + self.ms = ms + if __name__ == '__main__': cli() From dc6b778e179e2b3cca9267838d80692b5d9978b0 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 11 Jul 2024 11:10:38 +0200 Subject: [PATCH 07/39] Some clenup --- opmon-protobuf-dbwriter/dbwriter.py | 44 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index a62e058..39e932f 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -22,21 +22,22 @@ @click.command(context_settings=CONTEXT_SETTINGS) # subscriber options -@click.option('--subscriber-bootstrap', type=click.STRING, default="monkafka.cern.ch:30092", help="boostrap server and port of the OpMonSubscriber") -@click.option('--subscriber-group', type=click.STRING, default=None, help='group ID of the OpMonSubscriber') -@click.option('--subscriber-timeout', type=click.INT, default=500, help='timeout in ms used in the OpMonSubscriber') -@click.option('--subscriber-topic', type=click.STRING, multiple=True, default=['opmon_stream'] ) +@click.option('--subscriber-bootstrap', type=click.STRING, default="monkafka.cern.ch:30092", help="boostrap server and port of the OpMonSubscriber") +@click.option('--subscriber-group', type=click.STRING, default=None, help='group ID of the OpMonSubscriber') +@click.option('--subscriber-timeout', type=click.INT, default=500, help='timeout in ms used in the OpMonSubscriber') +@click.option('--subscriber-topic', type=click.STRING, multiple=True, default=['opmon_stream'], help='The system will add the "monitoring." prefix' ) #influx options -@click.option('--influxdb-address', type=click.STRING, default='opmondb.cern.ch', help='address of the influx db') -@click.option('--influxdb-port', type=click.INT, default=31002, help='port of the influxdb') -@click.option('--influxdb-name', type=click.STRING, default='test_influx', help='name used in the influxdb query') -@click.option('--influxdb-create', type=click.BOOL, default=True, help='Creates the influxdb if it does not exists') +@click.option('--influxdb-address', type=click.STRING, default='np04-srv-017', help='address of the influx db') +@click.option('--influxdb-port', type=click.INT, default=31002, help='port of the influxdb') +@click.option('--influxdb-name', type=click.STRING, default='test_influx', help='name used in the influxdb query') +@click.option('--influxdb-create', type=click.BOOL, default=True, help='Creates the influxdb if it does not exists') +@click.option('--influxdb-timeout', type=click.INT, default=500, help='Size in ms of the batches sent to influx') @click.option('--debug', type=click.BOOL, default=True, help='Set debug print levels') def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_topic, - influxdb_address, influxdb_port, influxdb_name, influxdb_create, + influxdb_address, influxdb_port, influxdb_name, influxdb_create, influxdb_timeout, debug): logging.basicConfig( @@ -44,16 +45,16 @@ def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_t level=logging.DEBUG if debug else logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') -# influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) -# db_list = influx.get_list_database() -# logging.info("Available DBs:",db_list) -# if {"name":influxdb_name} not in db_list: -# logging.warning(influxdb_name, "DB not available") -# if influxdb_create: -# influx.create_database(influxdb_name); -# logging.info("New list of DBs:", influx.get_list_database()) + influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) + db_list = influx.get_list_database() + logging.info("Available DBs:",db_list) + if {"name":influxdb_name} not in db_list: + logging.warning(influxdb_name, "DB not available") + if influxdb_create: + influx.create_database(influxdb_name); + logging.info("New list of DBs:", influx.get_list_database()) -# influx.switch_database(influxdb_name) + influx.switch_database(influxdb_name) sub = opmon_sub.OpMonSubscriber( bootstrap=subscriber_bootstrap, topics=subscriber_topic, @@ -69,7 +70,8 @@ def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_t sub.add_callback(name="to_influx", function=callback_function) - thread = threading.Thread(target=consume, daemon=True, args=(q,1,) ) + thread = threading.Thread(target=consume, daemon=True, + args=(q,influxdb_timeout,) ) thread.start() sub.start() @@ -81,7 +83,7 @@ def consume( q : queue.Queue, timeout_ms, batch_ms = 0 while True : try : - entry = q.get(timeout=1) + entry = q.get(timeout=1) ## timeout here is in seconds if ( entry.ms - batch_ms < timeout_ms ) : batch.append(entry.json) @@ -101,7 +103,7 @@ def consume( q : queue.Queue, timeout_ms, def send_batch( batch : list, influx : InfluxDBClient = None ) : if len(batch) > 0 : - logging.debug("Sending %s points", len(batch) ) + logging.info("Sending %s points", len(batch) ) if influx : try : influx.write_points(batch) From 085bff10d1c3b8a0328cf144621d58d207efd6f5 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 11 Jul 2024 11:13:15 +0200 Subject: [PATCH 08/39] Passing the client to the consumer --- opmon-protobuf-dbwriter/dbwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 39e932f..e0b91e7 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -71,7 +71,7 @@ def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_t function=callback_function) thread = threading.Thread(target=consume, daemon=True, - args=(q,influxdb_timeout,) ) + args=(q,influxdb_timeout,influx) ) thread.start() sub.start() From 8faa41a7ae2961e3f9ddc914a44274b7e4c63255 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 11 Jul 2024 11:46:27 +0200 Subject: [PATCH 09/39] Tentative entry file --- opmon-protobuf-dbwriter/dbwriter.py | 2 +- opmon-protobuf-dbwriter/entrypoint.sh | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100755 opmon-protobuf-dbwriter/entrypoint.sh diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index e0b91e7..86d82be 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -30,7 +30,7 @@ #influx options @click.option('--influxdb-address', type=click.STRING, default='np04-srv-017', help='address of the influx db') @click.option('--influxdb-port', type=click.INT, default=31002, help='port of the influxdb') -@click.option('--influxdb-name', type=click.STRING, default='test_influx', help='name used in the influxdb query') +@click.option('--influxdb-name', type=click.STRING, default='test_influx', help='Table name destination inside influxdb') @click.option('--influxdb-create', type=click.BOOL, default=True, help='Creates the influxdb if it does not exists') @click.option('--influxdb-timeout', type=click.INT, default=500, help='Size in ms of the batches sent to influx') diff --git a/opmon-protobuf-dbwriter/entrypoint.sh b/opmon-protobuf-dbwriter/entrypoint.sh new file mode 100755 index 0000000..b3bbfab --- /dev/null +++ b/opmon-protobuf-dbwriter/entrypoint.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +cd $(dirname $0) +source ../entrypoint_functions.sh + +ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS " + +python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER \ + --subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \ + --subscriber-topic $OPMON_DBWRITER_TOPIC \ + --influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \ + --influx-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ + --debug False --influxdb-create True + + From c479da72827b0668f7e5eef084a07f7e1df3f968 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 11 Jul 2024 12:30:39 +0200 Subject: [PATCH 10/39] Draft of yaml file --- .../opmon-dbwriter-deployment.yaml | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml new file mode 100644 index 0000000..ea781b3 --- /dev/null +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -0,0 +1,80 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + annotations: + kluctl.io/skip-delete-if-tags: "true" + labels: + pod-security.kubernetes.io/audit: baseline + pod-security.kubernetes.io/audit-version: latest + pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( + pod-security.kubernetes.io/enforce-version: latest + pod-security.kubernetes.io/warn: baseline + pod-security.kubernetes.io/warn-version: latest + name: ers +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/app: opmon-protobuf-dbwriter + app.kubernetes.io/component: opmon-protobuf-dbwriter + name: opmonstream-dbwriter + namespace: opmon +spec: + replicas: 1 + selector: + matchLabels: + app: opmon-protobuf-replica + template: + metadata: + labels: + app: opmon-protobuf-replica + app.kubernetes.io/app: opmon-protobuf-dbwriter + app.kubernetes.io/component: opmon-protobuf-dbwriter + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/worker + operator: Exists + containers: + - image: ghcr.io/dune-daq/microservices:9685 ## TBC + imagePullPolicy: Always + name: opmon-protobuf-replica + env: + - name: MICROSERVICE + value: opmon-protobuf-dbwriter + - name: ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER + value: dune-daq.kafka.svc.cluster.local:9092 + - name: OPMON_DBWRITER_KAFKA_GROUP + value: opmon-protobuf-dbwriter + - name: OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS + value: 1500 + - name: OPMON_DBWRITER_TOPIC + value: opmon_stream + - name: OPMON_DBWRITER_INFLUX_HOST + value: opmon-influxdb.opmon.svc + - name: OPMON_DBWRITER_INFLUX_PORT + value: 8086 + - name: OPMON_DBWRITER_TABLE + value: influxv5 + - name: OPMON_DBWRITER_BATCH_SIZE_MS + value: 800 + resources: + limits: + memory: 1Gi + requests: + memory: 8Mi + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + runAsGroup: 11000 + seccompProfile: + type: RuntimeDefault + securityContext: + fsGroup: 11000 From b08dccdc225509b977c41158e8ad57ded974de69 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 11 Jul 2024 12:53:44 +0200 Subject: [PATCH 11/39] Update dependencies --- dockerfiles/microservices-dependencies.dockerfile | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile index 1689a0f..03921d7 100644 --- a/dockerfiles/microservices-dependencies.dockerfile +++ b/dockerfiles/microservices-dependencies.dockerfile @@ -2,6 +2,9 @@ FROM cern/alma9-base ARG ERSVERSION=v1.5.1 # For issue.proto from ers ARG ERSKAFKAVERSION=v1.5.4 # For ERSSubscriber.py from erskafka +ARG OPMONLIBVERSION=v1.5.1 # For opmon_entry.proto from opmonlib +ARG KAFKAOPMONVERSION=v1.5.4 # For OpMonSubscriber.py from kafkaopmon + ARG LOCALPYDIR=/microservices_python RUN yum clean all \ @@ -29,11 +32,15 @@ RUN git clone https://github.com/DUNE-DAQ/elisa_client_api.git && \ RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3/protoc-24.3-linux-x86_64.zip && \ unzip protoc-24.3-linux-x86_64.zip && \ curl -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto && \ + curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/oopmonlib/opmon_entry.proto && \ mkdir -p $LOCALPYDIR/ers && \ - protoc --python_out=$LOCALPYDIR/ers issue.proto + protoc --python_out=$LOCALPYDIR/ers issue.proto && \ + protoc --python_out=$LOCALPYDIR/opmonlib opmon_entry.proto RUN mkdir -p $LOCALPYDIR/erskafka && \ - curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py + curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py && \ + mkdir -p $LOCALPYDIR/kafkaopmon && \ + curl https://raw.githubusercontent.com/DUNE-DAQ/kafkaopmon/$KAFKAOPMONVERSION/python/kafkaopmon/OpMonSubscriber.py -o $LOCALPYDIR/kafkaopmon/OpMonSubscriber.py ENV PYTHONPATH=$LOCALPYDIR:$PYTHONPATH From 964a0ffbe629233d14e81fdd922f677c2ec7a199 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 12 Jul 2024 16:21:02 +0200 Subject: [PATCH 12/39] Implement suggestions from Pat --- opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index ea781b3..3ff0755 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -52,7 +52,7 @@ spec: - name: OPMON_DBWRITER_KAFKA_GROUP value: opmon-protobuf-dbwriter - name: OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS - value: 1500 + value: 5000 - name: OPMON_DBWRITER_TOPIC value: opmon_stream - name: OPMON_DBWRITER_INFLUX_HOST @@ -60,7 +60,7 @@ spec: - name: OPMON_DBWRITER_INFLUX_PORT value: 8086 - name: OPMON_DBWRITER_TABLE - value: influxv5 + value: opmon_protobuf_v1 - name: OPMON_DBWRITER_BATCH_SIZE_MS value: 800 resources: From 83a1b6c455ab3d9d831258e6188b96cb04c4cbd5 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 12 Jul 2024 16:23:38 +0200 Subject: [PATCH 13/39] Correct dependencies --- dockerfiles/microservices-dependencies.dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile index 03921d7..a0f8030 100644 --- a/dockerfiles/microservices-dependencies.dockerfile +++ b/dockerfiles/microservices-dependencies.dockerfile @@ -2,8 +2,8 @@ FROM cern/alma9-base ARG ERSVERSION=v1.5.1 # For issue.proto from ers ARG ERSKAFKAVERSION=v1.5.4 # For ERSSubscriber.py from erskafka -ARG OPMONLIBVERSION=v1.5.1 # For opmon_entry.proto from opmonlib -ARG KAFKAOPMONVERSION=v1.5.4 # For OpMonSubscriber.py from kafkaopmon +ARG OPMONLIBVERSION=v2.0.0 # For opmon_entry.proto from opmonlib +ARG KAFKAOPMONVERSION=v2.0.0 # For OpMonSubscriber.py from kafkaopmon ARG LOCALPYDIR=/microservices_python From 41393222b249d432547484ac1c2968dbb39b7f68 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 12 Jul 2024 16:35:52 +0200 Subject: [PATCH 14/39] fix typo --- dockerfiles/microservices-dependencies.dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile index a0f8030..e8be65a 100644 --- a/dockerfiles/microservices-dependencies.dockerfile +++ b/dockerfiles/microservices-dependencies.dockerfile @@ -32,7 +32,7 @@ RUN git clone https://github.com/DUNE-DAQ/elisa_client_api.git && \ RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3/protoc-24.3-linux-x86_64.zip && \ unzip protoc-24.3-linux-x86_64.zip && \ curl -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto && \ - curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/oopmonlib/opmon_entry.proto && \ + curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto && \ mkdir -p $LOCALPYDIR/ers && \ protoc --python_out=$LOCALPYDIR/ers issue.proto && \ protoc --python_out=$LOCALPYDIR/opmonlib opmon_entry.proto From 0f41c4ff7ab8d3989b732a093248a91e57220bb7 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 15 Jul 2024 09:48:35 +0200 Subject: [PATCH 15/39] Buildable dependency --- dockerfiles/microservices-dependencies.dockerfile | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile index e8be65a..ce33bf1 100644 --- a/dockerfiles/microservices-dependencies.dockerfile +++ b/dockerfiles/microservices-dependencies.dockerfile @@ -32,10 +32,14 @@ RUN git clone https://github.com/DUNE-DAQ/elisa_client_api.git && \ RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3/protoc-24.3-linux-x86_64.zip && \ unzip protoc-24.3-linux-x86_64.zip && \ curl -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto && \ - curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto && \ mkdir -p $LOCALPYDIR/ers && \ protoc --python_out=$LOCALPYDIR/ers issue.proto && \ - protoc --python_out=$LOCALPYDIR/opmonlib opmon_entry.proto + curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto && \ + mkdir -p $LOCALPYDIR/opmonlib && \ + pwd && \ + ls -l . && \ + ls -lR /include microservices_python && \ + protoc --python_out=$LOCALPYDIR/opmonlib -I/ -I/include opmon_entry.proto RUN mkdir -p $LOCALPYDIR/erskafka && \ curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py && \ From 3b34b285b4a6a68de0110ec079bc343637cfee26 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 15 Jul 2024 14:08:01 +0200 Subject: [PATCH 16/39] Remove debug printouts --- dockerfiles/microservices-dependencies.dockerfile | 3 --- 1 file changed, 3 deletions(-) diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile index ce33bf1..f467a68 100644 --- a/dockerfiles/microservices-dependencies.dockerfile +++ b/dockerfiles/microservices-dependencies.dockerfile @@ -36,9 +36,6 @@ RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3 protoc --python_out=$LOCALPYDIR/ers issue.proto && \ curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto && \ mkdir -p $LOCALPYDIR/opmonlib && \ - pwd && \ - ls -l . && \ - ls -lR /include microservices_python && \ protoc --python_out=$LOCALPYDIR/opmonlib -I/ -I/include opmon_entry.proto RUN mkdir -p $LOCALPYDIR/erskafka && \ From 0ac1f09196d3229331b1c27eaa8862f4312af944 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 15 Jul 2024 15:29:16 +0200 Subject: [PATCH 17/39] add the right image for documentation --- opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index 3ff0755..16e5788 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -41,7 +41,7 @@ spec: - key: node-role.kubernetes.io/worker operator: Exists containers: - - image: ghcr.io/dune-daq/microservices:9685 ## TBC + - image: ghcr.io/dune-daq/microservices:mroda-opmon_protobuf ## TBC imagePullPolicy: Always name: opmon-protobuf-replica env: From e3c15612ed7fdc959687eb1433f8f46267b443df Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 15 Jul 2024 17:06:22 +0200 Subject: [PATCH 18/39] black format and update in the servers --- opmon-protobuf-dbwriter/dbwriter.py | 224 +++++++++++++++++----------- 1 file changed, 140 insertions(+), 84 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 86d82be..d1fb60d 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -18,151 +18,207 @@ import threading -CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) +CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) + @click.command(context_settings=CONTEXT_SETTINGS) # subscriber options -@click.option('--subscriber-bootstrap', type=click.STRING, default="monkafka.cern.ch:30092", help="boostrap server and port of the OpMonSubscriber") -@click.option('--subscriber-group', type=click.STRING, default=None, help='group ID of the OpMonSubscriber') -@click.option('--subscriber-timeout', type=click.INT, default=500, help='timeout in ms used in the OpMonSubscriber') -@click.option('--subscriber-topic', type=click.STRING, multiple=True, default=['opmon_stream'], help='The system will add the "monitoring." prefix' ) - -#influx options -@click.option('--influxdb-address', type=click.STRING, default='np04-srv-017', help='address of the influx db') -@click.option('--influxdb-port', type=click.INT, default=31002, help='port of the influxdb') -@click.option('--influxdb-name', type=click.STRING, default='test_influx', help='Table name destination inside influxdb') -@click.option('--influxdb-create', type=click.BOOL, default=True, help='Creates the influxdb if it does not exists') -@click.option('--influxdb-timeout', type=click.INT, default=500, help='Size in ms of the batches sent to influx') - -@click.option('--debug', type=click.BOOL, default=True, help='Set debug print levels') - -def cli(subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_topic, - influxdb_address, influxdb_port, influxdb_name, influxdb_create, influxdb_timeout, - debug): +@click.option( + "--subscriber-bootstrap", + type=click.STRING, + default="monkafka.cern.ch:30092", + help="boostrap server and port of the OpMonSubscriber", +) +@click.option( + "--subscriber-group", + type=click.STRING, + default=None, + help="group ID of the OpMonSubscriber", +) +@click.option( + "--subscriber-timeout", + type=click.INT, + default=500, + help="timeout in ms used in the OpMonSubscriber", +) +@click.option( + "--subscriber-topic", + type=click.STRING, + multiple=True, + default=["opmon_stream"], + help='The system will add the "monitoring." prefix', +) + +# influx options +@click.option( + "--influxdb-address", + type=click.STRING, + default="monkafka.cern.ch:30092", + help="address of the influx db", +) +@click.option( + "--influxdb-port", type=click.INT, default=31002, help="port of the influxdb" +) +@click.option( + "--influxdb-name", + type=click.STRING, + default="test_influx", + help="Table name destination inside influxdb", +) +@click.option( + "--influxdb-create", + type=click.BOOL, + default=True, + help="Creates the influxdb if it does not exists", +) +@click.option( + "--influxdb-timeout", + type=click.INT, + default=500, + help="Size in ms of the batches sent to influx", +) +@click.option("--debug", type=click.BOOL, default=True, help="Set debug print levels") +def cli( + subscriber_bootstrap, + subscriber_group, + subscriber_timeout, + subscriber_topic, + influxdb_address, + influxdb_port, + influxdb_name, + influxdb_create, + influxdb_timeout, + debug, +): logging.basicConfig( - format='%(asctime)s %(levelname)-8s %(message)s', + format="%(asctime)s %(levelname)-8s %(message)s", level=logging.DEBUG if debug else logging.INFO, - datefmt='%Y-%m-%d %H:%M:%S') + datefmt="%Y-%m-%d %H:%M:%S", + ) influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) db_list = influx.get_list_database() - logging.info("Available DBs:",db_list) - if {"name":influxdb_name} not in db_list: + logging.info("Available DBs:", db_list) + if {"name": influxdb_name} not in db_list: logging.warning(influxdb_name, "DB not available") if influxdb_create: - influx.create_database(influxdb_name); + influx.create_database(influxdb_name) logging.info("New list of DBs:", influx.get_list_database()) influx.switch_database(influxdb_name) - sub = opmon_sub.OpMonSubscriber( bootstrap=subscriber_bootstrap, - topics=subscriber_topic, - group_id = subscriber_group, - timeout_ms = subscriber_timeout) + sub = opmon_sub.OpMonSubscriber( + bootstrap=subscriber_bootstrap, + topics=subscriber_topic, + group_id=subscriber_group, + timeout_ms=subscriber_timeout, + ) # this is a list of Entries q = queue.Queue() - callback_function = partial(process_entry, - q = q ) - - sub.add_callback(name="to_influx", - function=callback_function) + callback_function = partial(process_entry, q=q) + + sub.add_callback(name="to_influx", function=callback_function) - thread = threading.Thread(target=consume, daemon=True, - args=(q,influxdb_timeout,influx) ) + thread = threading.Thread( + target=consume, daemon=True, args=(q, influxdb_timeout, influx) + ) thread.start() - + sub.start() -def consume( q : queue.Queue, timeout_ms, - influx : InfluxDBClient = None ) : + +def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None): logging.info("Starting consumer thread") - batch=[] + batch = [] batch_ms = 0 - while True : - try : - entry = q.get(timeout=1) ## timeout here is in seconds + while True: + try: + entry = q.get(timeout=1) ## timeout here is in seconds - if ( entry.ms - batch_ms < timeout_ms ) : + if entry.ms - batch_ms < timeout_ms: + # because of the if, batch_ms is not zero. batch.append(entry.json) batch_ms = min(batch_ms, entry.ms) - - if ( entry.ms - batch_ms >= timeout_ms ) : + + if entry.ms - batch_ms >= timeout_ms: + # note that if we are facing with a late arrival, i.e. entry.ms was smaller than batch_ms, the difference is 0, so this if is skipped + # i.e. there is not double insertion send_batch(batch) - batch=[entry.json] + batch = [entry.json] batch_ms = entry.ms - - except queue.Empty : + + except queue.Empty: logging.debug("Queue is empty") send_batch(batch, influx) - batch=[] - batch_ms=0 - -def send_batch( batch : list, - influx : InfluxDBClient = None ) : - if len(batch) > 0 : - logging.info("Sending %s points", len(batch) ) - if influx : - try : + batch = [] + batch_ms = 0 + + +def send_batch(batch: list, influx: InfluxDBClient = None): + if len(batch) > 0: + logging.info("Sending %s points", len(batch)) + if influx: + try: influx.write_points(batch) - except influxdb.exceptions.InfluxDBClientError as e: + except influxdb.exceptions.InfluxDBClientError as e: logging.error(e) - except : + except: logging.error("Something went wrong: json batch not sent") - else : + else: print(batch) - -def process_entry( entry : opmon_schema.OpMonEntry, - q : queue.Queue ) : + +def process_entry(entry: opmon_schema.OpMonEntry, q: queue.Queue): d = to_dict(entry) js = json.dumps(d) e = Entry(json=js, ms=entry.time.ToMilliseconds()) q.put(e) -def to_dict( entry : opmon_schema.OpMonEntry ) -> dict : - ret = dict(measurement = entry.measurement) - ret['fields'] = unpack_payload(entry) - ret['tags'] = create_tags(entry) - ret['time'] = entry.time.ToJsonString() +def to_dict(entry: opmon_schema.OpMonEntry) -> dict: + ret = dict(measurement=entry.measurement) + ret["fields"] = unpack_payload(entry) + ret["tags"] = create_tags(entry) + ret["time"] = entry.time.ToJsonString() return ret -def unpack_payload( entry : opmon_schema.OpMonEntry ) -> dict : + +def unpack_payload(entry: opmon_schema.OpMonEntry) -> dict: data = entry.data ret = dict() - for key in data : + for key in data: value = data[key] - casted_value = getattr(value, value.WhichOneof('kind')) + casted_value = getattr(value, value.WhichOneof("kind")) ret[key] = casted_value - + return ret -def create_tags( entry : opmon_schema.OpMonEntry ) -> dict : +def create_tags(entry: opmon_schema.OpMonEntry) -> dict: opmon_id = entry.origin - #session and application - tags = dict(session = opmon_id.session, - application = opmon_id.application) - - #element and subelements + # session and application + tags = dict(session=opmon_id.session, application=opmon_id.application) + + # element and subelements struct = opmon_id.substructure - for i in range(len(struct)) : - name='sub'*i + 'element' + for i in range(len(struct)): + name = "sub" * i + "element" tags[name] = struct[i] - #custom origin + # custom origin tags |= entry.custom_origin return tags -class Entry : - def __init__(self, json, ms) : + +class Entry: + def __init__(self, json: str, ms: int): self.json = json self.ms = ms -if __name__ == '__main__': - cli() +if __name__ == "__main__": + cli() From eb62cf5a1bc25e26bb6621c40c0d90b77a9cd179 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 16 Jul 2024 11:32:23 +0200 Subject: [PATCH 19/39] Fix dbnmae in entrypoint --- opmon-protobuf-dbwriter/entrypoint.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/entrypoint.sh b/opmon-protobuf-dbwriter/entrypoint.sh index b3bbfab..3760853 100755 --- a/opmon-protobuf-dbwriter/entrypoint.sh +++ b/opmon-protobuf-dbwriter/entrypoint.sh @@ -9,7 +9,7 @@ python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SER --subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \ --subscriber-topic $OPMON_DBWRITER_TOPIC \ --influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \ - --influx-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ + --influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ --debug False --influxdb-create True From 1f157b57a97e7dd6f616efb5208b6b06d1306b8b Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 16 Jul 2024 12:04:18 +0200 Subject: [PATCH 20/39] Add username and password --- opmon-protobuf-dbwriter/dbwriter.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index d1fb60d..992ddb5 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -77,6 +77,18 @@ default=500, help="Size in ms of the batches sent to influx", ) +@click.option( + "--influxdb-username", + type=click.STRING, + default=None, + help="Username to acces influxdb", +) +@click.option( + "--influxdb-password", + type=click.STRING, + default=None, + help="Password to acces influxdb", +) @click.option("--debug", type=click.BOOL, default=True, help="Set debug print levels") def cli( subscriber_bootstrap, @@ -88,6 +100,8 @@ def cli( influxdb_name, influxdb_create, influxdb_timeout, + influxdb_username, + influxdb_password, debug, ): @@ -97,7 +111,10 @@ def cli( datefmt="%Y-%m-%d %H:%M:%S", ) - influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) + kwargs = dict() + if influxdb_username : kwargs["username"]=influxdb_username + if influxdb_password : kwargs["password"]=influxdb_password + influx = InfluxDBClient(host=influxdb_address, port=influxdb_port, **kwargs) db_list = influx.get_list_database() logging.info("Available DBs:", db_list) if {"name": influxdb_name} not in db_list: From 8e53ef8b3f0bead755366e1330948d756a732394 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 16 Jul 2024 12:05:20 +0200 Subject: [PATCH 21/39] Balck format --- opmon-protobuf-dbwriter/dbwriter.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 992ddb5..a1de35c 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -100,7 +100,7 @@ def cli( influxdb_name, influxdb_create, influxdb_timeout, - influxdb_username, + influxdb_username, influxdb_password, debug, ): @@ -112,8 +112,10 @@ def cli( ) kwargs = dict() - if influxdb_username : kwargs["username"]=influxdb_username - if influxdb_password : kwargs["password"]=influxdb_password + if influxdb_username: + kwargs["username"] = influxdb_username + if influxdb_password: + kwargs["password"] = influxdb_password influx = InfluxDBClient(host=influxdb_address, port=influxdb_port, **kwargs) db_list = influx.get_list_database() logging.info("Available DBs:", db_list) From 4d538aeb5d8df0ddcfba9138c0f85ad69f33ff6c Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 16 Jul 2024 12:10:37 +0200 Subject: [PATCH 22/39] Correct inputs --- opmon-protobuf-dbwriter/dbwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index a1de35c..09edb93 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -53,7 +53,7 @@ @click.option( "--influxdb-address", type=click.STRING, - default="monkafka.cern.ch:30092", + default="monkafka.cern.ch", help="address of the influx db", ) @click.option( From fb5375552fdc064c5a7588ec3d99d8f26aa6f3d7 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 16 Jul 2024 12:18:26 +0200 Subject: [PATCH 23/39] Add documentation --- docs/README.md | 2 +- docs/README_opmon-protobuf-dbwriter.md | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 docs/README_opmon-protobuf-dbwriter.md diff --git a/docs/README.md b/docs/README.md index 141d308..304532e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,7 +6,7 @@ docker run --rm -e MICROSERVICE= ghcr.io/dune-daq/microser ``` There are a couple of points to note: -* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Oct-6-2023, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter`, `runnumber-rest` and `runregistry-rest`. +* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Jul-16-2024, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter` (now deprecated), `opmon-protobuf-dbwriter`, `runnumber-rest` and `runregistry-rest`. * Most microservices require additional environment variables to be set, which can be passed using the usual docker syntax: `-e VARIABLE_NAME=` * If you don't know what these additional environment variables are, you can just run the `docker` command as above without setting them; the container will exit out almost immediately but only after telling you what variables are missing * The microservices image tag will be `microservices:` or `microservices:`, i.e. `microservices:develop`. diff --git a/docs/README_opmon-protobuf-dbwriter.md b/docs/README_opmon-protobuf-dbwriter.md new file mode 100644 index 0000000..a4fa5e2 --- /dev/null +++ b/docs/README_opmon-protobuf-dbwriter.md @@ -0,0 +1,15 @@ +`dbwriter.py` is the script responsible for taking the opmon messages via the OpMonSubscriber +and writing to an InfluxDB database so that the data can be displayed in a +grafana dashboard. To run it manually do: +```python dbwriter.py [options]``` + + +# Running locally +The script can be run locally which can be useful to debug or start up quickly. After setting up a working area and cloning this repo, run: +``` +python3 dbwriter.py +``` +Passing the appropriate variables. +As this script requires opmonlibs and kafkaopmon, it has to be launched by a developing envirnoment. +It can run at the same time locally and in kubernetes. + From a914582feb37bdf0efb0f31fd3176e50b95f7e48 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 16 Jul 2024 16:16:47 +0200 Subject: [PATCH 24/39] Correct indentation --- .../opmon-dbwriter-deployment.yaml | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index 16e5788..c38c636 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -51,18 +51,18 @@ spec: value: dune-daq.kafka.svc.cluster.local:9092 - name: OPMON_DBWRITER_KAFKA_GROUP value: opmon-protobuf-dbwriter - - name: OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS - value: 5000 - - name: OPMON_DBWRITER_TOPIC - value: opmon_stream - - name: OPMON_DBWRITER_INFLUX_HOST - value: opmon-influxdb.opmon.svc - - name: OPMON_DBWRITER_INFLUX_PORT - value: 8086 - - name: OPMON_DBWRITER_TABLE - value: opmon_protobuf_v1 + - name: OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS + value: 5000 + - name: OPMON_DBWRITER_TOPIC + value: opmon_stream + - name: OPMON_DBWRITER_INFLUX_HOST + value: opmon-influxdb.opmon.svc + - name: OPMON_DBWRITER_INFLUX_PORT + value: 8086 + - name: OPMON_DBWRITER_TABLE + value: opmon_protobuf_v1 - name: OPMON_DBWRITER_BATCH_SIZE_MS - value: 800 + value: 800 resources: limits: memory: 1Gi From 962b73dced3a5ce1e92a27fe2144031703d39afb Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 24 Jul 2024 10:29:14 +0200 Subject: [PATCH 25/39] add user pass for influx --- opmon-protobuf-dbwriter/entrypoint.sh | 7 +++---- opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml | 4 ++++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/opmon-protobuf-dbwriter/entrypoint.sh b/opmon-protobuf-dbwriter/entrypoint.sh index 3760853..a39cf81 100755 --- a/opmon-protobuf-dbwriter/entrypoint.sh +++ b/opmon-protobuf-dbwriter/entrypoint.sh @@ -3,13 +3,12 @@ cd $(dirname $0) source ../entrypoint_functions.sh -ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS " +ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS OPMON_DBWRITER_INFLUX_USER OPMON_DBWRITER_INFLUX_PASSWORD" python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER \ --subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \ --subscriber-topic $OPMON_DBWRITER_TOPIC \ --influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \ --influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ - --debug False --influxdb-create True - - + --debug False --influxdb-create True --influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password OPMON_DBWRITER_INFLUX_PASSWORD + \ No newline at end of file diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index c38c636..7e6b957 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -63,6 +63,10 @@ spec: value: opmon_protobuf_v1 - name: OPMON_DBWRITER_BATCH_SIZE_MS value: 800 + - name: OPMON_DBWRITER_INFLUX_USER + value: user + - name: OPMON_DBWRITER_INFLUX_PASSWORD + value: pass resources: limits: memory: 1Gi From 3fcf1b327479f307177b4622ae39cecadb292f66 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 24 Jul 2024 10:30:25 +0200 Subject: [PATCH 26/39] Update dependency, ers version updade. The change is not related to any use in the microservice, but it would be odd to keep using an old version --- dockerfiles/microservices-dependencies.dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile index f467a68..6bee1f3 100644 --- a/dockerfiles/microservices-dependencies.dockerfile +++ b/dockerfiles/microservices-dependencies.dockerfile @@ -1,6 +1,6 @@ FROM cern/alma9-base -ARG ERSVERSION=v1.5.1 # For issue.proto from ers +ARG ERSVERSION=v1.5.2 # For issue.proto from ers ARG ERSKAFKAVERSION=v1.5.4 # For ERSSubscriber.py from erskafka ARG OPMONLIBVERSION=v2.0.0 # For opmon_entry.proto from opmonlib ARG KAFKAOPMONVERSION=v2.0.0 # For OpMonSubscriber.py from kafkaopmon From d750acad1773e0478bb32ef41da85771a213bbec Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 24 Jul 2024 15:20:22 +0200 Subject: [PATCH 27/39] fix mistake --- opmon-protobuf-dbwriter/entrypoint.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/entrypoint.sh b/opmon-protobuf-dbwriter/entrypoint.sh index a39cf81..d8a4bda 100755 --- a/opmon-protobuf-dbwriter/entrypoint.sh +++ b/opmon-protobuf-dbwriter/entrypoint.sh @@ -10,5 +10,5 @@ python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SER --subscriber-topic $OPMON_DBWRITER_TOPIC \ --influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \ --influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ - --debug False --influxdb-create True --influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password OPMON_DBWRITER_INFLUX_PASSWORD + --debug False --influxdb-create True --influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password $OPMON_DBWRITER_INFLUX_PASSWORD \ No newline at end of file From d420767370569e35ee433f37f47e91c04d318767 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 24 Jul 2024 15:23:33 +0200 Subject: [PATCH 28/39] Fix ordering of variables --- opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index 7e6b957..b19e13d 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -49,6 +49,10 @@ spec: value: opmon-protobuf-dbwriter - name: ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER value: dune-daq.kafka.svc.cluster.local:9092 + - name: OPMON_DBWRITER_INFLUX_USER + value: user + - name: OPMON_DBWRITER_INFLUX_PASSWORD + value: pass - name: OPMON_DBWRITER_KAFKA_GROUP value: opmon-protobuf-dbwriter - name: OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS @@ -63,10 +67,6 @@ spec: value: opmon_protobuf_v1 - name: OPMON_DBWRITER_BATCH_SIZE_MS value: 800 - - name: OPMON_DBWRITER_INFLUX_USER - value: user - - name: OPMON_DBWRITER_INFLUX_PASSWORD - value: pass resources: limits: memory: 1Gi From f6441bb65e57787de54417ce53391555c4ac2941 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 10:37:25 +0200 Subject: [PATCH 29/39] Better entry point formatting --- opmon-protobuf-dbwriter/entrypoint.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/opmon-protobuf-dbwriter/entrypoint.sh b/opmon-protobuf-dbwriter/entrypoint.sh index d8a4bda..70382e8 100755 --- a/opmon-protobuf-dbwriter/entrypoint.sh +++ b/opmon-protobuf-dbwriter/entrypoint.sh @@ -6,9 +6,11 @@ source ../entrypoint_functions.sh ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS OPMON_DBWRITER_INFLUX_USER OPMON_DBWRITER_INFLUX_PASSWORD" python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER \ - --subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \ + --subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \ --subscriber-topic $OPMON_DBWRITER_TOPIC \ --influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \ --influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ - --debug False --influxdb-create True --influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password $OPMON_DBWRITER_INFLUX_PASSWORD + --influxdb-create True \ + --debug False \ + --influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password $OPMON_DBWRITER_INFLUX_PASSWORD \ No newline at end of file From ea464b59938b9c0ae95e2325114209859d079bc7 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 10:44:26 +0200 Subject: [PATCH 30/39] Better matchLabels --- opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index b19e13d..f31f235 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -25,11 +25,11 @@ spec: replicas: 1 selector: matchLabels: - app: opmon-protobuf-replica + app: opmon-protobuf-dbwriter template: metadata: labels: - app: opmon-protobuf-replica + app: opmon-protobuf-dbwriter app.kubernetes.io/app: opmon-protobuf-dbwriter app.kubernetes.io/component: opmon-protobuf-dbwriter spec: @@ -43,7 +43,7 @@ spec: containers: - image: ghcr.io/dune-daq/microservices:mroda-opmon_protobuf ## TBC imagePullPolicy: Always - name: opmon-protobuf-replica + name: opmon-protobuf-dbwriter env: - name: MICROSERVICE value: opmon-protobuf-dbwriter From 8fe16648f49446e485ce0edb1e76b3924ad2f60c Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 11:03:39 +0200 Subject: [PATCH 31/39] Correct name in deployment --- opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index f31f235..85071c3 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -47,7 +47,7 @@ spec: env: - name: MICROSERVICE value: opmon-protobuf-dbwriter - - name: ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER + - name: OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER value: dune-daq.kafka.svc.cluster.local:9092 - name: OPMON_DBWRITER_INFLUX_USER value: user From 84d1d1566caee5a1b0a98d5b349b4961bcfbbd32 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 16:23:49 +0200 Subject: [PATCH 32/39] Fix type --- opmon-protobuf-dbwriter/dbwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 09edb93..ccd3558 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -234,7 +234,7 @@ def create_tags(entry: opmon_schema.OpMonEntry) -> dict: class Entry: - def __init__(self, json: str, ms: int): + def __init__(self, json: json, ms: int): self.json = json self.ms = ms From 26147efd8fa8cb5484ab023079460c90addef92c Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 16:34:42 +0200 Subject: [PATCH 33/39] Correct use of logging --- opmon-protobuf-dbwriter/dbwriter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index ccd3558..32148f0 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -118,12 +118,12 @@ def cli( kwargs["password"] = influxdb_password influx = InfluxDBClient(host=influxdb_address, port=influxdb_port, **kwargs) db_list = influx.get_list_database() - logging.info("Available DBs:", db_list) + logging.info("Available DBs: %s", db_list) if {"name": influxdb_name} not in db_list: - logging.warning(influxdb_name, "DB not available") + logging.warning("%s DB not available", influxdb_name ) if influxdb_create: influx.create_database(influxdb_name) - logging.info("New list of DBs:", influx.get_list_database()) + logging.info("New list of DBs: %s", influx.get_list_database()) influx.switch_database(influxdb_name) From 483ca788d2ba6127333f508fda136257548e1996 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 16:43:16 +0200 Subject: [PATCH 34/39] Correct queue timeout handling --- opmon-protobuf-dbwriter/dbwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 32148f0..255856d 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -169,7 +169,7 @@ def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None): batch = [entry.json] batch_ms = entry.ms - except queue.Empty: + except Exception : logging.debug("Queue is empty") send_batch(batch, influx) batch = [] From 0781f622fe3e3e6c7eeef8b167e2ea60b931a0d9 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 16:49:48 +0200 Subject: [PATCH 35/39] Adding influx exception --- opmon-protobuf-dbwriter/dbwriter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 255856d..79dfecf 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -10,6 +10,7 @@ import opmonlib.opmon_entry_pb2 as opmon_schema from influxdb import InfluxDBClient +import influxdb from functools import partial import json import click @@ -169,7 +170,7 @@ def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None): batch = [entry.json] batch_ms = entry.ms - except Exception : + except queue.Empty: logging.debug("Queue is empty") send_batch(batch, influx) batch = [] From 4f8a2b34e04b388f671a766fab3ab6adbef47b74 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 16:53:12 +0200 Subject: [PATCH 36/39] Better exception handling --- opmon-protobuf-dbwriter/dbwriter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 79dfecf..263ba2b 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -185,8 +185,9 @@ def send_batch(batch: list, influx: InfluxDBClient = None): influx.write_points(batch) except influxdb.exceptions.InfluxDBClientError as e: logging.error(e) - except: + except Exception as e: logging.error("Something went wrong: json batch not sent") + logging.error("Details: {}".format(str(e))) else: print(batch) From 18366aa96edb5e326568a46553cdc1ce6af057c8 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 16:57:04 +0200 Subject: [PATCH 37/39] Testing dict approach --- opmon-protobuf-dbwriter/dbwriter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 263ba2b..2ebc90d 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -194,8 +194,8 @@ def send_batch(batch: list, influx: InfluxDBClient = None): def process_entry(entry: opmon_schema.OpMonEntry, q: queue.Queue): d = to_dict(entry) - js = json.dumps(d) - e = Entry(json=js, ms=entry.time.ToMilliseconds()) + # js = json.dumps(d) + e = Entry(json=d, ms=entry.time.ToMilliseconds()) q.put(e) @@ -236,7 +236,7 @@ def create_tags(entry: opmon_schema.OpMonEntry) -> dict: class Entry: - def __init__(self, json: json, ms: int): + def __init__(self, json: dict, ms: int): self.json = json self.ms = ms From 65091f7b3221f38f7bc62b449893f3b376296373 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 17:05:07 +0200 Subject: [PATCH 38/39] Cleanup --- opmon-protobuf-dbwriter/dbwriter.py | 1 - 1 file changed, 1 deletion(-) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 2ebc90d..4c3551f 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -194,7 +194,6 @@ def send_batch(batch: list, influx: InfluxDBClient = None): def process_entry(entry: opmon_schema.OpMonEntry, q: queue.Queue): d = to_dict(entry) - # js = json.dumps(d) e = Entry(json=d, ms=entry.time.ToMilliseconds()) q.put(e) From e24f34ede8ac5d09e12f13baafe13c6eb3b8b5a2 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 25 Jul 2024 17:13:22 +0200 Subject: [PATCH 39/39] Final decoration --- docs/README.md | 2 +- opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/README.md b/docs/README.md index 304532e..e8794eb 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,7 +6,7 @@ docker run --rm -e MICROSERVICE= ghcr.io/dune-daq/microser ``` There are a couple of points to note: -* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Jul-16-2024, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter` (now deprecated), `opmon-protobuf-dbwriter`, `runnumber-rest` and `runregistry-rest`. +* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Jul-25-2024, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter` (now deprecated), `opmon-protobuf-dbwriter`, `runnumber-rest` and `runregistry-rest`. * Most microservices require additional environment variables to be set, which can be passed using the usual docker syntax: `-e VARIABLE_NAME=` * If you don't know what these additional environment variables are, you can just run the `docker` command as above without setting them; the container will exit out almost immediately but only after telling you what variables are missing * The microservices image tag will be `microservices:` or `microservices:`, i.e. `microservices:develop`. diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index 85071c3..d82893e 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -11,7 +11,7 @@ metadata: pod-security.kubernetes.io/enforce-version: latest pod-security.kubernetes.io/warn: baseline pod-security.kubernetes.io/warn-version: latest - name: ers + name: opmon --- apiVersion: apps/v1 kind: Deployment