From 3b99b6f0173eef9ead7070ee3de235639b172de2 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 20 Dec 2024 12:53:27 +0200 Subject: [PATCH 1/2] added keys replace transformation --- .../declarative_component_schema.yaml | 31 +++++++++++++++++++ .../models/declarative_component_schema.py | 19 ++++++++++++ .../parsers/model_to_component_factory.py | 12 +++++++ .../keys_replace_transformation.py | 25 +++++++++++++++ .../test_keys_replace_transformation.py | 15 +++++++++ 5 files changed, 102 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py create mode 100644 unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 662dce34..118b5bb5 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1236,6 +1236,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" state_migrations: title: State Migrations description: Array of state migrations to be applied on the input state @@ -1780,6 +1781,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" schema_type_identifier: "$ref": "#/definitions/SchemaTypeIdentifier" $parameters: @@ -1878,6 +1880,35 @@ definitions: $parameters: type: object additionalProperties: true + KeysReplace: + title: Keys Replace + description: A transformation that replaces symbols in keys. + type: object + required: + - type + - old + - new + properties: + type: + type: string + enum: [KeysReplace] + old: + type: string + title: Old value + description: Old value to replace. + examples: + - " " + - "_" + new: + type: string + title: New value + description: New value to set. + examples: + - "_" + - " " + $parameters: + type: object + additionalProperties: true IterableDecoder: title: Iterable Decoder description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 1d980ca6..ba4944e8 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -715,6 +715,23 @@ class KeysToSnakeCase(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class KeysReplace(BaseModel): + type: Literal["KeysReplace"] + old: str = Field( + ..., + description="Old value to replace.", + examples=[" ", "_"], + title="Old value", + ) + new: str = Field( + ..., + description="New value to set.", + examples=["_", " "], + title="New value", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class FlattenFields(BaseModel): type: Literal["FlattenFields"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1672,6 +1689,7 @@ class Config: KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( @@ -1846,6 +1864,7 @@ class DynamicSchemaLoader(BaseModel): KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a0551be2..3da6dc51 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -242,6 +242,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( KeysToSnakeCase as KeysToSnakeCaseModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + KeysReplace as KeysReplaceModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel, ) @@ -405,6 +408,9 @@ from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import ( KeysToSnakeCaseTransformation, ) +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) from airbyte_cdk.sources.message import ( InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, @@ -488,6 +494,7 @@ def _init_mappings(self) -> None: GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, + KeysReplaceModel: self.create_keys_replace_transformation, FlattenFieldsModel: self.create_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -609,6 +616,11 @@ def create_keys_to_snake_transformation( ) -> KeysToSnakeCaseTransformation: return KeysToSnakeCaseTransformation() + def create_keys_replace_transformation( + self, model: KeysReplaceModel, config: Config, **kwargs: Any + ) -> KeysReplaceTransformation: + return KeysReplaceTransformation(old=model.old, new=model.new) + def create_flatten_fields( self, model: FlattenFieldsModel, config: Config, **kwargs: Any ) -> FlattenFields: diff --git a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py new file mode 100644 index 00000000..d648d3bc --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Dict, Optional + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class KeysReplaceTransformation(RecordTransformation): + old: str + new: str + + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + for key in set(record.keys()): + record[key.replace(self.old, self.new)] = record.pop(key) diff --git a/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py new file mode 100644 index 00000000..700bae5d --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) + +_ANY_VALUE = -1 + + +def test_transform(): + record = {"date time": _ANY_VALUE, "customer id": _ANY_VALUE} + KeysReplaceTransformation(old=" ", new="_").transform(record) + assert record == {"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE} From 1cceccdb26777bce88f7b8b3593598342551b60b Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 20 Dec 2024 11:04:07 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../parsers/model_to_component_factory.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3da6dc51..16125c4d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -237,13 +237,13 @@ JwtPayload as JwtPayloadModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - KeysToLower as KeysToLowerModel, + KeysReplace as KeysReplaceModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - KeysToSnakeCase as KeysToSnakeCaseModel, + KeysToLower as KeysToLowerModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - KeysReplace as KeysReplaceModel, + KeysToSnakeCase as KeysToSnakeCaseModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel, @@ -402,15 +402,15 @@ from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, ) +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import ( KeysToSnakeCaseTransformation, ) -from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( - KeysReplaceTransformation, -) from airbyte_cdk.sources.message import ( InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator,