Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add dynamic schema loader #104

Merged
merged 25 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
be478ae
Add dynamic schema loader
lazebnyi Dec 3, 2024
520998a
Revert imports
lazebnyi Dec 3, 2024
edb52e7
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 3, 2024
7387131
Auto-fix lint and format issues
Dec 3, 2024
807d23e
Fix edge case validation
lazebnyi Dec 3, 2024
59c5c7f
Fix mypy
lazebnyi Dec 3, 2024
ffee00f
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 3, 2024
13441ca
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 3, 2024
227325f
Update after review
lazebnyi Dec 3, 2024
fd44be1
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 3, 2024
05e4f74
Add default value for schema pointer
lazebnyi Dec 3, 2024
d41f54c
Changed helpers var name in _extract_data and typo in schema loader
lazebnyi Dec 5, 2024
4097480
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 5, 2024
f843c3f
Replace deprecated import
lazebnyi Dec 5, 2024
6bfdff3
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 5, 2024
42357e7
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 6, 2024
dcc6233
Chenged interpolation_content to interpolation_context
lazebnyi Dec 6, 2024
1b7c63f
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 6, 2024
9b87978
Formated with ruff
lazebnyi Dec 6, 2024
3da287a
Add DynamicSchemaLoader to DeclarativeStream schema
lazebnyi Dec 12, 2024
507afb6
Add test_dynamic_schema_loader_manifest_flow
lazebnyi Dec 12, 2024
d7e4873
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 12, 2024
9c29cee
Fix typo
lazebnyi Dec 12, 2024
1edb25e
Updated imports
lazebnyi Dec 12, 2024
0dbea5d
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ definitions:
title: Schema Loader
description: Component used to retrieve the schema for the current stream.
anyOf:
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
Expand Down Expand Up @@ -1684,6 +1685,92 @@ definitions:
$parameters:
type: object
additionalProperties: true
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
type: object
required:
- target_type
- current_type
properties:
target_type:
anyOf:
- type: string
- type: array
items:
type: string
current_type:
anyOf:
- type: string
- type: array
items:
type: string
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
type: object
required:
- key_pointer
properties:
type:
type: string
enum: [SchemaTypeIdentifier]
schema_pointer:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Schema Path
description: List of nested fields defining the schema field path to extract. Defaults to [].
type: array
default: []
items:
- type: string
interpolation_context:
- config
key_pointer:
title: Key Path
description: List of potentially nested fields describing the full path of the field key to extract.
type: array
items:
- type: string
interpolation_context:
- config
type_pointer:
title: Type Path
description: List of potentially nested fields describing the full path of the field type to extract.
type: array
items:
- type: string
interpolation_context:
- config
types_mapping:
type: array
items:
- "$ref": "#/definitions/TypesMap"
$parameters:
type: object
additionalProperties: true
DynamicSchemaLoader:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Dynamic Schema Loader
description: (This component is experimental. Use at your own risk.) Loads a schema by extracting data from retrieved records.
type: object
required:
- type
- retriever
- schema_type_identifier
properties:
type:
type: string
enum: [DynamicSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,32 @@ class HttpResponseFilter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]


