From 4641bd2edc422c927179df5fc9465fbcd652223e Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Mon, 16 Dec 2024 17:16:18 +0200 Subject: [PATCH 1/6] added stream slices to http component resolver --- .../declarative_component_schema.yaml | 4 +++ .../parsers/model_to_component_factory.py | 2 +- .../resolvers/http_components_resolver.py | 30 +++++++++++-------- .../test_http_components_resolver.py | 1 + 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 461cfa76..a0c4469b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3007,6 +3007,7 @@ definitions: interpolation_context: - config - components_values + - stream_slice - stream_template_config examples: - ["data"] @@ -3023,10 +3024,13 @@ definitions: - config - stream_template_config - components_values + - stream_slice examples: - "{{ components_values['updates'] }}" - "{{ components_values['MetaData']['LastUpdatedTime'] }}" - "{{ config['segment_id'] }}" + - "{{ stream_slice['parent_id'] }}" + - "{{ stream_slice['extra_fields']['name'] }}" value_type: title: Value Type description: The expected data type of the value. If omitted, the type will be inferred from the value provided. diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 215d6fff..91ad409f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2353,7 +2353,7 @@ def create_http_components_resolver( config=config, name="", primary_key=None, - stream_slicer=combined_slicers, + stream_slicer=stream_slicer if stream_slicer else combined_slicers, transformations=[], ) diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 322b4368..4633513b 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -88,19 +88,23 @@ def resolve_components( """ kwargs = {"stream_template_config": stream_template_config} - for components_values in self.retriever.read_records({}): - updated_config = deepcopy(stream_template_config) - kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] + stream_slices = self.retriever.stream_slices() if self.retriever.stream_slicer else [{}] - for resolved_component in self._resolved_components: - valid_types = ( - (resolved_component.value_type,) if resolved_component.value_type else None - ) - value = resolved_component.value.eval( - self.config, valid_types=valid_types, **kwargs - ) + for stream_slice in stream_slices: + for components_values in self.retriever.read_records({}, stream_slice): + updated_config = deepcopy(stream_template_config) + kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] + kwargs["stream_slice"] = stream_slice + + for resolved_component in self._resolved_components: + valid_types = ( + (resolved_component.value_type,) if resolved_component.value_type else None + ) + value = resolved_component.value.eval( + self.config, valid_types=valid_types, **kwargs + ) - path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] - dpath.set(updated_config, path, value) + path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] + dpath.set(updated_config, path, value) - yield updated_config + yield updated_config diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 1bbf2a41..0a9c12a5 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -221,6 +221,7 @@ def test_http_components_resolver( ): mock_retriever = MagicMock() mock_retriever.read_records.return_value = retriever_data + mock_retriever.stream_slices.return_value = [{}] config = {} resolver = HttpComponentsResolver( From d9291e0bf57d4b43218837a7c8227b1a12a75d35 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Mon, 16 Dec 2024 15:27:28 +0000 Subject: [PATCH 2/6] Auto-fix lint and format issues --- .../sources/declarative/resolvers/http_components_resolver.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 4633513b..f1c154ad 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -104,7 +104,9 @@ def resolve_components( self.config, valid_types=valid_types, **kwargs ) - path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] + path = [ + path.eval(self.config, **kwargs) for path in resolved_component.field_path + ] dpath.set(updated_config, path, value) yield updated_config From 18f948d83712412516b4b67ca441b402a3c63f8e Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 18 Dec 2024 16:20:31 +0200 Subject: [PATCH 3/6] removed condition from resolve_components() --- .../declarative/resolvers/http_components_resolver.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index f1c154ad..b18433cc 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -88,13 +88,11 @@ def resolve_components( """ kwargs = {"stream_template_config": stream_template_config} - stream_slices = self.retriever.stream_slices() if self.retriever.stream_slicer else [{}] - - for stream_slice in stream_slices: - for components_values in self.retriever.read_records({}, stream_slice): + for stream_slice in self.retriever.stream_slices(): + for components_values in self.retriever.read_records(records_schema={}, stream_slice=stream_slice): updated_config = deepcopy(stream_template_config) kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] - kwargs["stream_slice"] = stream_slice + kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] for resolved_component in self._resolved_components: valid_types = ( From fb21cd407334c9e95fce843771690a46df4f9976 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 18 Dec 2024 16:22:53 +0200 Subject: [PATCH 4/6] added unit tests --- .../test_http_components_resolver.py | 217 +++++++++++++++++- 1 file changed, 216 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 0a9c12a5..5582d517 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -197,7 +197,131 @@ ], } - +_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "parent/{{ stream_partition.parent_id }}/items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "parent_id", + "stream": { + "type": "DeclarativeStream", + "name": "parent", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/parents", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"} + }, + "type": "object", + }, + }, + } + } + ] + } + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "path", + ], + "value": "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}", + }, + ], + }, + } + ], +} @pytest.mark.parametrize( "components_mapping, retriever_data, stream_template_config, expected_result", [ @@ -234,6 +358,41 @@ def test_http_components_resolver( result = list(resolver.resolve_components(stream_template_config=stream_template_config)) assert result == expected_result +@pytest.mark.parametrize( + "components_mapping, retriever_data, stream_template_config, expected_result", + [ + ( + [ + ComponentMappingDefinition( + field_path=[InterpolatedString.create("path", parameters={})], + value="{{stream_slice['parent_id']}}/{{components_values['id']}}", + value_type=str, + parameters={}, + ) + ], + [{"id": "1", "field1": "data1"}, {"id": "2", "field1": "data2"}], + {"path": None}, + [{"path": "1/1"}, {"path": "1/2"}, {"path": "2/1"}, {"path": "2/2"}], + ) + ], +) +def test_http_components_resolver_with_stream_slices( + components_mapping, retriever_data, stream_template_config, expected_result +): + mock_retriever = MagicMock() + mock_retriever.read_records.return_value = retriever_data + mock_retriever.stream_slices.return_value = [{"parent_id": 1}, {"parent_id": 2}] + config = {} + + resolver = HttpComponentsResolver( + retriever=mock_retriever, + config=config, + components_mapping=components_mapping, + parameters={}, + ) + + result = list(resolver.resolve_components(stream_template_config=stream_template_config)) + assert result == expected_result def test_dynamic_streams_read_with_http_components_resolver(): expected_stream_names = ["item_1", "item_2"] @@ -306,3 +465,59 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver(): str(exc_info.value) == "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support." ) + +def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream(): + expected_stream_names = [ + "parent_1_item_1", "parent_1_item_2", "parent_2_item_1", "parent_2_item_2" + ] + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/parents"), + HttpResponse( + body=json.dumps([{"id": 1}, {"id": 2}]) + ), + ) + parent_ids = [1, 2] + for parent_id in parent_ids: + http_mocker.get( + HttpRequest(url=f"https://api.test.com/parent/{parent_id}/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + ] + ) + ), + ) + dynamic_stream_paths = ["1/1", "2/1", "1/2", "2/2"] + for dynamic_stream_path in dynamic_stream_paths: + http_mocker.get( + HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"), + HttpResponse( + body=json.dumps([{"ABC": 1, "AED": 2}]) + ), + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM, config=_CONFIG, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + records = [ + message.record + for message in source.read(MagicMock(), _CONFIG, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(actual_catalog.streams) == 4 + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == 4 + assert [record.stream for record in records] == expected_stream_names \ No newline at end of file From ec9e480b98760b3f91e5c8e30a656c96515c537a Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Wed, 18 Dec 2024 14:26:58 +0000 Subject: [PATCH 5/6] Auto-fix lint and format issues --- .../resolvers/http_components_resolver.py | 4 +- .../test_http_components_resolver.py | 40 +++++++++++-------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index b18433cc..6e85fc57 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -89,7 +89,9 @@ def resolve_components( kwargs = {"stream_template_config": stream_template_config} for stream_slice in self.retriever.stream_slices(): - for components_values in self.retriever.read_records(records_schema={}, stream_slice=stream_slice): + for components_values in self.retriever.read_records( + records_schema={}, stream_slice=stream_slice + ): updated_config = deepcopy(stream_template_config) kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 5582d517..00d0ddc5 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -284,23 +284,24 @@ }, "record_selector": { "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, + "extractor": { + "type": "DpathExtractor", + "field_path": [], + }, }, }, "schema_loader": { "type": "InlineSchemaLoader", "schema": { "$schema": "http://json-schema.org/schema#", - "properties": { - "id": {"type": "integer"} - }, + "properties": {"id": {"type": "integer"}}, "type": "object", }, }, - } + }, } - ] - } + ], + }, }, "components_mapping": [ { @@ -322,6 +323,8 @@ } ], } + + @pytest.mark.parametrize( "components_mapping, retriever_data, stream_template_config, expected_result", [ @@ -358,6 +361,7 @@ def test_http_components_resolver( result = list(resolver.resolve_components(stream_template_config=stream_template_config)) assert result == expected_result + @pytest.mark.parametrize( "components_mapping, retriever_data, stream_template_config, expected_result", [ @@ -394,6 +398,7 @@ def test_http_components_resolver_with_stream_slices( result = list(resolver.resolve_components(stream_template_config=stream_template_config)) assert result == expected_result + def test_dynamic_streams_read_with_http_components_resolver(): expected_stream_names = ["item_1", "item_2"] with HttpMocker() as http_mocker: @@ -466,16 +471,18 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver(): == "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support." ) + def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream(): expected_stream_names = [ - "parent_1_item_1", "parent_1_item_2", "parent_2_item_1", "parent_2_item_2" + "parent_1_item_1", + "parent_1_item_2", + "parent_2_item_1", + "parent_2_item_2", ] with HttpMocker() as http_mocker: http_mocker.get( HttpRequest(url="https://api.test.com/parents"), - HttpResponse( - body=json.dumps([{"id": 1}, {"id": 2}]) - ), + HttpResponse(body=json.dumps([{"id": 1}, {"id": 2}])), ) parent_ids = [1, 2] for parent_id in parent_ids: @@ -494,13 +501,14 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str for dynamic_stream_path in dynamic_stream_paths: http_mocker.get( HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"), - HttpResponse( - body=json.dumps([{"ABC": 1, "AED": 2}]) - ), + HttpResponse(body=json.dumps([{"ABC": 1, "AED": 2}])), ) source = ConcurrentDeclarativeSource( - source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM, config=_CONFIG, catalog=None, state=None + source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM, + config=_CONFIG, + catalog=None, + state=None, ) actual_catalog = source.discover(logger=source.logger, config=_CONFIG) @@ -520,4 +528,4 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str assert len(actual_catalog.streams) == 4 assert [stream.name for stream in actual_catalog.streams] == expected_stream_names assert len(records) == 4 - assert [record.stream for record in records] == expected_stream_names \ No newline at end of file + assert [record.stream for record in records] == expected_stream_names From 43a1651fc4979339cdbdcbbcab891d7dfa4c9d2c Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 20 Dec 2024 13:35:02 +0200 Subject: [PATCH 6/6] updated unit tests --- .../declarative/resolvers/test_http_components_resolver.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 00d0ddc5..de3d3794 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -528,4 +528,8 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str assert len(actual_catalog.streams) == 4 assert [stream.name for stream in actual_catalog.streams] == expected_stream_names assert len(records) == 4 - assert [record.stream for record in records] == expected_stream_names + + actual_record_stream_names = [record.stream for record in records] + actual_record_stream_names.sort() + + assert actual_record_stream_names == expected_stream_names