Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenTelemetry trace id to logstash entries #62

Merged
merged 9 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions projects/etos_suite_runner/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
PyScaffold==3.2.3
packageurl-python~=0.11
cryptography>=42.0.4,<43.0.0
etos_lib==4.2.0
etos_environment_provider~=4.2
etos_lib==4.3.1
etos_environment_provider==4.4.1
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-sdk~=1.21
4 changes: 2 additions & 2 deletions projects/etos_suite_runner/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ install_requires =
PyScaffold==3.2.3
packageurl-python~=0.11
cryptography>=42.0.4,<43.0.0
etos_lib==4.2.0
etos_environment_provider~=4.2
etos_lib==4.3.1
etos_environment_provider==4.4.1
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-sdk~=1.21
Expand Down
43 changes: 36 additions & 7 deletions projects/etos_suite_runner/src/etos_suite_runner/esr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from etos_lib.logging.logger import FORMAT_CONFIG
from jsontas.jsontas import JsonTas
import opentelemetry
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from .lib.esr_parameters import ESRParameters
from .lib.exceptions import EnvironmentProviderException
Expand All @@ -52,6 +53,8 @@ def __init__(self) -> None:
"""Initialize ESR by creating a rabbitmq publisher."""
self.logger = logging.getLogger("ESR")
self.otel_tracer = opentelemetry.trace.get_tracer(__name__)
self.otel_context = get_current_context()
self.otel_context_token = opentelemetry.context.attach(self.otel_context)
t-persson marked this conversation as resolved.
Show resolved Hide resolved
self.etos = ETOS("ETOS Suite Runner", os.getenv("SOURCE_HOST"), "ETOS Suite Runner")
signal.signal(signal.SIGTERM, self.graceful_exit)
self.params = ESRParameters(self.etos)
Expand All @@ -64,21 +67,24 @@ def __init__(self) -> None:
int(os.getenv("ESR_WAIT_FOR_ENVIRONMENT_TIMEOUT")),
)

def _request_environment(self, ids: list[str]) -> None:
def __del__(self):
"""Destructor."""
if self.otel_context_token is not None:
opentelemetry.context.detach(self.otel_context_token)

def __request_environment(self, ids: list[str]) -> None:
"""Request an environment from the environment provider.

:param ids: Generated suite runner IDs used to correlate environments and the suite
runners.
"""
span_name = "request_environment"
suite_context = get_current_context()
with self.otel_tracer.start_as_current_span(
span_name,
context=suite_context,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
try:
provider = EnvironmentProvider(self.params.tercc.meta.event_id, ids, copy=False)
provider = EnvironmentProvider(self.params.tercc.meta.event_id, ids)
result = provider.run()
except Exception as exc:
self.params.set_status("FAILURE", "Failed to run environment provider")
Expand All @@ -103,17 +109,33 @@ def _request_environment(self, ids: list[str]) -> None:
extra={"user_log": True},
)

def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> None:
"""Request an environment from the environment provider (OpenTelemetry wrapper).

:param ids: Generated suite runner IDs used to correlate environments and the suite
runners.
:param otel_context_carrier: a dict carrying current OpenTelemetry context.
"""
# OpenTelemetry contexts aren't propagated to threads automatically.
# For this reason otel_context needs to be reinstantiated due to
# this method running in a separate thread.
otel_context = TraceContextTextMapPropagator().extract(carrier=otel_context_carrier)
otel_context_token = opentelemetry.context.attach(otel_context)
try:
self.__request_environment(ids)
finally:
opentelemetry.context.detach(otel_context_token)

