Skip to content

Commit

Permalink
Add limit to list workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Dec 5, 2024
1 parent a90f6d4 commit ecf594c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
16 changes: 15 additions & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ def list_workflows(
next_page_token: Optional[bytes] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
limit: Optional[int] = None,
) -> WorkflowExecutionAsyncIterator:
"""List workflows.
Expand All @@ -813,6 +814,8 @@ def list_workflows(
rpc_metadata: Headers used on each RPC call. Keys here override
client-level RPC metadata keys.
rpc_timeout: Optional RPC deadline to set for each RPC call.
limit: Maximum number of workflows to return. If unset, all
workflows are returned.
Returns:
An async iterator that can be used with ``async for``.
Expand All @@ -824,6 +827,7 @@ def list_workflows(
next_page_token=next_page_token,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
limit=limit,
)
)

Expand Down Expand Up @@ -2483,6 +2487,8 @@ def __init__(
self._next_page_token = input.next_page_token
self._current_page: Optional[Sequence[WorkflowExecution]] = None
self._current_page_index = 0
self._limit = input.limit
self._yielded = 0

@property
def current_page_index(self) -> int:
Expand All @@ -2508,10 +2514,14 @@ async def fetch_next_page(self, *, page_size: Optional[int] = None) -> None:
page_size: Override the page size this iterator was originally
created with.
"""
page_size = page_size or self._input.page_size
if self._limit is not None and self._limit - self._yielded < page_size:
page_size = self._limit - self._yielded

resp = await self._client.workflow_service.list_workflow_executions(
temporalio.api.workflowservice.v1.ListWorkflowExecutionsRequest(
namespace=self._client.namespace,
page_size=page_size or self._input.page_size,
page_size=page_size,
next_page_token=self._next_page_token or b"",
query=self._input.query or "",
),
Expand All @@ -2534,6 +2544,8 @@ async def __anext__(self) -> WorkflowExecution:
"""Get the next execution on this iterator, fetching next page if
necessary.
"""
if self._limit is not None and self._yielded >= self._limit:
raise StopAsyncIteration
while True:
# No page? fetch and continue
if self._current_page is None:
Expand All @@ -2551,6 +2563,7 @@ async def __anext__(self) -> WorkflowExecution:
# Get current, increment page index, and return
ret = self._current_page[self._current_page_index]
self._current_page_index += 1
self._yielded += 1
return ret

async def map_histories(
Expand Down Expand Up @@ -4573,6 +4586,7 @@ class ListWorkflowsInput:
next_page_token: Optional[bytes]
rpc_metadata: Mapping[str, str]
rpc_timeout: Optional[timedelta]
limit: Optional[int]


@dataclass
Expand Down
8 changes: 8 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,14 @@ async def test_list_workflows_and_fetch_history(
)
assert actual_id_and_input == expected_id_and_input

# Verify listing can limit results
limited = [w async for w in client.list_workflows(f"WorkflowId = '{workflow_id}'", limit=3)]
assert len(limited) == 3
# With a weird page size
limited = [w async for w in client.list_workflows(f"WorkflowId = '{workflow_id}'", page_size=2, limit=3)]
assert len(limited) == 3



@workflow.defn
class CountableWorkflow:
Expand Down

0 comments on commit ecf594c

Please sign in to comment.