diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile index 1689a0f..6bee1f3 100644 --- a/dockerfiles/microservices-dependencies.dockerfile +++ b/dockerfiles/microservices-dependencies.dockerfile @@ -1,7 +1,10 @@ FROM cern/alma9-base -ARG ERSVERSION=v1.5.1 # For issue.proto from ers +ARG ERSVERSION=v1.5.2 # For issue.proto from ers ARG ERSKAFKAVERSION=v1.5.4 # For ERSSubscriber.py from erskafka +ARG OPMONLIBVERSION=v2.0.0 # For opmon_entry.proto from opmonlib +ARG KAFKAOPMONVERSION=v2.0.0 # For OpMonSubscriber.py from kafkaopmon + ARG LOCALPYDIR=/microservices_python RUN yum clean all \ @@ -30,10 +33,15 @@ RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3 unzip protoc-24.3-linux-x86_64.zip && \ curl -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto && \ mkdir -p $LOCALPYDIR/ers && \ - protoc --python_out=$LOCALPYDIR/ers issue.proto + protoc --python_out=$LOCALPYDIR/ers issue.proto && \ + curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto && \ + mkdir -p $LOCALPYDIR/opmonlib && \ + protoc --python_out=$LOCALPYDIR/opmonlib -I/ -I/include opmon_entry.proto RUN mkdir -p $LOCALPYDIR/erskafka && \ - curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py + curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py && \ + mkdir -p $LOCALPYDIR/kafkaopmon && \ + curl https://raw.githubusercontent.com/DUNE-DAQ/kafkaopmon/$KAFKAOPMONVERSION/python/kafkaopmon/OpMonSubscriber.py -o $LOCALPYDIR/kafkaopmon/OpMonSubscriber.py ENV PYTHONPATH=$LOCALPYDIR:$PYTHONPATH diff --git a/docs/README.md b/docs/README.md index 141d308..e8794eb 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,7 +6,7 @@ docker run --rm -e MICROSERVICE= ghcr.io/dune-daq/microser ``` There are a couple of points to note: -* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Oct-6-2023, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter`, `runnumber-rest` and `runregistry-rest`. +* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Jul-25-2024, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter` (now deprecated), `opmon-protobuf-dbwriter`, `runnumber-rest` and `runregistry-rest`. * Most microservices require additional environment variables to be set, which can be passed using the usual docker syntax: `-e VARIABLE_NAME=` * If you don't know what these additional environment variables are, you can just run the `docker` command as above without setting them; the container will exit out almost immediately but only after telling you what variables are missing * The microservices image tag will be `microservices:` or `microservices:`, i.e. `microservices:develop`. diff --git a/docs/README_opmon-protobuf-dbwriter.md b/docs/README_opmon-protobuf-dbwriter.md new file mode 100644 index 0000000..a4fa5e2 --- /dev/null +++ b/docs/README_opmon-protobuf-dbwriter.md @@ -0,0 +1,15 @@ +`dbwriter.py` is the script responsible for taking the opmon messages via the OpMonSubscriber +and writing to an InfluxDB database so that the data can be displayed in a +grafana dashboard. To run it manually do: +```python dbwriter.py [options]``` + + +# Running locally +The script can be run locally which can be useful to debug or start up quickly. After setting up a working area and cloning this repo, run: +``` +python3 dbwriter.py +``` +Passing the appropriate variables. +As this script requires opmonlibs and kafkaopmon, it has to be launched by a developing envirnoment. +It can run at the same time locally and in kubernetes. + diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py new file mode 100644 index 0000000..4c3551f --- /dev/null +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -0,0 +1,244 @@ +# @file dbwriter.py Writing Opmon entries into InfluxDB +# This is part of the DUNE DAQ software, copyright 2020. +# Licensing/copyright details are in the COPYING file that you should have +# received with this code. +# + +import kafkaopmon.OpMonSubscriber as opmon_sub +import google.protobuf.json_format as pb_json +from google.protobuf.timestamp_pb2 import Timestamp +import opmonlib.opmon_entry_pb2 as opmon_schema + +from influxdb import InfluxDBClient +import influxdb +from functools import partial +import json +import click +import logging +import queue +import threading + + +CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) + + +@click.command(context_settings=CONTEXT_SETTINGS) +# subscriber options +@click.option( + "--subscriber-bootstrap", + type=click.STRING, + default="monkafka.cern.ch:30092", + help="boostrap server and port of the OpMonSubscriber", +) +@click.option( + "--subscriber-group", + type=click.STRING, + default=None, + help="group ID of the OpMonSubscriber", +) +@click.option( + "--subscriber-timeout", + type=click.INT, + default=500, + help="timeout in ms used in the OpMonSubscriber", +) +@click.option( + "--subscriber-topic", + type=click.STRING, + multiple=True, + default=["opmon_stream"], + help='The system will add the "monitoring." prefix', +) + +# influx options +@click.option( + "--influxdb-address", + type=click.STRING, + default="monkafka.cern.ch", + help="address of the influx db", +) +@click.option( + "--influxdb-port", type=click.INT, default=31002, help="port of the influxdb" +) +@click.option( + "--influxdb-name", + type=click.STRING, + default="test_influx", + help="Table name destination inside influxdb", +) +@click.option( + "--influxdb-create", + type=click.BOOL, + default=True, + help="Creates the influxdb if it does not exists", +) +@click.option( + "--influxdb-timeout", + type=click.INT, + default=500, + help="Size in ms of the batches sent to influx", +) +@click.option( + "--influxdb-username", + type=click.STRING, + default=None, + help="Username to acces influxdb", +) +@click.option( + "--influxdb-password", + type=click.STRING, + default=None, + help="Password to acces influxdb", +) +@click.option("--debug", type=click.BOOL, default=True, help="Set debug print levels") +def cli( + subscriber_bootstrap, + subscriber_group, + subscriber_timeout, + subscriber_topic, + influxdb_address, + influxdb_port, + influxdb_name, + influxdb_create, + influxdb_timeout, + influxdb_username, + influxdb_password, + debug, +): + + logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.DEBUG if debug else logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + ) + + kwargs = dict() + if influxdb_username: + kwargs["username"] = influxdb_username + if influxdb_password: + kwargs["password"] = influxdb_password + influx = InfluxDBClient(host=influxdb_address, port=influxdb_port, **kwargs) + db_list = influx.get_list_database() + logging.info("Available DBs: %s", db_list) + if {"name": influxdb_name} not in db_list: + logging.warning("%s DB not available", influxdb_name ) + if influxdb_create: + influx.create_database(influxdb_name) + logging.info("New list of DBs: %s", influx.get_list_database()) + + influx.switch_database(influxdb_name) + + sub = opmon_sub.OpMonSubscriber( + bootstrap=subscriber_bootstrap, + topics=subscriber_topic, + group_id=subscriber_group, + timeout_ms=subscriber_timeout, + ) + + # this is a list of Entries + q = queue.Queue() + + callback_function = partial(process_entry, q=q) + + sub.add_callback(name="to_influx", function=callback_function) + + thread = threading.Thread( + target=consume, daemon=True, args=(q, influxdb_timeout, influx) + ) + thread.start() + + sub.start() + + +def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None): + logging.info("Starting consumer thread") + batch = [] + batch_ms = 0 + while True: + try: + entry = q.get(timeout=1) ## timeout here is in seconds + + if entry.ms - batch_ms < timeout_ms: + # because of the if, batch_ms is not zero. + batch.append(entry.json) + batch_ms = min(batch_ms, entry.ms) + + if entry.ms - batch_ms >= timeout_ms: + # note that if we are facing with a late arrival, i.e. entry.ms was smaller than batch_ms, the difference is 0, so this if is skipped + # i.e. there is not double insertion + send_batch(batch) + batch = [entry.json] + batch_ms = entry.ms + + except queue.Empty: + logging.debug("Queue is empty") + send_batch(batch, influx) + batch = [] + batch_ms = 0 + + +def send_batch(batch: list, influx: InfluxDBClient = None): + if len(batch) > 0: + logging.info("Sending %s points", len(batch)) + if influx: + try: + influx.write_points(batch) + except influxdb.exceptions.InfluxDBClientError as e: + logging.error(e) + except Exception as e: + logging.error("Something went wrong: json batch not sent") + logging.error("Details: {}".format(str(e))) + else: + print(batch) + + +def process_entry(entry: opmon_schema.OpMonEntry, q: queue.Queue): + d = to_dict(entry) + e = Entry(json=d, ms=entry.time.ToMilliseconds()) + q.put(e) + + +def to_dict(entry: opmon_schema.OpMonEntry) -> dict: + ret = dict(measurement=entry.measurement) + ret["fields"] = unpack_payload(entry) + ret["tags"] = create_tags(entry) + ret["time"] = entry.time.ToJsonString() + return ret + + +def unpack_payload(entry: opmon_schema.OpMonEntry) -> dict: + data = entry.data + ret = dict() + for key in data: + value = data[key] + casted_value = getattr(value, value.WhichOneof("kind")) + ret[key] = casted_value + + return ret + + +def create_tags(entry: opmon_schema.OpMonEntry) -> dict: + opmon_id = entry.origin + # session and application + tags = dict(session=opmon_id.session, application=opmon_id.application) + + # element and subelements + struct = opmon_id.substructure + for i in range(len(struct)): + name = "sub" * i + "element" + tags[name] = struct[i] + + # custom origin + tags |= entry.custom_origin + + return tags + + +class Entry: + def __init__(self, json: dict, ms: int): + self.json = json + self.ms = ms + + +if __name__ == "__main__": + cli() diff --git a/opmon-protobuf-dbwriter/entrypoint.sh b/opmon-protobuf-dbwriter/entrypoint.sh new file mode 100755 index 0000000..70382e8 --- /dev/null +++ b/opmon-protobuf-dbwriter/entrypoint.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +cd $(dirname $0) +source ../entrypoint_functions.sh + +ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS OPMON_DBWRITER_INFLUX_USER OPMON_DBWRITER_INFLUX_PASSWORD" + +python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER \ + --subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \ + --subscriber-topic $OPMON_DBWRITER_TOPIC \ + --influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \ + --influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ + --influxdb-create True \ + --debug False \ + --influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password $OPMON_DBWRITER_INFLUX_PASSWORD + \ No newline at end of file diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml new file mode 100644 index 0000000..d82893e --- /dev/null +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -0,0 +1,84 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + annotations: + kluctl.io/skip-delete-if-tags: "true" + labels: + pod-security.kubernetes.io/audit: baseline + pod-security.kubernetes.io/audit-version: latest + pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( + pod-security.kubernetes.io/enforce-version: latest + pod-security.kubernetes.io/warn: baseline + pod-security.kubernetes.io/warn-version: latest + name: opmon +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/app: opmon-protobuf-dbwriter + app.kubernetes.io/component: opmon-protobuf-dbwriter + name: opmonstream-dbwriter + namespace: opmon +spec: + replicas: 1 + selector: + matchLabels: + app: opmon-protobuf-dbwriter + template: + metadata: + labels: + app: opmon-protobuf-dbwriter + app.kubernetes.io/app: opmon-protobuf-dbwriter + app.kubernetes.io/component: opmon-protobuf-dbwriter + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/worker + operator: Exists + containers: + - image: ghcr.io/dune-daq/microservices:mroda-opmon_protobuf ## TBC + imagePullPolicy: Always + name: opmon-protobuf-dbwriter + env: + - name: MICROSERVICE + value: opmon-protobuf-dbwriter + - name: OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER + value: dune-daq.kafka.svc.cluster.local:9092 + - name: OPMON_DBWRITER_INFLUX_USER + value: user + - name: OPMON_DBWRITER_INFLUX_PASSWORD + value: pass + - name: OPMON_DBWRITER_KAFKA_GROUP + value: opmon-protobuf-dbwriter + - name: OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS + value: 5000 + - name: OPMON_DBWRITER_TOPIC + value: opmon_stream + - name: OPMON_DBWRITER_INFLUX_HOST + value: opmon-influxdb.opmon.svc + - name: OPMON_DBWRITER_INFLUX_PORT + value: 8086 + - name: OPMON_DBWRITER_TABLE + value: opmon_protobuf_v1 + - name: OPMON_DBWRITER_BATCH_SIZE_MS + value: 800 + resources: + limits: + memory: 1Gi + requests: + memory: 8Mi + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + runAsGroup: 11000 + seccompProfile: + type: RuntimeDefault + securityContext: + fsGroup: 11000