From d85c38443c5aaf8605cb6438a7298ed0e6aae801 Mon Sep 17 00:00:00 2001 From: alex-stoica Date: Tue, 12 Nov 2024 20:54:08 +0200 Subject: [PATCH] Readded original comments and completion_start_time --- .../tracing/langfuse/tracer.py | 103 +++++++++--------- 1 file changed, 53 insertions(+), 50 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index c9c8a354e..895a9955e 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -121,67 +121,70 @@ def __init__(self, tracer: "langfuse.Langfuse", name: str = "Haystack", public: self.enforce_flush = os.getenv(HAYSTACK_LANGFUSE_ENFORCE_FLUSH_ENV_VAR, "true").lower() == "true" @contextlib.contextmanager - def trace(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]: - """ - Start and manage a new trace span. - :param operation_name: The name of the operation. - :param tags: A dictionary of tags to attach to the span. - :return: A context manager yielding the span. - """ + def trace( + self, operation_name: str, tags: Optional[Dict[str, Any]] = None, + parent_span: Optional[Span] = None + ) -> Iterator[Span]: tags = tags or {} span_name = tags.get("haystack.component.name", operation_name) + if parent_span is not None: + parent_raw_span = parent_span.raw_span() + else: + current_span = self.current_span() + parent_raw_span = current_span.raw_span() if current_span else None + if tags.get("haystack.component.type") in _ALL_SUPPORTED_GENERATORS: - span = LangfuseSpan(self.current_span().raw_span().generation(name=span_name)) + if parent_raw_span: + new_span = parent_raw_span.generation(name=span_name) + else: + new_span = self._langfuse.generation(name=span_name) else: - span = LangfuseSpan(self.current_span().raw_span().span(name=span_name)) + if parent_raw_span: + new_span = parent_raw_span.span(name=span_name) + else: + new_span = self._langfuse.span(name=span_name) + span = LangfuseSpan(new_span) self._context.append(span) span.set_tags(tags) - yield span - - if tags.get("haystack.component.type") in _SUPPORTED_GENERATORS: - meta = span._data.get("haystack.component.output", {}).get("meta") - if meta: - # Haystack returns one meta dict for each message, but the 'usage' value - # is always the same, let's just pick the first item - m = meta[0] - span._span.update(usage=m.get("usage") or None, model=m.get("model")) - elif tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS: - replies = span._data.get("haystack.component.output", {}).get("replies") - if replies: - meta = replies[0].meta - completion_start_time = meta.get("completion_start_time") - if completion_start_time: - try: - completion_start_time = datetime.fromisoformat(completion_start_time) - except ValueError: - logger.error(f"Failed to parse completion_start_time: {completion_start_time}") - completion_start_time = None - span._span.update( - usage=meta.get("usage") or None, - model=meta.get("model"), - completion_start_time=completion_start_time, - ) - - pipeline_input = tags.get("haystack.pipeline.input_data", None) - if pipeline_input: - span._span.update(input=tags["haystack.pipeline.input_data"]) - pipeline_output = tags.get("haystack.pipeline.output_data", None) - if pipeline_output: - span._span.update(output=tags["haystack.pipeline.output_data"]) - - span.raw_span().end() - self._context.pop() - - if len(self._context) == 1: - # The root span has to be a trace, which need to be removed from the context after the pipeline run + try: + yield span + finally: self._context.pop() - if self.enforce_flush: - self.flush() - + if tags.get("haystack.component.type") in _SUPPORTED_GENERATORS: + meta = span._data.get("haystack.component.output", {}).get("meta") + if meta: + # Haystack returns one meta dict for each message, but the 'usage' value + # is always the same, let's just pick the first item + m = meta[0] + span._span.update(usage=m.get("usage") or None, model=m.get("model")) + # add prompt object to generator #1154 + # if m.get("prompt_name") is not None: + # prompt_name = m["prompt_name"] + # prompt_obj = self.get_pipeline_run_context().get(prompt_name) + # if prompt_obj: + # span._span.update(prompt=prompt_obj) + span._span.update(usage=m.get("usage") or None, model=m.get("model")) + elif tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS: + replies = span._data.get("haystack.component.output", {}).get("replies") + if replies: + meta = replies[0].meta + completion_start_time = meta.get("completion_start_time") + if completion_start_time: + try: + completion_start_time = datetime.fromisoformat(completion_start_time) + except ValueError: + logger.error(f"Failed to parse completion_start_time: {completion_start_time}") + completion_start_time = None + span._span.update( + usage=meta.get("usage") or None, + model=meta.get("model"), + completion_start_time=completion_start_time, + ) + def flush(self): self._tracer.flush()