Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix observability SDK #1716

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions agenta-cli/agenta/sdk/decorators/llm_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ async def wrapper(*args, **kwargs) -> Any:

# Set the configuration and environment of the LLM app parent span at run-time
agenta.tracing.set_span_attribute(
attributes={"config": config_params}, is_parent_span=True
)
agenta.tracing.set_span_attribute(
attributes={"environment": "playground"}, is_parent_span=True
{"config": config_params, "environment": "playground"}
)

llm_result = await self.execute_function(
Expand All @@ -108,10 +105,7 @@ async def wrapper_deployed(*args, **kwargs) -> Any:

# Set the configuration and environment of the LLM app parent span at run-time
agenta.tracing.set_span_attribute(
attributes={"config": config_params}, is_parent_span=True
)
agenta.tracing.set_span_attribute(
attributes={"environment": kwargs["environment"]}, is_parent_span=True
{"config": config_params, "environment": kwargs["environment"]}
)

llm_result = await self.execute_function(
Expand Down Expand Up @@ -397,11 +391,8 @@ def handle_terminal_run(

# Set the configuration and environment of the LLM app parent span at run-time
agenta.tracing.set_span_attribute(
attributes={"config": agenta.config.all()}, is_parent_span=True
)
agenta.tracing.set_span_attribute(
attributes={"environment": "bash"}, is_parent_span=True
)
{"config": agenta.config.all(), "environment": "bash"}
)

loop = asyncio.get_event_loop()
result = loop.run_until_complete(
Expand Down
6 changes: 2 additions & 4 deletions agenta-cli/agenta/sdk/decorators/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ async def async_wrapper(*args, **kwargs):
self.tracing.end_span(
outputs=(
{"message": result} if not isinstance(result, dict) else result
),
span=span,
)
)
return result

Expand All @@ -85,8 +84,7 @@ def sync_wrapper(*args, **kwargs):
self.tracing.end_span(
outputs=(
{"message": result} if not isinstance(result, dict) else result
),
span=span,
)
)

return async_wrapper if is_coroutine_function else sync_wrapper
120 changes: 47 additions & 73 deletions agenta-cli/agenta/sdk/tracing/llm_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ def __init__(
max_workers if max_workers else 4, logger=llm_logger
)
self.active_span: Optional[CreateSpan] = None
self.active_trace: Optional[CreateSpan] = None
self.recording_trace_id: Union[str, None] = None
self.recorded_spans: List[CreateSpan] = []
self.active_trace_id: Optional[str] = None
self.pending_spans: List[CreateSpan] = []
self.tags: List[str] = []
self.trace_config: Dict[str, Any] = {}
self.trace_config_cache: Dict[str, Any] = {} # used to save the trace configuration before starting the first span
self.span_dict: Dict[str, CreateSpan] = {} # type: ignore

@property
Expand All @@ -94,35 +93,14 @@ def client(self) -> AsyncObservabilityClient:

def set_span_attribute(
self,
parent_key: Optional[str] = None,
attributes: Dict[str, Any] = {},
is_parent_span: bool = False,
):
if is_parent_span:
if self.active_span is None: # This is the case where entrypoint wants to save the trace information but the parent span has not been initialized yet
for key, value in attributes.items():
self.trace_config[key] = value
return

# set the span attributes
span = self.span_dict[self.active_span.id] # type: ignore
for key, value in attributes.items():
self.set_attribute(span.attributes, key, value, parent_key) # type: ignore
return

def set_attribute(
self,
span_attributes: Dict[str, Any],
key: str,
value: Any,
parent_key: Optional[str] = None,
):
if parent_key is not None:
model_config = span_attributes.get(parent_key, None)
if not model_config:
span_attributes[parent_key] = {}
span_attributes[parent_key][key] = value
self.trace_config_cache[key] = value
else:
span_attributes[key] = value
for key, value in attributes.items():
self.active_span.attributes[key] = value

def set_trace_tags(self, tags: List[str]):
self.tags.extend(tags)
Expand All @@ -147,11 +125,6 @@ def start_span(
variant_id=self.variant_id,
variant_name=self.variant_name,
config=config,
environment=(
self.active_trace.environment
if self.active_trace
else os.environ.get("AGENTA_LLM_RUN_ENVIRONMENT", "unset")
),
spankind=spankind.upper(),
attributes={},
status=SpanStatusCode.UNSET.value,
Expand All @@ -166,73 +139,74 @@ def start_span(
parent_span_id=None,
)

if span.spankind == SpanKind.WORKFLOW.value:
self.active_trace = span
self.environment = (
self.trace_config.get("environment")
if self.trace_config is not None
if self.active_trace_id is None: # This is a parent span
self.active_trace_id = self._create_trace_id()
span.environment = (
self.trace_config_cache.get("environment")
if self.trace_config_cache is not None
else os.environ.get("environment", "unset")
)
self.config = (
self.trace_config.get("config")
if not config and self.trace_config is not None
span.config = (
self.trace_config_cache.get("config")
if not config and self.trace_config_cache is not None
else None
)
self.parent_span_id = span.id
self.recording_trace_id = self._create_trace_id()
else:
self.active_span = span
self.active_span = span
self.span_dict[span.id] = span
span.parent_span_id = (
self.parent_span_id
) # set the parent_span_id to the present span
self.parent_span_id = span.id # update parent_span_id to active span
span.parent_span_id = self.active_span.id
self.span_dict[span.id] = span
self.active_span = span

self.llm_logger.info(f"Recorded span and setting parent_span_id: {span.id}")
return span

def update_span_status(self, span: CreateSpan, value: str):
span.status = value
self.active_span = span

def end_span(self, outputs: Dict[str, Any], span: CreateSpan):
span.end_time = datetime.now(timezone.utc)
span.outputs = [outputs["message"]]
span.cost = outputs.get("cost", None)
span.tokens = outputs.get("usage")
def end_span(self, outputs: Dict[str, Any]):
"""
Ends the active span, if it is a parent span, ends the trace too.
"""
if self.active_span is None:
raise ValueError("There is no active span to end.")
self.active_span.end_time = datetime.now(timezone.utc)
self.active_span.outputs = [outputs.get("message", "")]
self.active_span.cost = outputs.get("cost", None)
self.active_span.tokens = outputs.get("usage", None)

# Push span to list of recorded spans
self.recorded_spans.append(span)
self.pending_spans.append(self.active_span)
self.llm_logger.info(
f"Pushed {span.spankind} span {span.id} to recorded spans."
f"Pushed {self.active_span.spankind} span {self.active_span.id} to recorded spans."
)
if self.active_span.parent_span_id is None:
self.end_trace(parent_span=self.active_span)
else:
self.active_span = self.span_dict[self.active_span.parent_span_id]

# End tracing if spankind is workflow
if span.spankind == SpanKind.WORKFLOW.value:
self.end_recording(span=span)

def end_recording(self, span: CreateSpan):
def end_trace(self, parent_span: CreateSpan):
if self.api_key == "":
return

if not self.active_trace:
if not self.active_trace_id:
raise RuntimeError("No active trace to end.")

self.llm_logger.info("Preparing to send recorded spans for processing.")
self.llm_logger.info(f"Recorded spans => {len(self.recorded_spans)}")
self.llm_logger.info(f"Recorded spans => {len(self.pending_spans)}")
self.tasks_manager.add_task(
self.active_trace.id,
self.active_trace_id,
"trace",
self.client.create_traces(
trace=self.recording_trace_id, spans=self.recorded_spans # type: ignore
trace=self.active_trace_id, spans=self.pending_spans # type: ignore
),
self.client,
)
self.llm_logger.info(
f"Tracing for {span.id} recorded successfully and sent for processing."
f"Tracing for {parent_span.id} recorded successfully and sent for processing."
)
self._clear_recorded_spans()
self._clear_pending_spans()
self.active_trace_id = None
self.active_span = None
self.trace_config_cache.clear()

def _create_trace_id(self) -> str:
"""Creates a unique mongo id for the trace object.
Expand All @@ -252,12 +226,12 @@ def _create_span_id(self) -> str:

return str(ObjectId())

def _clear_recorded_spans(self) -> None:
def _clear_pending_spans(self) -> None:
"""
Clear the list of recorded spans to prepare for next batch processing.
"""

self.recorded_spans = []
self.pending_spans = []
self.llm_logger.info(
f"Cleared all recorded spans from batch: {self.recorded_spans}"
f"Cleared all recorded spans from batch: {self.pending_spans}"
)
2 changes: 1 addition & 1 deletion examples/app_with_observability/app_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def llm_call(prompt):
model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}]
)
ag.tracing.set_span_attribute(
"model_config", {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}
{"model_config":{"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}}
) # translate to {"model_config": {"model": "gpt-3.5-turbo", "temperature": 0.2}}
tokens_usage = chat_completion.usage.dict()
return {
Expand Down
2 changes: 1 addition & 1 deletion examples/app_with_observability/app_nested_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def llm_call(
presence_penalty=presence_penalty,
)
ag.tracing.set_span_attribute(
"model_config", {"model": model, "temperature": temperature}
{"model_config": {"model": model, "temperature": temperature}}
)
tokens_usage = response.usage.dict() # type: ignore
return {
Expand Down
2 changes: 1 addition & 1 deletion examples/app_with_observability/dict_app_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def litellm_call(prompt_system: str, prompt_user: str):
response_format=response_format,
)
ag.tracing.set_span_attribute(
"model_config", {"model": ag.config.model, "temperature": ag.config.temperature}
{"model_config": {"model": ag.config.model, "temperature": ag.config.temperature}}
)
tokens_usage = response.usage.dict()
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def llm_call(prompt):
model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}]
)
ag.tracing.set_span_attribute(
"model_config", {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}
{"model_config": {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}}
) # translates to {"model_config": {"model": "gpt-3.5-turbo", "temperature": 0.2}}
tokens_usage = chat_completion.usage.dict()
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


def hosted_platform_call(content: str):
span = ag.tracing.start_span(
ag.tracing.start_span(
name="gpt3.5-llm-call",
spankind="llm",
input={"content": content},
Expand All @@ -26,7 +26,7 @@ def hosted_platform_call(content: str):
"environment": llm_config["environment"],
},
)
ag.tracing.end_span(outputs=response.json(), span=span)
ag.tracing.end_span(outputs=response.json())
return response.json()


Expand All @@ -38,7 +38,7 @@ def query(content: str):
config=llm_config,
)
response = hosted_platform_call(content=content)
ag.tracing.end_span(outputs=response, span=ag.tracing.active_trace)
ag.tracing.end_span(outputs=response)
return response


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def llm_call(prompt):
model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}]
)
ag.tracing.set_span_attribute(
"model_config", {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}
{"model_config": {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}}
) # translates to {"model_config": {"model": "gpt-3.5-turbo", "temperature": 0.2}}
tokens_usage = chat_completion.usage.dict()
return {
Expand Down
Loading