From a07b04a2fe61a0cdc362cb8540d85ce2dd024ab3 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:05:39 +0100 Subject: [PATCH 1/2] feat(low-code cdk): add component resolver and http component resolver (#88) Co-authored-by: octavia-squidington-iii --- .../concurrent_declarative_source.py | 10 +- .../declarative_component_schema.yaml | 102 +++++++++- .../manifest_declarative_source.py | 55 ++++- .../models/declarative_component_schema.py | 97 ++++++++- .../parsers/manifest_component_transformer.py | 6 + .../parsers/model_to_component_factory.py | 116 +++++++++-- .../declarative/partition_routers/__init__.py | 3 +- .../sources/declarative/resolvers/__init__.py | 13 ++ .../resolvers/components_resolver.py | 55 +++++ .../resolvers/http_components_resolver.py | 106 ++++++++++ .../sources/declarative/resolvers/__init__.py | 3 + .../test_http_components_resolver.py | 189 ++++++++++++++++++ .../test_manifest_declarative_source.py | 147 +++++++++++++- 13 files changed, 864 insertions(+), 38 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/resolvers/__init__.py create mode 100644 airbyte_cdk/sources/declarative/resolvers/components_resolver.py create mode 100644 airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py create mode 100644 unit_tests/sources/declarative/resolvers/__init__.py create mode 100644 unit_tests/sources/declarative/resolvers/test_http_components_resolver.py diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 7ee4d287e..371e34a87 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -192,9 +192,13 @@ def _group_streams( state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later - name_to_stream_mapping = { - stream["name"]: stream for stream in self.resolved_manifest["streams"] - } + # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, + # and this is validated during the initialization of the source. + streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( + self._source_config, config + ) + + name_to_stream_mapping = {stream["name"]: stream for stream in streams} for declarative_stream in self.streams(config=config): # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 0dc0032dd..dff7abab6 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -7,8 +7,12 @@ version: 1.0.0 required: - type - check - - streams - version +anyOf: + - required: + - streams + - required: + - dynamic_streams properties: type: type: string @@ -19,6 +23,10 @@ properties: type: array items: "$ref": "#/definitions/DeclarativeStream" + dynamic_streams: + type: array + items: + "$ref": "#/definitions/DynamicDeclarativeStream" version: type: string description: The version of the Airbyte CDK used to build and test the source. @@ -1321,7 +1329,7 @@ definitions: type: array items: - type: string - interpolation_content: + interpolation_context: - config examples: - ["data"] @@ -2895,6 +2903,96 @@ definitions: $parameters: type: object additionalProperties: true + ComponentMappingDefinition: + title: Component Mapping Definition + description: (This component is experimental. Use at your own risk.) Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts. + type: object + required: + - type + - field_path + - value + properties: + type: + type: string + enum: [ComponentMappingDefinition] + field_path: + title: Field Path + description: A list of potentially nested fields indicating the full path where value will be added or updated. + type: array + items: + - type: string + interpolation_context: + - config + - components_values + - stream_template_config + examples: + - ["data"] + - ["data", "records"] + - ["data", "{{ parameters.name }}"] + - ["data", "*", "record"] + value: + title: Value + description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime. + type: string + interpolation_context: + - config + - stream_template_config + - components_values + examples: + - "{{ components_values['updates'] }}" + - "{{ components_values['MetaData']['LastUpdatedTime'] }}" + - "{{ config['segment_id'] }}" + value_type: + title: Value Type + description: The expected data type of the value. If omitted, the type will be inferred from the value provided. + "$ref": "#/definitions/ValueType" + $parameters: + type: object + additionalProperties: true + HttpComponentsResolver: + type: object + description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched via an HTTP retriever. + properties: + type: + type: string + enum: [HttpComponentsResolver] + retriever: + title: Retriever + description: Component used to coordinate how records are extracted across stream slices and request pages. + anyOf: + - "$ref": "#/definitions/AsyncRetriever" + - "$ref": "#/definitions/CustomRetriever" + - "$ref": "#/definitions/SimpleRetriever" + components_mapping: + type: array + items: + "$ref": "#/definitions/ComponentMappingDefinition" + $parameters: + type: object + additionalProperties: true + required: + - type + - retriever + - components_mapping + DynamicDeclarativeStream: + type: object + description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template. + properties: + type: + type: string + enum: [DynamicDeclarativeStream] + stream_template: + title: Stream Template + description: Reference to the stream template. + "$ref": "#/definitions/DeclarativeStream" + components_resolver: + title: Components Resolver + description: Component resolve and populates stream templates with components values. + "$ref": "#/definitions/HttpComponentsResolver" + required: + - type + - stream_template + - components_resolver interpolation: variables: - title: config diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 5346c2bcb..652da85c4 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -39,6 +39,7 @@ from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) +from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.types import ConnectionDefinition @@ -120,7 +121,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._emit_manifest_debug_message( extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} ) - stream_configs = self._stream_configs(self._source_config) + + stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( + self._source_config, config + ) source_streams = [ self._constructor.create_component( @@ -234,7 +238,8 @@ def _validate_source(self) -> None: ) streams = self._source_config.get("streams") - if not streams: + dynamic_streams = self._source_config.get("dynamic_streams") + if not (streams or dynamic_streams): raise ValidationError( f"A valid manifest should have at least one stream defined. Got {streams}" ) @@ -303,5 +308,51 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: s["type"] = "DeclarativeStream" return stream_configs + def _dynamic_stream_configs( + self, manifest: Mapping[str, Any], config: Mapping[str, Any] + ) -> List[Dict[str, Any]]: + dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) + dynamic_stream_configs: List[Dict[str, Any]] = [] + + for dynamic_definition in dynamic_stream_definitions: + components_resolver_config = dynamic_definition["components_resolver"] + + if not components_resolver_config: + raise ValueError( + f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" + ) + + resolver_type = components_resolver_config.get("type") + if not resolver_type: + raise ValueError( + f"Missing 'type' in components resolver configuration: {components_resolver_config}" + ) + + if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: + raise ValueError( + f"Invalid components resolver type '{resolver_type}'. " + f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." + ) + + if "retriever" in components_resolver_config: + components_resolver_config["retriever"]["requester"]["use_cache"] = True + + # Create a resolver for dynamic components based on type + components_resolver = self._constructor.create_component( + COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config + ) + + stream_template_config = dynamic_definition["stream_template"] + + for dynamic_stream in components_resolver.resolve_components( + stream_template_config=stream_template_config + ): + if "type" not in dynamic_stream: + dynamic_stream["type"] = "DeclarativeStream" + + dynamic_stream_configs.append(dynamic_stream) + + return dynamic_stream_configs + def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: self.logger.debug("declarative source created from manifest", extra=extra_args) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 6b830949b..173e045a0 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1158,6 +1158,37 @@ class WaitUntilTimeFromHeader(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class ComponentMappingDefinition(BaseModel): + type: Literal["ComponentMappingDefinition"] + field_path: List[str] = Field( + ..., + description="A list of potentially nested fields indicating the full path where value will be added or updated.", + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + value: str = Field( + ..., + description="The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.", + examples=[ + "{{ components_values['updates'] }}", + "{{ components_values['MetaData']['LastUpdatedTime'] }}", + "{{ config['segment_id'] }}", + ], + title="Value", + ) + value_type: Optional[ValueType] = Field( + None, + description="The expected data type of the value. If omitted, the type will be inferred from the value provided.", + title="Value Type", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class AddedFieldDefinition(BaseModel): type: Literal["AddedFieldDefinition"] path: List[str] = Field( @@ -1455,13 +1486,40 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class DeclarativeSource(BaseModel): +class DeclarativeSource1(BaseModel): class Config: extra = Extra.forbid type: Literal["DeclarativeSource"] check: CheckStream streams: List[DeclarativeStream] + dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None + version: str = Field( + ..., + description="The version of the Airbyte CDK used to build and test the source.", + ) + schemas: Optional[Schemas] = None + definitions: Optional[Dict[str, Any]] = None + spec: Optional[Spec] = None + concurrency_level: Optional[ConcurrencyLevel] = None + metadata: Optional[Dict[str, Any]] = Field( + None, + description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", + ) + description: Optional[str] = Field( + None, + description="A description of the connector. It will be presented on the Source documentation page.", + ) + + +class DeclarativeSource2(BaseModel): + class Config: + extra = Extra.forbid + + type: Literal["DeclarativeSource"] + check: CheckStream + streams: Optional[List[DeclarativeStream]] = None + dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., description="The version of the Airbyte CDK used to build and test the source.", @@ -1480,6 +1538,17 @@ class Config: ) +class DeclarativeSource(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Union[DeclarativeSource1, DeclarativeSource2] = Field( + ..., + description="An API source that extracts data according to its declarative components.", + title="DeclarativeSource", + ) + + class SelectiveAuthenticator(BaseModel): class Config: extra = Extra.allow @@ -1883,8 +1952,32 @@ class SubstreamPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class HttpComponentsResolver(BaseModel): + type: Literal["HttpComponentsResolver"] + retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( + ..., + description="Component used to coordinate how records are extracted across stream slices and request pages.", + title="Retriever", + ) + components_mapping: List[ComponentMappingDefinition] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class DynamicDeclarativeStream(BaseModel): + type: Literal["DynamicDeclarativeStream"] + stream_template: DeclarativeStream = Field( + ..., description="Reference to the stream template.", title="Stream Template" + ) + components_resolver: HttpComponentsResolver = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) + + CompositeErrorHandler.update_forward_refs() -DeclarativeSource.update_forward_refs() +DeclarativeSource1.update_forward_refs() +DeclarativeSource2.update_forward_refs() SelectiveAuthenticator.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 8cacda3d7..ed05b8e52 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -31,6 +31,12 @@ # DeclarativeStream "DeclarativeStream.retriever": "SimpleRetriever", "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", + # DynamicDeclarativeStream + "DynamicDeclarativeStream.stream_template": "DeclarativeStream", + "DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver", + # HttpComponentsResolver + "HttpComponentsResolver.retriever": "SimpleRetriever", + "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", # DefaultErrorHandler "DefaultErrorHandler.response_filters": "HttpResponseFilter", # DefaultPaginator diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2da84ff68..9a405f8f2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -119,6 +119,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CheckStream as CheckStreamModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ComponentMappingDefinition as ComponentMappingDefinitionModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CompositeErrorHandler as CompositeErrorHandlerModel, ) @@ -191,6 +194,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpComponentsResolver as HttpComponentsResolverModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) @@ -298,6 +304,7 @@ from airbyte_cdk.sources.declarative.partition_routers import ( CartesianProductStreamSlicer, ListPartitionRouter, + PartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter, ) @@ -338,6 +345,10 @@ ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod +from airbyte_cdk.sources.declarative.resolvers import ( + ComponentMappingDefinition, + HttpComponentsResolver, +) from airbyte_cdk.sources.declarative.retrievers import ( AsyncRetriever, SimpleRetriever, @@ -467,6 +478,8 @@ def _init_mappings(self) -> None: WaitTimeFromHeaderModel: self.create_wait_time_from_header, WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header, AsyncRetrieverModel: self.create_async_retriever, + HttpComponentsResolverModel: self.create_http_components_resolver, + ComponentMappingDefinitionModel: self.create_components_mapping_definition, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -1281,19 +1294,20 @@ def create_declarative_stream( parameters=model.parameters or {}, ) - def _merge_stream_slicers( - self, model: DeclarativeStreamModel, config: Config - ) -> Optional[StreamSlicer]: - stream_slicer = None + def _build_stream_slicer_from_partition_router( + self, + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + config: Config, + ) -> Optional[PartitionRouter]: if ( - hasattr(model.retriever, "partition_router") - and isinstance(model.retriever, SimpleRetrieverModel) - and model.retriever.partition_router + hasattr(model, "partition_router") + and isinstance(model, SimpleRetrieverModel) + and model.partition_router ): - stream_slicer_model = model.retriever.partition_router + stream_slicer_model = model.partition_router if isinstance(stream_slicer_model, list): - stream_slicer = CartesianProductStreamSlicer( + return CartesianProductStreamSlicer( [ self._create_component_from_model(model=slicer, config=config) for slicer in stream_slicer_model @@ -1301,9 +1315,24 @@ def _merge_stream_slicers( parameters={}, ) else: - stream_slicer = self._create_component_from_model( - model=stream_slicer_model, config=config - ) + return self._create_component_from_model(model=stream_slicer_model, config=config) # type: ignore[no-any-return] + # Will be created PartitionRouter as stream_slicer_model is model.partition_router + return None + + def _build_resumable_cursor_from_paginator( + self, + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + stream_slicer: Optional[StreamSlicer], + ) -> Optional[StreamSlicer]: + if hasattr(model, "paginator") and model.paginator and not stream_slicer: + # For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor` + return ResumableFullRefreshCursor(parameters={}) + return None + + def _merge_stream_slicers( + self, model: DeclarativeStreamModel, config: Config + ) -> Optional[StreamSlicer]: + stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) if model.incremental_sync and stream_slicer: incremental_sync_model = model.incremental_sync @@ -1346,15 +1375,7 @@ def _merge_stream_slicers( ), partition_router=stream_slicer, ) - elif ( - hasattr(model.retriever, "paginator") - and model.retriever.paginator - and not stream_slicer - ): - # For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor` - return ResumableFullRefreshCursor(parameters={}) - else: - return None + return self._build_resumable_cursor_from_paginator(model.retriever, stream_slicer) def create_default_error_handler( self, model: DefaultErrorHandlerModel, config: Config, **kwargs: Any @@ -2218,3 +2239,56 @@ def get_message_repository(self) -> MessageRepository: def _evaluate_log_level(self, emit_connector_builder_messages: bool) -> Level: return Level.DEBUG if emit_connector_builder_messages else Level.INFO + + @staticmethod + def create_components_mapping_definition( + model: ComponentMappingDefinitionModel, config: Config, **kwargs: Any + ) -> ComponentMappingDefinition: + interpolated_value = InterpolatedString.create( + model.value, parameters=model.parameters or {} + ) + field_path = [ + InterpolatedString.create(path, parameters=model.parameters or {}) + for path in model.field_path + ] + return ComponentMappingDefinition( + field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString + value=interpolated_value, + value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type), + parameters=model.parameters or {}, + ) + + def create_http_components_resolver( + self, model: HttpComponentsResolverModel, config: Config + ) -> Any: + stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) + combined_slicers = self._build_resumable_cursor_from_paginator( + model.retriever, stream_slicer + ) + + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name="", + primary_key=None, + stream_slicer=combined_slicers, + transformations=[], + ) + + components_mapping = [ + self._create_component_from_model( + model=components_mapping_definition_model, + value_type=ModelToComponentFactory._json_schema_type_name_to_type( + components_mapping_definition_model.value_type + ), + config=config, + ) + for components_mapping_definition_model in model.components_mapping + ] + + return HttpComponentsResolver( + retriever=retriever, + config=config, + components_mapping=components_mapping, + parameters=model.parameters or {}, + ) diff --git a/airbyte_cdk/sources/declarative/partition_routers/__init__.py b/airbyte_cdk/sources/declarative/partition_routers/__init__.py index 86e472a42..9487f5e1d 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/__init__.py +++ b/airbyte_cdk/sources/declarative/partition_routers/__init__.py @@ -6,5 +6,6 @@ from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter -__all__ = ["CartesianProductStreamSlicer", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter"] +__all__ = ["CartesianProductStreamSlicer", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter", "PartitionRouter"] diff --git a/airbyte_cdk/sources/declarative/resolvers/__init__.py b/airbyte_cdk/sources/declarative/resolvers/__init__.py new file mode 100644 index 000000000..17a3b5d52 --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/__init__.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.resolvers.components_resolver import ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition +from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import HttpComponentsResolver +from airbyte_cdk.sources.declarative.models import HttpComponentsResolver as HttpComponentsResolverModel + +COMPONENTS_RESOLVER_TYPE_MAPPING = { + "HttpComponentsResolver": HttpComponentsResolverModel +} + +__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "COMPONENTS_RESOLVER_TYPE_MAPPING"] diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py new file mode 100644 index 000000000..5975b3082 --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from dataclasses import InitVar, dataclass +from typing import Any, Dict, Iterable, List, Mapping, Optional, Type, Union + +from typing_extensions import deprecated + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.source import ExperimentalClassWarning + + +@dataclass(frozen=True) +class ComponentMappingDefinition: + """Defines the configuration for mapping a component in a stream. This class specifies + what field in the stream template should be updated with value, supporting dynamic interpolation + and type enforcement.""" + + field_path: List["InterpolatedString"] + value: Union["InterpolatedString", str] + value_type: Optional[Type[Any]] + parameters: InitVar[Mapping[str, Any]] + + +@dataclass(frozen=True) +class ResolvedComponentMappingDefinition: + """Defines resolved configuration for mapping a component in a stream. This class specifies + what field in the stream template should be updated with value, supporting dynamic interpolation + and type enforcement.""" + + field_path: List["InterpolatedString"] + value: "InterpolatedString" + value_type: Optional[Type[Any]] + parameters: InitVar[Mapping[str, Any]] + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class ComponentsResolver(ABC): + """ + Abstract base class for resolving components in a stream template. + """ + + @abstractmethod + def resolve_components( + self, stream_template_config: Dict[str, Any] + ) -> Iterable[Dict[str, Any]]: + """ + Maps and populates values into a stream template configuration. + :param stream_template_config: The stream template with placeholders for components. + :yields: The resolved stream config with populated values. + """ + pass diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py new file mode 100644 index 000000000..322b43683 --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -0,0 +1,106 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from copy import deepcopy +from dataclasses import InitVar, dataclass, field +from typing import Any, Dict, Iterable, List, Mapping + +import dpath +from typing_extensions import deprecated + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( + ComponentMappingDefinition, + ComponentsResolver, + ResolvedComponentMappingDefinition, +) +from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.types import Config + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class HttpComponentsResolver(ComponentsResolver): + """ + Resolves and populates stream templates with components fetched via an HTTP retriever. + + Attributes: + retriever (Retriever): The retriever used to fetch data from an API. + config (Config): Configuration object for the resolver. + components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. + parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. + """ + + retriever: Retriever + config: Config + components_mapping: List[ComponentMappingDefinition] + parameters: InitVar[Mapping[str, Any]] + _resolved_components: List[ResolvedComponentMappingDefinition] = field( + init=False, repr=False, default_factory=list + ) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + """ + Initializes and parses component mappings, converting them to resolved definitions. + + Args: + parameters (Mapping[str, Any]): Parameters for interpolation. + """ + for component_mapping in self.components_mapping: + if isinstance(component_mapping.value, (str, InterpolatedString)): + interpolated_value = ( + InterpolatedString.create(component_mapping.value, parameters=parameters) + if isinstance(component_mapping.value, str) + else component_mapping.value + ) + + field_path = [ + InterpolatedString.create(path, parameters=parameters) + for path in component_mapping.field_path + ] + + self._resolved_components.append( + ResolvedComponentMappingDefinition( + field_path=field_path, + value=interpolated_value, + value_type=component_mapping.value_type, + parameters=parameters, + ) + ) + else: + raise ValueError( + f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" + ) + + def resolve_components( + self, stream_template_config: Dict[str, Any] + ) -> Iterable[Dict[str, Any]]: + """ + Resolves components in the stream template configuration by populating values. + + Args: + stream_template_config (Dict[str, Any]): Stream template to populate. + + Yields: + Dict[str, Any]: Updated configurations with resolved components. + """ + kwargs = {"stream_template_config": stream_template_config} + + for components_values in self.retriever.read_records({}): + updated_config = deepcopy(stream_template_config) + kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] + + for resolved_component in self._resolved_components: + valid_types = ( + (resolved_component.value_type,) if resolved_component.value_type else None + ) + value = resolved_component.value.eval( + self.config, valid_types=valid_types, **kwargs + ) + + path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] + dpath.set(updated_config, path, value) + + yield updated_config diff --git a/unit_tests/sources/declarative/resolvers/__init__.py b/unit_tests/sources/declarative/resolvers/__init__.py new file mode 100644 index 000000000..66f6de8cb --- /dev/null +++ b/unit_tests/sources/declarative/resolvers/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py new file mode 100644 index 000000000..9e9fe225a --- /dev/null +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -0,0 +1,189 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +import pytest + +from airbyte_cdk.models import Type +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.resolvers import ( + ComponentMappingDefinition, + HttpComponentsResolver, +) +from airbyte_cdk.sources.embedded.catalog import ( + to_configured_catalog, + to_configured_stream, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + +_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} + +_MANIFEST = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + } + ], +} + + +@pytest.mark.parametrize( + "components_mapping, retriever_data, stream_template_config, expected_result", + [ + ( + [ + ComponentMappingDefinition( + field_path=[InterpolatedString.create("key1", parameters={})], + value="{{components_values['key1']}}", + value_type=str, + parameters={}, + ) + ], + [{"key1": "updated_value1", "key2": "updated_value2"}], + {"key1": None, "key2": None}, + [{"key1": "updated_value1", "key2": None}], + ) + ], +) +def test_http_components_resolver( + components_mapping, retriever_data, stream_template_config, expected_result +): + mock_retriever = MagicMock() + mock_retriever.read_records.return_value = retriever_data + config = {} + + resolver = HttpComponentsResolver( + retriever=mock_retriever, + config=config, + components_mapping=components_mapping, + parameters={}, + ) + + result = list(resolver.resolve_components(stream_template_config=stream_template_config)) + assert result == expected_result + + +def test_dynamic_streams_read(): + expected_stream_names = ["item_1", "item_2"] + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}]) + ), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/1"), + HttpResponse(body=json.dumps({"id": "1", "name": "item_1"})), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/2"), + HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config={}) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + records = [ + message.record + for message in source.read(MagicMock(), {}, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(actual_catalog.streams) == 2 + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == 2 + assert [record.stream for record in records] == expected_stream_names diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index 1f4b6df56..ea92bac5e 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -71,6 +71,116 @@ def use_external_yaml_spec(self): yield os.remove(yaml_path) + @pytest.fixture + def _base_manifest(self): + """Base manifest without streams or dynamic streams.""" + return { + "version": "3.8.2", + "description": "This is a sample source connector that is very valid.", + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + + @pytest.fixture + def _declarative_stream(self): + def declarative_stream_config( + name="lists", requester_type="HttpRequester", custom_requester=None + ): + """Generates a DeclarativeStream configuration.""" + requester_config = { + "type": requester_type, + "path": "/v3/marketing/lists", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config.apikey }}", + }, + "request_parameters": {"page_size": "{{ 10 }}"}, + } + if custom_requester: + requester_config.update(custom_requester) + + return { + "type": "DeclarativeStream", + "$parameters": { + "name": name, + "primary_key": "id", + "url_base": "https://api.sendgrid.com", + }, + "schema_loader": { + "name": "{{ parameters.stream_name }}", + "file_path": f"./source_sendgrid/schemas/{{{{ parameters.name }}}}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "page_size", + }, + "page_token_option": {"type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": requester_config, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + } + + return declarative_stream_config + + @pytest.fixture + def _dynamic_declarative_stream(self, _declarative_stream): + """Generates a DynamicDeclarativeStream configuration.""" + return { + "type": "DynamicDeclarativeStream", + "stream_template": _declarative_stream(), + "components_resolver": { + "type": "HttpComponentsResolver", + "$parameters": { + "name": "lists", + "primary_key": "id", + "url_base": "https://api.sendgrid.com", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "page_size", + }, + "page_token_option": {"type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config.apikey }}", + }, + "request_parameters": {"page_size": "{{ 10 }}"}, + }, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{ components_value['name'] }}", + } + ], + }, + } + def test_valid_manifest(self): manifest = { "version": "3.8.2", @@ -516,14 +626,37 @@ def test_source_missing_checker_fails_validation(self): with pytest.raises(ValidationError): ManifestDeclarativeSource(source_config=manifest) - def test_source_with_missing_streams_fails(self): - manifest = { - "version": "0.29.3", - "definitions": None, - "check": {"type": "CheckStream", "stream_names": ["lists"]}, - } + def test_source_with_missing_streams_and_dynamic_streams_fails( + self, _base_manifest, _dynamic_declarative_stream, _declarative_stream + ): + # test case for manifest without streams or dynamic streams + manifest_without_streams_and_dynamic_streams = _base_manifest with pytest.raises(ValidationError): - ManifestDeclarativeSource(source_config=manifest) + ManifestDeclarativeSource(source_config=manifest_without_streams_and_dynamic_streams) + + # test case for manifest with streams + manifest_with_streams = { + **manifest_without_streams_and_dynamic_streams, + "streams": [ + _declarative_stream(name="lists"), + _declarative_stream( + name="stream_with_custom_requester", + requester_type="CustomRequester", + custom_requester={ + "class_name": "unit_tests.sources.declarative.external_component.SampleCustomComponent", + "custom_request_parameters": {"page_size": 10}, + }, + ), + ], + } + ManifestDeclarativeSource(source_config=manifest_with_streams) + + # test case for manifest with dynamic streams + manifest_with_dynamic_streams = { + **manifest_without_streams_and_dynamic_streams, + "dynamic_streams": [_dynamic_declarative_stream], + } + ManifestDeclarativeSource(source_config=manifest_with_dynamic_streams) def test_source_with_missing_version_fails(self): manifest = { From 819555f1ae38201c11245667f3798b55e719361a Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 5 Dec 2024 10:05:15 -0800 Subject: [PATCH 2/2] chore: fix typing issues (#135) --- .../declarative/concurrent_declarative_source.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 371e34a87..0d3a176ce 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -206,7 +206,7 @@ def _group_streams( # so we need to treat them as synchronous if ( isinstance(declarative_stream, DeclarativeStream) - and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"] + and name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] == "SimpleRetriever" ): incremental_sync_component_definition = name_to_stream_mapping[ @@ -215,7 +215,7 @@ def _group_streams( partition_router_component_definition = ( name_to_stream_mapping[declarative_stream.name] - .get("retriever") + .get("retriever", {}) .get("partition_router") ) is_without_partition_router_or_cursor = not bool( @@ -237,7 +237,7 @@ def _group_streams( cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( state_manager=state_manager, model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, config=config or {}, @@ -320,10 +320,11 @@ def _group_streams( def _is_datetime_incremental_without_partition_routing( self, declarative_stream: DeclarativeStream, - incremental_sync_component_definition: Mapping[str, Any], + incremental_sync_component_definition: Mapping[str, Any] | None, ) -> bool: return ( - bool(incremental_sync_component_definition) + incremental_sync_component_definition is not None + and bool(incremental_sync_component_definition) and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ and self._stream_supports_concurrent_partition_processing(