Skip to content

Commit

Permalink
Merge branch 'main' into daryna/add-stream-slicer-to-http-component-r…
Browse files Browse the repository at this point in the history
…esolver
  • Loading branch information
darynaishchenko authored Dec 19, 2024
2 parents ec9e480 + 57e1b52 commit 8fb4d88
Show file tree
Hide file tree
Showing 21 changed files with 1,162 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,8 @@ definitions:
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1766,6 +1768,18 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_transformations:
title: Schema Transformations
description: A list of transformations to be applied to the schema.
type: array
items:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
Expand Down Expand Up @@ -1838,6 +1852,32 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeysToSnakeCase:
title: Key to Snake Case
description: A transformation that renames all keys to snake case.
type: object
required:
- type
properties:
type:
type: string
enum: [KeysToSnakeCase]
$parameters:
type: object
additionalProperties: true
FlattenFields:
title: Flatten Fields
description: A transformation that flatten record to single level format.
type: object
required:
- type
properties:
type:
type: string
enum: [FlattenFields]
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down Expand Up @@ -2160,7 +2200,9 @@ definitions:
description: |-
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}
- {
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
}
access_token_params:
title: Access Token Query Params (Json Encoded)
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,16 @@ class KeysToLower(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysToSnakeCase(BaseModel):
type: Literal["KeysToSnakeCase"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down Expand Up @@ -1654,7 +1664,16 @@ class Config:
title="Schema Loader",
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
]
]
] = Field(
None,
description="A list of transformations to be applied to each output record.",
Expand Down Expand Up @@ -1818,6 +1837,22 @@ class DynamicSchemaLoader(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_transformations: Optional[
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
]
]
] = Field(
None,
description="A list of transformations to be applied to the schema.",
title="Schema Transformations",
)
schema_type_identifier: SchemaTypeIdentifier
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FlattenFields as FlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipJsonDecoder as GzipJsonDecoderModel,
)
Expand Down Expand Up @@ -236,6 +239,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToLower as KeysToLowerModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToSnakeCase as KeysToSnakeCaseModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel,
)
Expand Down Expand Up @@ -323,6 +329,9 @@
SinglePartitionRouter,
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
AsyncJobPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
ParentStreamConfig,
)
Expand Down Expand Up @@ -387,9 +396,15 @@
RemoveFields,
)
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
KeysToSnakeCaseTransformation,
)
from airbyte_cdk.sources.message import (
InMemoryMessageRepository,
LogAppenderMessageRepositoryDecorator,
Expand Down Expand Up @@ -472,6 +487,8 @@ def _init_mappings(self) -> None:
JsonlDecoderModel: self.create_jsonl_decoder,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
Expand Down Expand Up @@ -587,6 +604,16 @@ def create_keys_to_lower_transformation(
) -> KeysToLowerTransformation:
return KeysToLowerTransformation()

def create_keys_to_snake_transformation(
self, model: KeysToSnakeCaseModel, config: Config, **kwargs: Any
) -> KeysToSnakeCaseTransformation:
return KeysToSnakeCaseTransformation()

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
return FlattenFields()

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
Expand Down Expand Up @@ -1638,6 +1665,13 @@ def create_dynamic_schema_loader(
model.retriever, stream_slicer
)

schema_transformations = []
if model.schema_transformations:
for transformation_model in model.schema_transformations:
schema_transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
Expand All @@ -1652,6 +1686,7 @@ def create_dynamic_schema_loader(
return DynamicSchemaLoader(
retriever=retriever,
config=config,
schema_transformations=schema_transformations,
schema_type_identifier=schema_type_identifier,
parameters=model.parameters or {},
)
Expand Down Expand Up @@ -2228,22 +2263,28 @@ def create_async_retriever(
urls_extractor=urls_extractor,
)

return AsyncRetriever(
async_job_partition_router = AsyncJobPartitionRouter(
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
stream_slices,
JobTracker(
1
), # FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
JobTracker(1),
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
self._message_repository,
has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
),
record_selector=record_selector,
stream_slicer=stream_slicer,
config=config,
parameters=model.parameters or {},
)

return AsyncRetriever(
record_selector=record_selector,
stream_slicer=async_job_partition_router,
config=config,
parameters=model.parameters or {},
)

@staticmethod
def create_spec(model: SpecModel, config: Config, **kwargs: Any) -> Spec:
return Spec(
Expand Down
10 changes: 9 additions & 1 deletion airbyte_cdk/sources/declarative/partition_routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import AsyncJobPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter

__all__ = ["CartesianProductStreamSlicer", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter", "PartitionRouter"]
__all__ = [
"AsyncJobPartitionRouter",
"CartesianProductStreamSlicer",
"ListPartitionRouter",
"SinglePartitionRouter",
"SubstreamPartitionRouter",
"PartitionRouter"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import InitVar, dataclass, field
from typing import Any, Callable, Iterable, Mapping, Optional

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import (
AsyncJobOrchestrator,
AsyncPartition,
)
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
SinglePartitionRouter,
)
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
from airbyte_cdk.sources.types import Config, StreamSlice
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


@dataclass
class AsyncJobPartitionRouter(StreamSlicer):
"""
Partition router that creates async jobs in a source API, periodically polls for job
completion, and supplies the completed job URL locations as stream slices so that
records can be extracted.
"""

config: Config
parameters: InitVar[Mapping[str, Any]]
job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator]
stream_slicer: StreamSlicer = field(
default_factory=lambda: SinglePartitionRouter(parameters={})
)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._job_orchestrator_factory = self.job_orchestrator_factory
self._job_orchestrator: Optional[AsyncJobOrchestrator] = None
self._parameters = parameters

def stream_slices(self) -> Iterable[StreamSlice]:
slices = self.stream_slicer.stream_slices()
self._job_orchestrator = self._job_orchestrator_factory(slices)

for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
yield StreamSlice(
partition=dict(completed_partition.stream_slice.partition)
| {"partition": completed_partition},
cursor_slice=completed_partition.stream_slice.cursor_slice,
)

def fetch_records(self, partition: AsyncPartition) -> Iterable[Mapping[str, Any]]:
"""
This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should
be responsible for. However, this was added in because the JobOrchestrator is required to
retrieve records. And without defining fetch_records() on this class, we're stuck with either
passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
"""

if not self._job_orchestrator:
raise AirbyteTracedException(
message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
failure_type=FailureType.system_error,
)

return self._job_orchestrator.fetch_records(partition=partition)
Loading

0 comments on commit 8fb4d88

Please sign in to comment.