Skip to content

Commit

Permalink
Add OpenTelemetry trace id to logstash entries (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
andmat900 authored Jun 12, 2024
1 parent 8adbde0 commit 1de8386
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 21 deletions.
4 changes: 2 additions & 2 deletions projects/etos_suite_runner/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
PyScaffold==3.2.3
packageurl-python~=0.11
cryptography>=42.0.4,<43.0.0
etos_lib==4.2.0
etos_environment_provider~=4.2
etos_lib==4.3.1
etos_environment_provider==4.4.1
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-sdk~=1.21
4 changes: 2 additions & 2 deletions projects/etos_suite_runner/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ install_requires =
PyScaffold==3.2.3
packageurl-python~=0.11
cryptography>=42.0.4,<43.0.0
etos_lib==4.2.0
etos_environment_provider~=4.2
etos_lib==4.3.1
etos_environment_provider==4.4.1
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-sdk~=1.21
Expand Down
43 changes: 36 additions & 7 deletions projects/etos_suite_runner/src/etos_suite_runner/esr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from etos_lib.logging.logger import FORMAT_CONFIG
from jsontas.jsontas import JsonTas
import opentelemetry
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from .lib.esr_parameters import ESRParameters
from .lib.exceptions import EnvironmentProviderException
Expand All @@ -52,6 +53,8 @@ def __init__(self) -> None:
"""Initialize ESR by creating a rabbitmq publisher."""
self.logger = logging.getLogger("ESR")
self.otel_tracer = opentelemetry.trace.get_tracer(__name__)
self.otel_context = get_current_context()
self.otel_context_token = opentelemetry.context.attach(self.otel_context)
self.etos = ETOS("ETOS Suite Runner", os.getenv("SOURCE_HOST"), "ETOS Suite Runner")
signal.signal(signal.SIGTERM, self.graceful_exit)
self.params = ESRParameters(self.etos)
Expand All @@ -64,21 +67,24 @@ def __init__(self) -> None:
int(os.getenv("ESR_WAIT_FOR_ENVIRONMENT_TIMEOUT")),
)

def _request_environment(self, ids: list[str]) -> None:
def __del__(self):
"""Destructor."""
if self.otel_context_token is not None:
opentelemetry.context.detach(self.otel_context_token)

def __request_environment(self, ids: list[str]) -> None:
"""Request an environment from the environment provider.
:param ids: Generated suite runner IDs used to correlate environments and the suite
runners.
"""
span_name = "request_environment"
suite_context = get_current_context()
with self.otel_tracer.start_as_current_span(
span_name,
context=suite_context,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
try:
provider = EnvironmentProvider(self.params.tercc.meta.event_id, ids, copy=False)
provider = EnvironmentProvider(self.params.tercc.meta.event_id, ids)
result = provider.run()
except Exception as exc:
self.params.set_status("FAILURE", "Failed to run environment provider")
Expand All @@ -103,17 +109,33 @@ def _request_environment(self, ids: list[str]) -> None:
extra={"user_log": True},
)

def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> None:
"""Request an environment from the environment provider (OpenTelemetry wrapper).
:param ids: Generated suite runner IDs used to correlate environments and the suite
runners.
:param otel_context_carrier: a dict carrying current OpenTelemetry context.
"""
# OpenTelemetry contexts aren't propagated to threads automatically.
# For this reason otel_context needs to be reinstantiated due to
# this method running in a separate thread.
otel_context = TraceContextTextMapPropagator().extract(carrier=otel_context_carrier)
otel_context_token = opentelemetry.context.attach(otel_context)
try:
self.__request_environment(ids)
finally:
opentelemetry.context.detach(otel_context_token)

