From ed9a5e7aafb1ba4b9be04db70003e29d4c765de9 Mon Sep 17 00:00:00 2001 From: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:16:40 -0800 Subject: [PATCH 1/2] feat: add unit test fixtures for manifest-only connectors to CDK (#121) Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../test/utils/manifest_only_fixtures.py | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 airbyte_cdk/test/utils/manifest_only_fixtures.py diff --git a/airbyte_cdk/test/utils/manifest_only_fixtures.py b/airbyte_cdk/test/utils/manifest_only_fixtures.py new file mode 100644 index 000000000..47620e7c1 --- /dev/null +++ b/airbyte_cdk/test/utils/manifest_only_fixtures.py @@ -0,0 +1,60 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +import importlib.util +from pathlib import Path +from types import ModuleType +from typing import Optional + +import pytest + +# The following fixtures are used to load a manifest-only connector's components module and manifest file. +# They can be accessed from any test file in the connector's unit_tests directory by importing them as follows: + +# from airbyte_cdk.test.utils.manifest_only_fixtures import components_module, connector_dir, manifest_path + +# individual components can then be referenced as: components_module. + + +@pytest.fixture(scope="session") +def connector_dir(request: pytest.FixtureRequest) -> Path: + """Return the connector's root directory.""" + + current_dir = Path(request.config.invocation_params.dir) + + # If the tests are run locally from the connector's unit_tests directory, return the parent (connector) directory + if current_dir.name == "unit_tests": + return current_dir.parent + # In CI, the tests are run from the connector directory itself + return current_dir + + +@pytest.fixture(scope="session") +def components_module(connector_dir: Path) -> Optional[ModuleType]: + """Load and return the components module from the connector directory. + + This assumes the components module is located at /components.py. + """ + components_path = connector_dir / "components.py" + if not components_path.exists(): + return None + + components_spec = importlib.util.spec_from_file_location("components", components_path) + if components_spec is None: + return None + + components_module = importlib.util.module_from_spec(components_spec) + if components_spec.loader is None: + return None + + components_spec.loader.exec_module(components_module) + return components_module + + +@pytest.fixture(scope="session") +def manifest_path(connector_dir: Path) -> Path: + """Return the path to the connector's manifest file.""" + path = connector_dir / "manifest.yaml" + if not path.exists(): + raise FileNotFoundError(f"Manifest file not found at {path}") + return path From 5801cd82eaa9e8fe9ece6baa4b8641f0934ec3e7 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Thu, 12 Dec 2024 16:38:37 +0100 Subject: [PATCH 2/2] feat(low-code cdk): add dynamic schema loader (#104) Co-authored-by: octavia-squidington-iii --- .../declarative_component_schema.yaml | 87 ++++++ .../models/declarative_component_schema.py | 63 +++- .../parsers/manifest_component_transformer.py | 4 + .../parsers/model_to_component_factory.py | 72 +++++ .../sources/declarative/schema/__init__.py | 3 +- .../schema/dynamic_schema_loader.py | 219 ++++++++++++++ .../schema/test_dynamic_schema_loader.py | 272 ++++++++++++++++++ 7 files changed, 709 insertions(+), 11 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py create mode 100644 unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 48c8f8c05..ab8bfa0d8 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1218,6 +1218,7 @@ definitions: title: Schema Loader description: Component used to retrieve the schema for the current stream. anyOf: + - "$ref": "#/definitions/DynamicSchemaLoader" - "$ref": "#/definitions/InlineSchemaLoader" - "$ref": "#/definitions/JsonFileSchemaLoader" - "$ref": "#/definitions/CustomSchemaLoader" @@ -1684,6 +1685,92 @@ definitions: $parameters: type: object additionalProperties: true + TypesMap: + title: Types Map + description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type. + type: object + required: + - target_type + - current_type + properties: + target_type: + anyOf: + - type: string + - type: array + items: + type: string + current_type: + anyOf: + - type: string + - type: array + items: + type: string + SchemaTypeIdentifier: + title: Schema Type Identifier + description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing. + type: object + required: + - key_pointer + properties: + type: + type: string + enum: [SchemaTypeIdentifier] + schema_pointer: + title: Schema Path + description: List of nested fields defining the schema field path to extract. Defaults to []. + type: array + default: [] + items: + - type: string + interpolation_context: + - config + key_pointer: + title: Key Path + description: List of potentially nested fields describing the full path of the field key to extract. + type: array + items: + - type: string + interpolation_context: + - config + type_pointer: + title: Type Path + description: List of potentially nested fields describing the full path of the field type to extract. + type: array + items: + - type: string + interpolation_context: + - config + types_mapping: + type: array + items: + - "$ref": "#/definitions/TypesMap" + $parameters: + type: object + additionalProperties: true + DynamicSchemaLoader: + title: Dynamic Schema Loader + description: (This component is experimental. Use at your own risk.) Loads a schema by extracting data from retrieved records. + type: object + required: + - type + - retriever + - schema_type_identifier + properties: + type: + type: string + enum: [DynamicSchemaLoader] + retriever: + title: Retriever + description: Component used to coordinate how records are extracted across stream slices and request pages. + anyOf: + - "$ref": "#/definitions/AsyncRetriever" + - "$ref": "#/definitions/CustomRetriever" + - "$ref": "#/definitions/SimpleRetriever" + schema_type_identifier: + "$ref": "#/definitions/SchemaTypeIdentifier" + $parameters: + type: object + additionalProperties: true InlineSchemaLoader: title: Inline Schema Loader description: Loads a schema that is defined directly in the manifest file. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 173e045a0..8f940fe5c 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -650,6 +650,32 @@ class HttpResponseFilter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class TypesMap(BaseModel): + target_type: Union[str, List[str]] + current_type: Union[str, List[str]] + + +class SchemaTypeIdentifier(BaseModel): + type: Optional[Literal["SchemaTypeIdentifier"]] = None + schema_pointer: Optional[List[str]] = Field( + [], + description="List of nested fields defining the schema field path to extract. Defaults to [].", + title="Schema Path", + ) + key_pointer: List[str] = Field( + ..., + description="List of potentially nested fields describing the full path of the field key to extract.", + title="Key Path", + ) + type_pointer: Optional[List[str]] = Field( + None, + description="List of potentially nested fields describing the full path of the field type to extract.", + title="Type Path", + ) + types_mapping: Optional[List[TypesMap]] = None + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class InlineSchemaLoader(BaseModel): type: Literal["InlineSchemaLoader"] schema_: Optional[Dict[str, Any]] = Field( @@ -822,13 +848,13 @@ class Config: ) extract_output: List[str] = Field( ..., - description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config. ", + description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.", examples=[{"extract_output": ["access_token", "refresh_token", "other_field"]}], title="DeclarativeOAuth Extract Output", ) state: Optional[State] = Field( None, - description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity. ", + description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity.", examples=[{"state": {"min": 7, "max": 128}}], title="(Optional) DeclarativeOAuth Configurable State Query Param", ) @@ -852,13 +878,13 @@ class Config: ) state_key: Optional[str] = Field( None, - description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider. ", + description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.", examples=[{"state_key": "my_custom_state_key_key_name"}], title="(Optional) DeclarativeOAuth State Key Override", ) auth_code_key: Optional[str] = Field( None, - description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider. ", + description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.", examples=[{"auth_code_key": "my_custom_auth_code_key_name"}], title="(Optional) DeclarativeOAuth Auth Code Key Override", ) @@ -1609,12 +1635,17 @@ class Config: primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( - Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", - ) + schema_loader: Optional[ + Union[ + DynamicSchemaLoader, + InlineSchemaLoader, + JsonFileSchemaLoader, + CustomSchemaLoader, + ] + ] = Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1774,6 +1805,17 @@ class HttpRequester(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class DynamicSchemaLoader(BaseModel): + type: Literal["DynamicSchemaLoader"] + retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( + ..., + description="Component used to coordinate how records are extracted across stream slices and request pages.", + title="Retriever", + ) + schema_type_identifier: SchemaTypeIdentifier + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ParentStreamConfig(BaseModel): type: Literal["ParentStreamConfig"] parent_key: str = Field( @@ -1981,5 +2023,6 @@ class DynamicDeclarativeStream(BaseModel): SelectiveAuthenticator.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() +DynamicSchemaLoader.update_forward_refs() SimpleRetriever.update_forward_refs() AsyncRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index ed05b8e52..473e78fc6 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -64,6 +64,10 @@ "AddFields.fields": "AddedFieldDefinition", # CustomPartitionRouter "CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig", + # DynamicSchemaLoader + "DynamicSchemaLoader.retriever": "SimpleRetriever", + # SchemaTypeIdentifier + "SchemaTypeIdentifier.types_map": "TypesMap", } # We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9a405f8f2..6594f33aa 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -188,6 +188,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DpathExtractor as DpathExtractorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DynamicSchemaLoader as DynamicSchemaLoaderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) @@ -278,6 +281,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ResponseToFileExtractor as ResponseToFileExtractorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + SchemaTypeIdentifier as SchemaTypeIdentifierModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SelectiveAuthenticator as SelectiveAuthenticatorModel, ) @@ -291,6 +297,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SubstreamPartitionRouter as SubstreamPartitionRouterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + TypesMap as TypesMapModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, @@ -356,8 +365,11 @@ ) from airbyte_cdk.sources.declarative.schema import ( DefaultSchemaLoader, + DynamicSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader, + SchemaTypeIdentifier, + TypesMap, ) from airbyte_cdk.sources.declarative.spec import Spec from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer @@ -455,6 +467,9 @@ def _init_mappings(self) -> None: IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, + DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, + SchemaTypeIdentifierModel: self.create_schema_type_identifier, + TypesMapModel: self.create_types_map, JwtAuthenticatorModel: self.create_jwt_authenticator, LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration, ListPartitionRouterModel: self.create_list_partition_router, @@ -1574,6 +1589,63 @@ def create_inline_schema_loader( ) -> InlineSchemaLoader: return InlineSchemaLoader(schema=model.schema_ or {}, parameters={}) + @staticmethod + def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap: + return TypesMap(target_type=model.target_type, current_type=model.current_type) + + def create_schema_type_identifier( + self, model: SchemaTypeIdentifierModel, config: Config, **kwargs: Any + ) -> SchemaTypeIdentifier: + types_mapping = [] + if model.types_mapping: + types_mapping.extend( + [ + self._create_component_from_model(types_map, config=config) + for types_map in model.types_mapping + ] + ) + model_schema_pointer: List[Union[InterpolatedString, str]] = ( + [x for x in model.schema_pointer] if model.schema_pointer else [] + ) + model_key_pointer: List[Union[InterpolatedString, str]] = [x for x in model.key_pointer] + model_type_pointer: Optional[List[Union[InterpolatedString, str]]] = ( + [x for x in model.type_pointer] if model.type_pointer else None + ) + + return SchemaTypeIdentifier( + schema_pointer=model_schema_pointer, + key_pointer=model_key_pointer, + type_pointer=model_type_pointer, + types_mapping=types_mapping, + parameters=model.parameters or {}, + ) + + def create_dynamic_schema_loader( + self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any + ) -> DynamicSchemaLoader: + stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) + combined_slicers = self._build_resumable_cursor_from_paginator( + model.retriever, stream_slicer + ) + + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name="", + primary_key=None, + stream_slicer=combined_slicers, + transformations=[], + ) + schema_type_identifier = self._create_component_from_model( + model.schema_type_identifier, config=config, parameters=model.parameters or {} + ) + return DynamicSchemaLoader( + retriever=retriever, + config=config, + schema_type_identifier=schema_type_identifier, + parameters=model.parameters or {}, + ) + @staticmethod def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder: return JsonDecoder(parameters={}) diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index fee72f44f..5d2aed60e 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -6,5 +6,6 @@ from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader +from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import DynamicSchemaLoader, TypesMap, SchemaTypeIdentifier -__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader"] +__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", "TypesMap", "SchemaTypeIdentifier"] diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py new file mode 100644 index 000000000..95b5bf0ae --- /dev/null +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -0,0 +1,219 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + + +from copy import deepcopy +from dataclasses import InitVar, dataclass +from typing import Any, List, Mapping, MutableMapping, Optional, Union + +import dpath +from typing_extensions import deprecated + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader +from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.types import Config + +AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = { + "string": {"type": ["null", "string"]}, + "boolean": {"type": ["null", "boolean"]}, + "date": {"type": ["null", "string"], "format": "date"}, + "timestamp_without_timezone": { + "type": ["null", "string"], + "format": "date-time", + "airbyte_type": "timestamp_without_timezone", + }, + "timestamp_with_timezone": {"type": ["null", "string"], "format": "date-time"}, + "time_without_timezone": { + "type": ["null", "string"], + "format": "time", + "airbyte_type": "time_without_timezone", + }, + "time_with_timezone": { + "type": ["null", "string"], + "format": "time", + "airbyte_type": "time_with_timezone", + }, + "integer": {"type": ["null", "integer"]}, + "number": {"type": ["null", "number"]}, + "array": {"type": ["null", "array"]}, + "object": {"type": ["null", "object"]}, +} + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass(frozen=True) +class TypesMap: + """ + Represents a mapping between a current type and its corresponding target type. + """ + + target_type: Union[List[str], str] + current_type: Union[List[str], str] + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class SchemaTypeIdentifier: + """ + Identifies schema details for dynamic schema extraction and processing. + """ + + key_pointer: List[Union[InterpolatedString, str]] + parameters: InitVar[Mapping[str, Any]] + type_pointer: Optional[List[Union[InterpolatedString, str]]] = None + types_mapping: Optional[List[TypesMap]] = None + schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self.schema_pointer = ( + self._update_pointer(self.schema_pointer, parameters) if self.schema_pointer else [] + ) # type: ignore[assignment] # This is reqired field in model + self.key_pointer = self._update_pointer(self.key_pointer, parameters) # type: ignore[assignment] # This is reqired field in model + self.type_pointer = ( + self._update_pointer(self.type_pointer, parameters) if self.type_pointer else None + ) + + @staticmethod + def _update_pointer( + pointer: Optional[List[Union[InterpolatedString, str]]], parameters: Mapping[str, Any] + ) -> Optional[List[Union[InterpolatedString, str]]]: + return ( + [ + InterpolatedString.create(path, parameters=parameters) + if isinstance(path, str) + else path + for path in pointer + ] + if pointer + else None + ) + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class DynamicSchemaLoader(SchemaLoader): + """ + Dynamically loads a JSON Schema by extracting data from retrieved records. + """ + + retriever: Retriever + config: Config + parameters: InitVar[Mapping[str, Any]] + schema_type_identifier: SchemaTypeIdentifier + + def get_json_schema(self) -> Mapping[str, Any]: + """ + Constructs a JSON Schema based on retrieved data. + """ + properties = {} + retrieved_record = next(self.retriever.read_records({}), None) # type: ignore[call-overload] # read_records return Iterable data type + + raw_schema = ( + self._extract_data( + retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any] + self.schema_type_identifier.schema_pointer, + ) + if retrieved_record + else [] + ) + + for property_definition in raw_schema: + key = self._get_key(property_definition, self.schema_type_identifier.key_pointer) + value = self._get_type( + property_definition, + self.schema_type_identifier.type_pointer, + ) + properties[key] = value + + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": properties, + } + + def _get_key( + self, + raw_schema: MutableMapping[str, Any], + field_key_path: List[Union[InterpolatedString, str]], + ) -> str: + """ + Extracts the key field from the schema using the specified path. + """ + field_key = self._extract_data(raw_schema, field_key_path) + if not isinstance(field_key, str): + raise ValueError(f"Expected key to be a string. Got {field_key}") + return field_key + + def _get_type( + self, + raw_schema: MutableMapping[str, Any], + field_type_path: Optional[List[Union[InterpolatedString, str]]], + ) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]: + """ + Determines the JSON Schema type for a field, supporting nullable and combined types. + """ + raw_field_type = ( + self._extract_data(raw_schema, field_type_path, default="string") + if field_type_path + else "string" + ) + mapped_field_type = self._replace_type_if_not_valid(raw_field_type) + if ( + isinstance(mapped_field_type, list) + and len(mapped_field_type) == 2 + and all(isinstance(item, str) for item in mapped_field_type) + ): + first_type = self._get_airbyte_type(mapped_field_type[0]) + second_type = self._get_airbyte_type(mapped_field_type[1]) + return {"oneOf": [first_type, second_type]} + elif isinstance(mapped_field_type, str): + return self._get_airbyte_type(mapped_field_type) + else: + raise ValueError( + f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}." + ) + + def _replace_type_if_not_valid( + self, field_type: Union[List[str], str] + ) -> Union[List[str], str]: + """ + Replaces a field type if it matches a type mapping in `types_map`. + """ + if self.schema_type_identifier.types_mapping: + for types_map in self.schema_type_identifier.types_mapping: + if field_type == types_map.current_type: + return types_map.target_type + return field_type + + @staticmethod + def _get_airbyte_type(field_type: str) -> Mapping[str, Any]: + """ + Maps a field type to its corresponding Airbyte type definition. + """ + if field_type not in AIRBYTE_DATA_TYPES: + raise ValueError(f"Invalid Airbyte data type: {field_type}") + + return deepcopy(AIRBYTE_DATA_TYPES[field_type]) + + def _extract_data( + self, + body: Mapping[str, Any], + extraction_path: Optional[List[Union[InterpolatedString, str]]] = None, + default: Any = None, + ) -> Any: + """ + Extracts data from the body based on the provided extraction path. + """ + + if not extraction_path: + return body + + path = [ + node.eval(self.config) if not isinstance(node, str) else node + for node in extraction_path + ] + + return dpath.get(body, path, default=default) # type: ignore # extracted will be a MutableMapping, given input data structure diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py new file mode 100644 index 000000000..c35f917cd --- /dev/null +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -0,0 +1,272 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +import pytest + +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.sources.declarative.schema import DynamicSchemaLoader, SchemaTypeIdentifier +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + +_CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", +} + +_MANIFEST = { + "version": "6.7.0", + "definitions": { + "party_members_stream": { + "type": "DeclarativeStream", + "name": "party_members", + "primary_key": [], + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/party_members", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "schema_loader": { + "type": "DynamicSchemaLoader", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/party_members/schema", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "schema_type_identifier": { + "schema_pointer": ["fields"], + "key_pointer": ["name"], + "type_pointer": ["type"], + "types_mapping": [{"target_type": "string", "current_type": "singleLineText"}], + }, + }, + }, + }, + "streams": [ + "#/definitions/party_members_stream", + ], + "check": {"stream_names": ["party_members"]}, +} + + +@pytest.fixture +def mock_retriever(): + retriever = MagicMock() + retriever.read_records.return_value = [ + { + "schema": [ + {"field1": {"key": "name", "type": "string"}}, + {"field2": {"key": "age", "type": "integer"}}, + {"field3": {"key": "active", "type": "boolean"}}, + ] + } + ] + return retriever + + +@pytest.fixture +def mock_schema_type_identifier(): + return SchemaTypeIdentifier( + schema_pointer=["schema"], + key_pointer=["key"], + type_pointer=["type"], + types_mapping=[], + parameters={}, + ) + + +@pytest.fixture +def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): + config = MagicMock() + parameters = {} + return DynamicSchemaLoader( + retriever=mock_retriever, + config=config, + parameters=parameters, + schema_type_identifier=mock_schema_type_identifier, + ) + + +@pytest.mark.parametrize( + "retriever_data, expected_schema", + [ + ( + # Test case: All fields with valid types + iter( + [ + { + "schema": [ + {"key": "name", "type": "string"}, + {"key": "age", "type": "integer"}, + ] + } + ] + ), + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "name": {"type": ["null", "string"]}, + "age": {"type": ["null", "integer"]}, + }, + }, + ), + ( + # Test case: Fields with missing type default to "string" + iter( + [ + { + "schema": [ + {"key": "name"}, + {"key": "email", "type": "string"}, + ] + } + ] + ), + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "name": {"type": ["null", "string"]}, + "email": {"type": ["null", "string"]}, + }, + }, + ), + ( + # Test case: Fields with nested types + iter( + [ + { + "schema": [ + {"key": "address", "type": ["string", "integer"]}, + ] + } + ] + ), + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "address": { + "oneOf": [{"type": ["null", "string"]}, {"type": ["null", "integer"]}] + }, + }, + }, + ), + ( + # Test case: Empty record set + iter([]), + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {}, + }, + ), + ], +) +def test_dynamic_schema_loader(dynamic_schema_loader, retriever_data, expected_schema): + dynamic_schema_loader.retriever.read_records = MagicMock(return_value=retriever_data) + + schema = dynamic_schema_loader.get_json_schema() + + # Validate the generated schema + assert schema == expected_schema + + +def test_dynamic_schema_loader_invalid_key(dynamic_schema_loader): + # Test case: Invalid key type + dynamic_schema_loader.retriever.read_records.return_value = iter( + [{"schema": [{"field1": {"key": 123, "type": "string"}}]}] + ) + + with pytest.raises(ValueError, match="Expected key to be a string"): + dynamic_schema_loader.get_json_schema() + + +def test_dynamic_schema_loader_invalid_type(dynamic_schema_loader): + # Test case: Invalid type + dynamic_schema_loader.retriever.read_records.return_value = iter( + [{"schema": [{"field1": {"key": "name", "type": "invalid_type"}}]}] + ) + + with pytest.raises(ValueError, match="Expected key to be a string. Got None"): + dynamic_schema_loader.get_json_schema() + + +def test_dynamic_schema_loader_manifest_flow(): + expected_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": ["null", "integer"]}, + "name": {"type": ["null", "string"]}, + "description": {"type": ["null", "string"]}, + }, + } + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/party_members"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "member_1", "description": "First member"}, + {"id": 2, "name": "member_2", "description": "Second member"}, + ] + ) + ), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/party_members/schema"), + HttpResponse( + body=json.dumps( + { + "fields": [ + {"name": "id", "type": "integer"}, + {"name": "name", "type": "string"}, + {"name": "description", "type": "singleLineText"}, + ] + } + ) + ), + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + assert len(actual_catalog.streams) == 1 + assert actual_catalog.streams[0].json_schema == expected_schema