Skip to content

Commit

Permalink
Merge pull request #1716 from Agenta-AI/fix-obs-pr
Browse files Browse the repository at this point in the history
Fix observability SDK
  • Loading branch information
aybruhm authored May 29, 2024
2 parents 9b76c72 + bf561e1 commit 754b4d8
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 98 deletions.
17 changes: 4 additions & 13 deletions agenta-cli/agenta/sdk/decorators/llm_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ async def wrapper(*args, **kwargs) -> Any:

# Set the configuration and environment of the LLM app parent span at run-time
agenta.tracing.set_span_attribute(
attributes={"config": config_params}, is_parent_span=True
)
agenta.tracing.set_span_attribute(
attributes={"environment": "playground"}, is_parent_span=True
{"config": config_params, "environment": "playground"}
)

llm_result = await self.execute_function(
Expand All @@ -108,10 +105,7 @@ async def wrapper_deployed(*args, **kwargs) -> Any:

# Set the configuration and environment of the LLM app parent span at run-time
agenta.tracing.set_span_attribute(
attributes={"config": config_params}, is_parent_span=True
)
agenta.tracing.set_span_attribute(
attributes={"environment": kwargs["environment"]}, is_parent_span=True
{"config": config_params, "environment": kwargs["environment"]}
)

llm_result = await self.execute_function(
Expand Down Expand Up @@ -397,11 +391,8 @@ def handle_terminal_run(

# Set the configuration and environment of the LLM app parent span at run-time
agenta.tracing.set_span_attribute(
attributes={"config": agenta.config.all()}, is_parent_span=True
)
agenta.tracing.set_span_attribute(
attributes={"environment": "bash"}, is_parent_span=True
)
{"config": agenta.config.all(), "environment": "bash"}
)

loop = asyncio.get_event_loop()
result = loop.run_until_complete(
Expand Down
6 changes: 2 additions & 4 deletions agenta-cli/agenta/sdk/decorators/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ async def async_wrapper(*args, **kwargs):
self.tracing.end_span(
outputs=(
{"message": result} if not isinstance(result, dict) else result
),
span=span,
)
)
return result

Expand All @@ -85,8 +84,7 @@ def sync_wrapper(*args, **kwargs):
self.tracing.end_span(
outputs=(
{"message": result} if not isinstance(result, dict) else result
),
span=span,
)
)

return async_wrapper if is_coroutine_function else sync_wrapper
120 changes: 47 additions & 73 deletions agenta-cli/agenta/sdk/tracing/llm_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ def __init__(
max_workers if max_workers else 4, logger=llm_logger
)
self.active_span: Optional[CreateSpan] = None
self.active_trace: Optional[CreateSpan] = None
self.recording_trace_id: Union[str, None] = None
self.recorded_spans: List[CreateSpan] = []
self.active_trace_id: Optional[str] = None
self.pending_spans: List[CreateSpan] = []
self.tags: List[str] = []
self.trace_config: Dict[str, Any] = {}
self.trace_config_cache: Dict[str, Any] = {} # used to save the trace configuration before starting the first span
self.span_dict: Dict[str, CreateSpan] = {} # type: ignore

@property
Expand All @@ -94,35 +93,14 @@ def client(self) -> AsyncObservabilityClient:

def set_span_attribute(
self,
parent_key: Optional[str] = None,
attributes: Dict[str, Any] = {},
is_parent_span: bool = False,
):
if is_parent_span:
if self.active_span is None: # This is the case where entrypoint wants to save the trace information but the parent span has not been initialized yet
for key, value in attributes.items():
self.trace_config[key] = value
return

# set the span attributes
span = self.span_dict[self.active_span.id] # type: ignore
for key, value in attributes.items():
self.set_attribute(span.attributes, key, value, parent_key) # type: ignore
return

def set_attribute(
self,
span_attributes: Dict[str, Any],
key: str,
value: Any,
parent_key: Optional[str] = None,
):
if parent_key is not None:
model_config = span_attributes.get(parent_key, None)
if not model_config:
span_attributes[parent_key] = {}
span_attributes[parent_key][key] = value
self.trace_config_cache[key] = value
else:
span_attributes[key] = value
for key, value in attributes.items():
self.active_span.attributes[key] = value

def set_trace_tags(self, tags: List[str]):
self.tags.extend(tags)
Expand All @@ -147,11 +125,6 @@ def start_span(
variant_id=self.variant_id,
variant_name=self.variant_name,
config=config,
environment=(
self.active_trace.environment
if self.active_trace
else os.environ.get("AGENTA_LLM_RUN_ENVIRONMENT", "unset")
),
spankind=spankind.upper(),
attributes={},
status=SpanStatusCode.UNSET.value,
Expand All @@ -166,73 +139,74 @@ def start_span(
parent_span_id=None,
)

if span.spankind == SpanKind.WORKFLOW.value:
self.active_trace = span
self.environment = (
self.trace_config.get("environment")
if self.trace_config is not None
if self.active_trace_id is None: # This is a parent span
self.active_trace_id = self._create_trace_id()
span.environment = (
self.trace_config_cache.get("environment")
if self.trace_config_cache is not None
else os.environ.get("environment", "unset")
)
self.config = (
self.trace_config.get("config")
if not config and self.trace_config is not None
span.config = (
self.trace_config_cache.get("config")
if not config and self.trace_config_cache is not None
else None
)
self.parent_span_id = span.id
self.recording_trace_id = self._create_trace_id()
else:
self.active_span = span
self.active_span = span
self.span_dict[span.id] = span
span.parent_span_id = (
self.parent_span_id
) # set the parent_span_id to the present span
self.parent_span_id = span.id # update parent_span_id to active span
span.parent_span_id = self.active_span.id
self.span_dict[span.id] = span
self.active_span = span

self.llm_logger.info(f"Recorded span and setting parent_span_id: {span.id}")
return span

def update_span_status(self, span: CreateSpan, value: str):
span.status = value
self.active_span = span

def end_span(self, outputs: Dict[str, Any], span: CreateSpan):
span.end_time = datetime.now(timezone.utc)
span.outputs = [outputs["message"]]
span.cost = outputs.get("cost", None)
span.tokens = outputs.get("usage")
def end_span(self, outputs: Dict[str, Any]):
"""
Ends the active span, if it is a parent span, ends the trace too.
"""
if self.active_span is None:
raise ValueError("There is no active span to end.")
self.active_span.end_time = datetime.now(timezone.utc)
self.active_span.outputs = [outputs.get("message", "")]
self.active_span.cost = outputs.get("cost", None)
self.active_span.tokens = outputs.get("usage", None)

# Push span to list of recorded spans
self.recorded_spans.append(span)
self.pending_spans.append(self.active_span)
self.llm_logger.info(
f"Pushed {span.spankind} span {span.id} to recorded spans."
f"Pushed {self.active_span.spankind} span {self.active_span.id} to recorded spans."
)
if self.active_span.parent_span_id is None:
self.end_trace(parent_span=self.active_span)
else:
self.active_span = self.span_dict[self.active_span.parent_span_id]

# End tracing if spankind is workflow
if span.spankind == SpanKind.WORKFLOW.value:
self.end_recording(span=span)

def end_recording(self, span: CreateSpan):
def end_trace(self, parent_span: CreateSpan):
if self.api_key == "":
return

if not self.active_trace:
if not self.active_trace_id:
raise RuntimeError("No active trace to end.")

self.llm_logger.info("Preparing to send recorded spans for processing.")
self.llm_logger.info(f"Recorded spans => {len(self.recorded_spans)}")
self.llm_logger.info(f"Recorded spans => {len(self.pending_spans)}")
self.tasks_manager.add_task(
self.active_trace.id,
self.active_trace_id,
"trace",
self.client.create_traces(
trace=self.recording_trace_id, spans=self.recorded_spans # type: ignore
trace=self.active_trace_id, spans=self.pending_spans # type: ignore
),
self.client,
)
self.llm_logger.info(
f"Tracing for {span.id} recorded successfully and sent for processing."
f"Tracing for {parent_span.id} recorded successfully and sent for processing."
)
self._clear_recorded_spans()
self._clear_pending_spans()
self.active_trace_id = None
self.active_span = None
self.trace_config_cache.clear()

def _create_trace_id(self) -> str:
"""Creates a unique mongo id for the trace object.
Expand All @@ -252,12 +226,12 @@ def _create_span_id(self) -> str:

return str(ObjectId())

def _clear_recorded_spans(self) -> None:
def _clear_pending_spans(self) -> None:
"""
Clear the list of recorded spans to prepare for next batch processing.
"""

self.recorded_spans = []
self.pending_spans = []
self.llm_logger.info(
f"Cleared all recorded spans from batch: {self.recorded_spans}"
f"Cleared all recorded spans from batch: {self.pending_spans}"
)
2 changes: 1 addition & 1 deletion examples/app_with_observability/app_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def llm_call(prompt):
model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}]
)
ag.tracing.set_span_attribute(
"model_config", {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}
{"model_config":{"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}}
) # translate to {"model_config": {"model": "gpt-3.5-turbo", "temperature": 0.2}}
tokens_usage = chat_completion.usage.dict()
return {
Expand Down
2 changes: 1 addition & 1 deletion examples/app_with_observability/app_nested_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def llm_call(
presence_penalty=presence_penalty,
)
ag.tracing.set_span_attribute(
"model_config", {"model": model, "temperature": temperature}
{"model_config": {"model": model, "temperature": temperature}}
)
tokens_usage = response.usage.dict() # type: ignore
return {
Expand Down
2 changes: 1 addition & 1 deletion examples/app_with_observability/dict_app_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def litellm_call(prompt_system: str, prompt_user: str):
response_format=response_format,
)
ag.tracing.set_span_attribute(
"model_config", {"model": ag.config.model, "temperature": ag.config.temperature}
{"model_config": {"model": ag.config.model, "temperature": ag.config.temperature}}
)
tokens_usage = response.usage.dict()
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def llm_call(prompt):
model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}]
)
ag.tracing.set_span_attribute(
"model_config", {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}
{"model_config": {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}}
) # translates to {"model_config": {"model": "gpt-3.5-turbo", "temperature": 0.2}}
tokens_usage = chat_completion.usage.dict()
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


def hosted_platform_call(content: str):
span = ag.tracing.start_span(
ag.tracing.start_span(
name="gpt3.5-llm-call",
spankind="llm",
input={"content": content},
Expand All @@ -26,7 +26,7 @@ def hosted_platform_call(content: str):
"environment": llm_config["environment"],
},
)
ag.tracing.end_span(outputs=response.json(), span=span)
ag.tracing.end_span(outputs=response.json())
return response.json()


Expand All @@ -38,7 +38,7 @@ def query(content: str):
config=llm_config,
)
response = hosted_platform_call(content=content)
ag.tracing.end_span(outputs=response, span=ag.tracing.active_trace)
ag.tracing.end_span(outputs=response)
return response


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def llm_call(prompt):
model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}]
)
ag.tracing.set_span_attribute(
"model_config", {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}
{"model_config": {"model": "gpt-3.5-turbo", "temperature": ag.config.temperature}}
) # translates to {"model_config": {"model": "gpt-3.5-turbo", "temperature": 0.2}}
tokens_usage = chat_completion.usage.dict()
return {
Expand Down

0 comments on commit 754b4d8

Please sign in to comment.