Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add download_decoder + download_extractor #50

Merged
merged 13 commits into from
Dec 3, 2024
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2513,6 +2513,11 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
download_extractor:
description: Responsible for fetching the records from provided urls.
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
creation_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.
anyOf:
Expand Down Expand Up @@ -2567,6 +2572,16 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
download_decoder:
title: Download Decoder
description: Component decoding the download response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,9 @@ class AsyncRetriever(BaseModel):
...,
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
)
download_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
None, description="Responsible for fetching the records from provided urls."
)
creation_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -1726,6 +1729,20 @@ class AsyncRetriever(BaseModel):
description="Component decoding the response so records can be extracted.",
title="Decoder",
)
download_decoder: Optional[
Union[
CustomDecoder,
JsonDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
]
] = Field(
None,
description="Component decoding the download response so records can be extracted.",
title="Download Decoder",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
DpathExtractor,
RecordFilter,
RecordSelector,
ResponseToFileExtractor,
)
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
Expand Down Expand Up @@ -2024,16 +2023,34 @@ def create_async_retriever(
name=f"job polling - {name}",
)
job_download_components_name = f"job download - {name}"
download_decoder = (
self._create_component_from_model(model=model.download_decoder, config=config)
if model.download_decoder
else JsonDecoder(parameters={})
)
download_extractor = (
self._create_component_from_model(
model=model.download_extractor,
config=config,
decoder=download_decoder,
parameters=model.parameters,
)
if model.download_extractor
else DpathExtractor(
[], config=config, decoder=download_decoder, parameters=model.parameters
)
)
download_requester = self._create_component_from_model(
model=model.download_requester,
decoder=decoder,
decoder=download_decoder,
config=config,
name=job_download_components_name,
)
download_retriever = SimpleRetriever(
requester=download_requester,
record_selector=RecordSelector(
extractor=ResponseToFileExtractor(),
# extractor=ResponseToFileExtractor(),# old one
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
extractor=download_extractor,
record_filter=None,
transformations=[],
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
Expand Down
Loading