From 0640d2b3338d24ce0e88f1d0e5da1032b6674182 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Tue, 5 Nov 2024 14:45:40 +0100 Subject: [PATCH 01/17] WIP add analytics --- .../apis/fastapi/observability/models.py | 6 + .../apis/fastapi/observability/router.py | 38 ++- .../apis/fastapi/observability/utils.py | 27 +- .../agenta_backend/core/observability/dtos.py | 22 ++ .../core/observability/interfaces.py | 17 +- .../core/observability/service.py | 23 +- .../dbs/postgres/observability/dao.py | 256 +++++++++++++++++- agenta-cli/agenta/sdk/tracing/inline.py | 2 +- 8 files changed, 379 insertions(+), 12 deletions(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py index 46da4214e7..8d3c10c055 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py @@ -8,6 +8,7 @@ SpanDTO, TreeDTO, RootDTO, + BucketDTO, ) @@ -58,3 +59,8 @@ class AgentaTreesResponse(VersionedModel, AgentaTreesDTO): class AgentaRootsResponse(VersionedModel, AgentaRootsDTO): count: Optional[int] = None + + +class AnalyticsResponse(VersionedModel): + width: int + buckets: List[BucketDTO] diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py index 72e8a2c7f6..ecb51321d6 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py @@ -4,7 +4,7 @@ from fastapi import APIRouter, Request, Depends, Query, status, HTTPException from agenta_backend.core.observability.service import ObservabilityService -from agenta_backend.core.observability.dtos import QueryDTO +from agenta_backend.core.observability.dtos import QueryDTO, AnalyticsDTO from agenta_backend.core.observability.utils import FilteringException from agenta_backend.apis.fastapi.shared.utils import handle_exceptions @@ -13,6 +13,7 @@ ) from agenta_backend.apis.fastapi.observability.utils import ( parse_query_dto, + parse_analytics_dto, parse_from_otel_span_dto, parse_to_otel_span_dto, parse_to_agenta_span_dto, @@ -23,6 +24,7 @@ AgentaNodesResponse, AgentaTreesResponse, AgentaRootsResponse, + AnalyticsResponse, AgentaNodeDTO, AgentaTreeDTO, AgentaRootDTO, @@ -85,6 +87,17 @@ def __init__( response_model_exclude_none=True, ) + self.router.add_api_route( + "/analytics", + self.query_analytics, + methods=["GET"], + operation_id="query_analytics", + summary="Query analytics, with optional grouping, windowing, filtering.", + status_code=status.HTTP_200_OK, + response_model=AnalyticsResponse, + response_model_exclude_none=True, + ) + ### MUTATIONS self.router.add_api_route( @@ -268,6 +281,29 @@ async def query_traces( nodes=[AgentaNodeDTO(**span.model_dump()) for span in spans], ) + @handle_exceptions() + async def query_analytics( + self, + request: Request, + analytics_dto: AnalyticsDTO = Depends(parse_analytics_dto), + ): + try: + bucket_dtos, width = await self.service.analytics( + project_id=UUID(request.state.project_id), + analytics_dto=analytics_dto, + ) + except FilteringException as e: + raise HTTPException( + status_code=400, + detail=str(e), + ) from e + + return AnalyticsResponse( + version=self.VERSION, + width=width, + buckets=bucket_dtos, + ) + ### MUTATIONS @handle_exceptions() diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index d839971eea..7cd545b41e 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -31,6 +31,7 @@ FilteringDTO, PaginationDTO, QueryDTO, + AnalyticsDTO, ) @@ -40,11 +41,12 @@ def _parse_windowing( oldest: Optional[str] = None, newest: Optional[str] = None, + bucket: Optional[int] = None, ) -> Optional[WindowingDTO]: _windowing = None if oldest or newest: - _windowing = WindowingDTO(oldest=oldest, newest=newest) + _windowing = WindowingDTO(oldest=oldest, newest=newest, bucket=bucket) return _windowing @@ -87,9 +89,6 @@ def _parse_pagination( ) -> Optional[PaginationDTO]: _pagination = None - print("---------------------------------") - print(page, size, next, stop) - if page and next: raise HTTPException( status_code=400, @@ -144,6 +143,26 @@ def parse_query_dto( ) +def parse_analytics_dto( + # GROUPING + # - Option 2: Flat query parameters + focus: Optional[str] = Query(None), + # WINDOWING + # - Option 2: Flat query parameters + oldest: Optional[str] = Query(None), + newest: Optional[str] = Query(None), + bucket: Optional[int] = Query(None), + # FILTERING + # - Option 1: Single query parameter as JSON + filtering: Optional[str] = Query(None), +) -> AnalyticsDTO: + return AnalyticsDTO( + grouping=_parse_grouping(focus=focus), + windowing=_parse_windowing(oldest=oldest, newest=newest, bucket=bucket), + filtering=_parse_filtering(filtering=filtering), + ) + + # --- PARSE SPAN DTO --- diff --git a/agenta-backend/agenta_backend/core/observability/dtos.py b/agenta-backend/agenta_backend/core/observability/dtos.py index 001a215cb5..611c7c9bee 100644 --- a/agenta-backend/agenta_backend/core/observability/dtos.py +++ b/agenta-backend/agenta_backend/core/observability/dtos.py @@ -229,6 +229,7 @@ class OTelSpanDTO(BaseModel): class WindowingDTO(BaseModel): oldest: Optional[datetime] = None newest: Optional[datetime] = None + bucket: Optional[int] = None class LogicalOperator(Enum): @@ -327,3 +328,24 @@ class QueryDTO(BaseModel): windowing: Optional[WindowingDTO] = None filtering: Optional[FilteringDTO] = None pagination: Optional[PaginationDTO] = None + + +class AnalyticsDTO(BaseModel): + grouping: Optional[GroupingDTO] = None + windowing: Optional[WindowingDTO] = None + filtering: Optional[FilteringDTO] = None + + +class MetricsDTO(BaseModel): + count: int + duration: float + cost: float + tokens: int + + +class BucketDTO(BaseModel): + timestamp: datetime + timespan: int + total: MetricsDTO + error: MetricsDTO + success: MetricsDTO diff --git a/agenta-backend/agenta_backend/core/observability/interfaces.py b/agenta-backend/agenta_backend/core/observability/interfaces.py index 4d608a1d84..a404811f82 100644 --- a/agenta-backend/agenta_backend/core/observability/interfaces.py +++ b/agenta-backend/agenta_backend/core/observability/interfaces.py @@ -1,14 +1,19 @@ from typing import List, Tuple, Optional from uuid import UUID -from agenta_backend.core.observability.dtos import QueryDTO, SpanDTO +from agenta_backend.core.observability.dtos import ( + QueryDTO, + SpanDTO, + AnalyticsDTO, + BucketDTO, +) class ObservabilityDAOInterface: def __init__(self): raise NotImplementedError - # ANALYTICS + # QUERIES async def query( self, @@ -18,6 +23,14 @@ async def query( ) -> Tuple[List[SpanDTO], Optional[int]]: raise NotImplementedError + async def analytics( + self, + *, + project_id: UUID, + analytics_dto: AnalyticsDTO, + ) -> Tuple[List[BucketDTO], Optional[int]]: + raise NotImplementedError + # TRANSACTIONS async def create_one( diff --git a/agenta-backend/agenta_backend/core/observability/service.py b/agenta-backend/agenta_backend/core/observability/service.py index 981fec9e35..59a9a00475 100644 --- a/agenta-backend/agenta_backend/core/observability/service.py +++ b/agenta-backend/agenta_backend/core/observability/service.py @@ -2,7 +2,12 @@ from uuid import UUID from agenta_backend.core.observability.interfaces import ObservabilityDAOInterface -from agenta_backend.core.observability.dtos import QueryDTO, SpanDTO +from agenta_backend.core.observability.dtos import ( + QueryDTO, + AnalyticsDTO, + SpanDTO, + BucketDTO, +) from agenta_backend.core.observability.utils import ( parse_span_dtos_to_span_idx, parse_span_idx_to_span_id_tree, @@ -49,6 +54,22 @@ async def query( return span_dtos, count + async def analytics( + self, + *, + project_id: UUID, + analytics_dto: AnalyticsDTO, + ) -> Tuple[List[BucketDTO], Optional[int]]: + if analytics_dto.filtering: + parse_filtering(analytics_dto.filtering) + + bucket_dtos, width = await self.observability_dao.analytics( + project_id=project_id, + analytics_dto=analytics_dto, + ) + + return bucket_dtos, width + async def ingest( self, *, diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py index fb4302b2ea..3b0c5af12e 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py @@ -1,8 +1,8 @@ from typing import Optional, List, Tuple, Union -from datetime import datetime +from datetime import datetime, timezone, timedelta from uuid import UUID -from sqlalchemy import and_, or_, not_, distinct, Column, func, cast +from sqlalchemy import and_, or_, not_, distinct, Column, func, cast, text from sqlalchemy import TIMESTAMP, Enum, UUID as SQLUUID, Integer, Numeric from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.future import select @@ -16,7 +16,12 @@ ) from agenta_backend.core.observability.interfaces import ObservabilityDAOInterface -from agenta_backend.core.observability.dtos import QueryDTO, SpanDTO +from agenta_backend.core.observability.dtos import ( + QueryDTO, + SpanDTO, + AnalyticsDTO, + BucketDTO, +) from agenta_backend.core.observability.dtos import ( FilteringDTO, ConditionDTO, @@ -37,6 +42,19 @@ _is_string_key, ) +_DEFAULT_TIME_DELTA = timedelta(days=30) +_DEFAULT_BUCKET = "1 hour" +_MAX_ALLOWED_BUCKETS = 1024 +_SUGGESTED_BUCKET_LIST = [ + (1, "1 minute"), + (5, "5 minutes"), + (15, "15 minutes"), + (30, "30 minutes"), + (60, "1 hour"), + (720, "12 hours"), + (1440, "1 day"), +] + class ObservabilityDAO(ObservabilityDAOInterface): def __init__(self): @@ -175,6 +193,238 @@ async def query( "Failed to run query due to non-existent key(s)." ) from e + async def analytics( + self, + *, + project_id: UUID, + analytics_dto: AnalyticsDTO, + ) -> Tuple[List[BucketDTO], Optional[int]]: + try: + async with engine.session() as session: + # WINDOWING + today = datetime.now(timezone.utc) + oldest = None + newest = None + bucket = None + # --------- + if analytics_dto.windowing: + if analytics_dto.windowing.newest: + newest = analytics_dto.windowing.newest + else: + newest = today + + if analytics_dto.windowing.oldest: + if analytics_dto.windowing.oldest > newest: + oldest = newest - _DEFAULT_TIME_DELTA + else: + oldest = analytics_dto.windowing.oldest + else: + oldest = newest - _DEFAULT_TIME_DELTA + + if analytics_dto.windowing.bucket: + _desired_bucket = analytics_dto.windowing.bucket + else: + _desired_bucket = 1 + + _window_minutes = (newest - oldest).total_seconds() // 60 + + _desired_buckets = _window_minutes // _desired_bucket + + bucket = None + + if _desired_buckets > _MAX_ALLOWED_BUCKETS: + for ( + _suggested_minutes, + _suggest_bucket, + ) in _SUGGESTED_BUCKET_LIST: + _suggested_buckets = _window_minutes // _suggested_minutes + + if _suggested_buckets <= _MAX_ALLOWED_BUCKETS: + bucket = _suggest_bucket + break + + if not bucket: + bucket = _SUGGESTED_BUCKET_LIST[-1][1] + + else: + bucket = f"{_desired_bucket} minute{'s' if _desired_bucket > 1 else ''}" + + else: + newest = today + oldest = newest - _DEFAULT_TIME_DELTA + bucket = _DEFAULT_BUCKET + + # --------- + + # BASE QUERY + _count = func.count().label("count") # pylint: disable=not-callable + _duration = func.sum( + cast( + InvocationSpanDBE.metrics["acc.duration.total"], + Numeric, + ) + ).label("duration") + _cost = func.sum( + cast( + InvocationSpanDBE.metrics["acc.costs.total"], + Numeric, + ) + ).label("cost") + _tokens = func.sum( + cast( + InvocationSpanDBE.metrics["acc.tokens.total"], + Integer, + ) + ).label("tokens") + _bucket = func.date_bin( + text(f"'{bucket}'"), + InvocationSpanDBE.created_at, + oldest, + ).label("bucket") + # ---------- + no_error_query = select( + _count, + _duration, + _cost, + _tokens, + _bucket, + ).select_from(InvocationSpanDBE) + error_query = select( + _count, + _duration, + _cost, + _tokens, + _bucket, + ).select_from(InvocationSpanDBE) + # ---------- + + # WINDOWING + no_error_query = no_error_query.filter( + InvocationSpanDBE.created_at >= oldest, + InvocationSpanDBE.created_at < newest, + ) + error_query = error_query.filter( + InvocationSpanDBE.created_at >= oldest, + InvocationSpanDBE.created_at < newest, + ) + # --------- + + # SCOPING + no_error_query = no_error_query.filter_by( + project_id=project_id, + ) + error_query = error_query.filter_by( + project_id=project_id, + ) + # ------- + + # SUCESS / FAILURE + no_error_query = no_error_query.filter( + InvocationSpanDBE.exception.is_(None), + ) + error_query = error_query.filter( + InvocationSpanDBE.exception.isnot(None), + ) + # ---------------- + + # GROUPING + grouping = analytics_dto.grouping + grouping_column: Optional[Column] = None + # -------- + if grouping and grouping.focus.value != "node": + grouping_column = getattr( + InvocationSpanDBE, grouping.focus.value + "_id" + ) + + query = select( + distinct(grouping_column).label("grouping_key"), + InvocationSpanDBE.created_at, + ) + # -------- + + # FILTERING + filtering = analytics_dto.filtering + # --------- + if filtering: + operator = filtering.operator + conditions = filtering.conditions + + query = query.filter( + _combine( + operator, + _filters(conditions), + ) + ) + # --------- + + # SORTING + no_error_query = no_error_query.order_by("bucket").group_by("bucket") + error_query = error_query.order_by("bucket").group_by("bucket") + # ------- + + # DEBUGGING + # TODO: HIDE THIS BEFORE RELEASING + print( + str( + no_error_query.compile( + dialect=postgresql.dialect(), + compile_kwargs={"literal_binds": True}, + ) + ) + ) + print("...") + print( + str( + error_query.compile( + dialect=postgresql.dialect(), + compile_kwargs={"literal_binds": True}, + ) + ) + ) + # --------- + + # QUERY EXECUTION + no_error_buckets = ( + (await session.execute(no_error_query)).scalars().all() + ) + error_buckets = (await session.execute(error_query)).scalars().all() + # --------------- + + print("---") + print(no_error_buckets) + print(error_buckets) + + no_error_buckets = [ + BucketDTO( + bucket=bucket.bucket, + count=bucket.count, + duration=bucket.duration, + cost=bucket.cost, + tokens=bucket.tokens, + ) + for bucket in no_error_buckets + ] + + error_buckets = [ + BucketDTO( + bucket=bucket.bucket, + count=bucket.count, + duration=bucket.duration, + cost=bucket.cost, + tokens=bucket.tokens, + ) + for bucket in error_buckets + ] + + print(no_error_buckets) + print(error_buckets) + + return None # [map_span_dbe_to_dto(span) for span in spans], count + except AttributeError as e: + raise FilteringException( + "Failed to run analytics due to non-existent key(s)." + ) from e + async def create_one( self, *, diff --git a/agenta-cli/agenta/sdk/tracing/inline.py b/agenta-cli/agenta/sdk/tracing/inline.py index 2d03389d76..2efbe3a31f 100644 --- a/agenta-cli/agenta/sdk/tracing/inline.py +++ b/agenta-cli/agenta/sdk/tracing/inline.py @@ -1146,7 +1146,7 @@ def _parse_to_legacy_span(span: SpanDTO) -> CreateSpan: ), # app_id=( - span.refs.get("application.id", "missing-app-id") + span.refs.get("application", {}).get("id") if span.refs else "missing-app-id" ), From 382b034d0d9be6429207aa56f3eb70d6139180b4 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Tue, 5 Nov 2024 22:33:29 +0100 Subject: [PATCH 02/17] fix analytics --- .../apis/fastapi/observability/models.py | 2 +- .../apis/fastapi/observability/router.py | 4 +- .../apis/fastapi/observability/utils.py | 8 +- .../agenta_backend/core/observability/dtos.py | 21 +- .../core/observability/service.py | 4 +- .../dbs/postgres/observability/dao.py | 244 +++++++++++------- .../dbs/postgres/observability/mappings.py | 48 ++++ 7 files changed, 216 insertions(+), 115 deletions(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py index 8d3c10c055..6f87476665 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py @@ -62,5 +62,5 @@ class AgentaRootsResponse(VersionedModel, AgentaRootsDTO): class AnalyticsResponse(VersionedModel): - width: int + count: Optional[int] = None buckets: List[BucketDTO] diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py index ecb51321d6..f51ae9154c 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py @@ -288,7 +288,7 @@ async def query_analytics( analytics_dto: AnalyticsDTO = Depends(parse_analytics_dto), ): try: - bucket_dtos, width = await self.service.analytics( + bucket_dtos, count = await self.service.analytics( project_id=UUID(request.state.project_id), analytics_dto=analytics_dto, ) @@ -300,7 +300,7 @@ async def query_analytics( return AnalyticsResponse( version=self.VERSION, - width=width, + count=count, buckets=bucket_dtos, ) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index 7cd545b41e..e7050f5041 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -41,12 +41,12 @@ def _parse_windowing( oldest: Optional[str] = None, newest: Optional[str] = None, - bucket: Optional[int] = None, + window: Optional[int] = None, ) -> Optional[WindowingDTO]: _windowing = None if oldest or newest: - _windowing = WindowingDTO(oldest=oldest, newest=newest, bucket=bucket) + _windowing = WindowingDTO(oldest=oldest, newest=newest, window=window) return _windowing @@ -151,14 +151,14 @@ def parse_analytics_dto( # - Option 2: Flat query parameters oldest: Optional[str] = Query(None), newest: Optional[str] = Query(None), - bucket: Optional[int] = Query(None), + window: Optional[int] = Query(None), # FILTERING # - Option 1: Single query parameter as JSON filtering: Optional[str] = Query(None), ) -> AnalyticsDTO: return AnalyticsDTO( grouping=_parse_grouping(focus=focus), - windowing=_parse_windowing(oldest=oldest, newest=newest, bucket=bucket), + windowing=_parse_windowing(oldest=oldest, newest=newest, window=window), filtering=_parse_filtering(filtering=filtering), ) diff --git a/agenta-backend/agenta_backend/core/observability/dtos.py b/agenta-backend/agenta_backend/core/observability/dtos.py index 611c7c9bee..cc8e546bad 100644 --- a/agenta-backend/agenta_backend/core/observability/dtos.py +++ b/agenta-backend/agenta_backend/core/observability/dtos.py @@ -229,7 +229,7 @@ class OTelSpanDTO(BaseModel): class WindowingDTO(BaseModel): oldest: Optional[datetime] = None newest: Optional[datetime] = None - bucket: Optional[int] = None + window: Optional[int] = None class LogicalOperator(Enum): @@ -337,15 +337,22 @@ class AnalyticsDTO(BaseModel): class MetricsDTO(BaseModel): - count: int - duration: float - cost: float - tokens: int + count: Optional[int] = 0 + duration: Optional[float] = 0.0 + cost: Optional[float] = 0.0 + tokens: Optional[int] = 0 + + def plus(self, other: "MetricsDTO") -> "MetricsDTO": + self.count += other.count + self.duration += other.duration + self.cost += other.cost + self.tokens += other.tokens + + return self class BucketDTO(BaseModel): timestamp: datetime - timespan: int + window: int total: MetricsDTO error: MetricsDTO - success: MetricsDTO diff --git a/agenta-backend/agenta_backend/core/observability/service.py b/agenta-backend/agenta_backend/core/observability/service.py index 59a9a00475..c326d010ed 100644 --- a/agenta-backend/agenta_backend/core/observability/service.py +++ b/agenta-backend/agenta_backend/core/observability/service.py @@ -63,12 +63,12 @@ async def analytics( if analytics_dto.filtering: parse_filtering(analytics_dto.filtering) - bucket_dtos, width = await self.observability_dao.analytics( + bucket_dtos, count = await self.observability_dao.analytics( project_id=project_id, analytics_dto=analytics_dto, ) - return bucket_dtos, width + return bucket_dtos, count async def ingest( self, diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py index 3b0c5af12e..d16220e67c 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py @@ -1,5 +1,6 @@ from typing import Optional, List, Tuple, Union -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta +from traceback import print_exc from uuid import UUID from sqlalchemy import and_, or_, not_, distinct, Column, func, cast, text @@ -13,6 +14,7 @@ from agenta_backend.dbs.postgres.observability.mappings import ( map_span_dto_to_dbe, map_span_dbe_to_dto, + map_bucket_dbes_to_dtos, ) from agenta_backend.core.observability.interfaces import ObservabilityDAOInterface @@ -43,9 +45,9 @@ ) _DEFAULT_TIME_DELTA = timedelta(days=30) -_DEFAULT_BUCKET = "1 hour" +_DEFAULT_WINDOW = "15 minutes" _MAX_ALLOWED_BUCKETS = 1024 -_SUGGESTED_BUCKET_LIST = [ +_SUGGESTED_BUCKETS_LIST = [ (1, "1 minute"), (5, "5 minutes"), (15, "15 minutes"), @@ -188,7 +190,10 @@ async def query( # --------------- return [map_span_dbe_to_dto(span) for span in spans], count + except AttributeError as e: + print_exc() + raise FilteringException( "Failed to run query due to non-existent key(s)." ) from e @@ -202,10 +207,10 @@ async def analytics( try: async with engine.session() as session: # WINDOWING - today = datetime.now(timezone.utc) + today = datetime.now() oldest = None newest = None - bucket = None + window_text = None # --------- if analytics_dto.windowing: if analytics_dto.windowing.newest: @@ -221,88 +226,124 @@ async def analytics( else: oldest = newest - _DEFAULT_TIME_DELTA - if analytics_dto.windowing.bucket: - _desired_bucket = analytics_dto.windowing.bucket + if analytics_dto.windowing.window: + _desired_window = analytics_dto.windowing.window else: - _desired_bucket = 1 + _desired_window = 15 _window_minutes = (newest - oldest).total_seconds() // 60 - _desired_buckets = _window_minutes // _desired_bucket - - bucket = None + _desired_buckets = _window_minutes // _desired_window if _desired_buckets > _MAX_ALLOWED_BUCKETS: for ( _suggested_minutes, - _suggest_bucket, - ) in _SUGGESTED_BUCKET_LIST: + _suggest_window_text, + ) in _SUGGESTED_BUCKETS_LIST: _suggested_buckets = _window_minutes // _suggested_minutes if _suggested_buckets <= _MAX_ALLOWED_BUCKETS: - bucket = _suggest_bucket + window_text = _suggest_window_text break - if not bucket: - bucket = _SUGGESTED_BUCKET_LIST[-1][1] + if not window_text: + window_text = _SUGGESTED_BUCKETS_LIST[-1][1] else: - bucket = f"{_desired_bucket} minute{'s' if _desired_bucket > 1 else ''}" + window_text = f"{_desired_window} minute{'s' if _desired_window > 1 else ''}" else: newest = today oldest = newest - _DEFAULT_TIME_DELTA - bucket = _DEFAULT_BUCKET + window_text = _DEFAULT_WINDOW # --------- # BASE QUERY _count = func.count().label("count") # pylint: disable=not-callable - _duration = func.sum( - cast( - InvocationSpanDBE.metrics["acc.duration.total"], - Numeric, - ) - ).label("duration") - _cost = func.sum( - cast( - InvocationSpanDBE.metrics["acc.costs.total"], - Numeric, - ) - ).label("cost") - _tokens = func.sum( - cast( - InvocationSpanDBE.metrics["acc.tokens.total"], - Integer, - ) - ).label("tokens") - _bucket = func.date_bin( - text(f"'{bucket}'"), + _duration = None + _cost = None + _tokens = None + _timestamp = func.date_bin( + text(f"'{window_text}'"), InvocationSpanDBE.created_at, oldest, - ).label("bucket") + ).label("timestamp") # ---------- - no_error_query = select( + + # GROUPING + if ( + analytics_dto.grouping + and analytics_dto.grouping.focus.value != "node" + ): + _duration = func.sum( + cast( + InvocationSpanDBE.metrics["acc.duration.total"], + Numeric, + ) + ).label("duration") + _cost = func.sum( + cast( + InvocationSpanDBE.metrics["acc.costs.total"], + Numeric, + ) + ).label("cost") + _tokens = func.sum( + cast( + InvocationSpanDBE.metrics["acc.tokens.total"], + Integer, + ) + ).label("tokens") + elif not analytics_dto.grouping or ( + analytics_dto.grouping + and analytics_dto.grouping.focus.value == "node" + ): + _duration = func.sum( + cast( + InvocationSpanDBE.metrics["unit.duration.total"], + Numeric, + ) + ).label("duration") + _cost = func.sum( + cast( + InvocationSpanDBE.metrics["unit.costs.total"], + Numeric, + ) + ).label("cost") + _tokens = func.sum( + cast( + InvocationSpanDBE.metrics["unit.tokens.total"], + Integer, + ) + ).label("tokens") + else: + raise ValueError("Unknown grouping focus.") + # -------- + + # BASE QUERY + total_query = select( _count, _duration, _cost, _tokens, - _bucket, + _timestamp, ).select_from(InvocationSpanDBE) + error_query = select( _count, _duration, _cost, _tokens, - _bucket, + _timestamp, ).select_from(InvocationSpanDBE) # ---------- # WINDOWING - no_error_query = no_error_query.filter( + total_query = total_query.filter( InvocationSpanDBE.created_at >= oldest, InvocationSpanDBE.created_at < newest, ) + error_query = error_query.filter( InvocationSpanDBE.created_at >= oldest, InvocationSpanDBE.created_at < newest, @@ -310,38 +351,21 @@ async def analytics( # --------- # SCOPING - no_error_query = no_error_query.filter_by( + total_query = total_query.filter_by( project_id=project_id, ) + error_query = error_query.filter_by( project_id=project_id, ) # ------- - # SUCESS / FAILURE - no_error_query = no_error_query.filter( - InvocationSpanDBE.exception.is_(None), - ) + # TOTAL vs ERROR error_query = error_query.filter( InvocationSpanDBE.exception.isnot(None), ) # ---------------- - # GROUPING - grouping = analytics_dto.grouping - grouping_column: Optional[Column] = None - # -------- - if grouping and grouping.focus.value != "node": - grouping_column = getattr( - InvocationSpanDBE, grouping.focus.value + "_id" - ) - - query = select( - distinct(grouping_column).label("grouping_key"), - InvocationSpanDBE.created_at, - ) - # -------- - # FILTERING filtering = analytics_dto.filtering # --------- @@ -349,7 +373,14 @@ async def analytics( operator = filtering.operator conditions = filtering.conditions - query = query.filter( + total_query = total_query.filter( + _combine( + operator, + _filters(conditions), + ) + ) + + error_query = error_query.filter( _combine( operator, _filters(conditions), @@ -357,16 +388,31 @@ async def analytics( ) # --------- + # GROUPING + if ( + analytics_dto.grouping + and analytics_dto.grouping.focus.value != "node" + ): + total_query = total_query.filter_by( + parent_id=None, + ) + + error_query = error_query.filter_by( + parent_id=None, + ) + # -------- + # SORTING - no_error_query = no_error_query.order_by("bucket").group_by("bucket") - error_query = error_query.order_by("bucket").group_by("bucket") + total_query = total_query.group_by("timestamp") + + error_query = error_query.group_by("timestamp") # ------- # DEBUGGING # TODO: HIDE THIS BEFORE RELEASING print( str( - no_error_query.compile( + total_query.compile( dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}, ) @@ -384,43 +430,23 @@ async def analytics( # --------- # QUERY EXECUTION - no_error_buckets = ( - (await session.execute(no_error_query)).scalars().all() - ) - error_buckets = (await session.execute(error_query)).scalars().all() + total_bucket_dbes = (await session.execute(total_query)).all() + error_bucket_dbes = (await session.execute(error_query)).all() # --------------- - print("---") - print(no_error_buckets) - print(error_buckets) - - no_error_buckets = [ - BucketDTO( - bucket=bucket.bucket, - count=bucket.count, - duration=bucket.duration, - cost=bucket.cost, - tokens=bucket.tokens, - ) - for bucket in no_error_buckets - ] - - error_buckets = [ - BucketDTO( - bucket=bucket.bucket, - count=bucket.count, - duration=bucket.duration, - cost=bucket.cost, - tokens=bucket.tokens, - ) - for bucket in error_buckets - ] + window = _to_minutes(window_text) + + bucket_dtos, count = map_bucket_dbes_to_dtos( + total_bucket_dbes=total_bucket_dbes, + error_bucket_dbes=error_bucket_dbes, + window=window, + ) - print(no_error_buckets) - print(error_buckets) + return bucket_dtos, count - return None # [map_span_dbe_to_dto(span) for span in spans], count except AttributeError as e: + print_exc() + raise FilteringException( "Failed to run analytics due to non-existent key(s)." ) from e @@ -804,3 +830,23 @@ def _filters(filtering: FilteringDTO) -> list: _conditions.append(attribute.is_(None)) return _conditions + + +def _to_minutes( + window_text: str, +) -> int: + quantity, unit = window_text.split() + quantity = int(quantity) + + if unit == "minute" or unit == "minutes": + return quantity + elif unit == "hour" or unit == "hours": + return quantity * 60 + elif unit == "day" or unit == "days": + return quantity * 1440 + elif unit == "week" or unit == "weeks": + return quantity * 10080 + elif unit == "month" or unit == "months": + return quantity * 43200 + else: + raise ValueError(f"Unknown time unit: {unit}") diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py index 90554b105f..0868658d33 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py @@ -1,3 +1,4 @@ +from typing import List, Tuple from json import dumps from agenta_backend.core.shared.dtos import LifecycleDTO @@ -11,6 +12,8 @@ ExceptionDTO, OTelExtraDTO, SpanDTO, + MetricsDTO, + BucketDTO, ) from agenta_backend.dbs.postgres.observability.dbes import InvocationSpanDBE @@ -119,3 +122,48 @@ def map_span_dto_to_dbe( ) return span_dbe + + +def map_bucket_dbes_to_dtos( + total_bucket_dbes: List[InvocationSpanDBE], + error_bucket_dbes: List[InvocationSpanDBE], + window: int, +) -> Tuple[List[BucketDTO], int]: + total_metrics = { + bucket.timestamp: MetricsDTO( + count=bucket.count, + duration=bucket.duration, + cost=bucket.cost, + tokens=bucket.tokens, + ) + for bucket in total_bucket_dbes + } + + error_metrics = { + bucket.timestamp: MetricsDTO( + count=bucket.count, + duration=bucket.duration, + cost=bucket.cost, + tokens=bucket.tokens, + ) + for bucket in error_bucket_dbes + } + + total_timestamps = list( + set(list(total_metrics.keys()) + list(error_metrics.keys())) + ) + total_timestamps.sort() + + bucket_dtos = [ + BucketDTO( + timestamp=timestamp, + window=window, + total=total_metrics.get(timestamp, MetricsDTO()), + error=error_metrics.get(timestamp, MetricsDTO()), + ) + for timestamp in total_timestamps + ] + + count = len(bucket_dtos) + + return bucket_dtos, count From b732d0b6fa5e1aab29f80f1bf56200ce6c001095 Mon Sep 17 00:00:00 2001 From: ashrafchowdury Date: Mon, 4 Nov 2024 20:16:27 +0600 Subject: [PATCH 03/17] fix(frontend): observability filter column labels --- agenta-web/src/components/Filters/Filters.tsx | 21 +++++++++++++++++-- .../observability/ObservabilityDashboard.tsx | 4 ++-- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/agenta-web/src/components/Filters/Filters.tsx b/agenta-web/src/components/Filters/Filters.tsx index faea7c2379..c396705b7d 100644 --- a/agenta-web/src/components/Filters/Filters.tsx +++ b/agenta-web/src/components/Filters/Filters.tsx @@ -115,6 +115,19 @@ const Filters: React.FC = ({filterData, columns, onApplyFilter, onClearFi setIsFilterOpen(false) } + const mapColumnLabel = useMemo( + () => + columns.reduce( + (acc, col) => { + acc[col.value] = col.label + return acc + }, + {} as Record, + ), + [columns], + ) + const getColumnLabelFromValue = (key: string) => mapColumnLabel[key] || key + return ( = ({filterData, columns, onApplyFilter, onClearFi labelRender={(label) => !label.value ? "Column" : label.label } - style={{width: 200}} popupMatchSelectWidth={220} + popupClassName="capitalize" + className="capitalize w-[200px]" suffixIcon={} onChange={(value) => onFilterChange({columnName: "key", value, idx}) } - value={item.key} + value={{ + value: item.key, + label: getColumnLabelFromValue(item.key), + }} options={filteredColumns} disabled={item.isPermanent} /> diff --git a/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx b/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx index 8053da96f2..47f2b4aaca 100644 --- a/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx +++ b/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx @@ -170,8 +170,8 @@ const ObservabilityDashboard = () => { }, }, { - title: "Latency", - key: "latency", + title: "Duration", + key: "duration", dataIndex: ["time", "span"], width: 80, onHeaderCell: () => ({ From 78798e2703ad256dd98478f74499fb2c85f981de Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Tue, 5 Nov 2024 17:38:28 +0100 Subject: [PATCH 04/17] fix refs --- .../agenta_backend/apis/fastapi/observability/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index e7050f5041..7cd8c63d79 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -516,7 +516,7 @@ def parse_from_otel_span_dto( exception = _parse_from_events(otel_span_dto) - root_id = refs.get("scenario.id", str(tree.id)) + root_id = refs.get("scenario.id", str(tree.id)) if refs else str(tree.id) root = RootDTO(id=UUID(root_id)) From aac793075a2d9e200117e778bd94dcd72fb53788 Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Tue, 5 Nov 2024 17:39:14 +0100 Subject: [PATCH 05/17] updates to tests --- .../integrations/langchain/requirements.txt | 2 +- .../langchain/stateoftheunion.txt | 723 ++++++++++++++++++ .../integrations/litellm/01_sanity_check.py | 24 + .../integrations/litellm/requirements.txt | 5 + .../integrations/openai/requirements.txt | 2 +- .../sanity_check/requirements.txt | 2 +- 6 files changed, 755 insertions(+), 3 deletions(-) create mode 100644 agenta-cli/tests/observability_sdk/integrations/langchain/stateoftheunion.txt create mode 100644 agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py create mode 100644 agenta-cli/tests/observability_sdk/integrations/litellm/requirements.txt diff --git a/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt b/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt index 6395726368..75d25b8180 100644 --- a/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt +++ b/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt @@ -1,4 +1,4 @@ -agenta==0.27.0a5 +agenta==0.27.0a9 # opentelemetry.instrumentation.openai==0.31.2 openai opentelemetry-instrumentation-langchain diff --git a/agenta-cli/tests/observability_sdk/integrations/langchain/stateoftheunion.txt b/agenta-cli/tests/observability_sdk/integrations/langchain/stateoftheunion.txt new file mode 100644 index 0000000000..d50175de40 --- /dev/null +++ b/agenta-cli/tests/observability_sdk/integrations/langchain/stateoftheunion.txt @@ -0,0 +1,723 @@ +Madam Speaker, Madam Vice President, our First Lady and Second Gentleman. Members of Congress and the Cabinet. Justices of the Supreme Court. My fellow Americans. + +Last year COVID-19 kept us apart. This year we are finally together again. + +Tonight, we meet as Democrats Republicans and Independents. But most importantly as Americans. + +With a duty to one another to the American people to the Constitution. + +And with an unwavering resolve that freedom will always triumph over tyranny. + +Six days ago, Russia’s Vladimir Putin sought to shake the foundations of the free world thinking he could make it bend to his menacing ways. But he badly miscalculated. + +He thought he could roll into Ukraine and the world would roll over. Instead he met a wall of strength he never imagined. + +He met the Ukrainian people. + +From President Zelenskyy to every Ukrainian, their fearlessness, their courage, their determination, inspires the world. + +Groups of citizens blocking tanks with their bodies. Everyone from students to retirees teachers turned soldiers defending their homeland. + +In this struggle as President Zelenskyy said in his speech to the European Parliament “Light will win over darkness.” The Ukrainian Ambassador to the United States is here tonight. + +Let each of us here tonight in this Chamber send an unmistakable signal to Ukraine and to the world. + +Please rise if you are able and show that, Yes, we the United States of America stand with the Ukrainian people. + +Throughout our history we’ve learned this lesson when dictators do not pay a price for their aggression they cause more chaos. + +They keep moving. + +And the costs and the threats to America and the world keep rising. + +That’s why the NATO Alliance was created to secure peace and stability in Europe after World War 2. + +The United States is a member along with 29 other nations. + +It matters. American diplomacy matters. American resolve matters. + +Putin’s latest attack on Ukraine was premeditated and unprovoked. + +He rejected repeated efforts at diplomacy. + +He thought the West and NATO wouldn’t respond. And he thought he could divide us at home. Putin was wrong. We were ready. Here is what we did. + +We prepared extensively and carefully. + +We spent months building a coalition of other freedom-loving nations from Europe and the Americas to Asia and Africa to confront Putin. + +I spent countless hours unifying our European allies. We shared with the world in advance what we knew Putin was planning and precisely how he would try to falsely justify his aggression. + +We countered Russia’s lies with truth. + +And now that he has acted the free world is holding him accountable. + +Along with twenty-seven members of the European Union including France, Germany, Italy, as well as countries like the United Kingdom, Canada, Japan, Korea, Australia, New Zealand, and many others, even Switzerland. + +We are inflicting pain on Russia and supporting the people of Ukraine. Putin is now isolated from the world more than ever. + +Together with our allies –we are right now enforcing powerful economic sanctions. + +We are cutting off Russia’s largest banks from the international financial system. + +Preventing Russia’s central bank from defending the Russian Ruble making Putin’s $630 Billion “war fund” worthless. + +We are choking off Russia’s access to technology that will sap its economic strength and weaken its military for years to come. + +Tonight I say to the Russian oligarchs and corrupt leaders who have bilked billions of dollars off this violent regime no more. + +The U.S. Department of Justice is assembling a dedicated task force to go after the crimes of Russian oligarchs. + +We are joining with our European allies to find and seize your yachts your luxury apartments your private jets. We are coming for your ill-begotten gains. + +And tonight I am announcing that we will join our allies in closing off American air space to all Russian flights – further isolating Russia – and adding an additional squeeze –on their economy. The Ruble has lost 30% of its value. + +The Russian stock market has lost 40% of its value and trading remains suspended. Russia’s economy is reeling and Putin alone is to blame. + +Together with our allies we are providing support to the Ukrainians in their fight for freedom. Military assistance. Economic assistance. Humanitarian assistance. + +We are giving more than $1 Billion in direct assistance to Ukraine. + +And we will continue to aid the Ukrainian people as they defend their country and to help ease their suffering. + +Let me be clear, our forces are not engaged and will not engage in conflict with Russian forces in Ukraine. + +Our forces are not going to Europe to fight in Ukraine, but to defend our NATO Allies – in the event that Putin decides to keep moving west. + +For that purpose we’ve mobilized American ground forces, air squadrons, and ship deployments to protect NATO countries including Poland, Romania, Latvia, Lithuania, and Estonia. + +As I have made crystal clear the United States and our Allies will defend every inch of territory of NATO countries with the full force of our collective power. + +And we remain clear-eyed. The Ukrainians are fighting back with pure courage. But the next few days weeks, months, will be hard on them. + +Putin has unleashed violence and chaos. But while he may make gains on the battlefield – he will pay a continuing high price over the long run. + +And a proud Ukrainian people, who have known 30 years of independence, have repeatedly shown that they will not tolerate anyone who tries to take their country backwards. + +To all Americans, I will be honest with you, as I’ve always promised. A Russian dictator, invading a foreign country, has costs around the world. + +And I’m taking robust action to make sure the pain of our sanctions is targeted at Russia’s economy. And I will use every tool at our disposal to protect American businesses and consumers. + +Tonight, I can announce that the United States has worked with 30 other countries to release 60 Million barrels of oil from reserves around the world. + +America will lead that effort, releasing 30 Million barrels from our own Strategic Petroleum Reserve. And we stand ready to do more if necessary, unified with our allies. + +These steps will help blunt gas prices here at home. And I know the news about what’s happening can seem alarming. + +But I want you to know that we are going to be okay. + +When the history of this era is written Putin’s war on Ukraine will have left Russia weaker and the rest of the world stronger. + +While it shouldn’t have taken something so terrible for people around the world to see what’s at stake now everyone sees it clearly. + +We see the unity among leaders of nations and a more unified Europe a more unified West. And we see unity among the people who are gathering in cities in large crowds around the world even in Russia to demonstrate their support for Ukraine. + +In the battle between democracy and autocracy, democracies are rising to the moment, and the world is clearly choosing the side of peace and security. + +This is a real test. It’s going to take time. So let us continue to draw inspiration from the iron will of the Ukrainian people. + +To our fellow Ukrainian Americans who forge a deep bond that connects our two nations we stand with you. + +Putin may circle Kyiv with tanks, but he will never gain the hearts and souls of the Ukrainian people. + +He will never extinguish their love of freedom. He will never weaken the resolve of the free world. + +We meet tonight in an America that has lived through two of the hardest years this nation has ever faced. + +The pandemic has been punishing. + +And so many families are living paycheck to paycheck, struggling to keep up with the rising cost of food, gas, housing, and so much more. + +I understand. + +I remember when my Dad had to leave our home in Scranton, Pennsylvania to find work. I grew up in a family where if the price of food went up, you felt it. + +That’s why one of the first things I did as President was fight to pass the American Rescue Plan. + +Because people were hurting. We needed to act, and we did. + +Few pieces of legislation have done more in a critical moment in our history to lift us out of crisis. + +It fueled our efforts to vaccinate the nation and combat COVID-19. It delivered immediate economic relief for tens of millions of Americans. + +Helped put food on their table, keep a roof over their heads, and cut the cost of health insurance. + +And as my Dad used to say, it gave people a little breathing room. + +And unlike the $2 Trillion tax cut passed in the previous administration that benefitted the top 1% of Americans, the American Rescue Plan helped working people—and left no one behind. + +And it worked. It created jobs. Lots of jobs. + +In fact—our economy created over 6.5 Million new jobs just last year, more jobs created in one year +than ever before in the history of America. + +Our economy grew at a rate of 5.7% last year, the strongest growth in nearly 40 years, the first step in bringing fundamental change to an economy that hasn’t worked for the working people of this nation for too long. + +For the past 40 years we were told that if we gave tax breaks to those at the very top, the benefits would trickle down to everyone else. + +But that trickle-down theory led to weaker economic growth, lower wages, bigger deficits, and the widest gap between those at the top and everyone else in nearly a century. + +Vice President Harris and I ran for office with a new economic vision for America. + +Invest in America. Educate Americans. Grow the workforce. Build the economy from the bottom up +and the middle out, not from the top down. + +Because we know that when the middle class grows, the poor have a ladder up and the wealthy do very well. + +America used to have the best roads, bridges, and airports on Earth. + +Now our infrastructure is ranked 13th in the world. + +We won’t be able to compete for the jobs of the 21st Century if we don’t fix that. + +That’s why it was so important to pass the Bipartisan Infrastructure Law—the most sweeping investment to rebuild America in history. + +This was a bipartisan effort, and I want to thank the members of both parties who worked to make it happen. + +We’re done talking about infrastructure weeks. + +We’re going to have an infrastructure decade. + +It is going to transform America and put us on a path to win the economic competition of the 21st Century that we face with the rest of the world—particularly with China. + +As I’ve told Xi Jinping, it is never a good bet to bet against the American people. + +We’ll create good jobs for millions of Americans, modernizing roads, airports, ports, and waterways all across America. + +And we’ll do it all to withstand the devastating effects of the climate crisis and promote environmental justice. + +We’ll build a national network of 500,000 electric vehicle charging stations, begin to replace poisonous lead pipes—so every child—and every American—has clean water to drink at home and at school, provide affordable high-speed internet for every American—urban, suburban, rural, and tribal communities. + +4,000 projects have already been announced. + +And tonight, I’m announcing that this year we will start fixing over 65,000 miles of highway and 1,500 bridges in disrepair. + +When we use taxpayer dollars to rebuild America – we are going to Buy American: buy American products to support American jobs. + +The federal government spends about $600 Billion a year to keep the country safe and secure. + +There’s been a law on the books for almost a century +to make sure taxpayers’ dollars support American jobs and businesses. + +Every Administration says they’ll do it, but we are actually doing it. + +We will buy American to make sure everything from the deck of an aircraft carrier to the steel on highway guardrails are made in America. + +But to compete for the best jobs of the future, we also need to level the playing field with China and other competitors. + +That’s why it is so important to pass the Bipartisan Innovation Act sitting in Congress that will make record investments in emerging technologies and American manufacturing. + +Let me give you one example of why it’s so important to pass it. + +If you travel 20 miles east of Columbus, Ohio, you’ll find 1,000 empty acres of land. + +It won’t look like much, but if you stop and look closely, you’ll see a “Field of dreams,” the ground on which America’s future will be built. + +This is where Intel, the American company that helped build Silicon Valley, is going to build its $20 billion semiconductor “mega site”. + +Up to eight state-of-the-art factories in one place. 10,000 new good-paying jobs. + +Some of the most sophisticated manufacturing in the world to make computer chips the size of a fingertip that power the world and our everyday lives. + +Smartphones. The Internet. Technology we have yet to invent. + +But that’s just the beginning. + +Intel’s CEO, Pat Gelsinger, who is here tonight, told me they are ready to increase their investment from +$20 billion to $100 billion. + +That would be one of the biggest investments in manufacturing in American history. + +And all they’re waiting for is for you to pass this bill. + +So let’s not wait any longer. Send it to my desk. I’ll sign it. + +And we will really take off. + +And Intel is not alone. + +There’s something happening in America. + +Just look around and you’ll see an amazing story. + +The rebirth of the pride that comes from stamping products “Made In America.” The revitalization of American manufacturing. + +Companies are choosing to build new factories here, when just a few years ago, they would have built them overseas. + +That’s what is happening. Ford is investing $11 billion to build electric vehicles, creating 11,000 jobs across the country. + +GM is making the largest investment in its history—$7 billion to build electric vehicles, creating 4,000 jobs in Michigan. + +All told, we created 369,000 new manufacturing jobs in America just last year. + +Powered by people I’ve met like JoJo Burgess, from generations of union steelworkers from Pittsburgh, who’s here with us tonight. + +As Ohio Senator Sherrod Brown says, “It’s time to bury the label “Rust Belt.” + +It’s time. + +But with all the bright spots in our economy, record job growth and higher wages, too many families are struggling to keep up with the bills. + +Inflation is robbing them of the gains they might otherwise feel. + +I get it. That’s why my top priority is getting prices under control. + +Look, our economy roared back faster than most predicted, but the pandemic meant that businesses had a hard time hiring enough workers to keep up production in their factories. + +The pandemic also disrupted global supply chains. + +When factories close, it takes longer to make goods and get them from the warehouse to the store, and prices go up. + +Look at cars. + +Last year, there weren’t enough semiconductors to make all the cars that people wanted to buy. + +And guess what, prices of automobiles went up. + +So—we have a choice. + +One way to fight inflation is to drive down wages and make Americans poorer. + +I have a better plan to fight inflation. + +Lower your costs, not your wages. + +Make more cars and semiconductors in America. + +More infrastructure and innovation in America. + +More goods moving faster and cheaper in America. + +More jobs where you can earn a good living in America. + +And instead of relying on foreign supply chains, let’s make it in America. + +Economists call it “increasing the productive capacity of our economy.” + +I call it building a better America. + +My plan to fight inflation will lower your costs and lower the deficit. + +17 Nobel laureates in economics say my plan will ease long-term inflationary pressures. Top business leaders and most Americans support my plan. And here’s the plan: + +First – cut the cost of prescription drugs. Just look at insulin. One in ten Americans has diabetes. In Virginia, I met a 13-year-old boy named Joshua Davis. + +He and his Dad both have Type 1 diabetes, which means they need insulin every day. Insulin costs about $10 a vial to make. + +But drug companies charge families like Joshua and his Dad up to 30 times more. I spoke with Joshua’s mom. + +Imagine what it’s like to look at your child who needs insulin and have no idea how you’re going to pay for it. + +What it does to your dignity, your ability to look your child in the eye, to be the parent you expect to be. + +Joshua is here with us tonight. Yesterday was his birthday. Happy birthday, buddy. + +For Joshua, and for the 200,000 other young people with Type 1 diabetes, let’s cap the cost of insulin at $35 a month so everyone can afford it. + +Drug companies will still do very well. And while we’re at it let Medicare negotiate lower prices for prescription drugs, like the VA already does. + +Look, the American Rescue Plan is helping millions of families on Affordable Care Act plans save $2,400 a year on their health care premiums. Let’s close the coverage gap and make those savings permanent. + +Second – cut energy costs for families an average of $500 a year by combatting climate change. + +Let’s provide investments and tax credits to weatherize your homes and businesses to be energy efficient and you get a tax credit; double America’s clean energy production in solar, wind, and so much more; lower the price of electric vehicles, saving you another $80 a month because you’ll never have to pay at the gas pump again. + +Third – cut the cost of child care. Many families pay up to $14,000 a year for child care per child. + +Middle-class and working families shouldn’t have to pay more than 7% of their income for care of young children. + +My plan will cut the cost in half for most families and help parents, including millions of women, who left the workforce during the pandemic because they couldn’t afford child care, to be able to get back to work. + +My plan doesn’t stop there. It also includes home and long-term care. More affordable housing. And Pre-K for every 3- and 4-year-old. + +All of these will lower costs. + +And under my plan, nobody earning less than $400,000 a year will pay an additional penny in new taxes. Nobody. + +The one thing all Americans agree on is that the tax system is not fair. We have to fix it. + +I’m not looking to punish anyone. But let’s make sure corporations and the wealthiest Americans start paying their fair share. + +Just last year, 55 Fortune 500 corporations earned $40 billion in profits and paid zero dollars in federal income tax. + +That’s simply not fair. That’s why I’ve proposed a 15% minimum tax rate for corporations. + +We got more than 130 countries to agree on a global minimum tax rate so companies can’t get out of paying their taxes at home by shipping jobs and factories overseas. + +That’s why I’ve proposed closing loopholes so the very wealthy don’t pay a lower tax rate than a teacher or a firefighter. + +So that’s my plan. It will grow the economy and lower costs for families. + +So what are we waiting for? Let’s get this done. And while you’re at it, confirm my nominees to the Federal Reserve, which plays a critical role in fighting inflation. + +My plan will not only lower costs to give families a fair shot, it will lower the deficit. + +The previous Administration not only ballooned the deficit with tax cuts for the very wealthy and corporations, it undermined the watchdogs whose job was to keep pandemic relief funds from being wasted. + +But in my administration, the watchdogs have been welcomed back. + +We’re going after the criminals who stole billions in relief money meant for small businesses and millions of Americans. + +And tonight, I’m announcing that the Justice Department will name a chief prosecutor for pandemic fraud. + +By the end of this year, the deficit will be down to less than half what it was before I took office. + +The only president ever to cut the deficit by more than one trillion dollars in a single year. + +Lowering your costs also means demanding more competition. + +I’m a capitalist, but capitalism without competition isn’t capitalism. + +It’s exploitation—and it drives up prices. + +When corporations don’t have to compete, their profits go up, your prices go up, and small businesses and family farmers and ranchers go under. + +We see it happening with ocean carriers moving goods in and out of America. + +During the pandemic, these foreign-owned companies raised prices by as much as 1,000% and made record profits. + +Tonight, I’m announcing a crackdown on these companies overcharging American businesses and consumers. + +And as Wall Street firms take over more nursing homes, quality in those homes has gone down and costs have gone up. + +That ends on my watch. + +Medicare is going to set higher standards for nursing homes and make sure your loved ones get the care they deserve and expect. + +We’ll also cut costs and keep the economy going strong by giving workers a fair shot, provide more training and apprenticeships, hire them based on their skills not degrees. + +Let’s pass the Paycheck Fairness Act and paid leave. + +Raise the minimum wage to $15 an hour and extend the Child Tax Credit, so no one has to raise a family in poverty. + +Let’s increase Pell Grants and increase our historic support of HBCUs, and invest in what Jill—our First Lady who teaches full-time—calls America’s best-kept secret: community colleges. + +And let’s pass the PRO Act when a majority of workers want to form a union—they shouldn’t be stopped. + +When we invest in our workers, when we build the economy from the bottom up and the middle out together, we can do something we haven’t done in a long time: build a better America. + +For more than two years, COVID-19 has impacted every decision in our lives and the life of the nation. + +And I know you’re tired, frustrated, and exhausted. + +But I also know this. + +Because of the progress we’ve made, because of your resilience and the tools we have, tonight I can say +we are moving forward safely, back to more normal routines. + +We’ve reached a new moment in the fight against COVID-19, with severe cases down to a level not seen since last July. + +Just a few days ago, the Centers for Disease Control and Prevention—the CDC—issued new mask guidelines. + +Under these new guidelines, most Americans in most of the country can now be mask free. + +And based on the projections, more of the country will reach that point across the next couple of weeks. + +Thanks to the progress we have made this past year, COVID-19 need no longer control our lives. + +I know some are talking about “living with COVID-19”. Tonight – I say that we will never just accept living with COVID-19. + +We will continue to combat the virus as we do other diseases. And because this is a virus that mutates and spreads, we will stay on guard. + +Here are four common sense steps as we move forward safely. + +First, stay protected with vaccines and treatments. We know how incredibly effective vaccines are. If you’re vaccinated and boosted you have the highest degree of protection. + +We will never give up on vaccinating more Americans. Now, I know parents with kids under 5 are eager to see a vaccine authorized for their children. + +The scientists are working hard to get that done and we’ll be ready with plenty of vaccines when they do. + +We’re also ready with anti-viral treatments. If you get COVID-19, the Pfizer pill reduces your chances of ending up in the hospital by 90%. + +We’ve ordered more of these pills than anyone in the world. And Pfizer is working overtime to get us 1 Million pills this month and more than double that next month. + +And we’re launching the “Test to Treat” initiative so people can get tested at a pharmacy, and if they’re positive, receive antiviral pills on the spot at no cost. + +If you’re immunocompromised or have some other vulnerability, we have treatments and free high-quality masks. + +We’re leaving no one behind or ignoring anyone’s needs as we move forward. + +And on testing, we have made hundreds of millions of tests available for you to order for free. + +Even if you already ordered free tests tonight, I am announcing that you can order more from covidtests.gov starting next week. + +Second – we must prepare for new variants. Over the past year, we’ve gotten much better at detecting new variants. + +If necessary, we’ll be able to deploy new vaccines within 100 days instead of many more months or years. + +And, if Congress provides the funds we need, we’ll have new stockpiles of tests, masks, and pills ready if needed. + +I cannot promise a new variant won’t come. But I can promise you we’ll do everything within our power to be ready if it does. + +Third – we can end the shutdown of schools and businesses. We have the tools we need. + +It’s time for Americans to get back to work and fill our great downtowns again. People working from home can feel safe to begin to return to the office. + +We’re doing that here in the federal government. The vast majority of federal workers will once again work in person. + +Our schools are open. Let’s keep it that way. Our kids need to be in school. + +And with 75% of adult Americans fully vaccinated and hospitalizations down by 77%, most Americans can remove their masks, return to work, stay in the classroom, and move forward safely. + +We achieved this because we provided free vaccines, treatments, tests, and masks. + +Of course, continuing this costs money. + +I will soon send Congress a request. + +The vast majority of Americans have used these tools and may want to again, so I expect Congress to pass it quickly. + +Fourth, we will continue vaccinating the world. + +We’ve sent 475 Million vaccine doses to 112 countries, more than any other nation. + +And we won’t stop. + +We have lost so much to COVID-19. Time with one another. And worst of all, so much loss of life. + +Let’s use this moment to reset. Let’s stop looking at COVID-19 as a partisan dividing line and see it for what it is: A God-awful disease. + +Let’s stop seeing each other as enemies, and start seeing each other for who we really are: Fellow Americans. + +We can’t change how divided we’ve been. But we can change how we move forward—on COVID-19 and other issues we must face together. + +I recently visited the New York City Police Department days after the funerals of Officer Wilbert Mora and his partner, Officer Jason Rivera. + +They were responding to a 9-1-1 call when a man shot and killed them with a stolen gun. + +Officer Mora was 27 years old. + +Officer Rivera was 22. + +Both Dominican Americans who’d grown up on the same streets they later chose to patrol as police officers. + +I spoke with their families and told them that we are forever in debt for their sacrifice, and we will carry on their mission to restore the trust and safety every community deserves. + +I’ve worked on these issues a long time. + +I know what works: Investing in crime preventionand community police officers who’ll walk the beat, who’ll know the neighborhood, and who can restore trust and safety. + +So let’s not abandon our streets. Or choose between safety and equal justice. + +Let’s come together to protect our communities, restore trust, and hold law enforcement accountable. + +That’s why the Justice Department required body cameras, banned chokeholds, and restricted no-knock warrants for its officers. + +That’s why the American Rescue Plan provided $350 Billion that cities, states, and counties can use to hire more police and invest in proven strategies like community violence interruption—trusted messengers breaking the cycle of violence and trauma and giving young people hope. + +We should all agree: The answer is not to Defund the police. The answer is to FUND the police with the resources and training they need to protect our communities. + +I ask Democrats and Republicans alike: Pass my budget and keep our neighborhoods safe. + +And I will keep doing everything in my power to crack down on gun trafficking and ghost guns you can buy online and make at home—they have no serial numbers and can’t be traced. + +And I ask Congress to pass proven measures to reduce gun violence. Pass universal background checks. Why should anyone on a terrorist list be able to purchase a weapon? + +Ban assault weapons and high-capacity magazines. + +Repeal the liability shield that makes gun manufacturers the only industry in America that can’t be sued. + +These laws don’t infringe on the Second Amendment. They save lives. + +The most fundamental right in America is the right to vote – and to have it counted. And it’s under assault. + +In state after state, new laws have been passed, not only to suppress the vote, but to subvert entire elections. + +We cannot let this happen. + +Tonight. I call on the Senate to: Pass the Freedom to Vote Act. Pass the John Lewis Voting Rights Act. And while you’re at it, pass the Disclose Act so Americans can know who is funding our elections. + +Tonight, I’d like to honor someone who has dedicated his life to serve this country: Justice Stephen Breyer—an Army veteran, Constitutional scholar, and retiring Justice of the United States Supreme Court. Justice Breyer, thank you for your service. + +One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. + +And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nation’s top legal minds, who will continue Justice Breyer’s legacy of excellence. + +A former top litigator in private practice. A former federal public defender. And from a family of public school educators and police officers. A consensus builder. Since she’s been nominated, she’s received a broad range of support—from the Fraternal Order of Police to former judges appointed by Democrats and Republicans. + +And if we are to advance liberty and justice, we need to secure the Border and fix the immigration system. + +We can do both. At our border, we’ve installed new technology like cutting-edge scanners to better detect drug smuggling. + +We’ve set up joint patrols with Mexico and Guatemala to catch more human traffickers. + +We’re putting in place dedicated immigration judges so families fleeing persecution and violence can have their cases heard faster. + +We’re securing commitments and supporting partners in South and Central America to host more refugees and secure their own borders. + +We can do all this while keeping lit the torch of liberty that has led generations of immigrants to this land—my forefathers and so many of yours. + +Provide a pathway to citizenship for Dreamers, those on temporary status, farm workers, and essential workers. + +Revise our laws so businesses have the workers they need and families don’t wait decades to reunite. + +It’s not only the right thing to do—it’s the economically smart thing to do. + +That’s why immigration reform is supported by everyone from labor unions to religious leaders to the U.S. Chamber of Commerce. + +Let’s get it done once and for all. + +Advancing liberty and justice also requires protecting the rights of women. + +The constitutional right affirmed in Roe v. Wade—standing precedent for half a century—is under attack as never before. + +If we want to go forward—not backward—we must protect access to health care. Preserve a woman’s right to choose. And let’s continue to advance maternal health care in America. + +And for our LGBTQ+ Americans, let’s finally get the bipartisan Equality Act to my desk. The onslaught of state laws targeting transgender Americans and their families is wrong. + +As I said last year, especially to our younger transgender Americans, I will always have your back as your President, so you can be yourself and reach your God-given potential. + +While it often appears that we never agree, that isn’t true. I signed 80 bipartisan bills into law last year. From preventing government shutdowns to protecting Asian-Americans from still-too-common hate crimes to reforming military justice. + +And soon, we’ll strengthen the Violence Against Women Act that I first wrote three decades ago. It is important for us to show the nation that we can come together and do big things. + +So tonight I’m offering a Unity Agenda for the Nation. Four big things we can do together. + +First, beat the opioid epidemic. + +There is so much we can do. Increase funding for prevention, treatment, harm reduction, and recovery. + +Get rid of outdated rules that stop doctors from prescribing treatments. And stop the flow of illicit drugs by working with state and local law enforcement to go after traffickers. + +If you’re suffering from addiction, know you are not alone. I believe in recovery, and I celebrate the 23 million Americans in recovery. + +Second, let’s take on mental health. Especially among our children, whose lives and education have been turned upside down. + +The American Rescue Plan gave schools money to hire teachers and help students make up for lost learning. + +I urge every parent to make sure your school does just that. And we can all play a part—sign up to be a tutor or a mentor. + +Children were also struggling before the pandemic. Bullying, violence, trauma, and the harms of social media. + +As Frances Haugen, who is here with us tonight, has shown, we must hold social media platforms accountable for the national experiment they’re conducting on our children for profit. + +It’s time to strengthen privacy protections, ban targeted advertising to children, demand tech companies stop collecting personal data on our children. + +And let’s get all Americans the mental health services they need. More people they can turn to for help, and full parity between physical and mental health care. + +Third, support our veterans. + +Veterans are the best of us. + +I’ve always believed that we have a sacred obligation to equip all those we send to war and care for them and their families when they come home. + +My administration is providing assistance with job training and housing, and now helping lower-income veterans get VA care debt-free. + +Our troops in Iraq and Afghanistan faced many dangers. + +One was stationed at bases and breathing in toxic smoke from “burn pits” that incinerated wastes of war—medical and hazard material, jet fuel, and more. + +When they came home, many of the world’s fittest and best trained warriors were never the same. + +Headaches. Numbness. Dizziness. + +A cancer that would put them in a flag-draped coffin. + +I know. + +One of those soldiers was my son Major Beau Biden. + +We don’t know for sure if a burn pit was the cause of his brain cancer, or the diseases of so many of our troops. + +But I’m committed to finding out everything we can. + +Committed to military families like Danielle Robinson from Ohio. + +The widow of Sergeant First Class Heath Robinson. + +He was born a soldier. Army National Guard. Combat medic in Kosovo and Iraq. + +Stationed near Baghdad, just yards from burn pits the size of football fields. + +Heath’s widow Danielle is here with us tonight. They loved going to Ohio State football games. He loved building Legos with their daughter. + +But cancer from prolonged exposure to burn pits ravaged Heath’s lungs and body. + +Danielle says Heath was a fighter to the very end. + +He didn’t know how to stop fighting, and neither did she. + +Through her pain she found purpose to demand we do better. + +Tonight, Danielle—we are. + +The VA is pioneering new ways of linking toxic exposures to diseases, already helping more veterans get benefits. + +And tonight, I’m announcing we’re expanding eligibility to veterans suffering from nine respiratory cancers. + +I’m also calling on Congress: pass a law to make sure veterans devastated by toxic exposures in Iraq and Afghanistan finally get the benefits and comprehensive health care they deserve. + +And fourth, let’s end cancer as we know it. + +This is personal to me and Jill, to Kamala, and to so many of you. + +Cancer is the #2 cause of death in America–second only to heart disease. + +Last month, I announced our plan to supercharge +the Cancer Moonshot that President Obama asked me to lead six years ago. + +Our goal is to cut the cancer death rate by at least 50% over the next 25 years, turn more cancers from death sentences into treatable diseases. + +More support for patients and families. + +To get there, I call on Congress to fund ARPA-H, the Advanced Research Projects Agency for Health. + +It’s based on DARPA—the Defense Department project that led to the Internet, GPS, and so much more. + +ARPA-H will have a singular purpose—to drive breakthroughs in cancer, Alzheimer’s, diabetes, and more. + +A unity agenda for the nation. + +We can do this. + +My fellow Americans—tonight , we have gathered in a sacred space—the citadel of our democracy. + +In this Capitol, generation after generation, Americans have debated great questions amid great strife, and have done great things. + +We have fought for freedom, expanded liberty, defeated totalitarianism and terror. + +And built the strongest, freest, and most prosperous nation the world has ever known. + +Now is the hour. + +Our moment of responsibility. + +Our test of resolve and conscience, of history itself. + +It is in this moment that our character is formed. Our purpose is found. Our future is forged. + +Well I know this nation. + +We will meet the test. + +To protect freedom and liberty, to expand fairness and opportunity. + +We will save democracy. + +As hard as these times have been, I am more optimistic about America today than I have been my whole life. + +Because I see the future that is within our grasp. + +Because I know there is simply nothing beyond our capacity. + +We are the only nation on Earth that has always turned every crisis we have faced into an opportunity. + +The only nation that can be defined by a single word: possibilities. + +So on this night, in our 245th year as a nation, I have come to report on the State of the Union. + +And my report is this: the State of the Union is strong—because you, the American people, are strong. + +We are stronger today than we were a year ago. + +And we will be stronger a year from now than we are today. + +Now is our moment to meet and overcome the challenges of our time. + +And we will, as one people. + +One America. + +The United States of America. + +May God bless you all. May God protect our troops. \ No newline at end of file diff --git a/agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py b/agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py new file mode 100644 index 0000000000..bf93909ed7 --- /dev/null +++ b/agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py @@ -0,0 +1,24 @@ +import litellm +import os +os.environ["OTEL_EXPORTER"]="otlp_http" +os.environ["OTEL_ENDPOINT"]="http://localhost/api/observability/v1/otlp/traces" +AGENTA_APP_ID="0192b441-ab58-7af3-91d0-2f1818690828" +AGENTA_API_KEY="xxx" +os.environ["OTEL_HEADERS"]=f"AG-APP-ID={AGENTA_APP_ID}" + +async def generate_completion(): + litellm.callbacks = ["otel"] + messages = [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Write a short story about AI Engineering."}] + temperature = 0.2 + max_tokens = 100 + chat_completion = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + ) + return chat_completion + +if __name__ == "__main__": + import asyncio + asyncio.run(generate_completion()) \ No newline at end of file diff --git a/agenta-cli/tests/observability_sdk/integrations/litellm/requirements.txt b/agenta-cli/tests/observability_sdk/integrations/litellm/requirements.txt new file mode 100644 index 0000000000..4a264bbf14 --- /dev/null +++ b/agenta-cli/tests/observability_sdk/integrations/litellm/requirements.txt @@ -0,0 +1,5 @@ +opentelemetry-api +opentelemetry-sdk +opentelemetry-exporter-otlp +agenta==0.27.0a9 +litellm \ No newline at end of file diff --git a/agenta-cli/tests/observability_sdk/integrations/openai/requirements.txt b/agenta-cli/tests/observability_sdk/integrations/openai/requirements.txt index 3dd64bb756..296e74c13b 100644 --- a/agenta-cli/tests/observability_sdk/integrations/openai/requirements.txt +++ b/agenta-cli/tests/observability_sdk/integrations/openai/requirements.txt @@ -1,3 +1,3 @@ -agenta==0.27.0a0 +agenta==0.27.0a9 opentelemetry.instrumentation.openai==0.31.2 openai \ No newline at end of file diff --git a/agenta-cli/tests/observability_sdk/sanity_check/requirements.txt b/agenta-cli/tests/observability_sdk/sanity_check/requirements.txt index 2fe0ce82c7..8f749fd51c 100644 --- a/agenta-cli/tests/observability_sdk/sanity_check/requirements.txt +++ b/agenta-cli/tests/observability_sdk/sanity_check/requirements.txt @@ -1 +1 @@ -agenta==0.27.0a5 \ No newline at end of file +agenta==0.27.0a9 \ No newline at end of file From 75251a96e13d0a15e20b60e0b4aa2e248134a20b Mon Sep 17 00:00:00 2001 From: ashrafchowdury Date: Tue, 5 Nov 2024 14:06:59 +0600 Subject: [PATCH 06/17] fix(frontend): replace data property with content for search filter --- .../pages/observability/ObservabilityDashboard.tsx | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx b/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx index 47f2b4aaca..11552149a6 100644 --- a/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx +++ b/agenta-web/src/components/pages/observability/ObservabilityDashboard.tsx @@ -308,7 +308,7 @@ const ObservabilityDashboard = () => { {type: "exists", value: "exception.type", label: "exception type"}, {type: "exists", value: "exception.message", label: "exception message"}, {type: "exists", value: "exception.stacktrace", label: "exception stacktrace"}, - {type: "string", value: "data", label: "data"}, + {type: "string", value: "content", label: "content"}, {type: "number", value: "metrics.acc.duration.total", label: "duration"}, {type: "number", value: "metrics.acc.costs.total", label: "total cost (accumulated)"}, {type: "number", value: "metrics.unit.costs.total", label: "total cost"}, @@ -416,26 +416,26 @@ const ObservabilityDashboard = () => { setSearchQuery(query) if (!query) { - setFilters((prevFilters) => prevFilters.filter((f) => f.key !== "data")) + setFilters((prevFilters) => prevFilters.filter((f) => f.key !== "content")) } } const onSearchQueryApply = () => { if (searchQuery) { - updateFilter({key: "data", operator: "contains", value: searchQuery}) + updateFilter({key: "content", operator: "contains", value: searchQuery}) } } const onSearchClear = () => { - const isSearchFilterExist = filters.some((item) => item.key === "data") + const isSearchFilterExist = filters.some((item) => item.key === "content") if (isSearchFilterExist) { - setFilters((prevFilters) => prevFilters.filter((f) => f.key !== "data")) + setFilters((prevFilters) => prevFilters.filter((f) => f.key !== "content")) } } // Sync searchQuery with filters state useUpdateEffect(() => { - const dataFilter = filters.find((f) => f.key === "data") + const dataFilter = filters.find((f) => f.key === "content") setSearchQuery(dataFilter ? dataFilter.value : "") }, [filters]) From 40dab4919266c0861de379dd19de3bf9f8ee03a6 Mon Sep 17 00:00:00 2001 From: Kaosiso Ezealigo Date: Tue, 5 Nov 2024 15:03:53 +0100 Subject: [PATCH 07/17] fix(frontend): removed action buttons in trace content and improved logic to display input tools --- .../pages/observability/drawer/TraceContent.tsx | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/agenta-web/src/components/pages/observability/drawer/TraceContent.tsx b/agenta-web/src/components/pages/observability/drawer/TraceContent.tsx index d0ea680f68..a81f7d2b0e 100644 --- a/agenta-web/src/components/pages/observability/drawer/TraceContent.tsx +++ b/agenta-web/src/components/pages/observability/drawer/TraceContent.tsx @@ -156,14 +156,14 @@ const TraceContent = ({activeTrace}: TraceContentProps) => { )) : null } else { - return ( + return Array.isArray(values) && values.length > 0 ? ( - ) + ) : null } }, ) @@ -254,7 +254,7 @@ const TraceContent = ({activeTrace}: TraceContentProps) => { {activeTrace.node.name} - + {/* {!activeTrace.parent && activeTrace.refs?.application?.id && ( - + */} From aa6b7ab7a10494f33e71d7d992df767526a86833 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Tue, 5 Nov 2024 15:26:08 +0100 Subject: [PATCH 08/17] Fix traceloop string to json --- .../observability/opentelemetry/semconv.py | 17 ++++++- .../apis/fastapi/observability/utils.py | 12 +++++ agenta-cli/agenta/sdk/tracing/inline.py | 45 +++++++++++++++++++ agenta-cli/pyproject.toml | 2 +- 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/opentelemetry/semconv.py b/agenta-backend/agenta_backend/apis/fastapi/observability/opentelemetry/semconv.py index 9dbb87c66c..24bd9f6134 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/opentelemetry/semconv.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/opentelemetry/semconv.py @@ -1,3 +1,5 @@ +from json import loads + VERSION = "0.4.1" V_0_4_1_ATTRIBUTES_EXACT = [ @@ -27,8 +29,6 @@ ("pinecone.query.top_k", "ag.meta.request.top_k"), ("traceloop.span.kind", "ag.type.node"), ("traceloop.entity.name", "ag.node.name"), - ("traceloop.entity.input", "ag.data.inputs"), - ("traceloop.entity.output", "ag.data.outputs"), # OPENINFERENCE ("output.value", "ag.data.outputs"), ("input.value", "ag.data.inputs"), @@ -50,6 +50,13 @@ ("llm.output_messages", "ag.data.outputs.completion"), ] +V_0_4_1_ATTRIBUTES_DYNAMIC = [ + # OPENLLMETRY + ("traceloop.entity.input", lambda x: ("ag.data.inputs", loads(x).get("inputs"))), + ("traceloop.entity.output", lambda x: ("ag.data.outputs", loads(x).get("outputs"))), +] + + V_0_4_1_MAPS = { "attributes": { "exact": { @@ -60,6 +67,9 @@ "from": {otel: agenta for otel, agenta in V_0_4_1_ATTRIBUTES_PREFIX[::-1]}, "to": {agenta: otel for otel, agenta in V_0_4_1_ATTRIBUTES_PREFIX[::-1]}, }, + "dynamic": { + "from": {otel: agenta for otel, agenta in V_0_4_1_ATTRIBUTES_DYNAMIC[::-1]} + }, }, } V_0_4_1_KEYS = { @@ -72,6 +82,9 @@ "from": list(V_0_4_1_MAPS["attributes"]["prefix"]["from"].keys()), "to": list(V_0_4_1_MAPS["attributes"]["prefix"]["to"].keys()), }, + "dynamic": { + "from": list(V_0_4_1_MAPS["attributes"]["dynamic"]["from"].keys()), + }, }, } diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index 7cd8c63d79..bf71353c19 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -340,6 +340,18 @@ def _parse_from_semconv( del attributes[old_key] + for dynamic_key in CODEX["keys"]["attributes"]["dynamic"]["from"]: + if old_key == dynamic_key: + try: + new_key, new_value = CODEX["maps"]["attributes"]["dynamic"][ + "from" + ][dynamic_key](value) + + attributes[new_key] = new_value + + except: # pylint: disable=bare-except + pass + def _parse_from_links( otel_span_dto: OTelSpanDTO, diff --git a/agenta-cli/agenta/sdk/tracing/inline.py b/agenta-cli/agenta/sdk/tracing/inline.py index 2efbe3a31f..2c094667d1 100644 --- a/agenta-cli/agenta/sdk/tracing/inline.py +++ b/agenta-cli/agenta/sdk/tracing/inline.py @@ -412,9 +412,12 @@ def _connect_tree_dfs( ### apis.fastapi.observability.opentelemetry.semconv ### ### ------------------------------------------------ ### +from json import loads + VERSION = "0.4.1" V_0_4_1_ATTRIBUTES_EXACT = [ + # OPENLLMETRY ("gen_ai.system", "ag.meta.system"), ("gen_ai.request.base_url", "ag.meta.request.base_url"), ("gen_ai.request.endpoint", "ag.meta.request.endpoint"), @@ -439,12 +442,35 @@ def _connect_tree_dfs( ("db.vector.query.top_k", "ag.meta.request.top_k"), ("pinecone.query.top_k", "ag.meta.request.top_k"), ("traceloop.span.kind", "ag.type.node"), + ("traceloop.entity.name", "ag.node.name"), + # OPENINFERENCE + ("output.value", "ag.data.outputs"), + ("input.value", "ag.data.inputs"), + ("embedding.model_name", "ag.meta.request.model"), + ("llm.invocation_parameters", "ag.meta.request"), + ("llm.model_name", "ag.meta.request.model"), + ("llm.provider", "ag.meta.provider"), + ("llm.system", "ag.meta.system"), ] V_0_4_1_ATTRIBUTES_PREFIX = [ + # OPENLLMETRY ("gen_ai.prompt", "ag.data.inputs.prompt"), ("gen_ai.completion", "ag.data.outputs.completion"), + ("llm.request.functions", "ag.data.inputs.functions"), + ("llm.request.tools", "ag.data.inputs.tools"), + # OPENINFERENCE + ("llm.token_count", "ag.metrics.unit.tokens"), + ("llm.input_messages", "ag.data.inputs.prompt"), + ("llm.output_messages", "ag.data.outputs.completion"), +] + +V_0_4_1_ATTRIBUTES_DYNAMIC = [ + # OPENLLMETRY + ("traceloop.entity.input", lambda x: ("ag.data.inputs", loads(x).get("inputs"))), + ("traceloop.entity.output", lambda x: ("ag.data.outputs", loads(x).get("outputs"))), ] + V_0_4_1_MAPS = { "attributes": { "exact": { @@ -455,6 +481,9 @@ def _connect_tree_dfs( "from": {otel: agenta for otel, agenta in V_0_4_1_ATTRIBUTES_PREFIX[::-1]}, "to": {agenta: otel for otel, agenta in V_0_4_1_ATTRIBUTES_PREFIX[::-1]}, }, + "dynamic": { + "from": {otel: agenta for otel, agenta in V_0_4_1_ATTRIBUTES_DYNAMIC[::-1]} + }, }, } V_0_4_1_KEYS = { @@ -467,6 +496,9 @@ def _connect_tree_dfs( "from": list(V_0_4_1_MAPS["attributes"]["prefix"]["from"].keys()), "to": list(V_0_4_1_MAPS["attributes"]["prefix"]["to"].keys()), }, + "dynamic": { + "from": list(V_0_4_1_MAPS["attributes"]["dynamic"]["from"].keys()), + }, }, } @@ -480,6 +512,7 @@ def _connect_tree_dfs( CODEX = {"maps": MAPS[VERSION], "keys": KEYS[VERSION]} + ### ------------------------------------------------ ### ### apis.fastapi.observability.opentelemetry.semconv ### ######################################################## @@ -653,6 +686,18 @@ def _parse_from_semconv( del attributes[old_key] + for dynamic_key in CODEX["keys"]["attributes"]["dynamic"]["from"]: + if old_key == dynamic_key: + try: + new_key, new_value = CODEX["maps"]["attributes"]["dynamic"][ + "from" + ][dynamic_key](value) + + attributes[new_key] = new_value + + except: # pylint: disable=bare-except + pass + def _parse_from_links( otel_span_dto: OTelSpanDTO, diff --git a/agenta-cli/pyproject.toml b/agenta-cli/pyproject.toml index 94754c6412..09f14e9927 100644 --- a/agenta-cli/pyproject.toml +++ b/agenta-cli/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "agenta" -version = "0.27.0a9" +version = "0.27.0a10" description = "The SDK for agenta is an open-source LLMOps platform." readme = "README.md" authors = ["Mahmoud Mabrouk "] From b87fc86bf19ef362fce913dcb814895040db8bd7 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Tue, 5 Nov 2024 23:13:17 +0100 Subject: [PATCH 09/17] fix dependency in langchain tests --- .../observability_sdk/integrations/langchain/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt b/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt index 75d25b8180..84be7f8bc6 100644 --- a/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt +++ b/agenta-cli/tests/observability_sdk/integrations/langchain/requirements.txt @@ -1,4 +1,4 @@ -agenta==0.27.0a9 +agenta==0.27.0a10 # opentelemetry.instrumentation.openai==0.31.2 openai opentelemetry-instrumentation-langchain From b6466ed6bb0ad275fe3e1294487b9c1ac16d57a4 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Tue, 5 Nov 2024 23:15:02 +0100 Subject: [PATCH 10/17] fix legacy analytics --- .../apis/fastapi/observability/models.py | 8 ++ .../apis/fastapi/observability/router.py | 42 +++++++-- .../apis/fastapi/observability/utils.py | 92 ++++++++++++++++--- 3 files changed, 118 insertions(+), 24 deletions(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py index 6f87476665..17820a91e9 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py @@ -3,6 +3,10 @@ from agenta_backend.apis.fastapi.shared.models import VersionedModel +from agenta_backend.apis.fastapi.observability.utils import ( + LegacyDataPoint, + LegacySummary, +) from agenta_backend.core.observability.dtos import ( OTelSpanDTO, SpanDTO, @@ -64,3 +68,7 @@ class AgentaRootsResponse(VersionedModel, AgentaRootsDTO): class AnalyticsResponse(VersionedModel): count: Optional[int] = None buckets: List[BucketDTO] + + +class LegacyAnalyticsResponse(LegacySummary): + data: List[LegacyDataPoint] diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py index f51ae9154c..c87d69dbf9 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py @@ -4,7 +4,12 @@ from fastapi import APIRouter, Request, Depends, Query, status, HTTPException from agenta_backend.core.observability.service import ObservabilityService -from agenta_backend.core.observability.dtos import QueryDTO, AnalyticsDTO +from agenta_backend.core.observability.dtos import ( + QueryDTO, + AnalyticsDTO, + TreeDTO, + RootDTO, +) from agenta_backend.core.observability.utils import FilteringException from agenta_backend.apis.fastapi.shared.utils import handle_exceptions @@ -17,6 +22,7 @@ parse_from_otel_span_dto, parse_to_otel_span_dto, parse_to_agenta_span_dto, + parse_legacy_analytics, ) from agenta_backend.apis.fastapi.observability.models import ( CollectStatusResponse, @@ -28,8 +34,8 @@ AgentaNodeDTO, AgentaTreeDTO, AgentaRootDTO, - TreeDTO, - RootDTO, + LegacyAnalyticsResponse, + AnalyticsResponse, ) @@ -94,7 +100,10 @@ def __init__( operation_id="query_analytics", summary="Query analytics, with optional grouping, windowing, filtering.", status_code=status.HTTP_200_OK, - response_model=AnalyticsResponse, + response_model=Union[ + LegacyAnalyticsResponse, + AnalyticsResponse, + ], response_model_exclude_none=True, ) @@ -286,24 +295,37 @@ async def query_analytics( self, request: Request, analytics_dto: AnalyticsDTO = Depends(parse_analytics_dto), + format: Literal[ # pylint: disable=W0622 + "legacy", + "agenta", + ] = Query("agenta"), ): try: bucket_dtos, count = await self.service.analytics( project_id=UUID(request.state.project_id), analytics_dto=analytics_dto, ) + + if format == "legacy": + data, summary = parse_legacy_analytics(bucket_dtos) + + return LegacyAnalyticsResponse( + data=data, + **summary.model_dump(), + ) + + return AnalyticsResponse( + version=self.VERSION, + count=count, + buckets=bucket_dtos, + ) + except FilteringException as e: raise HTTPException( status_code=400, detail=str(e), ) from e - return AnalyticsResponse( - version=self.VERSION, - count=count, - buckets=bucket_dtos, - ) - ### MUTATIONS @handle_exceptions() diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index bf71353c19..7fa9291a2f 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -3,6 +3,9 @@ from collections import OrderedDict from json import loads, JSONDecodeError, dumps from copy import copy +from datetime import datetime + +from pydantic import BaseModel from fastapi import Query, HTTPException @@ -24,6 +27,8 @@ OTelSpanDTO, OTelContextDTO, OTelLinkDTO, + BucketDTO, + MetricsDTO, ) from agenta_backend.core.observability.dtos import ( GroupingDTO, @@ -35,7 +40,7 @@ ) -# --- PARSE QUERY DTO --- +# --- PARSE QUERY / ANALYTICS DTO --- def _parse_windowing( @@ -340,18 +345,6 @@ def _parse_from_semconv( del attributes[old_key] - for dynamic_key in CODEX["keys"]["attributes"]["dynamic"]["from"]: - if old_key == dynamic_key: - try: - new_key, new_value = CODEX["maps"]["attributes"]["dynamic"][ - "from" - ][dynamic_key](value) - - attributes[new_key] = new_value - - except: # pylint: disable=bare-except - pass - def _parse_from_links( otel_span_dto: OTelSpanDTO, @@ -528,7 +521,7 @@ def parse_from_otel_span_dto( exception = _parse_from_events(otel_span_dto) - root_id = refs.get("scenario.id", str(tree.id)) if refs else str(tree.id) + root_id = refs.get("scenario.id", str(tree.id)) root = RootDTO(id=UUID(root_id)) @@ -774,3 +767,74 @@ def parse_to_agenta_span_dto( # ---------------------- return span_dto + + +# --- PARSE BUCKET DTO --- + + +class LegacySummary(BaseModel): + total_count: int + failure_rate: float + total_cost: float + avg_cost: float + avg_latency: float + total_tokens: int + avg_tokens: float + + +class LegacyDataPoint(BaseModel): + timestamp: datetime + success_count: int + failure_count: int + cost: float + latency: float + total_tokens: int + + +def parse_legacy_analytics( + bucket_dtos: List[BucketDTO], +) -> Tuple[List[LegacyDataPoint], LegacySummary]: + + data_points = list() + + failure_count = 0 + total_latency = 0.0 + + summary = LegacySummary( + total_count=0, + failure_rate=0.0, + total_cost=0.0, + avg_cost=0.0, + avg_latency=0.0, + total_tokens=0, + avg_tokens=0.0, + ) + + for bucket_dto in bucket_dtos: + data_point = LegacyDataPoint( + timestamp=bucket_dto.timestamp, + success_count=bucket_dto.total.count - bucket_dto.error.count, + failure_count=bucket_dto.error.count, + cost=bucket_dto.total.cost, + latency=bucket_dto.total.duration, + total_tokens=bucket_dto.total.tokens, + ) + + data_points.append(data_point) + + summary.total_count += bucket_dto.total.count + summary.total_cost += bucket_dto.total.cost + summary.total_tokens += bucket_dto.total.tokens + + failure_count += bucket_dto.error.count + total_latency += bucket_dto.total.duration + + summary.failure_rate = ( + failure_count / summary.total_count if summary.total_count else 0.0 + ) + + summary.avg_cost = summary.total_cost / summary.total_count + summary.avg_latency = (total_latency / summary.total_count) / 1_000 + summary.avg_tokens = summary.total_tokens / summary.total_count + + return data_points, summary From 06c27dcc47c1773e91b332364114546895ea36a6 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Wed, 6 Nov 2024 10:38:26 +0100 Subject: [PATCH 11/17] Add legacy Query params parser for new analytics --- .../apis/fastapi/observability/models.py | 31 +++- .../apis/fastapi/observability/router.py | 6 +- .../apis/fastapi/observability/utils.py | 145 ++++++++++++++---- .../dbs/postgres/observability/dao.py | 8 +- 4 files changed, 147 insertions(+), 43 deletions(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py index 17820a91e9..2f4a77afe2 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/models.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/models.py @@ -1,12 +1,10 @@ from typing import List, Optional +from datetime import datetime + from pydantic import BaseModel from agenta_backend.apis.fastapi.shared.models import VersionedModel -from agenta_backend.apis.fastapi.observability.utils import ( - LegacyDataPoint, - LegacySummary, -) from agenta_backend.core.observability.dtos import ( OTelSpanDTO, SpanDTO, @@ -65,10 +63,29 @@ class AgentaRootsResponse(VersionedModel, AgentaRootsDTO): count: Optional[int] = None -class AnalyticsResponse(VersionedModel): - count: Optional[int] = None - buckets: List[BucketDTO] +class LegacySummary(BaseModel): + total_count: int + failure_rate: float + total_cost: float + avg_cost: float + avg_latency: float + total_tokens: int + avg_tokens: float + + +class LegacyDataPoint(BaseModel): + timestamp: datetime + success_count: int + failure_count: int + cost: float + latency: float + total_tokens: int class LegacyAnalyticsResponse(LegacySummary): data: List[LegacyDataPoint] + + +class AnalyticsResponse(VersionedModel): + count: Optional[int] = None + buckets: List[BucketDTO] diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py index c87d69dbf9..dec448fddc 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/router.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/router.py @@ -22,6 +22,7 @@ parse_from_otel_span_dto, parse_to_otel_span_dto, parse_to_agenta_span_dto, + parse_legacy_analytics_dto, parse_legacy_analytics, ) from agenta_backend.apis.fastapi.observability.models import ( @@ -30,7 +31,6 @@ AgentaNodesResponse, AgentaTreesResponse, AgentaRootsResponse, - AnalyticsResponse, AgentaNodeDTO, AgentaTreeDTO, AgentaRootDTO, @@ -295,12 +295,16 @@ async def query_analytics( self, request: Request, analytics_dto: AnalyticsDTO = Depends(parse_analytics_dto), + legacy_analytics_dto: AnalyticsDTO = Depends(parse_legacy_analytics_dto), format: Literal[ # pylint: disable=W0622 "legacy", "agenta", ] = Query("agenta"), ): try: + if legacy_analytics_dto is not None: + analytics_dto = legacy_analytics_dto + bucket_dtos, count = await self.service.analytics( project_id=UUID(request.state.project_id), analytics_dto=analytics_dto, diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index 2eab20b1dc..0246d88993 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -3,14 +3,17 @@ from collections import OrderedDict from json import loads, JSONDecodeError, dumps from copy import copy -from datetime import datetime - -from pydantic import BaseModel +from datetime import datetime, timedelta, time from fastapi import Query, HTTPException from agenta_backend.apis.fastapi.observability.opentelemetry.semconv import CODEX +from agenta_backend.apis.fastapi.observability.models import ( + LegacyDataPoint, + LegacySummary, +) + from agenta_backend.core.observability.dtos import ( TimeDTO, StatusDTO, @@ -28,7 +31,6 @@ OTelContextDTO, OTelLinkDTO, BucketDTO, - MetricsDTO, ) from agenta_backend.core.observability.dtos import ( GroupingDTO, @@ -37,6 +39,7 @@ PaginationDTO, QueryDTO, AnalyticsDTO, + ConditionDTO, ) @@ -781,26 +784,103 @@ def parse_to_agenta_span_dto( return span_dto -# --- PARSE BUCKET DTO --- +# --- PARSE LEGACY ANALYTICS --- + + +def _parse_time_range( + window_text: str, +) -> Tuple[datetime, datetime, int]: + quantity, unit = window_text.split("_") + quantity = int(quantity) + + today = datetime.now() + newest = datetime.combine(today.date(), time.max) + + if unit == "hours": + oldest = newest - timedelta(hours=quantity) + window = 60 # 1 hour + return newest, oldest, window + elif unit == "days": + oldest = newest - timedelta(days=quantity) + window = 1440 # 1 day + return newest, oldest, window -class LegacySummary(BaseModel): - total_count: int - failure_rate: float - total_cost: float - avg_cost: float - avg_latency: float - total_tokens: int - avg_tokens: float + else: + raise ValueError(f"Unknown time unit: {unit}") -class LegacyDataPoint(BaseModel): - timestamp: datetime - success_count: int - failure_count: int - cost: float - latency: float - total_tokens: int +def parse_legacy_analytics_dto( + timeRange: Optional[str] = Query(None), # pylint: disable=invalid-name + app_id: Optional[str] = Query(None), + environment: Optional[str] = Query(None), + variant: Optional[str] = Query(None), +) -> Optional[AnalyticsDTO]: + if not timeRange and not environment and not variant: + return None + + print("timeRange: ", timeRange) + print("app_id: ", app_id) + print("environment: ", environment) + print("variant: ", variant) + + application_condition = None + environment_condition = None + variant_condition = None + filtering = None + + if app_id: + application_condition = ConditionDTO( + key="refs.application.id", # ID ? + operator="is", + value=app_id, + ) + + if environment: + environment_condition = ConditionDTO( + key="refs.environment.slug", # SLUG ? + operator="is", + value=environment, + ) + + if variant: + variant_condition = ConditionDTO( + key="refs.variant.id", # ID ? + operator="is", + value=variant, + ) + + if application_condition or environment_condition or variant_condition: + filtering = FilteringDTO( + conditions=[ + condition + for condition in [ + application_condition, + environment_condition, + variant_condition, + ] + if condition + ] + ) + + windowing = None + + if timeRange: + newest, oldest, window = _parse_time_range(timeRange) + + print("newest: ", newest) + print("oldest: ", oldest) + print("window: ", window) + + windowing = WindowingDTO(newest=newest, oldest=oldest, window=window) + + grouping = GroupingDTO(focus="tree") + + return AnalyticsDTO( + grouping=grouping, + windowing=windowing, + filtering=filtering, + ) def parse_legacy_analytics( @@ -809,7 +889,7 @@ def parse_legacy_analytics( data_points = list() - failure_count = 0 + total_failure = 0 total_latency = 0.0 summary = LegacySummary( @@ -834,19 +914,20 @@ def parse_legacy_analytics( data_points.append(data_point) - summary.total_count += bucket_dto.total.count - summary.total_cost += bucket_dto.total.cost - summary.total_tokens += bucket_dto.total.tokens + summary.total_count += bucket_dto.total.count if bucket_dto.total.count else 0 + summary.total_cost += bucket_dto.total.cost if bucket_dto.total.cost else 0.0 + summary.total_tokens += ( + bucket_dto.total.tokens if bucket_dto.total.tokens else 0 + ) - failure_count += bucket_dto.error.count - total_latency += bucket_dto.total.duration + total_failure += bucket_dto.error.count if bucket_dto.error.count else 0 + total_latency += bucket_dto.total.duration if bucket_dto.total.duration else 0.0 - summary.failure_rate = ( - failure_count / summary.total_count if summary.total_count else 0.0 - ) + if summary.total_count: + summary.failure_rate = total_failure / summary.total_count - summary.avg_cost = summary.total_cost / summary.total_count - summary.avg_latency = (total_latency / summary.total_count) / 1_000 - summary.avg_tokens = summary.total_tokens / summary.total_count + summary.avg_cost = summary.total_cost / summary.total_count + summary.avg_latency = (total_latency / summary.total_count) / 1_000 + summary.avg_tokens = summary.total_tokens / summary.total_count return data_points, summary diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py index b3f040f2e2..0bf7aa4185 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py @@ -1,5 +1,5 @@ from typing import Optional, List, Tuple, Union -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time from traceback import print_exc from uuid import UUID @@ -208,6 +208,8 @@ async def analytics( async with engine.session() as session: # WINDOWING today = datetime.now() + end_of_today = datetime.combine(today, time.max) + oldest = None newest = None window_text = None @@ -216,7 +218,7 @@ async def analytics( if analytics_dto.windowing.newest: newest = analytics_dto.windowing.newest else: - newest = today + newest = end_of_today if analytics_dto.windowing.oldest: if analytics_dto.windowing.oldest > newest: @@ -253,7 +255,7 @@ async def analytics( window_text = f"{_desired_window} minute{'s' if _desired_window > 1 else ''}" else: - newest = today + newest = end_of_today oldest = newest - _DEFAULT_TIME_DELTA window_text = _DEFAULT_WINDOW From c678815f15b4e271f1abf5a20b347ba8921143eb Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Wed, 6 Nov 2024 10:43:53 +0100 Subject: [PATCH 12/17] fix failure_rate --- .../agenta_backend/apis/fastapi/observability/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index 0246d88993..5294df03e4 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -924,7 +924,7 @@ def parse_legacy_analytics( total_latency += bucket_dto.total.duration if bucket_dto.total.duration else 0.0 if summary.total_count: - summary.failure_rate = total_failure / summary.total_count + summary.failure_rate = (total_failure / summary.total_count) * 100 summary.avg_cost = summary.total_cost / summary.total_count summary.avg_latency = (total_latency / summary.total_count) / 1_000 From 489d2218aa2e375c9c63498588304e7f496fff5a Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Fri, 15 Nov 2024 14:12:24 +0100 Subject: [PATCH 13/17] apply black --- .../apis/fastapi/observability/utils.py | 1 - .../integrations/litellm/01_sanity_check.py | 21 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py index ee4858298d..6a8a6ec462 100644 --- a/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py +++ b/agenta-backend/agenta_backend/apis/fastapi/observability/utils.py @@ -891,7 +891,6 @@ def parse_legacy_analytics_dto( def parse_legacy_analytics( bucket_dtos: List[BucketDTO], ) -> Tuple[List[LegacyDataPoint], LegacySummary]: - data_points = list() total_failure = 0 diff --git a/agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py b/agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py index bf93909ed7..9bedc41087 100644 --- a/agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py +++ b/agenta-cli/tests/observability_sdk/integrations/litellm/01_sanity_check.py @@ -1,14 +1,19 @@ import litellm import os -os.environ["OTEL_EXPORTER"]="otlp_http" -os.environ["OTEL_ENDPOINT"]="http://localhost/api/observability/v1/otlp/traces" -AGENTA_APP_ID="0192b441-ab58-7af3-91d0-2f1818690828" -AGENTA_API_KEY="xxx" -os.environ["OTEL_HEADERS"]=f"AG-APP-ID={AGENTA_APP_ID}" + +os.environ["OTEL_EXPORTER"] = "otlp_http" +os.environ["OTEL_ENDPOINT"] = "http://localhost/api/observability/v1/otlp/traces" +AGENTA_APP_ID = "0192b441-ab58-7af3-91d0-2f1818690828" +AGENTA_API_KEY = "xxx" +os.environ["OTEL_HEADERS"] = f"AG-APP-ID={AGENTA_APP_ID}" + async def generate_completion(): litellm.callbacks = ["otel"] - messages = [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Write a short story about AI Engineering."}] + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Write a short story about AI Engineering."}, + ] temperature = 0.2 max_tokens = 100 chat_completion = await litellm.acompletion( @@ -19,6 +24,8 @@ async def generate_completion(): ) return chat_completion + if __name__ == "__main__": import asyncio - asyncio.run(generate_completion()) \ No newline at end of file + + asyncio.run(generate_completion()) From 618e39b576fa18d1335d80e76173307bc7a24075 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Fri, 15 Nov 2024 14:14:04 +0100 Subject: [PATCH 14/17] fix merge --- agenta-cli/agenta/sdk/tracing/inline.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/agenta-cli/agenta/sdk/tracing/inline.py b/agenta-cli/agenta/sdk/tracing/inline.py index 925b7c02cf..8c4165ea6f 100644 --- a/agenta-cli/agenta/sdk/tracing/inline.py +++ b/agenta-cli/agenta/sdk/tracing/inline.py @@ -1195,11 +1195,7 @@ def _parse_to_legacy_span(span: SpanDTO) -> CreateSpan: ), # app_id=( -<<<<<<< HEAD - span.refs.get("application", {}).get("id") -======= span.refs.get("application", {}).get("id", "missing-app-id") ->>>>>>> feature/observability-checkpoint-2 if span.refs else "missing-app-id" ), From dc40d70269a93459e36f10ffc9c45e28d9a6e31f Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Fri, 15 Nov 2024 15:40:07 +0100 Subject: [PATCH 15/17] add all buckets ? WIP --- .../dbs/postgres/observability/dao.py | 27 +++++++++++++++++++ .../dbs/postgres/observability/mappings.py | 4 +++ 2 files changed, 31 insertions(+) diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py index 12b0f48dd0..50bb22ebe3 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py @@ -435,10 +435,37 @@ async def analytics( window = _to_minutes(window_text) + def generate_all_buckets( + oldest: datetime, newest: datetime, window: int + ) -> List[Tuple[datetime, datetime]]: + """ + Generate all buckets between the oldest and newest timestamps based on a given window. + + Args: + oldest (datetime): The start time of the bucket range. + newest (datetime): The end time of the bucket range. + window (int): The window size in minutes. + + Returns: + List[Tuple[datetime, datetime]]: A list of tuples representing bucket start and end times. + """ + bucket_start = oldest + buckets = [] + + while bucket_start < newest: + bucket_end = bucket_start + timedelta(minutes=window) + buckets.append((bucket_start, min(bucket_end, newest))) + bucket_start = bucket_end + + return buckets + + buckets = generate_all_buckets(oldest, newest, window) + bucket_dtos, count = map_bucket_dbes_to_dtos( total_bucket_dbes=total_bucket_dbes, error_bucket_dbes=error_bucket_dbes, window=window, + buckets=buckets, ) return bucket_dtos, count diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py index 017749b3ba..0f0bc6a620 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py @@ -1,5 +1,6 @@ from typing import List, Tuple from json import dumps, loads +from datetime import datetime from agenta_backend.core.shared.dtos import LifecycleDTO from agenta_backend.core.observability.dtos import ( @@ -134,6 +135,7 @@ def map_bucket_dbes_to_dtos( total_bucket_dbes: List[NodesDBE], error_bucket_dbes: List[NodesDBE], window: int, + buckets: List[datetime], ) -> Tuple[List[BucketDTO], int]: total_metrics = { bucket.timestamp: MetricsDTO( @@ -160,6 +162,8 @@ def map_bucket_dbes_to_dtos( ) total_timestamps.sort() + total_timestamps = bucket_dtos + bucket_dtos = [ BucketDTO( timestamp=timestamp, From 3da913304ec6f8a7b74d2ced4e86bb1037c41e8c Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Fri, 15 Nov 2024 16:19:59 +0100 Subject: [PATCH 16/17] fix missing buckets and bucket timestamps issue. --- .../dbs/postgres/observability/dao.py | 60 +++++++++---------- .../dbs/postgres/observability/mappings.py | 16 ++--- 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py index 50bb22ebe3..3268765b48 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py @@ -45,7 +45,8 @@ ) _DEFAULT_TIME_DELTA = timedelta(days=30) -_DEFAULT_WINDOW = "15 minutes" +_DEFAULT_WINDOW = 1440 # 1 day +_DEFAULT_WINDOW_TEXT = "1 day" _MAX_ALLOWED_BUCKETS = 1024 _SUGGESTED_BUCKETS_LIST = [ (1, "1 minute"), @@ -205,7 +206,9 @@ async def analytics( async with engine.session() as session: # WINDOWING today = datetime.now() - end_of_today = datetime.combine(today, time.max) + start_of_next_day = datetime.combine( + today + timedelta(days=1), time.min + ) oldest = None newest = None @@ -215,7 +218,7 @@ async def analytics( if analytics_dto.windowing.newest: newest = analytics_dto.windowing.newest else: - newest = end_of_today + newest = start_of_next_day if analytics_dto.windowing.oldest: if analytics_dto.windowing.oldest > newest: @@ -228,7 +231,7 @@ async def analytics( if analytics_dto.windowing.window: _desired_window = analytics_dto.windowing.window else: - _desired_window = 15 + _desired_window = _DEFAULT_WINDOW _window_minutes = (newest - oldest).total_seconds() // 60 @@ -252,9 +255,9 @@ async def analytics( window_text = f"{_desired_window} minute{'s' if _desired_window > 1 else ''}" else: - newest = end_of_today + newest = start_of_next_day oldest = newest - _DEFAULT_TIME_DELTA - window_text = _DEFAULT_WINDOW + window_text = _DEFAULT_WINDOW_TEXT # --------- @@ -435,37 +438,13 @@ async def analytics( window = _to_minutes(window_text) - def generate_all_buckets( - oldest: datetime, newest: datetime, window: int - ) -> List[Tuple[datetime, datetime]]: - """ - Generate all buckets between the oldest and newest timestamps based on a given window. - - Args: - oldest (datetime): The start time of the bucket range. - newest (datetime): The end time of the bucket range. - window (int): The window size in minutes. - - Returns: - List[Tuple[datetime, datetime]]: A list of tuples representing bucket start and end times. - """ - bucket_start = oldest - buckets = [] - - while bucket_start < newest: - bucket_end = bucket_start + timedelta(minutes=window) - buckets.append((bucket_start, min(bucket_end, newest))) - bucket_start = bucket_end - - return buckets - - buckets = generate_all_buckets(oldest, newest, window) + timestamps = _to_timestamps(oldest, newest, window) bucket_dtos, count = map_bucket_dbes_to_dtos( total_bucket_dbes=total_bucket_dbes, error_bucket_dbes=error_bucket_dbes, window=window, - buckets=buckets, + timestamps=timestamps, ) return bucket_dtos, count @@ -874,3 +853,20 @@ def _to_minutes( return quantity * 43200 else: raise ValueError(f"Unknown time unit: {unit}") + + +def _to_timestamps( + oldest: datetime, + newest: datetime, + window: int, +) -> List[datetime]: + buckets = [] + + bucket_start = oldest + + while bucket_start < newest: + buckets.append(bucket_start) + + bucket_start += timedelta(minutes=window) + + return buckets diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py index 0f0bc6a620..b494a08ad4 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py @@ -1,4 +1,4 @@ -from typing import List, Tuple +from typing import List, Tuple, Optional from json import dumps, loads from datetime import datetime @@ -135,7 +135,7 @@ def map_bucket_dbes_to_dtos( total_bucket_dbes: List[NodesDBE], error_bucket_dbes: List[NodesDBE], window: int, - buckets: List[datetime], + timestamps: Optional[List[datetime]] = None, ) -> Tuple[List[BucketDTO], int]: total_metrics = { bucket.timestamp: MetricsDTO( @@ -157,12 +157,12 @@ def map_bucket_dbes_to_dtos( for bucket in error_bucket_dbes } - total_timestamps = list( - set(list(total_metrics.keys()) + list(error_metrics.keys())) - ) - total_timestamps.sort() - - total_timestamps = bucket_dtos + total_timestamps = timestamps + if not total_timestamps: + total_timestamps = list( + set(list(total_metrics.keys()) + list(error_metrics.keys())) + ) + total_timestamps.sort() bucket_dtos = [ BucketDTO( From c03c9b640754d331e34f226d743f10a976904345 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Fri, 15 Nov 2024 16:29:07 +0100 Subject: [PATCH 17/17] fix timezone --- .../dbs/postgres/observability/dao.py | 18 +++++++++++++++--- .../dbs/postgres/observability/mappings.py | 5 +++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py index 3268765b48..fc48e0098a 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/dao.py @@ -1,5 +1,5 @@ from typing import Optional, List, Tuple, Union -from datetime import datetime, timedelta, time +from datetime import datetime, timedelta, time, timezone from traceback import print_exc from uuid import UUID @@ -862,9 +862,21 @@ def _to_timestamps( ) -> List[datetime]: buckets = [] - bucket_start = oldest + _oldest = oldest + if oldest.tzinfo is None: + _oldest = oldest.replace(tzinfo=timezone.utc) + else: + _oldest = oldest.astimezone(timezone.utc) + + _newest = newest + if newest.tzinfo is None: + _newest = newest.replace(tzinfo=timezone.utc) + else: + _newest = newest.astimezone(timezone.utc) + + bucket_start = _oldest - while bucket_start < newest: + while bucket_start < _newest: buckets.append(bucket_start) bucket_start += timedelta(minutes=window) diff --git a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py index b494a08ad4..9f4c56f819 100644 --- a/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py +++ b/agenta-backend/agenta_backend/dbs/postgres/observability/mappings.py @@ -164,6 +164,11 @@ def map_bucket_dbes_to_dtos( ) total_timestamps.sort() + _total_timestamps = list( + set(list(total_metrics.keys()) + list(error_metrics.keys())) + ) + _total_timestamps.sort() + bucket_dtos = [ BucketDTO( timestamp=timestamp,