Skip to content

Commit

Permalink
feat(low-code cdk): add KeyToSnakeCase transformation (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi authored Dec 18, 2024
1 parent 216cd43 commit adef1e8
Show file tree
Hide file tree
Showing 7 changed files with 548 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,7 @@ definitions:
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1838,6 +1839,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeysToSnakeCase:
title: Key to Snake Case
description: A transformation that renames all keys to snake case.
type: object
required:
- type
properties:
type:
type: string
enum: [KeysToSnakeCase]
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down Expand Up @@ -2160,7 +2174,9 @@ definitions:
description: |-
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}
- {
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
}
access_token_params:
title: Access Token Query Params (Json Encoded)
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,11 @@ class KeysToLower(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysToSnakeCase(BaseModel):
type: Literal["KeysToSnakeCase"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down Expand Up @@ -1654,7 +1659,15 @@ class Config:
title="Schema Loader",
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
]
]
] = Field(
None,
description="A list of transformations to be applied to each output record.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import unidecode

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class KeysToSnakeCaseTransformation(RecordTransformation):
token_pattern: re.Pattern[str] = re.compile(
r"[A-Z]+[a-z]*|[a-z]+|\d+|(?P<NoToken>[^a-zA-Z\d]+)"
)

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
transformed_record = self._transform_record(record)
record.clear()
record.update(transformed_record)

def _transform_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
transformed_record = {}
for key, value in record.items():
transformed_key = self.process_key(key)
transformed_value = value

if isinstance(value, dict):
transformed_value = self._transform_record(value)

transformed_record[transformed_key] = transformed_value
return transformed_record

def process_key(self, key: str) -> str:
key = self.normalize_key(key)
tokens = self.tokenize_key(key)
tokens = self.filter_tokens(tokens)
return self.tokens_to_snake_case(tokens)

def normalize_key(self, key: str) -> str:
return unidecode.unidecode(key)

def tokenize_key(self, key: str) -> List[str]:
tokens = []
for match in self.token_pattern.finditer(key):
token = match.group(0) if match.group("NoToken") is None else ""
tokens.append(token)
return tokens

def filter_tokens(self, tokens: List[str]) -> List[str]:
if len(tokens) >= 3:
tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:]
if tokens and tokens[0].isdigit():
tokens.insert(0, "")
return tokens

def tokens_to_snake_case(self, tokens: List[str]) -> str:
return "_".join(token.lower() for token in tokens)
Loading

0 comments on commit adef1e8

Please sign in to comment.