Skip to content

Commit

Permalink
Refactor retrievers, paginators, and pagination strategy to be stateless
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjlai committed Dec 21, 2024
1 parent ae1211f commit ef51643
Show file tree
Hide file tree
Showing 17 changed files with 345 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,40 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)
if isinstance(self.url_base, str):
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
self._token: Optional[Any] = self.pagination_strategy.initial_token
# self._token: Optional[Any] = self.pagination_strategy.initial_token

def get_initial_token(self) -> Optional[Any]:
"""
Return the page token that should be used for the first request of a stream
WARNING: get_initial_token() should not be used by streams that use RFR that perform checkpointing
of state using page numbers. Because paginators are stateless
"""
return self.pagination_strategy.initial_token

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any] = None,
) -> Optional[Mapping[str, Any]]:
self._token = self.pagination_strategy.next_page_token(
response, last_page_size, last_record
next_page_token = self.pagination_strategy.next_page_token(
response=response,
last_page_size=last_page_size,
last_record=last_record,
last_page_token_value=last_page_token_value,
)
if self._token:
return {"next_page_token": self._token}
if next_page_token:
return {"next_page_token": next_page_token}
else:
return None

def path(self) -> Optional[str]:
if (
self._token
and self.page_token_option
and isinstance(self.page_token_option, RequestPath)
):
def path(self, next_page_token: Mapping[str, Any]) -> Optional[str]:
token = next_page_token.get("next_page_token") if next_page_token else None
if token and self.page_token_option and isinstance(self.page_token_option, RequestPath):
# Replace url base to only return the path
return str(self._token).replace(self.url_base.eval(self.config), "") # type: ignore # url_base is casted to a InterpolatedString in __post_init__
return str(token).replace(self.url_base.eval(self.config), "") # type: ignore # url_base is casted to a InterpolatedString in __post_init__
else:
return None

Expand All @@ -143,7 +156,7 @@ def get_request_params(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._get_request_options(RequestOptionType.request_parameter)
return self._get_request_options(RequestOptionType.request_parameter, next_page_token)

def get_request_headers(
self,
Expand All @@ -152,7 +165,7 @@ def get_request_headers(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, str]:
return self._get_request_options(RequestOptionType.header)
return self._get_request_options(RequestOptionType.header, next_page_token)

def get_request_body_data(
self,
Expand All @@ -161,7 +174,7 @@ def get_request_body_data(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_data)
return self._get_request_options(RequestOptionType.body_data, next_page_token)

def get_request_body_json(
self,
Expand All @@ -170,25 +183,21 @@ def get_request_body_json(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def reset(self, reset_value: Optional[Any] = None) -> None:
if reset_value:
self.pagination_strategy.reset(reset_value=reset_value)
else:
self.pagination_strategy.reset()
self._token = self.pagination_strategy.initial_token
return self._get_request_options(RequestOptionType.body_json, next_page_token)

def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping[str, Any]:
def _get_request_options(
self, option_type: RequestOptionType, next_page_token: Optional[Mapping[str, Any]]
) -> MutableMapping[str, Any]:
options = {}

token = next_page_token.get("next_page_token") if next_page_token else None
if (
self.page_token_option
and self._token is not None
and token is not None
and isinstance(self.page_token_option, RequestOption)
and self.page_token_option.inject_into == option_type
):
options[self.page_token_option.field_name.eval(config=self.config)] = self._token # type: ignore # field_name is always cast to an interpolated string
options[self.page_token_option.field_name.eval(config=self.config)] = token # type: ignore # field_name is always cast to an interpolated string
if (
self.page_size_option
and self.pagination_strategy.get_page_size()
Expand Down Expand Up @@ -217,17 +226,26 @@ def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> No
self._decorated = decorated
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL

def get_initial_token(self) -> Optional[Any]:
return self._decorated.get_initial_token()

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any] = None,
) -> Optional[Mapping[str, Any]]:
if self._page_count >= self._maximum_number_of_pages:
return None

self._page_count += 1
return self._decorated.next_page_token(response, last_page_size, last_record)
return self._decorated.next_page_token(
response, last_page_size, last_record, last_page_token_value
)

def path(self) -> Optional[str]:
return self._decorated.path()
def path(self, next_page_token: Mapping[str, Any]) -> Optional[str]:
return self._decorated.path(next_page_token)

def get_request_params(
self,
Expand Down Expand Up @@ -272,7 +290,3 @@ def get_request_body_json(
return self._decorated.get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def reset(self, reset_value: Optional[Any] = None) -> None:
self._decorated.reset()
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class NoPagination(Paginator):

parameters: InitVar[Mapping[str, Any]]

def path(self) -> Optional[str]:
def path(self, next_page_token: Mapping[str, Any]) -> Optional[str]:
return None

def get_request_params(
Expand Down Expand Up @@ -58,11 +58,14 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return {}

def get_initial_token(self) -> Optional[Any]:
return None

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Mapping[str, Any]:
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any],
) -> Optional[Mapping[str, Any]]:
return {}

def reset(self, reset_value: Optional[Any] = None) -> None:
# No state to reset
pass
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,32 @@ class Paginator(ABC, RequestOptionsProvider):
"""

@abstractmethod
def reset(self, reset_value: Optional[Any] = None) -> None:
def get_initial_token(self) -> Optional[Any]:
"""
Reset the pagination's inner state
Get the page token that should be included in the request to get the first page of records
"""

@abstractmethod
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any],
) -> Optional[Mapping[str, Any]]:
"""
Returns the next_page_token to use to fetch the next page of records.
:param response: the response to process
:param last_page_size: the number of records read from the response
:param last_record: the last record extracted from the response
:param last_page_token_value: The current value of the page token made on the last request
:return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
"""
pass

@abstractmethod
def path(self) -> Optional[str]:
def path(self, next_page_token: Mapping[str, Any]) -> Optional[str]:
"""
Returns the URL path to hit to fetch the next page of records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class CursorPaginationStrategy(PaginationStrategy):
)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._initial_cursor = None
if isinstance(self.cursor_value, str):
self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
else:
Expand All @@ -57,10 +56,19 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

@property
def initial_token(self) -> Optional[Any]:
return self._initial_cursor
"""
CursorPaginationStrategy does not have an initial value because the next cursor is typically included
in the response of the first request. For Resumable Full Refresh streams that checkpoint the page
cursor, the next cursor should be read from the state or stream slice object.
"""
return None

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any] = None,
) -> Optional[Any]:
decoded_response = next(self.decoder.decode(response))

Expand All @@ -87,8 +95,5 @@ def next_page_token(
)
return token if token else None

def reset(self, reset_value: Optional[Any] = None) -> None:
self._initial_cursor = reset_value

def get_page_size(self) -> Optional[int]:
return self.page_size
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class OffsetIncrement(PaginationStrategy):
inject_on_first_request: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._offset = 0
page_size = str(self.page_size) if isinstance(self.page_size, int) else self.page_size
if page_size:
self._page_size: Optional[InterpolatedString] = InterpolatedString(
Expand All @@ -64,11 +63,15 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
@property
def initial_token(self) -> Optional[Any]:
if self.inject_on_first_request:
return self._offset
return 0
return None

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any] = None,
) -> Optional[Any]:
decoded_response = next(self.decoder.decode(response))

Expand All @@ -78,9 +81,17 @@ def next_page_token(
and last_page_size < self._page_size.eval(self.config, response=decoded_response)
) or last_page_size == 0:
return None
elif last_page_token_value is None:
# If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value
# will be None. For this case, we assume that None was the first page and progress to the next offset
return 0 + last_page_size
elif not isinstance(last_page_token_value, int):
raise ValueError(
"The page token for a OffsetIncrement pagination strategy must be an integer"
)
else:
self._offset += last_page_size
return self._offset
next_page_token_value = last_page_token_value + last_page_size
return next_page_token_value

def reset(self, reset_value: Optional[Any] = 0) -> None:
if not isinstance(reset_value, int):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class PageIncrement(PaginationStrategy):
inject_on_first_request: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._page = self.start_from_page
if isinstance(self.page_size, int) or (self.page_size is None):
self._page_size = self.page_size
else:
Expand All @@ -43,28 +42,30 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
@property
def initial_token(self) -> Optional[Any]:
if self.inject_on_first_request:
return self._page
return self.start_from_page
return None

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any],
) -> Optional[Any]:
# Stop paginating when there are fewer records than the page size or the current page has no records
if (self._page_size and last_page_size < self._page_size) or last_page_size == 0:
return None
else:
self._page += 1
return self._page

