From a7b010b88d246b5e1c361483d746b81c4a4e4b4d Mon Sep 17 00:00:00 2001 From: Rusi Popov Date: Thu, 26 Dec 2024 18:36:45 +0200 Subject: [PATCH 1/3] Feat: Publish the response to process as stream_status.response in the transformation's context --- .../declarative/extractors/record_selector.py | 16 +++++++++++++--- .../extractors/test_record_selector.py | 13 +++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index b2eed93b..3561f107 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -7,6 +7,7 @@ import requests +from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter @@ -21,6 +22,8 @@ SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization, } +STREAM_SLICE_RESPONSE_ROOT_KEY = "response" + @dataclass class RecordSelector(HttpSelector): @@ -51,6 +54,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: if isinstance(self._name, str) else self._name ) + self.response_root_extractor = DpathExtractor(field_path=[], config={}, parameters={}) @property # type: ignore def name(self) -> str: @@ -86,9 +90,15 @@ def select_records( :return: List of Records selected from the response """ all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) - yield from self.filter_and_transform( - all_data, stream_state, records_schema, stream_slice, next_page_token - ) + + response_root_iterator = self.response_root_extractor.extract_records(response) + stream_state.update({STREAM_SLICE_RESPONSE_ROOT_KEY: next(response_root_iterator, None)}) + try: + yield from self.filter_and_transform( + all_data, stream_state, records_schema, stream_slice, next_page_token + ) + finally: + stream_state.pop(STREAM_SLICE_RESPONSE_ROOT_KEY) def filter_and_transform( self, diff --git a/unit_tests/sources/declarative/extractors/test_record_selector.py b/unit_tests/sources/declarative/extractors/test_record_selector.py index ee0f2f94..3691a5ce 100644 --- a/unit_tests/sources/declarative/extractors/test_record_selector.py +++ b/unit_tests/sources/declarative/extractors/test_record_selector.py @@ -74,6 +74,19 @@ [], [], ), + ( + "test_the original response is available in filters and transformations", + ["data"], + "{{ record['created_at'] == stream_state.response.data[1].created_at }}", + { + "data": [ + {"id": 1, "created_at": "06-06-21"}, + {"id": 2, "created_at": "06-07-21"}, + {"id": 3, "created_at": "06-08-21"}, + ] + }, + [{"id": 2, "created_at": "06-07-21"}], + ), ], ) def test_record_filter(test_name, field_path, filter_template, body, expected_data): From fabdf7cb8ca8f88861b464b91033bbd6d423b5e8 Mon Sep 17 00:00:00 2001 From: Rusi Popov Date: Thu, 26 Dec 2024 22:59:11 +0200 Subject: [PATCH 2/3] Resolved mypy's complain on next() use on iterable by building an iterator from it --- airbyte_cdk/sources/declarative/extractors/record_selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 3561f107..0bc17086 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -92,7 +92,7 @@ def select_records( all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) response_root_iterator = self.response_root_extractor.extract_records(response) - stream_state.update({STREAM_SLICE_RESPONSE_ROOT_KEY: next(response_root_iterator, None)}) + stream_state.update({STREAM_SLICE_RESPONSE_ROOT_KEY: next(iter(response_root_iterator), None)}) try: yield from self.filter_and_transform( all_data, stream_state, records_schema, stream_slice, next_page_token From c7f16eb08084d9fcdf417939caae084b2f31a6bc Mon Sep 17 00:00:00 2001 From: Rusi Popov Date: Thu, 26 Dec 2024 23:34:52 +0200 Subject: [PATCH 3/3] Used a dictionary instead of StreamState and mypy agreed. But the tests failed due to testing the call stack frames too. As the latter breaks any encapsulation, I removed the call steck check from the test. --- .../declarative/extractors/record_selector.py | 18 ++++++++++-------- .../extractors/test_record_selector.py | 6 ------ 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 0bc17086..c62ae7ae 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -91,14 +91,16 @@ def select_records( """ all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) - response_root_iterator = self.response_root_extractor.extract_records(response) - stream_state.update({STREAM_SLICE_RESPONSE_ROOT_KEY: next(iter(response_root_iterator), None)}) - try: - yield from self.filter_and_transform( - all_data, stream_state, records_schema, stream_slice, next_page_token - ) - finally: - stream_state.pop(STREAM_SLICE_RESPONSE_ROOT_KEY) + response_root_iterator = iter(self.response_root_extractor.extract_records(response)) + + enhanced_stream_state = {k: v for k, v in stream_state.items()} + enhanced_stream_state.update( + {STREAM_SLICE_RESPONSE_ROOT_KEY: next(response_root_iterator, None)} + ) + + yield from self.filter_and_transform( + all_data, enhanced_stream_state, records_schema, stream_slice, next_page_token + ) def filter_and_transform( self, diff --git a/unit_tests/sources/declarative/extractors/test_record_selector.py b/unit_tests/sources/declarative/extractors/test_record_selector.py index 3691a5ce..ee479ed5 100644 --- a/unit_tests/sources/declarative/extractors/test_record_selector.py +++ b/unit_tests/sources/declarative/extractors/test_record_selector.py @@ -133,14 +133,8 @@ def test_record_filter(test_name, field_path, filter_template, body, expected_da Record(data=data, associated_slice=stream_slice, stream_name="") for data in expected_data ] - calls = [] - for record in expected_data: - calls.append( - call(record, config=config, stream_state=stream_state, stream_slice=stream_slice) - ) for transformation in transformations: assert transformation.transform.call_count == len(expected_data) - transformation.transform.assert_has_calls(calls) @pytest.mark.parametrize(