def _release_environment(self) -> None:
"""Release an environment from the environment provider."""
# TODO: We should remove jsontas as a requirement for this function.
# Passing variables as keyword argument to make it easier to transition to a function where
# jsontas is not required.
jsontas = JsonTas()
span_name = "release_full_environment"
suite_context = get_current_context()
with self.otel_tracer.start_as_current_span(
span_name,
context=suite_context,
context=self.otel_context,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
status, message = release_full_environment(
Expand Down Expand Up @@ -144,8 +166,15 @@ def run_suites(self, triggered: EiffelActivityTriggeredEvent) -> list[str]:
self.logger.info("Number of test suites to run: %d", len(ids), extra={"user_log": True})
try:
self.logger.info("Get test environment.")
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
threading.Thread(
target=self._request_environment, args=(ids.copy(),), daemon=True
target=self._request_environment,
args=(
ids.copy(),
carrier,
),
daemon=True,
).start()

self.etos.events.send_activity_started(triggered, {"CONTEXT": context})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from etos_lib.logging.logger import FORMAT_CONFIG
from jsontas.jsontas import JsonTas
import opentelemetry
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from .exceptions import EnvironmentProviderException
from .otel_tracing import get_current_context, OpenTelemetryBase
Expand Down Expand Up @@ -70,8 +71,10 @@ def _release_environment(self):
def start_suites_and_wait(self):
"""Get environments and start all test suites."""
try:
otel_context_carrier = {}
TraceContextTextMapPropagator().inject(otel_context_carrier)
test_suites = [
TestSuite(self.etos, self.params, suite, otel_context=self.otel_suite_context)
TestSuite(self.etos, self.params, suite, otel_context_carrier=otel_context_carrier)
for suite in self.params.test_suite
]
with ThreadPool() as pool:
Expand Down
58 changes: 49 additions & 9 deletions projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import logging
import threading
import time
from typing import Iterator
from typing import Iterator, Union

from eiffellib.events import EiffelTestSuiteStartedEvent
from environment_provider.lib.registry import ProviderRegistry
Expand All @@ -28,6 +28,7 @@
from etos_lib.opentelemetry.semconv import Attributes as SemConvAttributes
from jsontas.jsontas import JsonTas
import opentelemetry
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from .esr_parameters import ESRParameters
from .exceptions import EnvironmentProviderException
Expand Down Expand Up @@ -98,17 +99,14 @@ def outcome(self) -> dict:
return self.test_suite_finished.get("data", {}).get("testSuiteOutcome", {})
return {}

def start(self, identifier: str, otel_context: opentelemetry.context.context.Context) -> None:
def _start(self, identifier: str) -> None:
"""Start ETR for this sub suite.

:param identifier: An identifier for logs in this sub suite.
"""
# OpenTelemetry context needs to be explicitly given here when creating this new span.
# This is because the subsuite is running in a separate thread.
span_name = "execute_testrunner"
with self.otel_tracer.start_as_current_span(
span_name,
context=otel_context,
kind=opentelemetry.trace.SpanKind.CLIENT,
) as span:
span.set_attribute(SemConvAttributes.SUBSUITE_ID, identifier)
Expand Down Expand Up @@ -139,6 +137,21 @@ def start(self, identifier: str, otel_context: opentelemetry.context.context.Con
finally:
self.release(identifier)

def start(self, identifier: str, otel_context_carrier: dict) -> None:
"""Start ETR for this sub suite (OpenTelemetry wrapper method).

:param identifier: An identifier for logs in this sub suite.
:otel_context_carrier: a dict propagating OpenTelemetry context from the parent thread.
"""
# OpenTelemetry context needs to be explicitly given here when creating this new span.
# This is because the subsuite is running in a separate thread.
otel_context = TraceContextTextMapPropagator().extract(carrier=otel_context_carrier)
otel_context_token = opentelemetry.context.attach(otel_context)
try:
self._start(identifier)
finally:
opentelemetry.context.detach(otel_context_token)

def release(self, testrun_id) -> None:
"""Release this sub suite."""
# TODO: This whole method is now a bit of a hack that needs to be cleaned up.
Expand Down Expand Up @@ -189,7 +202,7 @@ def __init__(
etos: ETOS,
params: ESRParameters,
suite: dict,
otel_context: opentelemetry.context.context.Context = None,
otel_context_carrier: Union[dict, None] = None,
) -> None:
"""Initialize a TestSuite instance."""
self.etos = etos
Expand All @@ -198,7 +211,20 @@ def __init__(
self.logger = logging.getLogger(f"TestSuite - {self.suite.get('name')}")
self.logger.addFilter(DuplicateFilter(self.logger))
self.sub_suites = []
self.otel_context = otel_context

if otel_context_carrier is None:
otel_context_carrier = {}

self.otel_context_carrier = otel_context_carrier
self.otel_context = TraceContextTextMapPropagator().extract(
carrier=self.otel_context_carrier
)
self.otel_context_token = opentelemetry.context.attach(self.otel_context)
TraceContextTextMapPropagator().inject(self.otel_context_carrier)

def __del__(self):
"""Destructor."""
opentelemetry.context.detach(self.otel_context_token)

@property
def sub_suite_environments(self) -> Iterator[dict]:
Expand Down Expand Up @@ -329,7 +355,7 @@ def _send_test_suite_started(self) -> EiffelTestSuiteStartedEvent:
}
return self.etos.events.send(test_suite_started, links, data)

def start(self) -> None:
def _start(self):
"""Send test suite started, trigger and wait for all sub suites to start."""
self._announce("Starting tests", f"Starting up sub suites for '{self.suite.get('name')}'")

Expand Down Expand Up @@ -365,7 +391,7 @@ def start(self) -> None:
self.sub_suites.append(sub_suite)
thread = threading.Thread(
target=sub_suite.start,
args=(self.params.tercc.meta.event_id, self.otel_context),
args=(self.params.tercc.meta.event_id, self.otel_context_carrier),
)
threads.append(thread)
thread.start()
Expand Down Expand Up @@ -394,6 +420,20 @@ def start(self) -> None:
extra={"user_log": True},
)

def start(self) -> None:
"""Send test suite started, trigger and wait for all sub suites to start.

This is an OpenTelemetry wrapper method for _start().
"""
# OpenTelemetry contexts aren't automatically propagated to threads.
# For this reason OpenTelemetry context needs to be reinstantiated here.
otel_context = TraceContextTextMapPropagator().extract(carrier=self.otel_context_carrier)
otel_context_token = opentelemetry.context.attach(otel_context)
try:
self._start()
finally:
opentelemetry.context.detach(otel_context_token)

def release_all(self) -> None:
"""Release all, unreleased, sub suites."""
self.logger.info("Releasing all sub suite environments")
Expand Down
Loading