Skip to content

Commit

Permalink
Merge pull request #115 from DUNE-DAQ/mroda/opmon_protobuf
Browse files Browse the repository at this point in the history
Opmon protobuf microservice
  • Loading branch information
mroda88 authored Jul 25, 2024
2 parents 622cead + e24f34e commit 999ac29
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 4 deletions.
14 changes: 11 additions & 3 deletions dockerfiles/microservices-dependencies.dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
FROM cern/alma9-base

ARG ERSVERSION=v1.5.1 # For issue.proto from ers
ARG ERSVERSION=v1.5.2 # For issue.proto from ers
ARG ERSKAFKAVERSION=v1.5.4 # For ERSSubscriber.py from erskafka
ARG OPMONLIBVERSION=v2.0.0 # For opmon_entry.proto from opmonlib
ARG KAFKAOPMONVERSION=v2.0.0 # For OpMonSubscriber.py from kafkaopmon

ARG LOCALPYDIR=/microservices_python

RUN yum clean all \
Expand Down Expand Up @@ -30,10 +33,15 @@ RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3
unzip protoc-24.3-linux-x86_64.zip && \
curl -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto && \
mkdir -p $LOCALPYDIR/ers && \
protoc --python_out=$LOCALPYDIR/ers issue.proto
protoc --python_out=$LOCALPYDIR/ers issue.proto && \
curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto && \
mkdir -p $LOCALPYDIR/opmonlib && \
protoc --python_out=$LOCALPYDIR/opmonlib -I/ -I/include opmon_entry.proto

RUN mkdir -p $LOCALPYDIR/erskafka && \
curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py
curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py && \
mkdir -p $LOCALPYDIR/kafkaopmon && \
curl https://raw.githubusercontent.com/DUNE-DAQ/kafkaopmon/$KAFKAOPMONVERSION/python/kafkaopmon/OpMonSubscriber.py -o $LOCALPYDIR/kafkaopmon/OpMonSubscriber.py

