From 9b364f377ce5142862afbe234a3ef39903a4c9de Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Wed, 18 Dec 2024 04:52:20 +0100 Subject: [PATCH 1/2] Add flatten fields --- .../declarative_component_schema.yaml | 13 +++++ .../models/declarative_component_schema.py | 5 ++ .../parsers/model_to_component_factory.py | 12 +++++ .../transformations/flatten_fields.py | 50 +++++++++++++++++ .../transformations/test_flatten_fields.py | 54 +++++++++++++++++++ 5 files changed, 134 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/transformations/flatten_fields.py create mode 100644 unit_tests/sources/declarative/transformations/test_flatten_fields.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 10fa2331..ff8ee683 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1852,6 +1852,19 @@ definitions: $parameters: type: object additionalProperties: true + FlattenFields: + title: Flatten Fields + description: A transformation that flatten record to single level format. + type: object + required: + - type + properties: + type: + type: string + enum: [FlattenFields] + $parameters: + type: object + additionalProperties: true IterableDecoder: title: Iterable Decoder description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e45f3afc..8bd29fdb 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -715,6 +715,11 @@ class KeysToSnakeCase(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class FlattenFields(BaseModel): + type: Literal["FlattenFields"] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class IterableDecoder(BaseModel): type: Literal["IterableDecoder"] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 215d6fff..3ff6a871 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -197,6 +197,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + FlattenFields as FlattenFieldsModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) @@ -387,6 +390,9 @@ RemoveFields, ) from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( + FlattenFields, +) from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) @@ -472,6 +478,7 @@ def _init_mappings(self) -> None: JsonlDecoderModel: self.create_jsonl_decoder, GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, + FlattenFieldsModel: self.create_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, @@ -587,6 +594,11 @@ def create_keys_to_lower_transformation( ) -> KeysToLowerTransformation: return KeysToLowerTransformation() + def create_flatten_fields( + self, model: FlattenFieldsModel, config: Config, **kwargs: Any + ) -> FlattenFields: + return FlattenFields() + @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: diff --git a/airbyte_cdk/sources/declarative/transformations/flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py new file mode 100644 index 00000000..6224e109 --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py @@ -0,0 +1,50 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Dict, Optional + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class FlattenFields(RecordTransformation): + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + transformed_record = self.flatten_record(record) + record.clear() + record.update(transformed_record) + + def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]: + stack = [(record, "_")] + transformed_record = {} + force_with_parent_name = False + + while stack: + current_record, parent_key = stack.pop() + + if isinstance(current_record, dict): + for current_key, value in current_record.items(): + new_key = ( + f"{parent_key}.{current_key}" + if (current_key in transformed_record or force_with_parent_name) + else current_key + ) + stack.append((value, new_key)) + + elif isinstance(current_record, list): + for i, item in enumerate(current_record): + force_with_parent_name = True + stack.append((item, f"{parent_key}.{i}")) + + else: + transformed_record[parent_key] = current_record + + return transformed_record diff --git a/unit_tests/sources/declarative/transformations/test_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_flatten_fields.py new file mode 100644 index 00000000..4cf53a54 --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_flatten_fields.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import pytest + +from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( + FlattenFields, +) + + +@pytest.mark.parametrize( + "input_record, expected_output", + [ + ({"FirstName": "John", "LastName": "Doe"}, {"FirstName": "John", "LastName": "Doe"}), + ({"123Number": 123, "456Another123": 456}, {"123Number": 123, "456Another123": 456}), + ( + { + "NestedRecord": {"FirstName": "John", "LastName": "Doe"}, + "456Another123": 456, + }, + { + "FirstName": "John", + "LastName": "Doe", + "456Another123": 456, + }, + ), + ( + {"ListExample": [{"A": "a"}, {"A": "b"}]}, + {"ListExample.0.A": "a", "ListExample.1.A": "b"}, + ), + ( + { + "MixedCase123": { + "Nested": [{"Key": {"Value": "test1"}}, {"Key": {"Value": "test2"}}] + }, + "SimpleKey": "SimpleValue", + }, + { + "Nested.0.Key.Value": "test1", + "Nested.1.Key.Value": "test2", + "SimpleKey": "SimpleValue", + }, + ), + ( + {"List": ["Item1", "Item2", "Item3"]}, + {"List.0": "Item1", "List.1": "Item2", "List.2": "Item3"}, + ), + ], +) +def test_flatten_fields(input_record, expected_output): + flattener = FlattenFields() + flattener.transform(input_record) + assert input_record == expected_output From d5b4f651daae11f2dba852d0d7936a501547f651 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Wed, 18 Dec 2024 05:17:19 +0100 Subject: [PATCH 2/2] Fix mypy --- .../sources/declarative/transformations/flatten_fields.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/transformations/flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py index 6224e109..0cc30839 100644 --- a/airbyte_cdk/sources/declarative/transformations/flatten_fields.py +++ b/airbyte_cdk/sources/declarative/transformations/flatten_fields.py @@ -24,7 +24,7 @@ def transform( def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]: stack = [(record, "_")] - transformed_record = {} + transformed_record: Dict[str, Any] = {} force_with_parent_name = False while stack: