Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add flatten fields #181

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
FlattenFields:
title: Flatten Fields
description: A transformation that flatten record to single level format.
type: object
required:
- type
properties:
type:
type: string
enum: [FlattenFields]
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FlattenFields as FlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipJsonDecoder as GzipJsonDecoderModel,
)
Expand Down Expand Up @@ -387,6 +390,9 @@
RemoveFields,
)
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
Expand Down Expand Up @@ -472,6 +478,7 @@ def _init_mappings(self) -> None:
JsonlDecoderModel: self.create_jsonl_decoder,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
Expand Down Expand Up @@ -587,6 +594,11 @@ def create_keys_to_lower_transformation(
) -> KeysToLowerTransformation:
return KeysToLowerTransformation()

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
return FlattenFields()

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
Expand Down
50 changes: 50 additions & 0 deletions airbyte_cdk/sources/declarative/transformations/flatten_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Dict, Optional

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class FlattenFields(RecordTransformation):
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
transformed_record = self.flatten_record(record)
record.clear()
record.update(transformed_record)

lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
stack = [(record, "_")]
transformed_record = {}
force_with_parent_name = False

while stack:
current_record, parent_key = stack.pop()

if isinstance(current_record, dict):
for current_key, value in current_record.items():
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))

elif isinstance(current_record, list):
for i, item in enumerate(current_record):
force_with_parent_name = True
stack.append((item, f"{parent_key}.{i}"))

else:
transformed_record[parent_key] = current_record

return transformed_record
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import pytest

from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)


@pytest.mark.parametrize(
"input_record, expected_output",
[
({"FirstName": "John", "LastName": "Doe"}, {"FirstName": "John", "LastName": "Doe"}),
({"123Number": 123, "456Another123": 456}, {"123Number": 123, "456Another123": 456}),
(
{
"NestedRecord": {"FirstName": "John", "LastName": "Doe"},
"456Another123": 456,
},
{
"FirstName": "John",
"LastName": "Doe",
"456Another123": 456,
},
),
(
{"ListExample": [{"A": "a"}, {"A": "b"}]},
{"ListExample.0.A": "a", "ListExample.1.A": "b"},
),
(
{
"MixedCase123": {
"Nested": [{"Key": {"Value": "test1"}}, {"Key": {"Value": "test2"}}]
},
"SimpleKey": "SimpleValue",
},
{
"Nested.0.Key.Value": "test1",
"Nested.1.Key.Value": "test2",
"SimpleKey": "SimpleValue",
},
),
(
{"List": ["Item1", "Item2", "Item3"]},
{"List.0": "Item1", "List.1": "Item2", "List.2": "Item3"},
),
],
)
def test_flatten_fields(input_record, expected_output):
flattener = FlattenFields()
flattener.transform(input_record)
assert input_record == expected_output
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
Loading