Skip to content

Commit

Permalink
Merge branch 'main' into lazebnyi/add-config-components-resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi authored Dec 12, 2024
2 parents 4dd1a0a + 5801cd8 commit f118a88
Show file tree
Hide file tree
Showing 8 changed files with 765 additions and 7 deletions.
87 changes: 87 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ definitions:
title: Schema Loader
description: Component used to retrieve the schema for the current stream.
anyOf:
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
Expand Down Expand Up @@ -1684,6 +1685,92 @@ definitions:
$parameters:
type: object
additionalProperties: true
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
type: object
required:
- target_type
- current_type
properties:
target_type:
anyOf:
- type: string
- type: array
items:
type: string
current_type:
anyOf:
- type: string
- type: array
items:
type: string
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
type: object
required:
- key_pointer
properties:
type:
type: string
enum: [SchemaTypeIdentifier]
schema_pointer:
title: Schema Path
description: List of nested fields defining the schema field path to extract. Defaults to [].
type: array
default: []
items:
- type: string
interpolation_context:
- config
key_pointer:
title: Key Path
description: List of potentially nested fields describing the full path of the field key to extract.
type: array
items:
- type: string
interpolation_context:
- config
type_pointer:
title: Type Path
description: List of potentially nested fields describing the full path of the field type to extract.
type: array
items:
- type: string
interpolation_context:
- config
types_mapping:
type: array
items:
- "$ref": "#/definitions/TypesMap"
$parameters:
type: object
additionalProperties: true
DynamicSchemaLoader:
title: Dynamic Schema Loader
description: (This component is experimental. Use at your own risk.) Loads a schema by extracting data from retrieved records.
type: object
required:
- type
- retriever
- schema_type_identifier
properties:
type:
type: string
enum: [DynamicSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,32 @@ class HttpResponseFilter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]


class SchemaTypeIdentifier(BaseModel):
type: Optional[Literal["SchemaTypeIdentifier"]] = None
schema_pointer: Optional[List[str]] = Field(
[],
description="List of nested fields defining the schema field path to extract. Defaults to [].",
title="Schema Path",
)
key_pointer: List[str] = Field(
...,
description="List of potentially nested fields describing the full path of the field key to extract.",
title="Key Path",
)
type_pointer: Optional[List[str]] = Field(
None,
description="List of potentially nested fields describing the full path of the field type to extract.",
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InlineSchemaLoader(BaseModel):
type: Literal["InlineSchemaLoader"]
schema_: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1629,12 +1655,17 @@ class Config:
primary_key: Optional[PrimaryKey] = Field(
"", description="The primary key of the stream.", title="Primary Key"
)
schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = (
Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
)
schema_loader: Optional[
Union[
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
Expand Down Expand Up @@ -1794,6 +1825,17 @@ class HttpRequester(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DynamicSchemaLoader(BaseModel):
type: Literal["DynamicSchemaLoader"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_type_identifier: SchemaTypeIdentifier
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ParentStreamConfig(BaseModel):
type: Literal["ParentStreamConfig"]
parent_key: str = Field(
Expand Down Expand Up @@ -2001,5 +2043,6 @@ class DynamicDeclarativeStream(BaseModel):
SelectiveAuthenticator.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
"AddFields.fields": "AddedFieldDefinition",
# CustomPartitionRouter
"CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
# DynamicSchemaLoader
"DynamicSchemaLoader.retriever": "SimpleRetriever",
# SchemaTypeIdentifier
"SchemaTypeIdentifier.types_map": "TypesMap",
}

# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DpathExtractor as DpathExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicSchemaLoader as DynamicSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
Expand Down Expand Up @@ -281,6 +284,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SelectiveAuthenticator as SelectiveAuthenticatorModel,
)
Expand All @@ -297,6 +303,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SubstreamPartitionRouter as SubstreamPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
TypesMap as TypesMapModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
WaitTimeFromHeader as WaitTimeFromHeaderModel,
Expand Down Expand Up @@ -364,8 +373,11 @@
)
from airbyte_cdk.sources.declarative.schema import (
DefaultSchemaLoader,
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
SchemaTypeIdentifier,
TypesMap,
)
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
Expand Down Expand Up @@ -463,6 +475,9 @@ def _init_mappings(self) -> None:
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
TypesMapModel: self.create_types_map,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
ListPartitionRouterModel: self.create_list_partition_router,
Expand Down Expand Up @@ -1584,6 +1599,63 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

@staticmethod
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
return TypesMap(target_type=model.target_type, current_type=model.current_type)

def create_schema_type_identifier(
self, model: SchemaTypeIdentifierModel, config: Config, **kwargs: Any
) -> SchemaTypeIdentifier:
types_mapping = []
if model.types_mapping:
types_mapping.extend(
[
self._create_component_from_model(types_map, config=config)
for types_map in model.types_mapping
]
)
model_schema_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.schema_pointer] if model.schema_pointer else []
)
model_key_pointer: List[Union[InterpolatedString, str]] = [x for x in model.key_pointer]
model_type_pointer: Optional[List[Union[InterpolatedString, str]]] = (
[x for x in model.type_pointer] if model.type_pointer else None
)

return SchemaTypeIdentifier(
schema_pointer=model_schema_pointer,
key_pointer=model_key_pointer,
type_pointer=model_type_pointer,
types_mapping=types_mapping,
parameters=model.parameters or {},
)

def create_dynamic_schema_loader(
self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any
) -> DynamicSchemaLoader:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor_from_paginator(
model.retriever, stream_slicer
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name="",
primary_key=None,
stream_slicer=combined_slicers,
transformations=[],
)
schema_type_identifier = self._create_component_from_model(
model.schema_type_identifier, config=config, parameters=model.parameters or {}
)
return DynamicSchemaLoader(
retriever=retriever,
config=config,
schema_type_identifier=schema_type_identifier,
parameters=model.parameters or {},
)

@staticmethod
def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder:
return JsonDecoder(parameters={})
Expand Down
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import DynamicSchemaLoader, TypesMap, SchemaTypeIdentifier

__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader"]
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", "TypesMap", "SchemaTypeIdentifier"]
Loading

0 comments on commit f118a88

Please sign in to comment.