From 9563c3399e4b6a21babffe220e66506a72cfa3ef Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Wed, 18 Dec 2024 17:52:14 +0100 Subject: [PATCH] feat(low-code cdk): add transformation to dynamic schema loader (#176) --- .../declarative_component_schema.yaml | 13 +++ .../models/declarative_component_schema.py | 17 ++++ .../parsers/model_to_component_factory.py | 20 +++++ .../schema/dynamic_schema_loader.py | 23 ++++- .../schema/test_dynamic_schema_loader.py | 28 +++++-- .../test_manifest_declarative_source.py | 84 +++++++++++++++++++ 6 files changed, 176 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ff8ee683..662dce34 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1235,6 +1235,7 @@ definitions: - "$ref": "#/definitions/RemoveFields" - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" + - "$ref": "#/definitions/FlattenFields" state_migrations: title: State Migrations description: Array of state migrations to be applied on the input state @@ -1767,6 +1768,18 @@ definitions: - "$ref": "#/definitions/AsyncRetriever" - "$ref": "#/definitions/CustomRetriever" - "$ref": "#/definitions/SimpleRetriever" + schema_transformations: + title: Schema Transformations + description: A list of transformations to be applied to the schema. + type: array + items: + anyOf: + - "$ref": "#/definitions/AddFields" + - "$ref": "#/definitions/CustomTransformation" + - "$ref": "#/definitions/RemoveFields" + - "$ref": "#/definitions/KeysToLower" + - "$ref": "#/definitions/KeysToSnakeCase" + - "$ref": "#/definitions/FlattenFields" schema_type_identifier: "$ref": "#/definitions/SchemaTypeIdentifier" $parameters: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 8bd29fdb..1d980ca6 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1671,6 +1671,7 @@ class Config: RemoveFields, KeysToLower, KeysToSnakeCase, + FlattenFields, ] ] ] = Field( @@ -1836,6 +1837,22 @@ class DynamicSchemaLoader(BaseModel): description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) + schema_transformations: Optional[ + List[ + Union[ + AddFields, + CustomTransformation, + RemoveFields, + KeysToLower, + KeysToSnakeCase, + FlattenFields, + ] + ] + ] = Field( + None, + description="A list of transformations to be applied to the schema.", + title="Schema Transformations", + ) schema_type_identifier: SchemaTypeIdentifier parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3ff6a871..5d62a15d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -239,6 +239,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( KeysToLower as KeysToLowerModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + KeysToSnakeCase as KeysToSnakeCaseModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel, ) @@ -396,6 +399,9 @@ from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) +from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import ( + KeysToSnakeCaseTransformation, +) from airbyte_cdk.sources.message import ( InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, @@ -478,6 +484,7 @@ def _init_mappings(self) -> None: JsonlDecoderModel: self.create_jsonl_decoder, GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, + KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, FlattenFieldsModel: self.create_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -594,6 +601,11 @@ def create_keys_to_lower_transformation( ) -> KeysToLowerTransformation: return KeysToLowerTransformation() + def create_keys_to_snake_transformation( + self, model: KeysToSnakeCaseModel, config: Config, **kwargs: Any + ) -> KeysToSnakeCaseTransformation: + return KeysToSnakeCaseTransformation() + def create_flatten_fields( self, model: FlattenFieldsModel, config: Config, **kwargs: Any ) -> FlattenFields: @@ -1650,6 +1662,13 @@ def create_dynamic_schema_loader( model.retriever, stream_slicer ) + schema_transformations = [] + if model.schema_transformations: + for transformation_model in model.schema_transformations: + schema_transformations.append( + self._create_component_from_model(model=transformation_model, config=config) + ) + retriever = self._create_component_from_model( model=model.retriever, config=config, @@ -1664,6 +1683,7 @@ def create_dynamic_schema_loader( return DynamicSchemaLoader( retriever=retriever, config=config, + schema_transformations=schema_transformations, schema_type_identifier=schema_type_identifier, parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index 95b5bf0a..16347a43 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -4,7 +4,7 @@ from copy import deepcopy -from dataclasses import InitVar, dataclass +from dataclasses import InitVar, dataclass, field from typing import Any, List, Mapping, MutableMapping, Optional, Union import dpath @@ -13,8 +13,9 @@ from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader +from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.types import Config +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = { "string": {"type": ["null", "string"]}, @@ -103,6 +104,7 @@ class DynamicSchemaLoader(SchemaLoader): config: Config parameters: InitVar[Mapping[str, Any]] schema_type_identifier: SchemaTypeIdentifier + schema_transformations: List[RecordTransformation] = field(default_factory=lambda: []) def get_json_schema(self) -> Mapping[str, Any]: """ @@ -128,12 +130,27 @@ def get_json_schema(self) -> Mapping[str, Any]: ) properties[key] = value + transformed_properties = self._transform(properties, {}) + return { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", - "properties": properties, + "properties": transformed_properties, } + def _transform( + self, + properties: Mapping[str, Any], + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + ) -> Mapping[str, Any]: + for transformation in self.schema_transformations: + transformation.transform( + properties, # type: ignore # properties has type Mapping[str, Any], but Dict[str, Any] expected + config=self.config, + ) + return properties + def _get_key( self, raw_schema: MutableMapping[str, Any], diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index c35f917c..a042cd23 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -64,6 +64,21 @@ }, "paginator": {"type": "NoPagination"}, }, + "schema_transformations": [ + { + "type": "AddFields", + "fields": [ + { + "type": "AddedFieldDefinition", + "path": ["StaticField"], + "value": "{{ {'type': ['null', 'string']} }}", + } + ], + }, + { + "type": "KeysToSnakeCase", + }, + ], "schema_type_identifier": { "schema_pointer": ["fields"], "key_pointer": ["name"], @@ -230,8 +245,9 @@ def test_dynamic_schema_loader_manifest_flow(): "type": "object", "properties": { "id": {"type": ["null", "integer"]}, - "name": {"type": ["null", "string"]}, + "first_name": {"type": ["null", "string"]}, "description": {"type": ["null", "string"]}, + "static_field": {"type": ["null", "string"]}, }, } @@ -245,8 +261,8 @@ def test_dynamic_schema_loader_manifest_flow(): HttpResponse( body=json.dumps( [ - {"id": 1, "name": "member_1", "description": "First member"}, - {"id": 2, "name": "member_2", "description": "Second member"}, + {"id": 1, "first_name": "member_1", "description": "First member"}, + {"id": 2, "first_name": "member_2", "description": "Second member"}, ] ) ), @@ -257,9 +273,9 @@ def test_dynamic_schema_loader_manifest_flow(): body=json.dumps( { "fields": [ - {"name": "id", "type": "integer"}, - {"name": "name", "type": "string"}, - {"name": "description", "type": "singleLineText"}, + {"name": "Id", "type": "integer"}, + {"name": "FirstName", "type": "string"}, + {"name": "Description", "type": "singleLineText"}, ] } ) diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index ea92bac5..a132757a 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -1367,6 +1367,90 @@ def _create_page(response_body): ], [call({}, {})], ), + ( + "test_read_manifest_with_flatten_fields", + { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Rates", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "transformations": [{"type": "FlattenFields"}], + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.apilayer.com", + "path": "/exchangerates_data/latest", + "http_method": "GET", + "request_parameters": {}, + "request_headers": {}, + "request_body_json": {}, + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, + }, + "paginator": {"type": "NoPagination"}, + }, + } + ], + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["api_key"], + "properties": { + "api_key": { + "type": "string", + "title": "API Key", + "airbyte_secret": True, + } + }, + "additionalProperties": True, + }, + "documentation_url": "https://example.org", + "type": "Spec", + }, + }, + ( + _create_page( + { + "rates": [ + {"nested_fields": {"ABC": 0}, "id": 1}, + {"nested_fields": {"AED": 1}, "id": 2}, + ], + "_metadata": {"next": "next"}, + } + ), + _create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}), + ) + * 10, + [ + {"ABC": 0, "id": 1}, + {"AED": 1, "id": 2}, + ], + [call({}, {})], + ), ( "test_read_with_pagination_no_partitions", {