Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add KeyToSnakeCase transformation #178

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,7 @@ definitions:
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1838,6 +1839,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeysToSnakeCase:
title: Key to Snake Case
description: A transformation that renames all keys to snake case.
type: object
required:
- type
properties:
type:
type: string
enum: [KeysToSnakeCase]
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down Expand Up @@ -2160,7 +2174,9 @@ definitions:
description: |-
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}
- {
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
}
access_token_params:
title: Access Token Query Params (Json Encoded)
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,11 @@ class KeysToLower(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysToSnakeCase(BaseModel):
type: Literal["KeysToSnakeCase"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down Expand Up @@ -1654,7 +1659,15 @@ class Config:
title="Schema Loader",
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
]
]
] = Field(
None,
description="A list of transformations to be applied to each output record.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import unidecode

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class KeysToSnakeCaseTransformation(RecordTransformation):
token_pattern: re.Pattern[str] = re.compile(
r"[A-Z]+[a-z]*|[a-z]+|\d+|(?P<NoToken>[^a-zA-Z\d]+)"
)

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
transformed_record = self._transform_record(record)
record.clear()
record.update(transformed_record)

def _transform_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
transformed_record = {}
for key, value in record.items():
transformed_key = self.process_key(key)
transformed_value = value

if isinstance(value, dict):
transformed_value = self._transform_record(value)

transformed_record[transformed_key] = transformed_value
return transformed_record

def process_key(self, key: str) -> str:
key = self.normalize_key(key)
tokens = self.tokenize_key(key)
tokens = self.filter_tokens(tokens)
return self.tokens_to_snake_case(tokens)
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved

def normalize_key(self, key: str) -> str:
return unidecode.unidecode(key)

def tokenize_key(self, key: str) -> List[str]:
tokens = []
for match in self.token_pattern.finditer(key):
token = match.group(0) if match.group("NoToken") is None else ""
tokens.append(token)
return tokens

def filter_tokens(self, tokens: List[str]) -> List[str]:
if len(tokens) >= 3:
tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:]
if tokens and tokens[0].isdigit():
tokens.insert(0, "")
return tokens

def tokens_to_snake_case(self, tokens: List[str]) -> str:
return "_".join(token.lower() for token in tokens)
Loading
Loading