ENV PYTHONPATH=$LOCALPYDIR:$PYTHONPATH

Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ docker run --rm -e MICROSERVICE=<name of microservice> ghcr.io/dune-daq/microser
```

There are a couple of points to note:
* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Oct-6-2023, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter`, `runnumber-rest` and `runregistry-rest`.
* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Jul-25-2024, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter` (now deprecated), `opmon-protobuf-dbwriter`, `runnumber-rest` and `runregistry-rest`.
* Most microservices require additional environment variables to be set, which can be passed using the usual docker syntax: `-e VARIABLE_NAME=<variable value>`
* If you don't know what these additional environment variables are, you can just run the `docker` command as above without setting them; the container will exit out almost immediately but only after telling you what variables are missing
* The microservices image tag will be `microservices:<name-of-branch>` or `microservices:<version-tag>`, i.e. `microservices:develop`.
Expand Down
15 changes: 15 additions & 0 deletions docs/README_opmon-protobuf-dbwriter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
`dbwriter.py` is the script responsible for taking the opmon messages via the OpMonSubscriber
and writing to an InfluxDB database so that the data can be displayed in a
grafana dashboard. To run it manually do:
```python dbwriter.py [options]```


# Running locally
The script can be run locally which can be useful to debug or start up quickly. After setting up a working area and cloning this repo, run:
```
python3 dbwriter.py
```
Passing the appropriate variables.
As this script requires opmonlibs and kafkaopmon, it has to be launched by a developing envirnoment.
It can run at the same time locally and in kubernetes.

244 changes: 244 additions & 0 deletions opmon-protobuf-dbwriter/dbwriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# @file dbwriter.py Writing Opmon entries into InfluxDB
# This is part of the DUNE DAQ software, copyright 2020.
# Licensing/copyright details are in the COPYING file that you should have
# received with this code.
#

import kafkaopmon.OpMonSubscriber as opmon_sub
import google.protobuf.json_format as pb_json
from google.protobuf.timestamp_pb2 import Timestamp
import opmonlib.opmon_entry_pb2 as opmon_schema

from influxdb import InfluxDBClient
import influxdb
from functools import partial
import json
import click
import logging
import queue
import threading


CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])


@click.command(context_settings=CONTEXT_SETTINGS)
# subscriber options
@click.option(
"--subscriber-bootstrap",
type=click.STRING,
default="monkafka.cern.ch:30092",
help="boostrap server and port of the OpMonSubscriber",
)
@click.option(
"--subscriber-group",
type=click.STRING,
default=None,
help="group ID of the OpMonSubscriber",
)
@click.option(
"--subscriber-timeout",
type=click.INT,
default=500,
help="timeout in ms used in the OpMonSubscriber",
)
@click.option(
"--subscriber-topic",
type=click.STRING,
multiple=True,
default=["opmon_stream"],
help='The system will add the "monitoring." prefix',
)

# influx options
@click.option(
"--influxdb-address",
type=click.STRING,
default="monkafka.cern.ch",
help="address of the influx db",
)
@click.option(
"--influxdb-port", type=click.INT, default=31002, help="port of the influxdb"
)
@click.option(
"--influxdb-name",
type=click.STRING,
default="test_influx",
help="Table name destination inside influxdb",
)
@click.option(
"--influxdb-create",
type=click.BOOL,
default=True,
help="Creates the influxdb if it does not exists",
)
@click.option(
"--influxdb-timeout",
type=click.INT,
default=500,
help="Size in ms of the batches sent to influx",
)
@click.option(
"--influxdb-username",
type=click.STRING,
default=None,
help="Username to acces influxdb",
)
@click.option(
"--influxdb-password",
type=click.STRING,
default=None,
help="Password to acces influxdb",
)
@click.option("--debug", type=click.BOOL, default=True, help="Set debug print levels")
def cli(
subscriber_bootstrap,
subscriber_group,
subscriber_timeout,
subscriber_topic,
influxdb_address,
influxdb_port,
influxdb_name,
influxdb_create,
influxdb_timeout,
influxdb_username,
influxdb_password,
debug,
):

logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.DEBUG if debug else logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)

kwargs = dict()
if influxdb_username:
kwargs["username"] = influxdb_username
if influxdb_password:
kwargs["password"] = influxdb_password
influx = InfluxDBClient(host=influxdb_address, port=influxdb_port, **kwargs)
db_list = influx.get_list_database()
logging.info("Available DBs: %s", db_list)
if {"name": influxdb_name} not in db_list:
logging.warning("%s DB not available", influxdb_name )
if influxdb_create:
influx.create_database(influxdb_name)
logging.info("New list of DBs: %s", influx.get_list_database())

influx.switch_database(influxdb_name)

sub = opmon_sub.OpMonSubscriber(
bootstrap=subscriber_bootstrap,
topics=subscriber_topic,
group_id=subscriber_group,
timeout_ms=subscriber_timeout,
)

# this is a list of Entries
q = queue.Queue()

callback_function = partial(process_entry, q=q)

sub.add_callback(name="to_influx", function=callback_function)

thread = threading.Thread(
target=consume, daemon=True, args=(q, influxdb_timeout, influx)
)
thread.start()

sub.start()


def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None):
logging.info("Starting consumer thread")
batch = []
batch_ms = 0
while True:
try:
entry = q.get(timeout=1) ## timeout here is in seconds

if entry.ms - batch_ms < timeout_ms:
# because of the if, batch_ms is not zero.
batch.append(entry.json)
batch_ms = min(batch_ms, entry.ms)

if entry.ms - batch_ms >= timeout_ms:
# note that if we are facing with a late arrival, i.e. entry.ms was smaller than batch_ms, the difference is 0, so this if is skipped
# i.e. there is not double insertion
send_batch(batch)
batch = [entry.json]
batch_ms = entry.ms

except queue.Empty:
logging.debug("Queue is empty")
send_batch(batch, influx)
batch = []
batch_ms = 0


def send_batch(batch: list, influx: InfluxDBClient = None):
if len(batch) > 0:
logging.info("Sending %s points", len(batch))
if influx:
try:
influx.write_points(batch)
except influxdb.exceptions.InfluxDBClientError as e:
logging.error(e)
except Exception as e:
logging.error("Something went wrong: json batch not sent")
logging.error("Details: {}".format(str(e)))
else:
print(batch)


def process_entry(entry: opmon_schema.OpMonEntry, q: queue.Queue):
d = to_dict(entry)
e = Entry(json=d, ms=entry.time.ToMilliseconds())
q.put(e)


def to_dict(entry: opmon_schema.OpMonEntry) -> dict:
ret = dict(measurement=entry.measurement)
ret["fields"] = unpack_payload(entry)
ret["tags"] = create_tags(entry)
ret["time"] = entry.time.ToJsonString()
return ret


def unpack_payload(entry: opmon_schema.OpMonEntry) -> dict:
data = entry.data
ret = dict()
for key in data:
value = data[key]
casted_value = getattr(value, value.WhichOneof("kind"))
ret[key] = casted_value

return ret


def create_tags(entry: opmon_schema.OpMonEntry) -> dict:
opmon_id = entry.origin
# session and application
tags = dict(session=opmon_id.session, application=opmon_id.application)

# element and subelements
struct = opmon_id.substructure
for i in range(len(struct)):
name = "sub" * i + "element"
tags[name] = struct[i]

# custom origin
tags |= entry.custom_origin

return tags


class Entry:
def __init__(self, json: dict, ms: int):
self.json = json
self.ms = ms


if __name__ == "__main__":
cli()
16 changes: 16 additions & 0 deletions opmon-protobuf-dbwriter/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

cd $(dirname $0)
source ../entrypoint_functions.sh

ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS OPMON_DBWRITER_INFLUX_USER OPMON_DBWRITER_INFLUX_PASSWORD"

python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER \
--subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \
--subscriber-topic $OPMON_DBWRITER_TOPIC \
--influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \
--influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \
--influxdb-create True \
--debug False \
--influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password $OPMON_DBWRITER_INFLUX_PASSWORD

Loading

0 comments on commit 999ac29

Please sign in to comment.