diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index c47cffa9..c262deb6 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1236,6 +1236,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" state_migrations: title: State Migrations description: Array of state migrations to be applied on the input state @@ -1780,6 +1781,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" schema_type_identifier: "$ref": "#/definitions/SchemaTypeIdentifier" $parameters: @@ -1878,6 +1880,35 @@ definitions: $parameters: type: object additionalProperties: true + KeysReplace: + title: Keys Replace + description: A transformation that replaces symbols in keys. + type: object + required: + - type + - old + - new + properties: + type: + type: string + enum: [KeysReplace] + old: + type: string + title: Old value + description: Old value to replace. + examples: + - " " + - "_" + new: + type: string + title: New value + description: New value to set. + examples: + - "_" + - " " + $parameters: + type: object + additionalProperties: true IterableDecoder: title: Iterable Decoder description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 1d980ca6..ba4944e8 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -715,6 +715,23 @@ class KeysToSnakeCase(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class KeysReplace(BaseModel): + type: Literal["KeysReplace"] + old: str = Field( + ..., + description="Old value to replace.", + examples=[" ", "_"], + title="Old value", + ) + new: str = Field( + ..., + description="New value to set.", + examples=["_", " "], + title="New value", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class FlattenFields(BaseModel): type: Literal["FlattenFields"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1672,6 +1689,7 @@ class Config: KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( @@ -1846,6 +1864,7 @@ class DynamicSchemaLoader(BaseModel): KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index ad25c250..f52d740d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -236,6 +236,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtPayload as JwtPayloadModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + KeysReplace as KeysReplaceModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( KeysToLower as KeysToLowerModel, ) @@ -399,6 +402,9 @@ from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, ) +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) @@ -488,6 +494,7 @@ def _init_mappings(self) -> None: GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, + KeysReplaceModel: self.create_keys_replace_transformation, FlattenFieldsModel: self.create_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -609,6 +616,11 @@ def create_keys_to_snake_transformation( ) -> KeysToSnakeCaseTransformation: return KeysToSnakeCaseTransformation() + def create_keys_replace_transformation( + self, model: KeysReplaceModel, config: Config, **kwargs: Any + ) -> KeysReplaceTransformation: + return KeysReplaceTransformation(old=model.old, new=model.new) + def create_flatten_fields( self, model: FlattenFieldsModel, config: Config, **kwargs: Any ) -> FlattenFields: diff --git a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py new file mode 100644 index 00000000..d648d3bc --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Dict, Optional + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class KeysReplaceTransformation(RecordTransformation): + old: str + new: str + + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + for key in set(record.keys()): + record[key.replace(self.old, self.new)] = record.pop(key) diff --git a/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py new file mode 100644 index 00000000..700bae5d --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) + +_ANY_VALUE = -1 + + +def test_transform(): + record = {"date time": _ANY_VALUE, "customer id": _ANY_VALUE} + KeysReplaceTransformation(old=" ", new="_").transform(record) + assert record == {"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE}