Skip to content

Commit

Permalink
feat!: tracing with concurrency (#8489)
Browse files Browse the repository at this point in the history
  • Loading branch information
LastRemote authored Oct 29, 2024
1 parent 2045f6f commit 081b143
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 17 deletions.
11 changes: 8 additions & 3 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ class Pipeline(PipelineBase):
Orchestrates component execution according to the execution graph, one after the other.
"""

def _run_component(self, name: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
def _run_component(
self, name: str, inputs: Dict[str, Any], parent_span: Optional[tracing.Span] = None
) -> Dict[str, Any]:
"""
Runs a Component with the given inputs.
:param name: Name of the Component as defined in the Pipeline.
:param inputs: Inputs for the Component.
:param parent_span: The parent span to use for the newly created span.
This is to allow tracing to be correctly linked to the pipeline run.
:raises PipelineRuntimeError: If Component doesn't return a dictionary.
:return: The output of the Component.
"""
Expand All @@ -63,6 +67,7 @@ def _run_component(self, name: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
for key, value in instance.__haystack_output__._sockets_dict.items() # type: ignore
},
},
parent_span=parent_span,
) as span:
# We deepcopy the inputs otherwise we might lose that information
# when we delete them in case they're sent to other Components
Expand Down Expand Up @@ -412,7 +417,7 @@ def run( # noqa: PLR0915, PLR0912
"haystack.pipeline.metadata": self.metadata,
"haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
},
):
) as span:
# Cache for extra outputs, if enabled.
extra_outputs: Dict[Any, Any] = {}

Expand Down Expand Up @@ -463,7 +468,7 @@ def run( # noqa: PLR0915, PLR0912
msg = f"Maximum run count {self._max_runs_per_component} reached for component '{name}'"
raise PipelineMaxComponentRuns(msg)

res: Dict[str, Any] = self._run_component(name, components_inputs[name])
res: Dict[str, Any] = self._run_component(name, components_inputs[name], parent_span=span)

# Delete the inputs that were consumed by the Component and are not received from the user
sockets = list(components_inputs[name].keys())
Expand Down
4 changes: 3 additions & 1 deletion haystack/tracing/datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def __init__(self, tracer: "ddtrace.Tracer") -> None:
self._tracer = tracer

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
) -> Iterator[Span]:
"""Activate and return a new span that inherits from the current active span."""
with self._tracer.trace(operation_name) as span:
custom_span = DatadogSpan(span)
Expand Down
5 changes: 4 additions & 1 deletion haystack/tracing/logging_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ def __init__(self, tags_color_strings: Optional[Dict[str, str]] = None) -> None:
self.tags_color_strings = tags_color_strings or {}

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
) -> Iterator[Span]:
"""
Trace the execution of a block of code.
:param operation_name: the name of the operation being traced.
:param tags: tags to apply to the newly created span.
:param parent_span: the parent span to use for the newly created span. Not used in this simple tracer.
:returns: the newly created span.
"""

Expand Down
4 changes: 3 additions & 1 deletion haystack/tracing/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def __init__(self, tracer: "opentelemetry.trace.Tracer") -> None:
self._tracer = tracer

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
) -> Iterator[Span]:
"""Activate and return a new span that inherits from the current active span."""
with self._tracer.start_as_current_span(operation_name) as raw_span:
span = OpenTelemetrySpan(raw_span)
Expand Down
16 changes: 12 additions & 4 deletions haystack/tracing/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,16 @@ class Tracer(abc.ABC):

@abc.abstractmethod
@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
) -> Iterator[Span]:
"""
Trace the execution of a block of code.
:param operation_name: the name of the operation being traced.
:param tags: tags to apply to the newly created span.
:param parent_span: the parent span to use for the newly created span.
If `None`, the newly created span will be a root span.
:return: the newly created span.
"""
pass
Expand Down Expand Up @@ -117,9 +121,11 @@ def __init__(self, provided_tracer: Tracer) -> None:
self.is_content_tracing_enabled = os.getenv(HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR, "false").lower() == "true"

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
) -> Iterator[Span]:
"""Activate and return a new span that inherits from the current active span."""
with self.actual_tracer.trace(operation_name, tags=tags) as span:
with self.actual_tracer.trace(operation_name, tags=tags, parent_span=parent_span) as span:
yield span

def current_span(self) -> Optional[Span]:
Expand All @@ -139,7 +145,9 @@ class NullTracer(Tracer):
"""A no-op implementation of the `Tracer` interface. This is used when tracing is disabled."""

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
) -> Iterator[Span]:
"""Activate and return a new span that inherits from the current active span."""
yield NullSpan()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
Each tracing span of a component run is now attached with the pipeline run span object. This allows users to trace
the execution of multiple pipeline runs concurrently.
9 changes: 7 additions & 2 deletions test/core/pipeline/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTra
assert len(spying_tracer.spans) == 3

assert spying_tracer.spans == [
SpyingSpan(
pipeline_span := SpyingSpan(
operation_name="haystack.pipeline.run",
tags={
"haystack.pipeline.input_data": {"hello": {"word": "world"}},
"haystack.pipeline.output_data": {"hello2": {"output": "Hello, Hello, world!!"}},
"haystack.pipeline.metadata": {},
"haystack.pipeline.max_runs_per_component": 100,
},
parent_span=None,
trace_id=ANY,
span_id=ANY,
),
Expand All @@ -60,6 +61,7 @@ def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTra
"haystack.component.output_spec": {"output": {"type": "str", "receivers": ["hello2"]}},
"haystack.component.visits": 1,
},
parent_span=pipeline_span,
trace_id=ANY,
span_id=ANY,
),
Expand All @@ -73,6 +75,7 @@ def test_with_enabled_tracing(self, pipeline: Pipeline, spying_tracer: SpyingTra
"haystack.component.output_spec": {"output": {"type": "str", "receivers": []}},
"haystack.component.visits": 1,
},
parent_span=pipeline_span,
trace_id=ANY,
span_id=ANY,
),
Expand All @@ -95,7 +98,7 @@ def test_with_enabled_content_tracing(

assert len(spying_tracer.spans) == 3
assert spying_tracer.spans == [
SpyingSpan(
pipeline_span := SpyingSpan(
operation_name="haystack.pipeline.run",
tags={
"haystack.pipeline.metadata": {},
Expand All @@ -118,6 +121,7 @@ def test_with_enabled_content_tracing(
"haystack.component.visits": 1,
"haystack.component.output": {"output": "Hello, world!"},
},
parent_span=pipeline_span,
trace_id=ANY,
span_id=ANY,
),
Expand All @@ -133,6 +137,7 @@ def test_with_enabled_content_tracing(
"haystack.component.visits": 1,
"haystack.component.output": {"output": "Hello, Hello, world!!"},
},
parent_span=pipeline_span,
trace_id=ANY,
span_id=ANY,
),
Expand Down
8 changes: 5 additions & 3 deletions test/tracing/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
tracer,
HAYSTACK_CONTENT_TRACING_ENABLED_ENV_VAR,
)
from test.tracing.utils import SpyingTracer
from test.tracing.utils import SpyingTracer, SpyingSpan


class TestNullTracer:
def test_tracing(self) -> None:
assert isinstance(tracer.actual_tracer, NullTracer)

# None of this raises
with tracer.trace("operation", {"key": "value"}) as span:
with tracer.trace("operation", {"key": "value"}, parent_span=None) as span:
span.set_tag("key", "value")
span.set_tags({"key": "value"})

Expand All @@ -50,12 +50,14 @@ def test_tracing(self) -> None:
spying_tracer = SpyingTracer()
my_tracer = ProxyTracer(provided_tracer=spying_tracer)

with my_tracer.trace("operation", {"key": "value"}) as span:
parent_span = Mock(spec=SpyingSpan)
with my_tracer.trace("operation", {"key": "value"}, parent_span=parent_span) as span:
span.set_tag("key", "value")
span.set_tags({"key2": "value2"})

assert len(spying_tracer.spans) == 1
assert spying_tracer.spans[0].operation_name == "operation"
assert spying_tracer.spans[0].parent_span == parent_span
assert spying_tracer.spans[0].tags == {"key": "value", "key2": "value2"}


Expand Down
7 changes: 5 additions & 2 deletions test/tracing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@dataclasses.dataclass
class SpyingSpan(Span):
operation_name: str
parent_span: Optional[Span] = None
tags: Dict[str, Any] = dataclasses.field(default_factory=dict)

trace_id: Optional[str] = dataclasses.field(default_factory=lambda: str(uuid.uuid4()))
Expand All @@ -32,8 +33,10 @@ def __init__(self) -> None:
self.spans: List[SpyingSpan] = []

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
new_span = SpyingSpan(operation_name)
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None
) -> Iterator[Span]:
new_span = SpyingSpan(operation_name, parent_span)

for key, value in (tags or {}).items():
new_span.set_tag(key, value)
Expand Down

0 comments on commit 081b143

Please sign in to comment.