Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add transformation to dynamic schema loader #176

Merged
merged 10 commits into from
Dec 18, 2024
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ definitions:
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1767,6 +1768,18 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_transformations:
title: Schema Transformations
description: A list of transformations to be applied to the schema.
type: array
items:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
- "$ref": "#/definitions/FlattenFields"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,7 @@ class Config:
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
]
]
] = Field(
Expand Down Expand Up @@ -1836,6 +1837,22 @@ class DynamicSchemaLoader(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_transformations: Optional[
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
FlattenFields,
]
]
] = Field(
None,
description="A list of transformations to be applied to the schema.",
title="Schema Transformations",
)
schema_type_identifier: SchemaTypeIdentifier
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToLower as KeysToLowerModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToSnakeCase as KeysToSnakeCaseModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel,
)
Expand Down Expand Up @@ -396,6 +399,9 @@
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
KeysToSnakeCaseTransformation,
)
from airbyte_cdk.sources.message import (
InMemoryMessageRepository,
LogAppenderMessageRepositoryDecorator,
Expand Down Expand Up @@ -478,6 +484,7 @@ def _init_mappings(self) -> None:
JsonlDecoderModel: self.create_jsonl_decoder,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -594,6 +601,11 @@ def create_keys_to_lower_transformation(
) -> KeysToLowerTransformation:
return KeysToLowerTransformation()

def create_keys_to_snake_transformation(
self, model: KeysToSnakeCaseModel, config: Config, **kwargs: Any
) -> KeysToSnakeCaseTransformation:
return KeysToSnakeCaseTransformation()

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
Expand Down Expand Up @@ -1650,6 +1662,13 @@ def create_dynamic_schema_loader(
model.retriever, stream_slicer
)

schema_transformations = []
if model.schema_transformations:
for transformation_model in model.schema_transformations:
schema_transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
Expand All @@ -1664,6 +1683,7 @@ def create_dynamic_schema_loader(
return DynamicSchemaLoader(
retriever=retriever,
config=config,
schema_transformations=schema_transformations,
schema_type_identifier=schema_type_identifier,
parameters=model.parameters or {},
)
Expand Down
23 changes: 20 additions & 3 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from copy import deepcopy
from dataclasses import InitVar, dataclass
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, MutableMapping, Optional, Union

import dpath
Expand All @@ -13,8 +13,9 @@
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = {
"string": {"type": ["null", "string"]},
Expand Down Expand Up @@ -103,6 +104,7 @@ class DynamicSchemaLoader(SchemaLoader):
config: Config
parameters: InitVar[Mapping[str, Any]]
schema_type_identifier: SchemaTypeIdentifier
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])

def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand All @@ -128,12 +130,27 @@ def get_json_schema(self) -> Mapping[str, Any]:
)
properties[key] = value

transformed_properties = self._transform(properties, {})

return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": properties,
"properties": transformed_properties,
}

def _transform(
self,
properties: Mapping[str, Any],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> Mapping[str, Any]:
for transformation in self.schema_transformations:
transformation.transform(
properties, # type: ignore # properties has type Mapping[str, Any], but Dict[str, Any] expected
config=self.config,
)
return properties

def _get_key(
self,
raw_schema: MutableMapping[str, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@
},
"paginator": {"type": "NoPagination"},
},
"schema_transformations": [
{
"type": "AddFields",
"fields": [
{
"type": "AddedFieldDefinition",
"path": ["StaticField"],
"value": "{{ {'type': ['null', 'string']} }}",
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
}
],
},
{
"type": "KeysToSnakeCase",
},
],
"schema_type_identifier": {
"schema_pointer": ["fields"],
"key_pointer": ["name"],
Expand Down Expand Up @@ -230,8 +245,9 @@ def test_dynamic_schema_loader_manifest_flow():
"type": "object",
"properties": {
"id": {"type": ["null", "integer"]},
"name": {"type": ["null", "string"]},
"first_name": {"type": ["null", "string"]},
"description": {"type": ["null", "string"]},
"static_field": {"type": ["null", "string"]},
},
}

Expand All @@ -245,8 +261,8 @@ def test_dynamic_schema_loader_manifest_flow():
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "member_1", "description": "First member"},
{"id": 2, "name": "member_2", "description": "Second member"},
{"id": 1, "first_name": "member_1", "description": "First member"},
{"id": 2, "first_name": "member_2", "description": "Second member"},
]
)
),
Expand All @@ -257,9 +273,9 @@ def test_dynamic_schema_loader_manifest_flow():
body=json.dumps(
{
"fields": [
{"name": "id", "type": "integer"},
{"name": "name", "type": "string"},
{"name": "description", "type": "singleLineText"},
{"name": "Id", "type": "integer"},
{"name": "FirstName", "type": "string"},
{"name": "Description", "type": "singleLineText"},
]
}
)
Expand Down
84 changes: 84 additions & 0 deletions unit_tests/sources/declarative/test_manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,90 @@ def _create_page(response_body):
],
[call({}, {})],
),
(
"test_read_manifest_with_flatten_fields",
{
"version": "0.34.2",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"streams": [
{
"type": "DeclarativeStream",
"name": "Rates",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"transformations": [{"type": "FlattenFields"}],
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.apilayer.com",
"path": "/exchangerates_data/latest",
"http_method": "GET",
"request_parameters": {},
"request_headers": {},
"request_body_json": {},
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": ["rates"]},
},
"paginator": {"type": "NoPagination"},
},
}
],
"spec": {
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["api_key"],
"properties": {
"api_key": {
"type": "string",
"title": "API Key",
"airbyte_secret": True,
}
},
"additionalProperties": True,
},
"documentation_url": "https://example.org",
"type": "Spec",
},
},
(
_create_page(
{
"rates": [
{"nested_fields": {"ABC": 0}, "id": 1},
{"nested_fields": {"AED": 1}, "id": 2},
],
"_metadata": {"next": "next"},
}
),
_create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}),
)
* 10,
[
{"ABC": 0, "id": 1},
{"AED": 1, "id": 2},
],
[call({}, {})],
),
(
"test_read_with_pagination_no_partitions",
{
Expand Down
Loading