Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(HttpComponentsResolver): added stream slices to HttpComponentsResolver #175

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3049,6 +3049,7 @@ definitions:
interpolation_context:
- config
- components_values
- stream_slice
- stream_template_config
examples:
- ["data"]
Expand All @@ -3065,10 +3066,13 @@ definitions:
- config
- stream_template_config
- components_values
- stream_slice
examples:
- "{{ components_values['updates'] }}"
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
- "{{ config['segment_id'] }}"
- "{{ stream_slice['parent_id'] }}"
- "{{ stream_slice['extra_fields']['name'] }}"
value_type:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,7 @@ def create_http_components_resolver(
config=config,
name="",
primary_key=None,
stream_slicer=combined_slicers,
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
transformations=[],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,25 @@ def resolve_components(
"""
kwargs = {"stream_template_config": stream_template_config}

for components_values in self.retriever.read_records({}):
updated_config = deepcopy(stream_template_config)
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]

for resolved_component in self._resolved_components:
valid_types = (
(resolved_component.value_type,) if resolved_component.value_type else None
)
value = resolved_component.value.eval(
self.config, valid_types=valid_types, **kwargs
)
for stream_slice in self.retriever.stream_slices():
for components_values in self.retriever.read_records(
records_schema={}, stream_slice=stream_slice
):
updated_config = deepcopy(stream_template_config)
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]

for resolved_component in self._resolved_components:
valid_types = (
(resolved_component.value_type,) if resolved_component.value_type else None
)
value = resolved_component.value.eval(
self.config, valid_types=valid_types, **kwargs
)

path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
dpath.set(updated_config, path, value)
path = [
path.eval(self.config, **kwargs) for path in resolved_component.field_path
]
dpath.set(updated_config, path, value)

yield updated_config
yield updated_config
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,133 @@
],
}

_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = {
"version": "6.7.0",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"dynamic_streams": [
{
"type": "DynamicDeclarativeStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
},
},
"components_resolver": {
"type": "HttpComponentsResolver",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "parent/{{ stream_partition.parent_id }}/items",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
"partition_router": {
"type": "SubstreamPartitionRouter",
"parent_stream_configs": [
{
"type": "ParentStreamConfig",
"parent_key": "id",
"partition_field": "parent_id",
"stream": {
"type": "DeclarativeStream",
"name": "parent",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "/parents",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": [],
},
},
},
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {"id": {"type": "integer"}},
"type": "object",
},
},
},
}
],
},
},
"components_mapping": [
{
"type": "ComponentMappingDefinition",
"field_path": ["name"],
"value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}",
},
{
"type": "ComponentMappingDefinition",
"field_path": [
"retriever",
"requester",
"path",
],
"value": "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}",
},
],
},
}
],
}


@pytest.mark.parametrize(
"components_mapping, retriever_data, stream_template_config, expected_result",
Expand All @@ -221,6 +348,44 @@ def test_http_components_resolver(
):
mock_retriever = MagicMock()
mock_retriever.read_records.return_value = retriever_data
mock_retriever.stream_slices.return_value = [{}]
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
config = {}

resolver = HttpComponentsResolver(
retriever=mock_retriever,
config=config,
components_mapping=components_mapping,
parameters={},
)

result = list(resolver.resolve_components(stream_template_config=stream_template_config))
assert result == expected_result


@pytest.mark.parametrize(
"components_mapping, retriever_data, stream_template_config, expected_result",
[
(
[
ComponentMappingDefinition(
field_path=[InterpolatedString.create("path", parameters={})],
value="{{stream_slice['parent_id']}}/{{components_values['id']}}",
value_type=str,
parameters={},
)
],
[{"id": "1", "field1": "data1"}, {"id": "2", "field1": "data2"}],
{"path": None},
[{"path": "1/1"}, {"path": "1/2"}, {"path": "2/1"}, {"path": "2/2"}],
)
],
)
def test_http_components_resolver_with_stream_slices(
components_mapping, retriever_data, stream_template_config, expected_result
):
mock_retriever = MagicMock()
mock_retriever.read_records.return_value = retriever_data
mock_retriever.stream_slices.return_value = [{"parent_id": 1}, {"parent_id": 2}]
config = {}

resolver = HttpComponentsResolver(
Expand Down Expand Up @@ -305,3 +470,62 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver():
str(exc_info.value)
== "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support."
)


def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream():
expected_stream_names = [
"parent_1_item_1",
"parent_1_item_2",
"parent_2_item_1",
"parent_2_item_2",
]
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/parents"),
HttpResponse(body=json.dumps([{"id": 1}, {"id": 2}])),
)
parent_ids = [1, 2]
for parent_id in parent_ids:
http_mocker.get(
HttpRequest(url=f"https://api.test.com/parent/{parent_id}/items"),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1"},
{"id": 2, "name": "item_2"},
]
)
),
)
dynamic_stream_paths = ["1/1", "2/1", "1/2", "2/2"]
for dynamic_stream_path in dynamic_stream_paths:
http_mocker.get(
HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"),
HttpResponse(body=json.dumps([{"ABC": 1, "AED": 2}])),
)

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM,
config=_CONFIG,
catalog=None,
state=None,
)

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

configured_streams = [
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
for stream in actual_catalog.streams
]
configured_catalog = to_configured_catalog(configured_streams)

records = [
message.record
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
if message.type == Type.RECORD
]

assert len(actual_catalog.streams) == 4
assert [stream.name for stream in actual_catalog.streams] == expected_stream_names
assert len(records) == 4
assert [record.stream for record in records] == expected_stream_names
Loading