def _release_environment(self) -> None:
"""Release an environment from the environment provider."""
# TODO: We should remove jsontas as a requirement for this function.
# Passing variables as keyword argument to make it easier to transition to a function where
# jsontas is not required.
jsontas = JsonTas()
span_name = "release_full_environment"
suite_context = get_current_context()
with self.otel_tracer.start_as_current_span(
span_name,
context=suite_context,
context=self.otel_context,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
status, message = release_full_environment(
Expand Down Expand Up @@ -144,8 +166,15 @@ def run_suites(self, triggered: EiffelActivityTriggeredEvent) -> list[str]:
self.logger.info("Number of test suites to run: %d", len(ids), extra={"user_log": True})
try:
self.logger.info("Get test environment.")
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
threading.Thread(
target=self._request_environment, args=(ids.copy(),), daemon=True
target=self._request_environment,
args=(
ids.copy(),
carrier,
),
daemon=True,
).start()

self.etos.events.send_activity_started(triggered, {"CONTEXT": context})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from etos_lib.logging.logger import FORMAT_CONFIG
from jsontas.jsontas import JsonTas
import opentelemetry
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from .exceptions import EnvironmentProviderException
from .otel_tracing import get_current_context, OpenTelemetryBase
Expand Down Expand Up @@ -70,8 +71,10 @@ def _release_environment(self):
def start_suites_and_wait(self):
"""Get environments and start all test suites."""
try:
otel_context_carrier = {}
TraceContextTextMapPropagator().inject(otel_context_carrier)
test_suites = [
TestSuite(self.etos, self.params, suite, otel_context=self.otel_suite_context)
TestSuite(self.etos, self.params, suite, otel_context_carrier=otel_context_carrier)
for suite in self.params.test_suite
]
with ThreadPool() as pool:
Expand Down
58 changes: 49 additions & 9 deletions projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import logging
import threading
import time
from typing import Iterator
from typing import Iterator, Union

from eiffellib.events import EiffelTestSuiteStartedEvent
from environment_provider.lib.registry import ProviderRegistry
Expand All @@ -28,6 +28,7 @@
from etos_lib.opentelemetry.semconv import Attributes as SemConvAttributes
from jsontas.jsontas import JsonTas
import opentelemetry
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from .esr_parameters import ESRParameters
from .exceptions import EnvironmentProviderException
Expand Down Expand Up @@ -98,17 +99,14 @@ def outcome(self) -> dict:
return self.test_suite_finished.get("data", {}).get("testSuiteOutcome", {})
return {}

def start(self, identifier: str, otel_context: opentelemetry.context.context.Context) -> None:
def _start(self, identifier: str) -> None:
"""Start ETR for this sub suite.
:param identifier: An identifier for logs in this sub suite.
"""
# OpenTelemetry context needs to be explicitly given here when creating this new span.
# This is because the subsuite is running in a separate thread.
span_name = "execute_testrunner"
with self.otel_tracer.start_as_current_span(
span_name,
context=otel_context,
kind=opentelemetry.trace.SpanKind.CLIENT,
) as span:
span.set_attribute(SemConvAttributes.SUBSUITE_ID, identifier)
Expand Down Expand Up @@ -139,6 +137,21 @@ def start(self, identifier: str, otel_context: opentelemetry.context.context.Con
finally:
self.release(identifier)

def start(self, identifier: str, otel_context_carrier: dict) -> None:
"""Start ETR for this sub suite (OpenTelemetry wrapper method).
:param identifier: An identifier for logs in this sub suite.
:otel_context_carrier: a dict propagating OpenTelemetry context from the parent thread.
"""
# OpenTelemetry context needs to be explicitly given here when creating this new span.
# This is because the subsuite is running in a separate thread.
otel_context = TraceContextTextMapPropagator().extract(carrier=otel_context_carrier)
otel_context_token = opentelemetry.context.attach(otel_context)
try:
self._start(identifier)
finally:
opentelemetry.context.detach(otel_context_token)

def release(self, testrun_id) -> None:
"""Release this sub suite."""
# TODO: This whole method is now a bit of a hack that needs to be cleaned up.
Expand Down Expand Up @@ -189,7 +202,7 @@ def __init__(
etos: ETOS,
params: ESRParameters,
suite: dict,
otel_context: opentelemetry.context.context.Context = None,
otel_context_carrier: Union[dict, None] = None,
) -> None:
"""Initialize a TestSuite instance."""
self.etos = etos
Expand All @@ -198,7 +211,20 @@ def __init__(
self.logger = logging.getLogger(f"TestSuite - {self.suite.get('name')}")
self.logger.addFilter(DuplicateFilter(self.logger))
self.sub_suites = []
self.otel_context = otel_context

if otel_context_carrier is None:
otel_context_carrier = {}

self.otel_context_carrier = otel_context_carrier
self.otel_context = TraceContextTextMapPropagator().extract(
carrier=self.otel_context_carrier
)
self.otel_context_token = opentelemetry.context.attach(self.otel_context)
TraceContextTextMapPropagator().inject(self.otel_context_carrier)

def __del__(self):
"""Destructor."""
opentelemetry.context.detach(self.otel_context_token)

@property
def sub_suite_environments(self) -> Iterator[dict]:
Expand Down Expand Up @@ -329,7 +355,7 @@ def _send_test_suite_started(self) -> EiffelTestSuiteStartedEvent:
}
return self.etos.events.send(test_suite_started, links, data)

def start(self) -> None:
def _start(self):
"""Send test suite started, trigger and wait for all sub suites to start."""
self._announce("Starting tests", f"Starting up sub suites for '{self.suite.get('name')}'")

Expand Down Expand Up @@ -365,7 +391,7 @@ def start(self) -> None:
self.sub_suites.append(sub_suite)
thread = threading.Thread(
target=sub_suite.start,
args=(self.params.tercc.meta.event_id, self.otel_context),
args=(self.params.tercc.meta.event_id, self.otel_context_carrier),
)
threads.append(thread)
thread.start()
Expand Down Expand Up @@ -394,6 +420,20 @@ def start(self) -> None:
extra={"user_log": True},
)

def start(self) -> None:
"""Send test suite started, trigger and wait for all sub suites to start.
This is an OpenTelemetry wrapper method for _start().
"""
# OpenTelemetry contexts aren't automatically propagated to threads.
# For this reason OpenTelemetry context needs to be reinstantiated here.
otel_context = TraceContextTextMapPropagator().extract(carrier=self.otel_context_carrier)
otel_context_token = opentelemetry.context.attach(otel_context)
try:
self._start()
finally:
opentelemetry.context.detach(otel_context_token)

def release_all(self) -> None:
"""Release all, unreleased, sub suites."""
self.logger.info("Releasing all sub suite environments")
Expand Down

0 comments on commit 1de8386

Please sign in to comment.