Skip to content

Commit

Permalink
feat: ArizePhoenixTracer v2 - Enhanced Session Tracking and Flow Orga…
Browse files Browse the repository at this point in the history
…nization (#5336)

Updated ArizePhoenixTracer
  • Loading branch information
ialisaleh authored Dec 18, 2024
1 parent 5167766 commit 82080eb
Showing 1 changed file with 73 additions and 33 deletions.
106 changes: 73 additions & 33 deletions src/backend/base/langflow/services/tracing/arize_phoenix.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from loguru import logger
from openinference.semconv.trace import OpenInferenceMimeTypeValues, SpanAttributes
from opentelemetry.semconv.trace import SpanAttributes as OTELSpanAttributes
from opentelemetry.trace import Span, Status, StatusCode
from opentelemetry.trace import Span, Status, StatusCode, use_span
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from typing_extensions import override

Expand All @@ -34,15 +34,21 @@


class ArizePhoenixTracer(BaseTracer):
flow_name: str
flow_id: str
chat_input_value: str
chat_output_value: str

def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
"""Initializes the ArizePhoenixTracer instance and sets up a root span."""
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
self.flow_name = trace_name.split(" - ")[0]
self.flow_id = trace_name.split(" - ")[-1]
self.chat_input_value = ""
self.chat_output_value = ""

try:
self._ready = self.setup_arize_phoenix()
Expand All @@ -53,12 +59,17 @@ def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id
self.propagator = TraceContextTextMapPropagator()
self.carrier: dict[Any, CarrierT] = {}

with self.tracer.start_as_current_span(
self.root_span = self.tracer.start_span(
name=self.flow_id,
start_time=self._get_current_timestamp(),
) as root_span:
root_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, self.trace_type)
root_span.set_status(Status(StatusCode.OK))
)
self.root_span.set_attribute(SpanAttributes.SESSION_ID, self.flow_id)
self.root_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, self.trace_type)
self.root_span.set_attribute("langflow.project.name", self.project_name)
self.root_span.set_attribute("langflow.flow.name", self.flow_name)
self.root_span.set_attribute("langflow.flow.id", self.flow_id)

with use_span(self.root_span, end_on_exit=False):
self.propagator.inject(carrier=self.carrier)

self.child_spans: dict[str, Span] = {}
Expand Down Expand Up @@ -118,7 +129,8 @@ def setup_arize_phoenix(self) -> bool:
TracerProvider,
)

project_name = self.project_name or self.flow_id
name_without_space = self.flow_name.replace(" ", "-")
project_name = self.project_name if name_without_space == "None" else name_without_space
attributes = {PROJECT_NAME: project_name, "model_id": project_name}
resource = Resource.create(attributes=attributes)
tracer_provider = TracerProvider(resource=resource, verbose=False)
Expand Down Expand Up @@ -187,11 +199,6 @@ def add_trace(
else:
child_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, trace_type)

if "session_id" in inputs and len(inputs["session_id"]) > 0 and inputs["session_id"] != self.flow_id:
child_span.set_attribute(SpanAttributes.SESSION_ID, inputs["session_id"])
else:
child_span.set_attribute(SpanAttributes.SESSION_ID, self.flow_id)

