-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* logging tracer: first draft * progress * more tests * license header * avoid interference with other tests * release note * incorporate feedback from review * Update haystack/tracing/logging_tracer.py Co-authored-by: Madeesh Kannan <[email protected]> --------- Co-authored-by: Madeesh Kannan <[email protected]>
- Loading branch information
Showing
3 changed files
with
262 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]> | ||
# | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import contextlib | ||
import dataclasses | ||
from typing import Any, Dict, Iterator, Optional | ||
|
||
from haystack import logging | ||
from haystack.tracing import Span, Tracer | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
RESET_COLOR = "\033[0m" | ||
|
||
|
||
@dataclasses.dataclass | ||
class LoggingSpan(Span): | ||
operation_name: str | ||
tags: Dict[str, Any] = dataclasses.field(default_factory=dict) | ||
|
||
def set_tag(self, key: str, value: Any) -> None: | ||
""" | ||
Set a single tag on the span. | ||
:param key: the name of the tag. | ||
:param value: the value of the tag. | ||
""" | ||
self.tags[key] = value | ||
|
||
|
||
class LoggingTracer(Tracer): | ||
""" | ||
A simple tracer that logs the operation name and tags of a span. | ||
""" | ||
|
||
def __init__(self, tags_color_strings: Optional[Dict[str, str]] = None) -> None: | ||
""" | ||
Initialize the LoggingTracer. | ||
:param tags_color_strings: | ||
A dictionary that maps tag names to color strings that should be used when logging the tags. | ||
The color strings should be in the format of | ||
[ANSI escape codes](https://en.wikipedia.org/wiki/ANSI_escape_code#Colors). | ||
For example, to color the tag "haystack.component.input" in red, you would pass | ||
`tags_color_strings={"haystack.component.input": "\x1b[1;31m"}`. | ||
""" | ||
|
||
self.tags_color_strings = tags_color_strings or {} | ||
|
||
@contextlib.contextmanager | ||
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: | ||
""" | ||
Trace the execution of a block of code. | ||
:param operation_name: the name of the operation being traced. | ||
:param tags: tags to apply to the newly created span. | ||
:returns: the newly created span. | ||
""" | ||
|
||
custom_span = LoggingSpan(operation_name, tags=tags or {}) | ||
|
||
try: | ||
yield custom_span | ||
except Exception as e: | ||
raise e | ||
# we make sure to log the operation name and tags of the span when the context manager exits | ||
# both in case of success and error | ||
finally: | ||
operation_name = custom_span.operation_name | ||
tags = custom_span.tags or {} | ||
logger.debug("Operation: {operation_name}", operation_name=operation_name) | ||
for tag_name, tag_value in tags.items(): | ||
color_string = self.tags_color_strings.get(tag_name, "") | ||
logger.debug( | ||
color_string + "{tag_name}={tag_value}" + RESET_COLOR, tag_name=tag_name, tag_value=tag_value | ||
) | ||
|
||
def current_span(self) -> Optional[Span]: | ||
"""Return the current active span, if any.""" | ||
# we don't store spans in this simple tracer | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
--- | ||
highlights: > | ||
With the new Logging Tracer, users can inspect in the logs everything | ||
that is happening in their Pipelines in real time. This feature aims to | ||
improve the user experience during experimentation and prototyping. | ||
features: | ||
- | | ||
Introduce a Logging Tracer, that sends all traces to the logs. | ||
It can enabled as follows: | ||
```python | ||
import logging | ||
from haystack import tracing | ||
from haystack.tracing.logging_tracer import LoggingTracer | ||
logging.basicConfig(format="%(levelname)s - %(name)s - %(message)s", level=logging.WARNING) | ||
logging.getLogger("haystack").setLevel(logging.DEBUG) | ||
tracing.tracer.is_content_tracing_enabled = True # to enable tracing/logging content (inputs/outputs) | ||
tracing.enable_tracing(LoggingTracer()) | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]> | ||
# | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from typing import Optional | ||
import logging | ||
|
||
|
||
from haystack.tracing.logging_tracer import LoggingTracer | ||
from haystack import component, Pipeline | ||
from haystack import tracing | ||
|
||
|
||
@component | ||
class Hello: | ||
@component.output_types(output=str) | ||
def run(self, word: Optional[str]): | ||
return {"output": f"Hello, {word}!"} | ||
|
||
|
||
@component | ||
class FailingComponent: | ||
@component.output_types(output=str) | ||
def run(self, word: Optional[str]): | ||
raise Exception("Failing component") | ||
|
||
|
||
class TestLoggingTracer: | ||
def test_init(self) -> None: | ||
tracer = LoggingTracer() | ||
assert tracer.tags_color_strings == {} | ||
|
||
tracer = LoggingTracer(tags_color_strings={"tag_name": "color_string"}) | ||
assert tracer.tags_color_strings == {"tag_name": "color_string"} | ||
|
||
def test_logging_tracer(self, caplog) -> None: | ||
tracer = LoggingTracer() | ||
|
||
caplog.set_level(logging.DEBUG) | ||
with tracer.trace("test") as span: | ||
span.set_tag("key", "value") | ||
|
||
assert "Operation: test" in caplog.text | ||
assert "key=value" in caplog.text | ||
assert len(caplog.records) == 2 | ||
|
||
# structured logging | ||
assert caplog.records[0].operation_name == "test" | ||
assert caplog.records[1].tag_name == "key" | ||
assert caplog.records[1].tag_value == "value" | ||
|
||
def test_tracing_complex_values(self, caplog) -> None: | ||
tracer = LoggingTracer() | ||
|
||
caplog.set_level(logging.DEBUG) | ||
|
||
with tracer.trace("test") as span: | ||
span.set_tag("key", {"a": 1, "b": [2, 3, 4]}) | ||
|
||
assert "Operation: test" in caplog.text | ||
assert "key={'a': 1, 'b': [2, 3, 4]}" in caplog.text | ||
assert len(caplog.records) == 2 | ||
|
||
# structured logging | ||
assert caplog.records[0].operation_name == "test" | ||
assert caplog.records[1].tag_name == "key" | ||
assert caplog.records[1].tag_value == {"a": 1, "b": [2, 3, 4]} | ||
|
||
def test_apply_color_strings(self, caplog) -> None: | ||
tracer = LoggingTracer(tags_color_strings={"key": "color_string"}) | ||
|
||
caplog.set_level(logging.DEBUG) | ||
|
||
with tracer.trace("test") as span: | ||
span.set_tag("key", "value") | ||
|
||
assert "color_string" in caplog.text | ||
|
||
def test_logging_pipeline(self, caplog) -> None: | ||
pipeline = Pipeline() | ||
pipeline.add_component("hello", Hello()) | ||
pipeline.add_component("hello2", Hello()) | ||
pipeline.connect("hello.output", "hello2.word") | ||
|
||
tracing.enable_tracing(LoggingTracer()) | ||
caplog.set_level(logging.DEBUG) | ||
|
||
pipeline.run(data={"word": "world"}) | ||
|
||
records = caplog.records | ||
|
||
assert any( | ||
record.operation_name == "haystack.component.run" for record in records if hasattr(record, "operation_name") | ||
) | ||
assert any( | ||
record.operation_name == "haystack.pipeline.run" for record in records if hasattr(record, "operation_name") | ||
) | ||
|
||
tags_records = [record for record in records if hasattr(record, "tag_name")] | ||
assert any(record.tag_name == "haystack.component.name" for record in tags_records) | ||
assert any(record.tag_value == "hello" for record in tags_records) | ||
|
||
tracing.disable_tracing() | ||
|
||
def test_logging_pipeline_with_content_tracing(self, caplog) -> None: | ||
pipeline = Pipeline() | ||
pipeline.add_component("hello", Hello()) | ||
|
||
tracing.tracer.is_content_tracing_enabled = True | ||
tracing.enable_tracing(LoggingTracer()) | ||
|
||
caplog.set_level(logging.DEBUG) | ||
|
||
pipeline.run(data={"word": "world"}) | ||
records = caplog.records | ||
|
||
tags_records = [record for record in records if hasattr(record, "tag_name")] | ||
|
||
input_tag_value = [ | ||
record.tag_value for record in tags_records if record.tag_name == "haystack.component.input" | ||
][0] | ||
assert input_tag_value == {"word": "world"} | ||
|
||
output_tag_value = [ | ||
record.tag_value for record in tags_records if record.tag_name == "haystack.component.output" | ||
][0] | ||
assert output_tag_value == {"output": "Hello, world!"} | ||
|
||
tracing.tracer.is_content_tracing_enabled = False | ||
tracing.disable_tracing() | ||
|
||
def test_logging_pipeline_on_failure(self, caplog) -> None: | ||
""" | ||
Test that the LoggingTracer also logs events when a component fails. | ||
""" | ||
pipeline = Pipeline() | ||
pipeline.add_component("failing_component", FailingComponent()) | ||
|
||
tracing.enable_tracing(LoggingTracer()) | ||
caplog.set_level(logging.DEBUG) | ||
|
||
try: | ||
pipeline.run(data={"word": "world"}) | ||
except: | ||
pass | ||
|
||
records = caplog.records | ||
|
||
assert any( | ||
record.operation_name == "haystack.component.run" for record in records if hasattr(record, "operation_name") | ||
) | ||
assert any( | ||
record.operation_name == "haystack.pipeline.run" for record in records if hasattr(record, "operation_name") | ||
) | ||
|
||
tags_records = [record for record in records if hasattr(record, "tag_name")] | ||
assert any(record.tag_name == "haystack.component.name" for record in tags_records) | ||
assert any(record.tag_value == "failing_component" for record in tags_records) | ||
|
||
tracing.disable_tracing() |