From e187c1756ca784ce4c457f031dc5b37c92986d8c Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Wed, 10 Apr 2024 07:41:14 +0200 Subject: [PATCH] Add OpenTelemetry instrumentation --- Dockerfile | 12 ++- projects/etos_suite_runner/requirements.txt | 7 +- projects/etos_suite_runner/setup.cfg | 7 +- .../src/etos_suite_runner/__init__.py | 29 ++++++ .../src/etos_suite_runner/__main__.py | 3 +- .../src/etos_suite_runner/esr.py | 69 +++++++------- .../src/etos_suite_runner/lib/executor.py | 32 ++++--- .../src/etos_suite_runner/lib/runner.py | 18 ++-- .../src/etos_suite_runner/lib/suite.py | 90 +++++++++++-------- .../src/etos_suite_runner/otel_tracing.py | 34 +++++++ 10 files changed, 207 insertions(+), 94 deletions(-) create mode 100644 projects/etos_suite_runner/src/etos_suite_runner/otel_tracing.py diff --git a/Dockerfile b/Dockerfile index ea9067a..90c9f9d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,20 @@ -FROM python:3.9.0-buster AS build +FROM python:3.9-buster AS build COPY . /src WORKDIR /src/projects/etos_suite_runner RUN python3 setup.py bdist_wheel -FROM python:3.9.0-slim-buster +FROM python:3.9-slim-buster COPY --from=build /src/projects/etos_suite_runner/dist/*.whl /tmp # hadolint ignore=DL3013 -RUN pip install --no-cache-dir /tmp/*.whl && groupadd -r etos && useradd -r -m -s /bin/false -g etos etos +RUN apt-get update && \ + apt-get install -y gcc libc-dev --no-install-recommends && \ + pip install --no-cache-dir /tmp/*.whl && \ + apt-get purge -y --auto-remove gcc libc-dev && \ + rm -rf /var/lib/apt/lists/* && \ + groupadd -r etos && useradd -r -m -s /bin/false -g etos etos + USER etos LABEL org.opencontainers.image.source=https://github.com/eiffel-community/etos-suite-runner diff --git a/projects/etos_suite_runner/requirements.txt b/projects/etos_suite_runner/requirements.txt index 1df9bb6..0e97f26 100644 --- a/projects/etos_suite_runner/requirements.txt +++ b/projects/etos_suite_runner/requirements.txt @@ -18,5 +18,8 @@ PyScaffold==3.2.3 packageurl-python~=0.11 cryptography>=42.0.4,<43.0.0 -etos_lib==4.0.0 -etos_environment_provider~=3.2 +etos_lib==4.1.1 +#etos_environment_provider~=3.2 +opentelemetry-api~=1.21 +opentelemetry-exporter-otlp~=1.21 +opentelemetry-sdk~=1.21 diff --git a/projects/etos_suite_runner/setup.cfg b/projects/etos_suite_runner/setup.cfg index 0ca05dc..ed6d447 100644 --- a/projects/etos_suite_runner/setup.cfg +++ b/projects/etos_suite_runner/setup.cfg @@ -28,8 +28,11 @@ install_requires = PyScaffold==3.2.3 packageurl-python~=0.11 cryptography>=42.0.4,<43.0.0 - etos_lib==4.0.0 - etos_environment_provider~=4.0 + etos_lib==4.1.1 + #etos_environment_provider~=4.0 + opentelemetry-api~=1.21 + opentelemetry-exporter-otlp~=1.21 + opentelemetry-sdk~=1.21 python_requires = >=3.4 diff --git a/projects/etos_suite_runner/src/etos_suite_runner/__init__.py b/projects/etos_suite_runner/src/etos_suite_runner/__init__.py index 3fae8f6..dfe8232 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/__init__.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/__init__.py @@ -14,9 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS suite runner module.""" +import logging import os from importlib.metadata import PackageNotFoundError, version +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_NAMESPACE, SERVICE_VERSION, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + from etos_lib.logging.logger import setup_logging try: @@ -30,3 +37,25 @@ ENVIRONMENT = "development" if DEV else "production" os.environ["ENVIRONMENT_PROVIDER_DISABLE_LOGGING"] = "true" setup_logging("ETOS Suite Runner", VERSION, ENVIRONMENT) + + +LOGGER = logging.getLogger(__name__) + +# Setting OTEL_COLLECTOR_HOST will override the default OTEL collector endpoint. +# This is needed when using the centralized cluster-level OTEL collector instead of sidecar collector. +if os.getenv("OTEL_COLLECTOR_HOST"): + os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = os.getenv("OTEL_COLLECTOR_HOST") + LOGGER.info("Using OTEL collector: %s", os.getenv("OTEL_COLLECTOR_HOST")) + +if os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"): + PROVIDER = TracerProvider( + resource=Resource.create( + {SERVICE_NAME: "etos-suite-runner", SERVICE_VERSION: VERSION, SERVICE_NAMESPACE: ENVIRONMENT} + ) + ) + EXPORTER = OTLPSpanExporter() + PROCESSOR = BatchSpanProcessor(EXPORTER) + PROVIDER.add_span_processor(PROCESSOR) + trace.set_tracer_provider(PROVIDER) +else: + LOGGER.info("Suite runner OTEL_EXPORTER_OTLP_TRACES_ENDPOINT not set!") \ No newline at end of file diff --git a/projects/etos_suite_runner/src/etos_suite_runner/__main__.py b/projects/etos_suite_runner/src/etos_suite_runner/__main__.py index 2891532..42478c5 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/__main__.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/__main__.py @@ -21,6 +21,7 @@ from .esr import ESR + LOGGER = logging.getLogger(__name__) @@ -38,11 +39,9 @@ def main(): esr.etos.publisher.stop() LOGGER.info("ESR Finished Executing.", extra={"user_log": True}) - def run(): """Entry point for console_scripts.""" main() - if __name__ == "__main__": run() diff --git a/projects/etos_suite_runner/src/etos_suite_runner/esr.py b/projects/etos_suite_runner/src/etos_suite_runner/esr.py index 212e2e9..611278f 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/esr.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/esr.py @@ -15,6 +15,7 @@ # limitations under the License. # -*- coding: utf-8 -*- """ETOS suite runner module.""" +import json import logging import os import signal @@ -28,14 +29,17 @@ from etos_lib import ETOS from etos_lib.logging.logger import FORMAT_CONFIG from jsontas.jsontas import JsonTas +import opentelemetry from .lib.esr_parameters import ESRParameters from .lib.exceptions import EnvironmentProviderException from .lib.runner import SuiteRunner +from .otel_tracing import get_current_context # Remove spam from pika. logging.getLogger("pika").setLevel(logging.WARNING) +SUBSUITE_CONTEXT = None class ESR: # pylint:disable=too-many-instance-attributes """Suite runner for ETOS main program. @@ -49,6 +53,7 @@ class ESR: # pylint:disable=too-many-instance-attributes def __init__(self) -> None: """Initialize ESR by creating a rabbitmq publisher.""" self.logger = logging.getLogger("ESR") + self.otel_tracer = opentelemetry.trace.get_tracer(__name__) self.etos = ETOS("ETOS Suite Runner", os.getenv("SOURCE_HOST"), "ETOS Suite Runner") signal.signal(signal.SIGTERM, self.graceful_exit) self.params = ESRParameters(self.etos) @@ -67,28 +72,32 @@ def _request_environment(self, ids: list[str]) -> None: :param ids: Generated suite runner IDs used to correlate environments and the suite runners. """ - try: - provider = EnvironmentProvider(self.params.tercc.meta.event_id, ids, copy=False) - result = provider.run() - except Exception: - self.params.set_status("FAILURE", "Failed to run environment provider") - self.logger.error( - "Environment provider has failed in creating an environment for test.", - extra={"user_log": True}, - ) - raise - if result.get("error") is not None: - self.params.set_status("FAILURE", result.get("error")) - self.logger.error( - "Environment provider has failed in creating an environment for test.", - extra={"user_log": True}, - ) - else: - self.params.set_status("SUCCESS", result.get("error")) - self.logger.info( - "Environment provider has finished creating an environment for test.", - extra={"user_log": True}, - ) + span_name = "request_environment" + suite_context = get_current_context() + with self.otel_tracer.start_as_current_span(span_name, context=suite_context) as span: + try: + provider = EnvironmentProvider(self.params.tercc.meta.event_id, ids, copy=False) + result = provider.run() + except Exception as exc: + self.params.set_status("FAILURE", "Failed to run environment provider") + self.logger.error( + "Environment provider has failed in creating an environment for test.", + extra={"user_log": True}, + ) + span.set_attribute("result", traceback.format_exc()) + raise + if result.get("error") is not None: + self.params.set_status("FAILURE", result.get("error")) + self.logger.error( + "Environment provider has failed in creating an environment for test.", + extra={"user_log": True}, + ) + else: + self.params.set_status("SUCCESS", result.get("error")) + self.logger.info( + "Environment provider has finished creating an environment for test.", + extra={"user_log": True}, + ) def _release_environment(self) -> None: """Release an environment from the environment provider.""" @@ -96,11 +105,14 @@ def _release_environment(self) -> None: # Passing variables as keyword argument to make it easier to transition to a function where # jsontas is not required. jsontas = JsonTas() - status, message = release_full_environment( - etos=self.etos, jsontas=jsontas, suite_id=self.params.tercc.meta.event_id - ) - if not status: - self.logger.error(message) + span_name = "release_full_environment" + suite_context = get_current_context() + with self.otel_tracer.start_as_current_span(span_name, context=suite_context) as span: + status, message = release_full_environment( + etos=self.etos, jsontas=jsontas, suite_id=self.params.tercc.meta.event_id + ) + if not status: + self.logger.error(message) def run_suites(self, triggered: EiffelActivityTriggeredEvent) -> list[str]: """Start up a suite runner handling multiple suites that execute within test runners. @@ -117,13 +129,11 @@ def run_suites(self, triggered: EiffelActivityTriggeredEvent) -> list[str]: "ESR Docker", {"CONTEXT": context}, image=os.getenv("SUITE_RUNNER") ) runner = SuiteRunner(self.params, self.etos) - ids = [] for suite in self.params.test_suite: suite["test_suite_started_id"] = str(uuid4()) ids.append(suite["test_suite_started_id"]) self.logger.info("Number of test suites to run: %d", len(ids), extra={"user_log": True}) - try: self.logger.info("Get test environment.") threading.Thread( @@ -176,7 +186,6 @@ def run(self) -> list[str]: executionType="AUTOMATED", triggers=[{"type": "EIFFEL_EVENT"}], ) - self.verify_input() context = triggered.meta.event_id except: # noqa diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py index 04bb38e..3cfdc0e 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py @@ -16,15 +16,17 @@ """Executor handler module.""" import logging import os -from json import JSONDecodeError +from json import JSONDecodeError, dumps from typing import Union from cryptography.fernet import Fernet from etos_lib import ETOS +from opentelemetry import trace from requests.auth import HTTPBasicAuth, HTTPDigestAuth from requests.exceptions import ConnectionError as RequestsConnectionError from requests.exceptions import HTTPError +from ..otel_tracing import get_current_context class TestStartException(Exception): """Exception when starting tests.""" @@ -47,6 +49,7 @@ def __init__(self, etos: ETOS) -> None: """ self.etos = etos self.etos.config.set("build_urls", []) + self.tracer = trace.get_tracer(__name__) def __decrypt(self, password: Union[str, dict]) -> str: """Decrypt a password using an encryption key. @@ -89,15 +92,20 @@ def run_tests(self, test_suite: dict) -> None: if request.get("auth"): request["auth"] = self.__auth(**request["auth"]) method = getattr(self.etos.http, request.pop("method").lower()) - try: - response = method(**request) - response.raise_for_status() - except HTTPError as http_error: + span_name = "start_execution_space" + with self.tracer.start_as_current_span(span_name) as span: + span.set_attribute("executor_id", executor['id']) + span.set_attribute("request", dumps(request, indent=4)) try: - raise TestStartException(http_error.response.json()) from http_error - except JSONDecodeError: - raise TestStartException({"error": http_error.response.text}) from http_error - except RequestsConnectionError as connection_error: - raise TestStartException({"error": str(connection_error)}) from connection_error - self.logger.info("%r", response) - self.logger.debug("%r", response.text) + response = method(**request) + response.raise_for_status() + except HTTPError as http_error: + span.set_attribute("http_error", str(http_error)) + try: + raise TestStartException(http_error.response.json()) from http_error + except JSONDecodeError: + raise TestStartException({"error": http_error.response.text}) from http_error + except RequestsConnectionError as connection_error: + raise TestStartException({"error": str(connection_error)}) from connection_error + self.logger.info("%r", response) + self.logger.debug("%r", response.text) diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py index fd694c3..3093bd5 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py @@ -20,8 +20,10 @@ from environment_provider.environment import release_full_environment from etos_lib.logging.logger import FORMAT_CONFIG from jsontas.jsontas import JsonTas +import opentelemetry from .exceptions import EnvironmentProviderException +from ..otel_tracing import get_current_context from .suite import TestSuite @@ -44,6 +46,8 @@ def __init__(self, params, etos): """ self.params = params self.etos = etos + self.otel_tracer = opentelemetry.trace.get_tracer(__name__) + self.otel_suite_context = get_current_context() def _release_environment(self): """Release an environment from the environment provider.""" @@ -51,17 +55,19 @@ def _release_environment(self): # Passing variables as keyword argument to make it easier to transition to a function where # jsontas is not required. jsontas = JsonTas() - status, message = release_full_environment( - etos=self.etos, jsontas=jsontas, suite_id=self.params.tercc.meta.event_id - ) - if not status: - self.logger.error(message) + span_name = "release_full_environment" + with self.otel_tracer.start_as_current_span(span_name, context=self.otel_suite_context): + status, message = release_full_environment( + etos=self.etos, jsontas=jsontas, suite_id=self.params.tercc.meta.event_id + ) + if not status: + self.logger.error(message) def start_suites_and_wait(self): """Get environments and start all test suites.""" try: test_suites = [ - TestSuite(self.etos, self.params, suite) for suite in self.params.test_suite + TestSuite(self.etos, self.params, suite, otel_context=self.otel_suite_context) for suite in self.params.test_suite ] with ThreadPool() as pool: pool.map(self.run, test_suites) diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py index 782066c..8d9b468 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Test suite handler.""" +import json import logging import threading import time @@ -25,6 +26,7 @@ from etos_lib import ETOS from etos_lib.logging.logger import FORMAT_CONFIG from jsontas.jsontas import JsonTas +import opentelemetry from .esr_parameters import ESRParameters from .exceptions import EnvironmentProviderException @@ -37,6 +39,7 @@ request_test_suite_started, ) from .log_filter import DuplicateFilter +from ..otel_tracing import get_current_context class SubSuite: # pylint:disable=too-many-instance-attributes @@ -52,6 +55,7 @@ def __init__(self, etos: ETOS, environment: dict, main_suite_id: str) -> None: self.main_suite_id = main_suite_id self.logger = logging.getLogger(f"SubSuite - {self.environment.get('name')}") self.logger.addFilter(DuplicateFilter(self.logger)) + self.otel_tracer = opentelemetry.trace.get_tracer(__name__) self.test_suite_started = {} self.test_suite_finished = {} @@ -93,36 +97,41 @@ def outcome(self) -> dict: return self.test_suite_finished.get("data", {}).get("testSuiteOutcome", {}) return {} - def start(self, identifier: str) -> None: + def start(self, identifier: str, otel_context: opentelemetry.context.context.Context) -> None: """Start ETR for this sub suite. :param identifier: An identifier for logs in this sub suite. """ - FORMAT_CONFIG.identifier = identifier - self.logger.info("Starting up the ETOS test runner", extra={"user_log": True}) - executor = Executor(self.etos) - try: - executor.run_tests(self.environment) - except TestStartException as exception: - self.failed = True - self.logger.error( - "Failed to start sub suite: %s", exception.error, extra={"user_log": True} - ) - raise - self.logger.info("ETR triggered.") - timeout = time.time() + self.etos.debug.default_test_result_timeout - try: - while time.time() < timeout: - time.sleep(10) - if not self.started: - continue - self.logger.info("ETOS test runner has started", extra={"user_log": True}) - self.request_finished_event() - if self.finished: - self.logger.info("ETOS test runner has finished", extra={"user_log": True}) - break - finally: - self.release(identifier) + # OpenTelemetry context needs to be retrieved here, since the subsuite is running in a separate process + span_name = "execute_testrunner" + with self.otel_tracer.start_as_current_span(span_name, context=otel_context) as span: + span.set_attribute("subsuite_id", identifier) + FORMAT_CONFIG.identifier = identifier + self.logger.info("Starting up the ETOS test runner", extra={"user_log": True}) + executor = Executor(self.etos) + try: + executor.run_tests(self.environment) + except TestStartException as exception: + self.failed = True + self.logger.error( + "Failed to start sub suite: %s", exception.error, extra={"user_log": True} + ) + span.set_attribute("failed", str(exception)) + raise + self.logger.info("ETR triggered.") + timeout = time.time() + self.etos.debug.default_test_result_timeout + try: + while time.time() < timeout: + time.sleep(10) + if not self.started: + continue + self.logger.info("ETOS test runner has started", extra={"user_log": True}) + self.request_finished_event() + if self.finished: + self.logger.info("ETOS test runner has finished", extra={"user_log": True}) + break + finally: + self.release(identifier) def release(self, testrun_id) -> None: """Release this sub suite.""" @@ -136,16 +145,22 @@ def release(self, testrun_id) -> None: registry = ProviderRegistry(etos=self.etos, jsontas=jsontas, suite_id=testrun_id) self.logger.info(self.environment) - success = release_environment( - etos=self.etos, jsontas=jsontas, provider_registry=registry, sub_suite=self.environment - ) - if not success: - self.logger.exception( - "Failed to check in %r", self.environment["id"], extra={"user_log": True} + + span_name = "release_environment" + with self.otel_tracer.start_as_current_span(span_name) as span: + failure = release_environment( + etos=self.etos, jsontas=jsontas, provider_registry=registry, sub_suite=self.environment ) - return - self.logger.info("Checked in %r", self.environment["id"], extra={"user_log": True}) - self.released = True + span.set_attribute("testrun_id", testrun_id) + span.set_attribute("failure", str(failure)) + span.set_attribute("environment", json.dumps(self.environment, indent=4)) + if failure is not None: + self.logger.exception( + "Failed to check in %r", self.environment["id"], extra={"user_log": True} + ) + return + self.logger.info("Checked in %r", self.environment["id"], extra={"user_log": True}) + self.released = True class TestSuite: # pylint:disable=too-many-instance-attributes @@ -157,7 +172,7 @@ class TestSuite: # pylint:disable=too-many-instance-attributes __activity_triggered = None __activity_finished = None - def __init__(self, etos: ETOS, params: ESRParameters, suite: dict) -> None: + def __init__(self, etos: ETOS, params: ESRParameters, suite: dict, otel_context: opentelemetry.context.context.Context = None) -> None: """Initialize a TestSuite instance.""" self.etos = etos self.params = params @@ -165,6 +180,7 @@ def __init__(self, etos: ETOS, params: ESRParameters, suite: dict) -> None: self.logger = logging.getLogger(f"TestSuite - {self.suite.get('name')}") self.logger.addFilter(DuplicateFilter(self.logger)) self.sub_suites = [] + self.otel_context = otel_context @property def sub_suite_environments(self) -> Iterator[dict]: @@ -324,7 +340,7 @@ def start(self) -> None: ) self.sub_suites.append(sub_suite) thread = threading.Thread( - target=sub_suite.start, args=(self.params.tercc.meta.event_id,) + target=sub_suite.start, args=(self.params.tercc.meta.event_id, self.otel_context) ) threads.append(thread) thread.start() diff --git a/projects/etos_suite_runner/src/etos_suite_runner/otel_tracing.py b/projects/etos_suite_runner/src/etos_suite_runner/otel_tracing.py new file mode 100644 index 0000000..4c0cbd2 --- /dev/null +++ b/projects/etos_suite_runner/src/etos_suite_runner/otel_tracing.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# -*- coding: utf-8 -*- +import logging +import os +import opentelemetry + +LOGGER = logging.getLogger(__name__) + +def get_current_context() -> opentelemetry.context.context.Context: + """Get current context (propagated via environment variable OTEL_CONTEXT).""" + carrier = {} + LOGGER.info("Current OpenTelemetry context env: %s", os.environ.get("OTEL_CONTEXT")) + for kv in os.environ.get("OTEL_CONTEXT", "").split(","): + if kv: + k, v = kv.split("=", 1) + carrier[k] = v + ctx = opentelemetry.propagate.extract(carrier) + LOGGER.info("Current OpenTelemetry context %s", ctx) + return ctx