Skip to content

Commit

Permalink
feat(low-code cdk): add transformation to dynamic schema loader (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi authored Dec 18, 2024
1 parent f222fcc commit 9563c33
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 9 deletions.
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ definitions:
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1767,6 +1768,18 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_transformations:
title: Schema Transformations
description: A list of transformations to be applied to the schema.
type: array
items:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,7 @@ class Config:
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
]
]
] = Field(
Expand Down Expand Up @@ -1836,6 +1837,22 @@ class DynamicSchemaLoader(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_transformations: Optional[
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
]
]
] = Field(
None,
description="A list of transformations to be applied to the schema.",
title="Schema Transformations",
)
schema_type_identifier: SchemaTypeIdentifier
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToLower as KeysToLowerModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToSnakeCase as KeysToSnakeCaseModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel,
)
Expand Down Expand Up @@ -396,6 +399,9 @@
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
KeysToSnakeCaseTransformation,
)
from airbyte_cdk.sources.message import (
InMemoryMessageRepository,
LogAppenderMessageRepositoryDecorator,
Expand Down Expand Up @@ -478,6 +484,7 @@ def _init_mappings(self) -> None:
JsonlDecoderModel: self.create_jsonl_decoder,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -594,6 +601,11 @@ def create_keys_to_lower_transformation(
) -> KeysToLowerTransformation:
return KeysToLowerTransformation()

def create_keys_to_snake_transformation(
self, model: KeysToSnakeCaseModel, config: Config, **kwargs: Any
) -> KeysToSnakeCaseTransformation:
return KeysToSnakeCaseTransformation()

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
Expand Down Expand Up @@ -1650,6 +1662,13 @@ def create_dynamic_schema_loader(
model.retriever, stream_slicer
)

schema_transformations = []
if model.schema_transformations:
for transformation_model in model.schema_transformations:
schema_transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
Expand All @@ -1664,6 +1683,7 @@ def create_dynamic_schema_loader(
return DynamicSchemaLoader(
retriever=retriever,
config=config,
schema_transformations=schema_transformations,
schema_type_identifier=schema_type_identifier,
parameters=model.parameters or {},
)
Expand Down
23 changes: 20 additions & 3 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from copy import deepcopy
from dataclasses import InitVar, dataclass
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, MutableMapping, Optional, Union

import dpath
Expand All @@ -13,8 +13,9 @@
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = {
"string": {"type": ["null", "string"]},
Expand Down Expand Up @@ -103,6 +104,7 @@ class DynamicSchemaLoader(SchemaLoader):
config: Config
parameters: InitVar[Mapping[str, Any]]
schema_type_identifier: SchemaTypeIdentifier
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])

def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand All @@ -128,12 +130,27 @@ def get_json_schema(self) -> Mapping[str, Any]:
)
properties[key] = value

transformed_properties = self._transform(properties, {})

return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": properties,
"properties": transformed_properties,
}

def _transform(
self,
properties: Mapping[str, Any],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> Mapping[str, Any]:
for transformation in self.schema_transformations:
transformation.transform(
properties, # type: ignore # properties has type Mapping[str, Any], but Dict[str, Any] expected
config=self.config,
)
return properties

def _get_key(
self,
raw_schema: MutableMapping[str, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@
},
"paginator": {"type": "NoPagination"},
},
"schema_transformations": [
{
"type": "AddFields",
"fields": [
{
"type": "AddedFieldDefinition",
"path": ["StaticField"],
"value": "{{ {'type': ['null', 'string']} }}",
}
],
},
{
"type": "KeysToSnakeCase",
},
],
"schema_type_identifier": {
"schema_pointer": ["fields"],
"key_pointer": ["name"],
Expand Down Expand Up @@ -230,8 +245,9 @@ def test_dynamic_schema_loader_manifest_flow():
"type": "object",
"properties": {
"id": {"type": ["null", "integer"]},
"name": {"type": ["null", "string"]},
"first_name": {"type": ["null", "string"]},
"description": {"type": ["null", "string"]},
"static_field": {"type": ["null", "string"]},
},
}

Expand All @@ -245,8 +261,8 @@ def test_dynamic_schema_loader_manifest_flow():
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "member_1", "description": "First member"},
{"id": 2, "name": "member_2", "description": "Second member"},
{"id": 1, "first_name": "member_1", "description": "First member"},
{"id": 2, "first_name": "member_2", "description": "Second member"},
]
)
),
Expand All @@ -257,9 +273,9 @@ def test_dynamic_schema_loader_manifest_flow():
body=json.dumps(
{
"fields": [
{"name": "id", "type": "integer"},
{"name": "name", "type": "string"},
{"name": "description", "type": "singleLineText"},
{"name": "Id", "type": "integer"},
{"name": "FirstName", "type": "string"},
{"name": "Description", "type": "singleLineText"},
]
}
)
Expand Down
84 changes: 84 additions & 0 deletions unit_tests/sources/declarative/test_manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,90 @@ def _create_page(response_body):
],
[call({}, {})],
),
(
"test_read_manifest_with_flatten_fields",
{
"version": "0.34.2",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"streams": [
{
"type": "DeclarativeStream",
"name": "Rates",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"transformations": [{"type": "FlattenFields"}],
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.apilayer.com",
"path": "/exchangerates_data/latest",
"http_method": "GET",
"request_parameters": {},
"request_headers": {},
"request_body_json": {},
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": ["rates"]},
},
"paginator": {"type": "NoPagination"},
},
}
],
"spec": {
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["api_key"],
"properties": {
"api_key": {
"type": "string",
"title": "API Key",
"airbyte_secret": True,
}
},
"additionalProperties": True,
},
"documentation_url": "https://example.org",
"type": "Spec",
},
},
(
_create_page(
{
"rates": [
{"nested_fields": {"ABC": 0}, "id": 1},
{"nested_fields": {"AED": 1}, "id": 2},
],
"_metadata": {"next": "next"},
}
),
_create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}),
)
* 10,
[
{"ABC": 0, "id": 1},
{"AED": 1, "id": 2},
],
[call({}, {})],
),
(
"test_read_with_pagination_no_partitions",
{
Expand Down

0 comments on commit 9563c33

Please sign in to comment.