diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index 7f8c2cfbed..e6b631ff63 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -31,12 +31,16 @@ class Pipeline(PipelineBase): Orchestrates component execution according to the execution graph, one after the other. """ - def _run_component(self, name: str, inputs: Dict[str, Any]) -> Dict[str, Any]: + def _run_component( + self, name: str, inputs: Dict[str, Any], parent_span: Optional[tracing.Span] = None + ) -> Dict[str, Any]: """ Runs a Component with the given inputs. :param name: Name of the Component as defined in the Pipeline. :param inputs: Inputs for the Component. + :param parent_span: The parent span to use for the newly created span. + This is to allow tracing to be correctly linked to the pipeline run. :raises PipelineRuntimeError: If Component doesn't return a dictionary. :return: The output of the Component. """ @@ -63,6 +67,7 @@ def _run_component(self, name: str, inputs: Dict[str, Any]) -> Dict[str, Any]: for key, value in instance.__haystack_output__._sockets_dict.items() # type: ignore }, }, + parent_span=parent_span, ) as span: # We deepcopy the inputs otherwise we might lose that information # when we delete them in case they're sent to other Components @@ -412,7 +417,7 @@ def run( # noqa: PLR0915, PLR0912 "haystack.pipeline.metadata": self.metadata, "haystack.pipeline.max_runs_per_component": self._max_runs_per_component, }, - ): + ) as span: # Cache for extra outputs, if enabled. extra_outputs: Dict[Any, Any] = {} @@ -463,7 +468,7 @@ def run( # noqa: PLR0915, PLR0912 msg = f"Maximum run count {self._max_runs_per_component} reached for component '{name}'" raise PipelineMaxComponentRuns(msg) - res: Dict[str, Any] = self._run_component(name, components_inputs[name]) + res: Dict[str, Any] = self._run_component(name, components_inputs[name], parent_span=span) # Delete the inputs that were consumed by the Component and are not received from the user sockets = list(components_inputs[name].keys()) diff --git a/haystack/tracing/datadog.py b/haystack/tracing/datadog.py index 40d1cda81a..a1d0808e55 100644 --- a/haystack/tracing/datadog.py +++ b/haystack/tracing/datadog.py @@ -59,7 +59,9 @@ def __init__(self, tracer: "ddtrace.Tracer") -> None: self._tracer = tracer @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + ) -> Iterator[Span]: """Activate and return a new span that inherits from the current active span.""" with self._tracer.trace(operation_name) as span: custom_span = DatadogSpan(span) diff --git a/haystack/tracing/logging_tracer.py b/haystack/tracing/logging_tracer.py index 9ae4114428..166c484a90 100644 --- a/haystack/tracing/logging_tracer.py +++ b/haystack/tracing/logging_tracer.py @@ -49,12 +49,15 @@ def __init__(self, tags_color_strings: Optional[Dict[str, str]] = None) -> None: self.tags_color_strings = tags_color_strings or {} @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + ) -> Iterator[Span]: """ Trace the execution of a block of code. :param operation_name: the name of the operation being traced. :param tags: tags to apply to the newly created span. + :param parent_span: the parent span to use for the newly created span. Not used in this simple tracer. :returns: the newly created span. """ diff --git a/haystack/tracing/opentelemetry.py b/haystack/tracing/opentelemetry.py index 5eb9b6e173..88c61301cd 100644 --- a/haystack/tracing/opentelemetry.py +++ b/haystack/tracing/opentelemetry.py @@ -48,7 +48,9 @@ def __init__(self, tracer: "opentelemetry.trace.Tracer") -> None: self._tracer = tracer @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + ) -> Iterator[Span]: """Activate and return a new span that inherits from the current active span.""" with self._tracer.start_as_current_span(operation_name) as raw_span: span = OpenTelemetrySpan(raw_span) diff --git a/haystack/tracing/tracer.py b/haystack/tracing/tracer.py index b1f147ef34..4afe7e3db7 100644 --- a/haystack/tracing/tracer.py +++ b/haystack/tracing/tracer.py @@ -83,12 +83,16 @@ class Tracer(abc.ABC): @abc.abstractmethod @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + ) -> Iterator[Span]: """ Trace the execution of a block of code. :param operation_name: the name of the operation being traced. :param tags: tags to apply to the newly created span. + :param parent_span: the parent span to use for the newly created span. + If `None`, the newly created span will be a root span. :return: the newly created span. """ pass @@ -117,9 +121,11 @@ def __init__(self, provided_tracer: Tracer) -> None: self.is_content_tracing_enabled = os.getenv(HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR, "false").lower() == "true" @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + ) -> Iterator[Span]: """Activate and return a new span that inherits from the current active span.""" - with self.actual_tracer.trace(operation_name, tags=tags) as span: + with self.actual_tracer.trace(operation_name, tags=tags, parent_span=parent_span) as span: yield span def current_span(self) -> Optional[Span]: @@ -139,7 +145,9 @@ class NullTracer(Tracer): """A no-op implementation of the `Tracer` interface. This is used when tracing is disabled.""" @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + ) -> Iterator[Span]: """Activate and return a new span that inherits from the current active span.""" yield NullSpan() diff --git a/releasenotes/notes/tracing-with-concurrency-5daadfde0a36f94a.yaml b/releasenotes/notes/tracing-with-concurrency-5daadfde0a36f94a.yaml new file mode 100644 index 0000000000..d319c98091 --- /dev/null +++ b/releasenotes/notes/tracing-with-concurrency-5daadfde0a36f94a.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Each tracing span of a component run is now attached with the pipeline run span object. This allows users to trace + the execution of multiple pipeline runs concurrently. diff --git a/test/core/pipeline/test_tracing.py b/test/core/pipeline/test_tracing.py index 83294bcc2c..6b5493383d 100644 --- a/test/core/pipeline/test_tracing.py +++ b/test/core/pipeline/test_tracing.py @@ -39,7 +39,7 @@ def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTra assert len(spying_tracer.spans) == 3 assert spying_tracer.spans == [ - SpyingSpan( + pipeline_span := SpyingSpan( operation_name="haystack.pipeline.run", tags={ "haystack.pipeline.input_data": {"hello": {"word": "world"}}, @@ -47,6 +47,7 @@ def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTra "haystack.pipeline.metadata": {}, "haystack.pipeline.max_runs_per_component": 100, }, + parent_span=None, trace_id=ANY, span_id=ANY, ), @@ -60,6 +61,7 @@ def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTra "haystack.component.output_spec": {"output": {"type": "str", "receivers": ["hello2"]}}, "haystack.component.visits": 1, }, + parent_span=pipeline_span, trace_id=ANY, span_id=ANY, ), @@ -73,6 +75,7 @@ def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTra "haystack.component.output_spec": {"output": {"type": "str", "receivers": []}}, "haystack.component.visits": 1, }, + parent_span=pipeline_span, trace_id=ANY, span_id=ANY, ), @@ -95,7 +98,7 @@ def test_with_enabled_content_tracing( assert len(spying_tracer.spans) == 3 assert spying_tracer.spans == [ - SpyingSpan( + pipeline_span := SpyingSpan( operation_name="haystack.pipeline.run", tags={ "haystack.pipeline.metadata": {}, @@ -118,6 +121,7 @@ def test_with_enabled_content_tracing( "haystack.component.visits": 1, "haystack.component.output": {"output": "Hello, world!"}, }, + parent_span=pipeline_span, trace_id=ANY, span_id=ANY, ), @@ -133,6 +137,7 @@ def test_with_enabled_content_tracing( "haystack.component.visits": 1, "haystack.component.output": {"output": "Hello, Hello, world!!"}, }, + parent_span=pipeline_span, trace_id=ANY, span_id=ANY, ), diff --git a/test/tracing/test_tracer.py b/test/tracing/test_tracer.py index 0593a27fa3..3ef63c7b26 100644 --- a/test/tracing/test_tracer.py +++ b/test/tracing/test_tracer.py @@ -29,7 +29,7 @@ tracer, HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR, ) -from test.tracing.utils import SpyingTracer +from test.tracing.utils import SpyingTracer, SpyingSpan class TestNullTracer: @@ -37,7 +37,7 @@ def test_tracing(self) -> None: assert isinstance(tracer.actual_tracer, NullTracer) # None of this raises - with tracer.trace("operation", {"key": "value"}) as span: + with tracer.trace("operation", {"key": "value"}, parent_span=None) as span: span.set_tag("key", "value") span.set_tags({"key": "value"}) @@ -50,12 +50,14 @@ def test_tracing(self) -> None: spying_tracer = SpyingTracer() my_tracer = ProxyTracer(provided_tracer=spying_tracer) - with my_tracer.trace("operation", {"key": "value"}) as span: + parent_span = Mock(spec=SpyingSpan) + with my_tracer.trace("operation", {"key": "value"}, parent_span=parent_span) as span: span.set_tag("key", "value") span.set_tags({"key2": "value2"}) assert len(spying_tracer.spans) == 1 assert spying_tracer.spans[0].operation_name == "operation" + assert spying_tracer.spans[0].parent_span == parent_span assert spying_tracer.spans[0].tags == {"key": "value", "key2": "value2"} diff --git a/test/tracing/utils.py b/test/tracing/utils.py index 8a1472da8b..c2261baacf 100644 --- a/test/tracing/utils.py +++ b/test/tracing/utils.py @@ -12,6 +12,7 @@ @dataclasses.dataclass class SpyingSpan(Span): operation_name: str + parent_span: Optional[Span] = None tags: Dict[str, Any] = dataclasses.field(default_factory=dict) trace_id: Optional[str] = dataclasses.field(default_factory=lambda: str(uuid.uuid4())) @@ -32,8 +33,10 @@ def __init__(self) -> None: self.spans: List[SpyingSpan] = [] @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: - new_span = SpyingSpan(operation_name) + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + ) -> Iterator[Span]: + new_span = SpyingSpan(operation_name, parent_span) for key, value in (tags or {}).items(): new_span.set_tag(key, value)