Skip to content

Commit

Permalink
Upgrade pagination to allow for next and stop, useful for continuous …
Browse files Browse the repository at this point in the history
…loading and refresh, without page number.
  • Loading branch information
jp-agenta committed Oct 29, 2024
1 parent a00cd3f commit f7c4448
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 17 deletions.
52 changes: 41 additions & 11 deletions agenta-backend/agenta_backend/apis/fastapi/observability/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _parse_filtering(
raise HTTPException(
status_code=400,
detail=f"Invalid JSON filtering provided: {str(e)}",
)
) from e

return _filtering

Expand All @@ -82,11 +82,38 @@ def _parse_grouping(
def _parse_pagination(
page: Optional[int] = None,
size: Optional[int] = None,
next: Optional[str] = None, # pylint: disable=W0622:redefined-builtin
stop: Optional[str] = None,
) -> Optional[PaginationDTO]:
_pagination = None

if page and size:
_pagination = PaginationDTO(page=page, size=size)
print("---------------------------------")
print(page, size, next, stop)

if page and next:
raise HTTPException(
status_code=400,
detail="Both 'page' and 'next' cannot be provided at the same time",
)

if size and stop:
raise HTTPException(
status_code=400,
detail="Both 'size' and 'stop' cannot be provided at the same time",
)

if page and not size:
raise HTTPException(
status_code=400,
detail="'size' is required when 'page' is provided",
)

_pagination = PaginationDTO(
page=page,
size=size,
next=next,
stop=stop,
)

return _pagination

Expand All @@ -106,12 +133,14 @@ def parse_query_dto(
# - Option 2: Flat query parameters
page: Optional[int] = Query(None),
size: Optional[int] = Query(None),
next: Optional[str] = Query(None), # pylint: disable=W0622:redefined-builtin
stop: Optional[str] = Query(None),
) -> QueryDTO:
return QueryDTO(
grouping=_parse_grouping(focus=focus),
windowing=_parse_windowing(earliest=earliest, latest=latest),
filtering=_parse_filtering(filtering=filtering),
pagination=_parse_pagination(page=page, size=size),
pagination=_parse_pagination(page=page, size=size, next=next, stop=stop),
)


Expand Down Expand Up @@ -372,7 +401,7 @@ def _parse_from_attributes(
if key.endswith(".id"):
try:
_refs[key] = str(UUID(_refs[key]))
except:
except: # pylint: disable=W0702:bare-except
_refs[key] = None

_refs[key] = str(_refs[key])
Expand Down Expand Up @@ -403,9 +432,9 @@ def _parse_from_events(
attributes=event.attributes,
)

del exception.attributes["exception.type"]
del exception.attributes["exception.message"]
del exception.attributes["exception.stacktrace"]
del event.attributes["exception.type"]
del event.attributes["exception.message"]
del event.attributes["exception.stacktrace"]

else:
_other_events.append(event)
Expand Down Expand Up @@ -512,8 +541,6 @@ def _parse_to_attributes(
) -> Attributes:
attributes = dict()

MAX_DEPTH = 4

# DATA
if span_dto.data:
_data = span_dto.data
Expand Down Expand Up @@ -718,7 +745,10 @@ def parse_to_agenta_span_dto(
# ------------------

# MASK LIFECYCLE FOR NOW
span_dto.lifecycle = None
# span_dto.lifecycle = None
if span_dto.lifecycle:
span_dto.lifecycle.updated_at = None
span_dto.lifecycle.updated_by_id = None
# ----------------------

return span_dto
7 changes: 5 additions & 2 deletions agenta-backend/agenta_backend/core/observability/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,11 @@ class GroupingDTO(BaseModel):


class PaginationDTO(BaseModel):
page: int
size: int
page: Optional[int] = None
size: Optional[int] = None

next: Optional[datetime] = None
stop: Optional[datetime] = None


class QueryDTO(BaseModel):
Expand Down
55 changes: 51 additions & 4 deletions agenta-backend/agenta_backend/dbs/postgres/observability/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def query(

if windowing.latest:
query = query.filter(
InvocationSpanDBE.created_at <= windowing.latest
InvocationSpanDBE.created_at < windowing.latest
)
# ---------

Expand All @@ -96,20 +96,67 @@ async def query(
query = query.order_by(InvocationSpanDBE.created_at.desc())
# -------

# WHERE created_at <= stop
# WHERE next < created_at AND created_at <= stop

# PAGINATION
pagination = query_dto.pagination
count = None
# ----------
if pagination:
print("-----------------")
print(pagination)

count_query = select(
func.count() # pylint: disable=E1102:not-callable
).select_from(query.subquery())
count = (await session.execute(count_query)).scalar()

limit = pagination.size
offset = (pagination.page - 1) * pagination.size
# 1. LIMIT size OFFSET (page - 1) * size
# -> unstable if windowing.latest is not set
if pagination.page and pagination.size:
limit = pagination.size
offset = (pagination.page - 1) * pagination.size

query = query.limit(limit).offset(offset)

# 2. WHERE next > created_at LIMIT size
# -> unstable if created_at is not unique
elif pagination.next and pagination.size:
query = query.filter(
InvocationSpanDBE.created_at < pagination.next
)
query = query.limit(pagination.size)

# 3. WHERE next > created_at AND created_at >= stop
# -> stable thanks to the </<= combination
elif pagination.next and pagination.stop:

query = query.filter(
InvocationSpanDBE.created_at < pagination.next
)
query = query.filter(
InvocationSpanDBE.created_at >= pagination.stop
)

# 4. WHERE LIMIT size
# -> useful as a starter query
elif pagination.size:
query = query.limit(pagination.size)

query = query.limit(limit).offset(offset)
# 5. WHERE created_at >= stop
# -> useful as a starter query
elif pagination.stop:
query = query.filter(
InvocationSpanDBE.created_at >= pagination.stop
)

# 6. WHERE next > created_at
# -> rather useless
elif pagination.next:
query = query.filter(
InvocationSpanDBE.created_at < pagination.next
)
# ----------

# GROUPING
Expand Down

0 comments on commit f7c4448

Please sign in to comment.