Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): added keys replace transformation #183

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1780,6 +1781,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
Expand Down Expand Up @@ -1878,6 +1880,35 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeysReplace:
title: Keys Replace
description: A transformation that replaces symbols in keys.
type: object
required:
- type
- old
- new
properties:
type:
type: string
enum: [KeysReplace]
old:
type: string
title: Old value
description: Old value to replace.
examples:
- " "
- "_"
new:
type: string
title: New value
description: New value to set.
examples:
- "_"
- " "
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,23 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
...,
description="Old value to replace.",
examples=[" ", "_"],
title="Old value",
)
new: str = Field(
...,
description="New value to set.",
examples=["_", " "],
title="New value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
Expand Down Expand Up @@ -1672,6 +1689,7 @@ class Config:
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down Expand Up @@ -1846,6 +1864,7 @@ class DynamicSchemaLoader(BaseModel):
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtPayload as JwtPayloadModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysReplace as KeysReplaceModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToLower as KeysToLowerModel,
)
Expand Down Expand Up @@ -399,6 +402,9 @@
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
KeysReplaceTransformation,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
Expand Down Expand Up @@ -488,6 +494,7 @@ def _init_mappings(self) -> None:
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
KeysReplaceModel: self.create_keys_replace_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -609,6 +616,11 @@ def create_keys_to_snake_transformation(
) -> KeysToSnakeCaseTransformation:
return KeysToSnakeCaseTransformation()

def create_keys_replace_transformation(
self, model: KeysReplaceModel, config: Config, **kwargs: Any
) -> KeysReplaceTransformation:
return KeysReplaceTransformation(old=model.old, new=model.new)

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Dict, Optional

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class KeysReplaceTransformation(RecordTransformation):
old: str
new: str

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for key in set(record.keys()):
record[key.replace(self.old, self.new)] = record.pop(key)
Comment on lines +17 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider handling nested dictionaries recursively

The current implementation only handles top-level keys. Would you consider adding support for nested dictionaries? Here's a suggestion:

     def transform(
         self,
         record: Dict[str, Any],
         config: Optional[Config] = None,
         stream_state: Optional[StreamState] = None,
         stream_slice: Optional[StreamSlice] = None,
     ) -> None:
+        def transform_dict(d: Dict[str, Any]) -> Dict[str, Any]:
+            result = {}
+            for key, value in d.items():
+                new_key = key.replace(self.old, self.new)
+                if isinstance(value, dict):
+                    result[new_key] = transform_dict(value)
+                else:
+                    result[new_key] = value
+            return result
+
+        transformed = transform_dict(record)
+        record.clear()
+        record.update(transformed)
-        for key in set(record.keys()):
-            record[key.replace(self.old, self.new)] = record.pop(key)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for key in set(record.keys()):
record[key.replace(self.old, self.new)] = record.pop(key)
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
def transform_dict(d: Dict[str, Any]) -> Dict[str, Any]:
result = {}
for key, value in d.items():
new_key = key.replace(self.old, self.new)
if isinstance(value, dict):
result[new_key] = transform_dict(value)
else:
result[new_key] = value
return result
transformed = transform_dict(record)
record.clear()
record.update(transformed)

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
KeysReplaceTransformation,
)

_ANY_VALUE = -1


def test_transform():
record = {"date time": _ANY_VALUE, "customer id": _ANY_VALUE}
KeysReplaceTransformation(old=" ", new="_").transform(record)
assert record == {"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE}
Loading