processed_inputs = self._convert_to_arize_phoenix_types(inputs) if inputs else {}
if processed_inputs:
child_span.set_attribute(SpanAttributes.INPUT_VALUE, self._safe_json_dumps(processed_inputs))
Expand All @@ -202,6 +209,12 @@ def add_trace(
for key, value in processed_metadata.items():
child_span.set_attribute(f"{SpanAttributes.METADATA}.{key}", value)

component_name = trace_id.split("-")[0]
if component_name == "ChatInput":
self.chat_input_value = processed_inputs["input_value"]
elif component_name == "ChatOutput":
self.chat_output_value = processed_inputs["input_value"]

self.child_spans[trace_id] = child_span

@override
Expand Down Expand Up @@ -232,28 +245,7 @@ def end_trace(
for key, value in processed_logs.items():
child_span.set_attribute(f"logs.{key}", value)

if error:
error_string = self._error_to_string(error)
child_span.set_status(Status(StatusCode.ERROR, error_string))
child_span.set_attribute("error.message", error_string)

if isinstance(error, Exception):
child_span.record_exception(error)
else:
exception_type = error.__class__.__name__
exception_message = str(error)
if not exception_message:
exception_message = repr(error)
attributes: dict[str, AttributeValue] = {
OTELSpanAttributes.EXCEPTION_TYPE: exception_type,
OTELSpanAttributes.EXCEPTION_MESSAGE: exception_message,
OTELSpanAttributes.EXCEPTION_ESCAPED: False,
OTELSpanAttributes.EXCEPTION_STACKTRACE: error_string,
}
child_span.add_event(name="exception", attributes=attributes)
else:
child_span.set_status(Status(StatusCode.OK))

self._set_span_status(child_span, error)
child_span.end(end_time=self._get_current_timestamp())
self.child_spans.pop(trace_id)

Expand All @@ -269,6 +261,30 @@ def end(
if not self._ready:
return

if self.root_span:
self.root_span.set_attribute(SpanAttributes.INPUT_VALUE, self.chat_input_value)
self.root_span.set_attribute(SpanAttributes.INPUT_MIME_TYPE, OpenInferenceMimeTypeValues.TEXT.value)
self.root_span.set_attribute(SpanAttributes.OUTPUT_VALUE, self.chat_output_value)
self.root_span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, OpenInferenceMimeTypeValues.TEXT.value)

processed_metadata = self._convert_to_arize_phoenix_types(metadata) if metadata else {}
if processed_metadata:
for key, value in processed_metadata.items():
self.root_span.set_attribute(f"{SpanAttributes.METADATA}.{key}", value)

self._set_span_status(self.root_span, error)
self.root_span.end()

try:
from openinference.instrumentation.langchain import LangChainInstrumentor

LangChainInstrumentor().uninstrument(tracer_provider=self.tracer_provider, skip_dep_check=True)
except ImportError:
logger.exception(
"Could not import LangChainInstrumentor."
"Please install it with `pip install openinference-instrumentation-langchain`."
)

def _convert_to_arize_phoenix_types(self, io_dict: dict[str | Any, Any]) -> dict[str, Any]:
"""Converts data types to Arize/Phoenix compatible formats."""
return {
Expand Down Expand Up @@ -319,6 +335,30 @@ def _safe_json_dumps(self, obj: Any, **kwargs: Any) -> str:
"""A convenience wrapper around `json.dumps` that ensures that any object can be safely encoded."""
return json.dumps(obj, default=str, ensure_ascii=False, **kwargs)

def _set_span_status(self, current_span: Span, error: Exception | None = None):
"""Sets the status and attributes of the current span based on the presence of an error."""
if error:
error_string = self._error_to_string(error)
current_span.set_status(Status(StatusCode.ERROR, error_string))
current_span.set_attribute("error.message", error_string)

if isinstance(error, Exception):
current_span.record_exception(error)
else:
exception_type = error.__class__.__name__
exception_message = str(error)
if not exception_message:
exception_message = repr(error)
attributes: dict[str, AttributeValue] = {
OTELSpanAttributes.EXCEPTION_TYPE: exception_type,
OTELSpanAttributes.EXCEPTION_MESSAGE: exception_message,
OTELSpanAttributes.EXCEPTION_ESCAPED: False,
OTELSpanAttributes.EXCEPTION_STACKTRACE: error_string,
}
current_span.add_event(name="exception", attributes=attributes)
else:
current_span.set_status(Status(StatusCode.OK))

def get_langchain_callback(self) -> BaseCallbackHandler | None:
"""Returns the LangChain callback handler if applicable."""
return None

0 comments on commit 82080eb

Please sign in to comment.