From d997cf0c84a4137d1fcaea164244c5c584edbac2 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 17 Dec 2024 05:18:52 +0100 Subject: [PATCH 1/2] Fix checkpointing for declarative streams --- airbyte_cdk/sources/streams/core.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/streams/core.py b/airbyte_cdk/sources/streams/core.py index a9aa8550..6cc5c8b5 100644 --- a/airbyte_cdk/sources/streams/core.py +++ b/airbyte_cdk/sources/streams/core.py @@ -223,17 +223,17 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o record_counter += 1 checkpoint_interval = self.state_checkpoint_interval - checkpoint = checkpoint_reader.get_checkpoint() if ( should_checkpoint and checkpoint_interval and record_counter % checkpoint_interval == 0 - and checkpoint is not None ): - airbyte_state_message = self._checkpoint_state( - checkpoint, state_manager=state_manager - ) - yield airbyte_state_message + checkpoint = checkpoint_reader.get_checkpoint() + if checkpoint: + airbyte_state_message = self._checkpoint_state( + checkpoint, state_manager=state_manager + ) + yield airbyte_state_message if internal_config.is_limit_reached(record_counter): break From 7a693a21cd0af219aa5be05da389c67a567a0d53 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Wed, 18 Dec 2024 20:03:01 +0100 Subject: [PATCH 2/2] Added unit tests --- .../test_manifest_declarative_source.py | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index ea92bac5..dfadc3da 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -1901,3 +1901,145 @@ def validate_refs(yaml_file: str) -> List[str]: / "airbyte_cdk/sources/declarative/declarative_component_schema.yaml" ) assert not validate_refs(yaml_file_path) + + +@pytest.mark.parametrize( + "test_name, manifest, pages, expected_states_qty", + [ + ( + "test_with_pagination_and_partition_router", + { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Rates", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + "partition": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.apilayer.com", + "path": "/exchangerates_data/latest", + "http_method": "GET", + "request_parameters": {}, + "request_headers": {}, + "request_body_json": {}, + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1"], + "cursor_field": "partition", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, + }, + "paginator": { + "type": "DefaultPaginator", + "page_size": 2, + "page_size_option": { + "inject_into": "request_parameter", + "field_name": "page_size", + }, + "page_token_option": {"inject_into": "path", "type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 2, + }, + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%S.%fZ"], + "datetime_format": "%Y-%m-%dT%H:%M:%S.%fZ", + "cursor_field": "updated_at", + "start_datetime": { + "datetime": "{{ config.get('start_date', '2020-10-16T00:00:00.000Z') }}" + }, + }, + } + ], + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["api_key"], + "properties": { + "api_key": { + "type": "string", + "title": "API Key", + "airbyte_secret": True, + }, + "start_date": { + "title": "Start Date", + "description": "UTC date and time in the format YYYY-MM-DDTHH:MM:SS.000Z. During incremental sync, any data generated before this date will not be replicated. If left blank, the start date will be set to 2 years before the present date.", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + "pattern_descriptor": "YYYY-MM-DDTHH:MM:SS.000Z", + "examples": ["2020-11-16T00:00:00.000Z"], + "type": "string", + "format": "date-time", + }, + }, + "additionalProperties": True, + }, + "documentation_url": "https://example.org", + "type": "Spec", + }, + }, + ( + _create_page( + { + "rates": [ + {"ABC": 0, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"}, + {"AED": 1, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"}, + ], + "_metadata": {"next": "next"}, + } + ), + _create_page( + { + "rates": [ + {"USD": 3, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"} + ], + "_metadata": {}, + } + ), + _create_page( + { + "rates": [ + {"ABC": 2, "partition": 1, "updated_at": "2020-11-16T00:00:00.000Z"} + ], + "_metadata": {}, + } + ), + ), + 2, + ), + ], +) +def test_slice_checkpoint(test_name, manifest, pages, expected_states_qty): + _stream_name = "Rates" + with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages): + states = [message.state for message in _run_read(manifest, _stream_name) if message.state] + assert len(states) == expected_states_qty