Skip to content

Commit

Permalink
Added condition for failure_type
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Dec 12, 2024
1 parent e02ee97 commit 079d32a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,17 @@ def _dynamic_stream_configs(
name = dynamic_stream.get("name")

if name in seen_dynamic_streams:
error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
failure_type = FailureType.system_error

if resolver_type == "ConfigComponentsResolverModel":
error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
failure_type = FailureType.config_error

raise AirbyteTracedException(
message=error_message,
internal_message=error_message,
failure_type=FailureType.config_error,
failure_type=failure_type,
)

seen_dynamic_streams.add(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
to_configured_stream,
)
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"}

Expand Down Expand Up @@ -192,3 +193,29 @@ def test_dynamic_streams_read_with_http_components_resolver():
assert [stream.name for stream in actual_catalog.streams] == expected_stream_names
assert len(records) == 2
assert [record.stream for record in records] == expected_stream_names


def test_duplicated_dynamic_streams_read_with_http_components_resolver():
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/items"),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1"},
{"id": 2, "name": "item_2"},
{"id": 3, "name": "item_2"},
]
)
),
)

with pytest.raises(AirbyteTracedException) as exc_info:
source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
)
source.discover(logger=source.logger, config=_CONFIG)
assert (
str(exc_info.value)
== "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support."
)

0 comments on commit 079d32a

Please sign in to comment.