Skip to content

Commit

Permalink
Merge pull request #2204 from Agenta-AI/feature/observability-analytics
Browse files Browse the repository at this point in the history
[Feature] Observability Analytics (MongoDB replacement)
  • Loading branch information
jp-agenta authored Nov 19, 2024
2 parents 4b66d82 + 406a3be commit 64d69c3
Show file tree
Hide file tree
Showing 9 changed files with 766 additions and 14 deletions.
31 changes: 31 additions & 0 deletions agenta-backend/agenta_backend/apis/fastapi/observability/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import List, Optional
from datetime import datetime

from pydantic import BaseModel

from agenta_backend.apis.fastapi.shared.models import VersionedModel
Expand All @@ -8,6 +10,7 @@
SpanDTO,
TreeDTO,
RootDTO,
BucketDTO,
)


Expand Down Expand Up @@ -58,3 +61,31 @@ class AgentaTreesResponse(VersionedModel, AgentaTreesDTO):

class AgentaRootsResponse(VersionedModel, AgentaRootsDTO):
count: Optional[int] = None


class LegacySummary(BaseModel):
total_count: int
failure_rate: float
total_cost: float
avg_cost: float
avg_latency: float
total_tokens: int
avg_tokens: float


class LegacyDataPoint(BaseModel):
timestamp: datetime
success_count: int
failure_count: int
cost: float
latency: float
total_tokens: int


class LegacyAnalyticsResponse(LegacySummary):
data: List[LegacyDataPoint]


class AnalyticsResponse(VersionedModel):
count: Optional[int] = None
buckets: List[BucketDTO]
68 changes: 65 additions & 3 deletions agenta-backend/agenta_backend/apis/fastapi/observability/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from fastapi import APIRouter, Request, Depends, Query, status, HTTPException

from agenta_backend.core.observability.service import ObservabilityService
from agenta_backend.core.observability.dtos import QueryDTO
from agenta_backend.core.observability.dtos import (
QueryDTO,
AnalyticsDTO,
TreeDTO,
RootDTO,
)
from agenta_backend.core.observability.utils import FilteringException

from agenta_backend.apis.fastapi.shared.utils import handle_exceptions
Expand All @@ -13,9 +18,12 @@
)
from agenta_backend.apis.fastapi.observability.utils import (
parse_query_dto,
parse_analytics_dto,
parse_from_otel_span_dto,
parse_to_otel_span_dto,
parse_to_agenta_span_dto,
parse_legacy_analytics_dto,
parse_legacy_analytics,
)
from agenta_backend.apis.fastapi.observability.models import (
CollectStatusResponse,
Expand All @@ -26,8 +34,8 @@
AgentaNodeDTO,
AgentaTreeDTO,
AgentaRootDTO,
TreeDTO,
RootDTO,
LegacyAnalyticsResponse,
AnalyticsResponse,
)


Expand Down Expand Up @@ -85,6 +93,20 @@ def __init__(
response_model_exclude_none=True,
)

self.router.add_api_route(
"/analytics",
self.query_analytics,
methods=["GET"],
operation_id="query_analytics",
summary="Query analytics, with optional grouping, windowing, filtering.",
status_code=status.HTTP_200_OK,
response_model=Union[
LegacyAnalyticsResponse,
AnalyticsResponse,
],
response_model_exclude_none=True,
)

### MUTATIONS

