Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add download_decoder + download_extractor #50

Merged
merged 13 commits into from
Dec 3, 2024
29 changes: 29 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
ResponseToFileExtractor:
title: CSV To File Extractor
description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed.
type: object
required:
- type
properties:
type:
type: string
enum: [ResponseToFileExtractor]
$parameters:
type: object
additionalProperties: true
ExponentialBackoffStrategy:
title: Exponential Backoff
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Expand Down Expand Up @@ -2513,6 +2526,12 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
download_extractor:
description: Responsible for fetching the records from provided urls.
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/ResponseToFileExtractor"
creation_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.
anyOf:
Expand Down Expand Up @@ -2567,6 +2586,16 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
download_decoder:
title: Download Decoder
description: Component decoding the download response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid
import zlib
from contextlib import closing
from dataclasses import InitVar, dataclass
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

import pandas as pd
Expand All @@ -19,6 +20,7 @@
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10


@dataclass
class ResponseToFileExtractor(RecordExtractor):
"""
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
Expand All @@ -28,7 +30,9 @@ class ResponseToFileExtractor(RecordExtractor):
a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
"""

def __init__(self) -> None:
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.logger = logging.getLogger("airbyte")

def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,11 @@ class DpathExtractor(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ResponseToFileExtractor(BaseModel):
type: Literal["ResponseToFileExtractor"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ExponentialBackoffStrategy(BaseModel):
type: Literal["ExponentialBackoffStrategy"]
factor: Optional[Union[float, str]] = Field(
Expand Down Expand Up @@ -1676,6 +1681,9 @@ class AsyncRetriever(BaseModel):
...,
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
)
download_extractor: Optional[
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
creation_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -1726,6 +1734,20 @@ class AsyncRetriever(BaseModel):
description="Component decoding the response so records can be extracted.",
title="Decoder",
)
download_decoder: Optional[
Union[
CustomDecoder,
JsonDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
]
] = Field(
None,
description="Component decoding the download response so records can be extracted.",
title="Download Decoder",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
RequestPath as RequestPathModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SelectiveAuthenticator as SelectiveAuthenticatorModel,
)
Expand Down Expand Up @@ -427,6 +430,7 @@ def _init_mappings(self) -> None:
DefaultErrorHandlerModel: self.create_default_error_handler,
DefaultPaginatorModel: self.create_default_paginator,
DpathExtractorModel: self.create_dpath_extractor,
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
HttpRequesterModel: self.create_http_requester,
Expand Down Expand Up @@ -1447,6 +1451,13 @@ def create_dpath_extractor(
parameters=model.parameters or {},
)

def create_response_to_file_extractor(
self,
model: ResponseToFileExtractorModel,
**kwargs: Any,
) -> ResponseToFileExtractor:
return ResponseToFileExtractor(parameters=model.parameters or {})

@staticmethod
def create_exponential_backoff_strategy(
model: ExponentialBackoffStrategyModel, config: Config
Expand Down Expand Up @@ -2011,6 +2022,7 @@ def create_async_retriever(
model=model.record_selector,
config=config,
decoder=decoder,
name=name,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
)
Expand All @@ -2028,16 +2040,36 @@ def create_async_retriever(
name=f"job polling - {name}",
)
job_download_components_name = f"job download - {name}"
download_decoder = (
self._create_component_from_model(model=model.download_decoder, config=config)
if model.download_decoder
else JsonDecoder(parameters={})
)
download_extractor = (
self._create_component_from_model(
model=model.download_extractor,
config=config,
decoder=download_decoder,
parameters=model.parameters,
)
if model.download_extractor
else DpathExtractor(
[],
config=config,
decoder=download_decoder,
parameters=model.parameters or {},
)
)
download_requester = self._create_component_from_model(
model=model.download_requester,
decoder=decoder,
decoder=download_decoder,
config=config,
name=job_download_components_name,
)
download_retriever = SimpleRetriever(
requester=download_requester,
record_selector=RecordSelector(
extractor=ResponseToFileExtractor(),
extractor=download_extractor,
name=name,
record_filter=None,
transformations=[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):

job_timeout: Optional[timedelta] = None
record_extractor: RecordExtractor = field(
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor()
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
)

def __post_init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class ResponseToFileExtractorTest(TestCase):
def setUp(self) -> None:
self._extractor = ResponseToFileExtractor()
self._extractor = ResponseToFileExtractor({})
self._http_mocker = requests_mock.Mocker()
self._http_mocker.__enter__()

Expand Down Expand Up @@ -76,7 +76,7 @@ def large_event_response_fixture():
@pytest.mark.limit_memory("20 MB")
def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
lines_in_response, file_path = large_events_response
extractor = ResponseToFileExtractor()
extractor = ResponseToFileExtractor({})

url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
requests_mock.get(url, body=open(file_path, "rb"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

# mypy: ignore-errors
import datetime
from typing import Any, Mapping
from typing import Any, Iterable, Mapping

import freezegun
import pendulum
import pytest
import requests

from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.models import FailureType, Level
Expand All @@ -27,6 +28,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
Expand All @@ -47,6 +49,9 @@
from airbyte_cdk.sources.declarative.models import (
CustomPartitionRouter as CustomPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models import (
CustomRecordExtractor as CustomRecordExtractorModel,
)
from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
Expand Down Expand Up @@ -3271,3 +3276,20 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
"state_type": "date-range",
"legacy": {},
}


class CustomRecordExtractor(RecordExtractor):
def extract_records(
self,
response: requests.Response,
) -> Iterable[Mapping[str, Any]]:
yield from response.json()


def test_create_custom_record_extractor():
definition = {
"type": "CustomRecordExtractor",
"class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.CustomRecordExtractor",
}
component = factory.create_component(CustomRecordExtractorModel, definition, {})
assert isinstance(component, CustomRecordExtractor)
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def setUp(self) -> None:
stream_response=True,
),
record_selector=RecordSelector(
extractor=ResponseToFileExtractor(),
extractor=ResponseToFileExtractor({}),
record_filter=None,
transformations=[],
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
Expand Down
Loading