Skip to content

Commit

Permalink
Readded original comments and completion_start_time
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-stoica committed Nov 12, 2024
1 parent 394f7e1 commit d85c384
Showing 1 changed file with 53 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,67 +121,70 @@ def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public:
self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true"

@contextlib.contextmanager
def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]:
"""
Start and manage a new trace span.
:param operation_name: The name of the operation.
:param tags: A dictionary of tags to attach to the span.
:return: A context manager yielding the span.
"""
def trace(
self, operation_name: str, tags: Optional[Dict[str, Any]] = None,
parent_span: Optional[Span] = None
) -> Iterator[Span]:
tags = tags or {}
span_name = tags.get("haystack.component.name", operation_name)

if parent_span is not None:
parent_raw_span = parent_span.raw_span()
else:
current_span = self.current_span()
parent_raw_span = current_span.raw_span() if current_span else None

if tags.get("haystack.component.type") in _ALL_SUPPORTED_GENERATORS:
span = LangfuseSpan(self.current_span().raw_span().generation(name=span_name))
if parent_raw_span:
new_span = parent_raw_span.generation(name=span_name)
else:
new_span = self._langfuse.generation(name=span_name)
else:
span = LangfuseSpan(self.current_span().raw_span().span(name=span_name))
if parent_raw_span:
new_span = parent_raw_span.span(name=span_name)
else:
new_span = self._langfuse.span(name=span_name)

span = LangfuseSpan(new_span)
self._context.append(span)
span.set_tags(tags)

yield span

if tags.get("haystack.component.type") in _SUPPORTED_GENERATORS:
meta = span._data.get("haystack.component.output", {}).get("meta")
if meta:
# Haystack returns one meta dict for each message, but the 'usage' value
# is always the same, let's just pick the first item
m = meta[0]
span._span.update(usage=m.get("usage") or None, model=m.get("model"))
elif tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS:
replies = span._data.get("haystack.component.output", {}).get("replies")
if replies:
meta = replies[0].meta
completion_start_time = meta.get("completion_start_time")
if completion_start_time:
try:
completion_start_time = datetime.fromisoformat(completion_start_time)
except ValueError:
logger.error(f"Failed to parse completion_start_time: {completion_start_time}")
completion_start_time = None
span._span.update(
usage=meta.get("usage") or None,
model=meta.get("model"),
completion_start_time=completion_start_time,
)

pipeline_input = tags.get("haystack.pipeline.input_data", None)
if pipeline_input:
span._span.update(input=tags["haystack.pipeline.input_data"])
pipeline_output = tags.get("haystack.pipeline.output_data", None)
if pipeline_output:
span._span.update(output=tags["haystack.pipeline.output_data"])

span.raw_span().end()
self._context.pop()

if len(self._context) == 1:
# The root span has to be a trace, which need to be removed from the context after the pipeline run
try:
yield span
finally:
self._context.pop()

if self.enforce_flush:
self.flush()

if tags.get("haystack.component.type") in _SUPPORTED_GENERATORS:
meta = span._data.get("haystack.component.output", {}).get("meta")
if meta:
# Haystack returns one meta dict for each message, but the 'usage' value
# is always the same, let's just pick the first item
m = meta[0]
span._span.update(usage=m.get("usage") or None, model=m.get("model"))
# add prompt object to generator #1154
# if m.get("prompt_name") is not None:
# prompt_name = m["prompt_name"]
# prompt_obj = self.get_pipeline_run_context().get(prompt_name)
# if prompt_obj:
# span._span.update(prompt=prompt_obj)
span._span.update(usage=m.get("usage") or None, model=m.get("model"))
elif tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS:
replies = span._data.get("haystack.component.output", {}).get("replies")
if replies:
meta = replies[0].meta
completion_start_time = meta.get("completion_start_time")
if completion_start_time:
try:
completion_start_time = datetime.fromisoformat(completion_start_time)
except ValueError:
logger.error(f"Failed to parse completion_start_time: {completion_start_time}")
completion_start_time = None
span._span.update(
usage=meta.get("usage") or None,
model=meta.get("model"),
completion_start_time=completion_start_time,
)

def flush(self):
self._tracer.flush()

Expand Down

0 comments on commit d85c384

Please sign in to comment.