self.router.add_api_route(
Expand Down Expand Up @@ -268,6 +290,46 @@ async def query_traces(
nodes=[AgentaNodeDTO(**span.model_dump()) for span in spans],
)

@handle_exceptions()
async def query_analytics(
self,
request: Request,
analytics_dto: AnalyticsDTO = Depends(parse_analytics_dto),
legacy_analytics_dto: AnalyticsDTO = Depends(parse_legacy_analytics_dto),
format: Literal[ # pylint: disable=W0622
"legacy",
"agenta",
] = Query("agenta"),
):
try:
if legacy_analytics_dto is not None:
analytics_dto = legacy_analytics_dto

bucket_dtos, count = await self.service.analytics(
project_id=UUID(request.state.project_id),
analytics_dto=analytics_dto,
)

if format == "legacy":
data, summary = parse_legacy_analytics(bucket_dtos)

return LegacyAnalyticsResponse(
data=data,
**summary.model_dump(),
)

return AnalyticsResponse(
version=self.VERSION,
count=count,
buckets=bucket_dtos,
)

except FilteringException as e:
raise HTTPException(
status_code=400,
detail=str(e),
) from e

### MUTATIONS

@handle_exceptions()
Expand Down
185 changes: 180 additions & 5 deletions agenta-backend/agenta_backend/apis/fastapi/observability/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
from collections import OrderedDict
from json import loads, JSONDecodeError, dumps
from copy import copy
from datetime import datetime, timedelta, time

from fastapi import Query, HTTPException

from agenta_backend.apis.fastapi.observability.opentelemetry.semconv import CODEX

from agenta_backend.apis.fastapi.observability.models import (
LegacyDataPoint,
LegacySummary,
)

from agenta_backend.core.observability.dtos import (
TimeDTO,
StatusDTO,
Expand All @@ -24,6 +30,7 @@
OTelSpanDTO,
OTelContextDTO,
OTelLinkDTO,
BucketDTO,
NodeType,
)
from agenta_backend.core.observability.dtos import (
Expand All @@ -32,20 +39,23 @@
FilteringDTO,
PaginationDTO,
QueryDTO,
AnalyticsDTO,
ConditionDTO,
)


# --- PARSE QUERY DTO ---
# --- PARSE QUERY / ANALYTICS DTO ---


def _parse_windowing(
oldest: Optional[str] = None,
newest: Optional[str] = None,
window: Optional[int] = None,
) -> Optional[WindowingDTO]:
_windowing = None

if oldest or newest:
_windowing = WindowingDTO(oldest=oldest, newest=newest)
_windowing = WindowingDTO(oldest=oldest, newest=newest, window=window)

return _windowing

Expand Down Expand Up @@ -88,9 +98,6 @@ def _parse_pagination(
) -> Optional[PaginationDTO]:
_pagination = None

print("---------------------------------")
print(page, size, next, stop)

if page and next:
raise HTTPException(
status_code=400,
Expand Down Expand Up @@ -145,6 +152,26 @@ def parse_query_dto(
)


def parse_analytics_dto(
# GROUPING
# - Option 2: Flat query parameters
focus: Optional[str] = Query(None),
# WINDOWING
# - Option 2: Flat query parameters
oldest: Optional[str] = Query(None),
newest: Optional[str] = Query(None),
window: Optional[int] = Query(None),
# FILTERING
# - Option 1: Single query parameter as JSON
filtering: Optional[str] = Query(None),
) -> AnalyticsDTO:
return AnalyticsDTO(
grouping=_parse_grouping(focus=focus),
windowing=_parse_windowing(oldest=oldest, newest=newest, window=window),
filtering=_parse_filtering(filtering=filtering),
)


# --- PARSE SPAN DTO ---


Expand Down Expand Up @@ -760,3 +787,151 @@ def parse_to_agenta_span_dto(
# ----------------------

return span_dto


# --- PARSE LEGACY ANALYTICS ---


def _parse_time_range(
window_text: str,
) -> Tuple[datetime, datetime, int]:
quantity, unit = window_text.split("_")
quantity = int(quantity)

today = datetime.now()
newest = datetime.combine(today.date(), time.max)

if unit == "hours":
oldest = newest - timedelta(hours=quantity)
window = 60 # 1 hour
return newest, oldest, window

elif unit == "days":
oldest = newest - timedelta(days=quantity)
window = 1440 # 1 day
return newest, oldest, window

else:
raise ValueError(f"Unknown time unit: {unit}")


def parse_legacy_analytics_dto(
timeRange: Optional[str] = Query(None), # pylint: disable=invalid-name
app_id: Optional[str] = Query(None),
environment: Optional[str] = Query(None),
variant: Optional[str] = Query(None),
) -> Optional[AnalyticsDTO]:
if not timeRange and not environment and not variant:
return None

print("timeRange: ", timeRange)
print("app_id: ", app_id)
print("environment: ", environment)
print("variant: ", variant)

application_condition = None
environment_condition = None
variant_condition = None
filtering = None

if app_id:
application_condition = ConditionDTO(
key="refs.application.id", # ID ?
operator="is",
value=app_id,
)

if environment:
environment_condition = ConditionDTO(
key="refs.environment.slug", # SLUG ?
operator="is",
value=environment,
)

if variant:
variant_condition = ConditionDTO(
key="refs.variant.id", # ID ?
operator="is",
value=variant,
)

if application_condition or environment_condition or variant_condition:
filtering = FilteringDTO(
conditions=[
condition
for condition in [
application_condition,
environment_condition,
variant_condition,
]
if condition
]
)

windowing = None

if timeRange:
newest, oldest, window = _parse_time_range(timeRange)

print("newest: ", newest)
print("oldest: ", oldest)
print("window: ", window)

windowing = WindowingDTO(newest=newest, oldest=oldest, window=window)

grouping = GroupingDTO(focus="tree")

return AnalyticsDTO(
grouping=grouping,
windowing=windowing,
filtering=filtering,
)


def parse_legacy_analytics(
bucket_dtos: List[BucketDTO],
) -> Tuple[List[LegacyDataPoint], LegacySummary]:
data_points = list()

total_failure = 0
total_latency = 0.0

summary = LegacySummary(
total_count=0,
failure_rate=0.0,
total_cost=0.0,
avg_cost=0.0,
avg_latency=0.0,
total_tokens=0,
avg_tokens=0.0,
)

for bucket_dto in bucket_dtos:
data_point = LegacyDataPoint(
timestamp=bucket_dto.timestamp,
success_count=bucket_dto.total.count - bucket_dto.error.count,
failure_count=bucket_dto.error.count,
cost=bucket_dto.total.cost,
latency=bucket_dto.total.duration,
total_tokens=bucket_dto.total.tokens,
)

data_points.append(data_point)

summary.total_count += bucket_dto.total.count if bucket_dto.total.count else 0
summary.total_cost += bucket_dto.total.cost if bucket_dto.total.cost else 0.0
summary.total_tokens += (
bucket_dto.total.tokens if bucket_dto.total.tokens else 0
)

total_failure += bucket_dto.error.count if bucket_dto.error.count else 0
total_latency += bucket_dto.total.duration if bucket_dto.total.duration else 0.0

if summary.total_count:
summary.failure_rate = (total_failure / summary.total_count) * 100

summary.avg_cost = summary.total_cost / summary.total_count
summary.avg_latency = (total_latency / summary.total_count) / 1_000
summary.avg_tokens = summary.total_tokens / summary.total_count

return data_points, summary
Loading

0 comments on commit 64d69c3

Please sign in to comment.