From f8054a82deecf2e00d507664772f04698e64b110 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko <80129833+darynaishchenko@users.noreply.github.com> Date: Fri, 20 Dec 2024 14:10:27 +0200 Subject: [PATCH] feat(HttpComponentsResolver): added stream slices to HttpComponentsResolver (#175) Co-authored-by: octavia-squidington-iii --- .../declarative_component_schema.yaml | 4 + .../parsers/model_to_component_factory.py | 2 +- .../resolvers/http_components_resolver.py | 34 +-- .../test_http_components_resolver.py | 228 ++++++++++++++++++ 4 files changed, 253 insertions(+), 15 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 662dce34..c47cffa9 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3049,6 +3049,7 @@ definitions: interpolation_context: - config - components_values + - stream_slice - stream_template_config examples: - ["data"] @@ -3065,10 +3066,13 @@ definitions: - config - stream_template_config - components_values + - stream_slice examples: - "{{ components_values['updates'] }}" - "{{ components_values['MetaData']['LastUpdatedTime'] }}" - "{{ config['segment_id'] }}" + - "{{ stream_slice['parent_id'] }}" + - "{{ stream_slice['extra_fields']['name'] }}" value_type: title: Value Type description: The expected data type of the value. If omitted, the type will be inferred from the value provided. diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a0551be2..ad25c250 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2394,7 +2394,7 @@ def create_http_components_resolver( config=config, name="", primary_key=None, - stream_slicer=combined_slicers, + stream_slicer=stream_slicer if stream_slicer else combined_slicers, transformations=[], ) diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 322b4368..6e85fc57 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -88,19 +88,25 @@ def resolve_components( """ kwargs = {"stream_template_config": stream_template_config} - for components_values in self.retriever.read_records({}): - updated_config = deepcopy(stream_template_config) - kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] - - for resolved_component in self._resolved_components: - valid_types = ( - (resolved_component.value_type,) if resolved_component.value_type else None - ) - value = resolved_component.value.eval( - self.config, valid_types=valid_types, **kwargs - ) + for stream_slice in self.retriever.stream_slices(): + for components_values in self.retriever.read_records( + records_schema={}, stream_slice=stream_slice + ): + updated_config = deepcopy(stream_template_config) + kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] + kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] + + for resolved_component in self._resolved_components: + valid_types = ( + (resolved_component.value_type,) if resolved_component.value_type else None + ) + value = resolved_component.value.eval( + self.config, valid_types=valid_types, **kwargs + ) - path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] - dpath.set(updated_config, path, value) + path = [ + path.eval(self.config, **kwargs) for path in resolved_component.field_path + ] + dpath.set(updated_config, path, value) - yield updated_config + yield updated_config diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 1bbf2a41..de3d3794 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -197,6 +197,133 @@ ], } +_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "parent/{{ stream_partition.parent_id }}/items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "parent_id", + "stream": { + "type": "DeclarativeStream", + "name": "parent", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/parents", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": [], + }, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {"id": {"type": "integer"}}, + "type": "object", + }, + }, + }, + } + ], + }, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "path", + ], + "value": "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}", + }, + ], + }, + } + ], +} + @pytest.mark.parametrize( "components_mapping, retriever_data, stream_template_config, expected_result", @@ -221,6 +348,44 @@ def test_http_components_resolver( ): mock_retriever = MagicMock() mock_retriever.read_records.return_value = retriever_data + mock_retriever.stream_slices.return_value = [{}] + config = {} + + resolver = HttpComponentsResolver( + retriever=mock_retriever, + config=config, + components_mapping=components_mapping, + parameters={}, + ) + + result = list(resolver.resolve_components(stream_template_config=stream_template_config)) + assert result == expected_result + + +@pytest.mark.parametrize( + "components_mapping, retriever_data, stream_template_config, expected_result", + [ + ( + [ + ComponentMappingDefinition( + field_path=[InterpolatedString.create("path", parameters={})], + value="{{stream_slice['parent_id']}}/{{components_values['id']}}", + value_type=str, + parameters={}, + ) + ], + [{"id": "1", "field1": "data1"}, {"id": "2", "field1": "data2"}], + {"path": None}, + [{"path": "1/1"}, {"path": "1/2"}, {"path": "2/1"}, {"path": "2/2"}], + ) + ], +) +def test_http_components_resolver_with_stream_slices( + components_mapping, retriever_data, stream_template_config, expected_result +): + mock_retriever = MagicMock() + mock_retriever.read_records.return_value = retriever_data + mock_retriever.stream_slices.return_value = [{"parent_id": 1}, {"parent_id": 2}] config = {} resolver = HttpComponentsResolver( @@ -305,3 +470,66 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver(): str(exc_info.value) == "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support." ) + + +def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream(): + expected_stream_names = [ + "parent_1_item_1", + "parent_1_item_2", + "parent_2_item_1", + "parent_2_item_2", + ] + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/parents"), + HttpResponse(body=json.dumps([{"id": 1}, {"id": 2}])), + ) + parent_ids = [1, 2] + for parent_id in parent_ids: + http_mocker.get( + HttpRequest(url=f"https://api.test.com/parent/{parent_id}/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + ] + ) + ), + ) + dynamic_stream_paths = ["1/1", "2/1", "1/2", "2/2"] + for dynamic_stream_path in dynamic_stream_paths: + http_mocker.get( + HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"), + HttpResponse(body=json.dumps([{"ABC": 1, "AED": 2}])), + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM, + config=_CONFIG, + catalog=None, + state=None, + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + records = [ + message.record + for message in source.read(MagicMock(), _CONFIG, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(actual_catalog.streams) == 4 + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == 4 + + actual_record_stream_names = [record.stream for record in records] + actual_record_stream_names.sort() + + assert actual_record_stream_names == expected_stream_names