class SchemaTypeIdentifier(BaseModel):
type: Optional[Literal["SchemaTypeIdentifier"]] = None
schema_pointer: Optional[List[str]] = Field(
[],
description="List of nested fields defining the schema field path to extract. Defaults to [].",
title="Schema Path",
)
key_pointer: List[str] = Field(
...,
description="List of potentially nested fields describing the full path of the field key to extract.",
title="Key Path",
)
type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the field type to extract.",
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InlineSchemaLoader(BaseModel):
type: Literal["InlineSchemaLoader"]
schema_: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -822,13 +848,13 @@ class Config:
)
extract_output: List[str] = Field(
...,
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config. ",
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.",
examples=[{"extract_output": ["access_token", "refresh_token", "other_field"]}],
title="DeclarativeOAuth Extract Output",
)
state: Optional[State] = Field(
None,
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity. ",
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity.",
examples=[{"state": {"min": 7, "max": 128}}],
title="(Optional) DeclarativeOAuth Configurable State Query Param",
)
Expand All @@ -852,13 +878,13 @@ class Config:
)
state_key: Optional[str] = Field(
None,
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider. ",
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.",
examples=[{"state_key": "my_custom_state_key_key_name"}],
title="(Optional) DeclarativeOAuth State Key Override",
)
auth_code_key: Optional[str] = Field(
None,
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider. ",
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.",
examples=[{"auth_code_key": "my_custom_auth_code_key_name"}],
title="(Optional) DeclarativeOAuth Auth Code Key Override",
)
Expand Down Expand Up @@ -1609,12 +1635,17 @@ class Config:
primary_key: Optional[PrimaryKey] = Field(
"", description="The primary key of the stream.", title="Primary Key"
)
schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = (
Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
)
schema_loader: Optional[
Union[
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
Expand Down Expand Up @@ -1774,6 +1805,17 @@ class HttpRequester(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DynamicSchemaLoader(BaseModel):
type: Literal["DynamicSchemaLoader"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_type_identifier: SchemaTypeIdentifier
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
class ParentStreamConfig(BaseModel):
type: Literal["ParentStreamConfig"]
parent_key: str = Field(
Expand Down Expand Up @@ -1981,5 +2023,6 @@ class DynamicDeclarativeStream(BaseModel):
SelectiveAuthenticator.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
"AddFields.fields": "AddedFieldDefinition",
# CustomPartitionRouter
"CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
# DynamicSchemaLoader
"DynamicSchemaLoader.retriever": "SimpleRetriever",
# SchemaTypeIdentifier
"SchemaTypeIdentifier.types_map": "TypesMap",
}

# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DpathExtractor as DpathExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicSchemaLoader as DynamicSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
Expand Down Expand Up @@ -278,6 +281,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SelectiveAuthenticator as SelectiveAuthenticatorModel,
)
Expand All @@ -291,6 +297,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SubstreamPartitionRouter as SubstreamPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
TypesMap as TypesMapModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
WaitTimeFromHeader as WaitTimeFromHeaderModel,
Expand Down Expand Up @@ -356,8 +365,11 @@
)
from airbyte_cdk.sources.declarative.schema import (
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
SchemaTypeIdentifier,
TypesMap,
)
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
Expand Down Expand Up @@ -455,6 +467,9 @@ def _init_mappings(self) -> None:
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
TypesMapModel: self.create_types_map,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
ListPartitionRouterModel: self.create_list_partition_router,
Expand Down Expand Up @@ -1574,6 +1589,63 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

@staticmethod
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
return TypesMap(target_type=model.target_type, current_type=model.current_type)

def create_schema_type_identifier(
self, model: SchemaTypeIdentifierModel, config: Config, **kwargs: Any
) -> SchemaTypeIdentifier:
types_mapping = []
if model.types_mapping:
types_mapping.extend(
[
self._create_component_from_model(types_map, config=config)
for types_map in model.types_mapping
]
)
model_schema_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.schema_pointer] if model.schema_pointer else []
)
model_key_pointer: List[Union[InterpolatedString, str]] = [x for x in model.key_pointer]
model_type_pointer: Optional[List[Union[InterpolatedString, str]]] = (
[x for x in model.type_pointer] if model.type_pointer else None
)

return SchemaTypeIdentifier(
schema_pointer=model_schema_pointer,
key_pointer=model_key_pointer,
type_pointer=model_type_pointer,
types_mapping=types_mapping,
parameters=model.parameters or {},
)

def create_dynamic_schema_loader(
self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any
) -> DynamicSchemaLoader:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor_from_paginator(
model.retriever, stream_slicer
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name="",
primary_key=None,
stream_slicer=combined_slicers,
transformations=[],
)
schema_type_identifier = self._create_component_from_model(
model.schema_type_identifier, config=config, parameters=model.parameters or {}
)
return DynamicSchemaLoader(
retriever=retriever,
config=config,
schema_type_identifier=schema_type_identifier,
parameters=model.parameters or {},
)

@staticmethod
def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder:
return JsonDecoder(parameters={})
Expand Down
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import DynamicSchemaLoader, TypesMap, SchemaTypeIdentifier

__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader"]
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", "TypesMap", "SchemaTypeIdentifier"]
Loading
Loading