From 4759654e5f840d3eaf82c9e71bd1eb6bfe361b1a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:48:57 +0100 Subject: [PATCH 1/2] fix: (HttpClient) rate limit fix unlimited tries (#171) Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/streams/http/http_client.py | 6 ++++-- unit_tests/sources/streams/http/test_http_client.py | 7 +++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 71548441..c4fa8686 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -262,7 +262,7 @@ def _send_with_retry( user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)( self._send ) - rate_limit_backoff_handler = rate_limit_default_backoff_handler() + rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries) backoff_handler = http_client_default_backoff_handler( max_tries=max_tries, max_time=max_time ) @@ -472,7 +472,9 @@ def _handle_error_resolution( elif retry_endlessly: raise RateLimitBackoffException( - request=request, response=response or exc, error_message=error_message + request=request, + response=(response if response is not None else exc), + error_message=error_message, ) raise DefaultBackoffException( diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 29bac0ec..5cc6d20e 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -20,6 +20,7 @@ ) from airbyte_cdk.sources.streams.http.exceptions import ( DefaultBackoffException, + RateLimitBackoffException, RequestBodyException, UserDefinedBackoffException, ) @@ -690,10 +691,12 @@ def backoff_time(self, *args, **kwargs): @pytest.mark.parametrize( "exit_on_rate_limit, expected_call_count, expected_error", - [[True, 6, DefaultBackoffException], [False, 38, OverflowError]], + [[True, 6, DefaultBackoffException], [False, 6, RateLimitBackoffException]], ) @pytest.mark.usefixtures("mock_sleep") -def test_backoff_strategy_endless(exit_on_rate_limit, expected_call_count, expected_error): +def test_backoff_strategy_endless( + exit_on_rate_limit: bool, expected_call_count: int, expected_error: Exception +): http_client = HttpClient( name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()) ) From f222fccfc782b2cc40e7badb85d46e0271e4d606 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Wed, 18 Dec 2024 17:09:41 +0100 Subject: [PATCH 2/2] feat(low-code cdk): add flatten fields (#181) --- .../declarative_component_schema.yaml | 13 +++++ .../models/declarative_component_schema.py | 5 ++ .../parsers/model_to_component_factory.py | 12 +++++ .../transformations/flatten_fields.py | 50 +++++++++++++++++ .../transformations/test_flatten_fields.py | 54 +++++++++++++++++++ 5 files changed, 134 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/transformations/flatten_fields.py create mode 100644 unit_tests/sources/declarative/transformations/test_flatten_fields.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 10fa2331..ff8ee683 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1852,6 +1852,19 @@ definitions: $parameters: type: object additionalProperties: true + FlattenFields: + title: Flatten Fields + description: A transformation that flatten record to single level format. + type: object + required: + - type + properties: + type: + type: string + enum: [FlattenFields] + $parameters: + type: object + additionalProperties: true IterableDecoder: title: Iterable Decoder description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e45f3afc..8bd29fdb 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -715,6 +715,11 @@ class KeysToSnakeCase(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class FlattenFields(BaseModel): + type: Literal["FlattenFields"] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class IterableDecoder(BaseModel): type: Literal["IterableDecoder"] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 215d6fff..3ff6a871 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -197,6 +197,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + FlattenFields as FlattenFieldsModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) @@ -387,6 +390,9 @@ RemoveFields, ) from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( + FlattenFields, +) from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) @@ -472,6 +478,7 @@ def _init_mappings(self) -> None: JsonlDecoderModel: self.create_jsonl_decoder, GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, + FlattenFieldsModel: self.create_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, @@ -587,6 +594,11 @@ def create_keys_to_lower_transformation( ) -> KeysToLowerTransformation: return KeysToLowerTransformation() + def create_flatten_fields( + self, model: FlattenFieldsModel, config: Config, **kwargs: Any + ) -> FlattenFields: + return FlattenFields() + @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: diff --git a/airbyte_cdk/sources/declarative/transformations/flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py new file mode 100644 index 00000000..0cc30839 --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py @@ -0,0 +1,50 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Dict, Optional + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class FlattenFields(RecordTransformation): + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + transformed_record = self.flatten_record(record) + record.clear() + record.update(transformed_record) + + def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]: + stack = [(record, "_")] + transformed_record: Dict[str, Any] = {} + force_with_parent_name = False + + while stack: + current_record, parent_key = stack.pop() + + if isinstance(current_record, dict): + for current_key, value in current_record.items(): + new_key = ( + f"{parent_key}.{current_key}" + if (current_key in transformed_record or force_with_parent_name) + else current_key + ) + stack.append((value, new_key)) + + elif isinstance(current_record, list): + for i, item in enumerate(current_record): + force_with_parent_name = True + stack.append((item, f"{parent_key}.{i}")) + + else: + transformed_record[parent_key] = current_record + + return transformed_record diff --git a/unit_tests/sources/declarative/transformations/test_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_flatten_fields.py new file mode 100644 index 00000000..4cf53a54 --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_flatten_fields.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import pytest + +from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( + FlattenFields, +) + + +@pytest.mark.parametrize( + "input_record, expected_output", + [ + ({"FirstName": "John", "LastName": "Doe"}, {"FirstName": "John", "LastName": "Doe"}), + ({"123Number": 123, "456Another123": 456}, {"123Number": 123, "456Another123": 456}), + ( + { + "NestedRecord": {"FirstName": "John", "LastName": "Doe"}, + "456Another123": 456, + }, + { + "FirstName": "John", + "LastName": "Doe", + "456Another123": 456, + }, + ), + ( + {"ListExample": [{"A": "a"}, {"A": "b"}]}, + {"ListExample.0.A": "a", "ListExample.1.A": "b"}, + ), + ( + { + "MixedCase123": { + "Nested": [{"Key": {"Value": "test1"}}, {"Key": {"Value": "test2"}}] + }, + "SimpleKey": "SimpleValue", + }, + { + "Nested.0.Key.Value": "test1", + "Nested.1.Key.Value": "test2", + "SimpleKey": "SimpleValue", + }, + ), + ( + {"List": ["Item1", "Item2", "Item3"]}, + {"List.0": "Item1", "List.1": "Item2", "List.2": "Item3"}, + ), + ], +) +def test_flatten_fields(input_record, expected_output): + flattener = FlattenFields() + flattener.transform(input_record) + assert input_record == expected_output