def reset(self, reset_value: Optional[Any] = None) -> None:
if reset_value is None:
self._page = self.start_from_page
elif not isinstance(reset_value, int):
elif last_page_token_value is None:
# If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value
# may be None. When this is the case, we assume we've already requested the first page specified by
# start_from_page and must now get the next page
return self.start_from_page + 1
elif not isinstance(last_page_token_value, int):
raise ValueError(
f"Reset value {reset_value} for PageIncrement pagination strategy was not an integer"
"The page token for a PageIncrement pagination strategy must be an integer"
)
else:
self._page = reset_value
return last_page_token_value + 1

def get_page_size(self) -> Optional[int]:
return self._page_size
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Optional
from typing import Any, Mapping, Optional

import requests

Expand All @@ -26,22 +26,21 @@ def initial_token(self) -> Optional[Any]:

@abstractmethod
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
self,
response: requests.Response,
last_page_size: int,
last_record: Optional[Record],
last_page_token_value: Optional[Any],
) -> Optional[Any]:
"""
:param response: response to process
:param last_page_size: the number of records read from the response
:param last_record: the last record extracted from the response
:param last_page_token_value: The current value of the page token made on the last request
:return: next page token. Returns None if there are no more pages to fetch
"""
pass

@abstractmethod
def reset(self, reset_value: Optional[Any] = None) -> None:
"""
Reset the pagination's inner state
"""

@abstractmethod
def get_page_size(self) -> Optional[int]:
"""
Expand Down
Loading

0 comments on commit ef51643

Please sign in to comment.