diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index fd8f6f22..74726aca 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2766,9 +2766,59 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/GzipJsonDecoder" + - "$ref": "#/definitions/CompositeRawDecoder" $parameters: type: object additionalProperties: true + CompositeRawDecoder: + description: "(This is experimental, use at your own risk)" + type: object + required: + - type + - parser + properties: + type: + type: string + enum: [ CompositeRawDecoder ] + parser: + anyOf: + - "$ref": "#/definitions/GzipParser" + - "$ref": "#/definitions/JsonLineParser" + - "$ref": "#/definitions/CsvParser" +# PARSERS + GzipParser: + type: object + required: + - type + - inner_parser + properties: + type: + type: string + enum: [ GzipParser ] + inner_parser: + anyOf: + - "$ref": "#/definitions/JsonLineParser" + - "$ref": "#/definitions/CsvParser" + JsonLineParser: + type: object + properties: + encoding: + type: string + default: utf-8 + CsvParser: + type: object + required: + - type + properties: + type: + type: string + enum: [ CsvParser ] + encoding: + type: string + default: utf-8 + delimiter: + type: string + default: "," AsyncJobStatusMap: description: Matches the api job status to Async Job Status. type: object diff --git a/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte_cdk/sources/declarative/decoders/__init__.py index 2d91b893..9fbcb163 100644 --- a/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.decoders.composite_decoder import CompositeRawDecoder from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import ( GzipJsonDecoder, @@ -17,6 +18,7 @@ __all__ = [ "Decoder", + "CompositeRawDecoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", diff --git a/airbyte_cdk/sources/declarative/decoders/composite_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py new file mode 100644 index 00000000..ba4ff5f6 --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/composite_decoder.py @@ -0,0 +1,102 @@ +import gzip +import json +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from io import BufferedIOBase +from typing import Any, Generator, MutableMapping, Optional + +import pandas as pd +import requests +from numpy import nan + +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder + +logger = logging.getLogger("airbyte") + + +@dataclass +class Parser(ABC): + @abstractmethod + def parse( + self, + data: BufferedIOBase, + ) -> Generator[MutableMapping[str, Any], None, None]: + """ + Parse data and yield dictionaries. + """ + pass + + +@dataclass +class GzipParser(Parser): + inner_parser: Parser + + def parse( + self, + data: BufferedIOBase, + ) -> Generator[MutableMapping[str, Any], None, None]: + """ + Decompress gzipped bytes and pass decompressed data to the inner parser. + """ + gzipobj = gzip.GzipFile(fileobj=data, mode="rb") + yield from self.inner_parser.parse(gzipobj) + + +@dataclass +class JsonLineParser(Parser): + encoding: Optional[str] = "utf-8" + + def parse( + self, + data: BufferedIOBase, + ) -> Generator[MutableMapping[str, Any], None, None]: + for line in data: + try: + yield json.loads(line.decode(encoding=self.encoding or "utf-8")) + except json.JSONDecodeError: + logger.warning(f"Cannot decode/parse line {line!r} as JSON") + + +@dataclass +class CsvParser(Parser): + # TODO: migrate implementation to re-use file-base classes + encoding: Optional[str] = "utf-8" + delimiter: Optional[str] = "," + + def parse( + self, + data: BufferedIOBase, + ) -> Generator[MutableMapping[str, Any], None, None]: + """ + Parse CSV data from decompressed bytes. + """ + reader = pd.read_csv( # type: ignore + data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding + ) + for chunk in reader: + chunk = chunk.replace({nan: None}).to_dict(orient="records") + for row in chunk: + yield row + + +@dataclass +class CompositeRawDecoder(Decoder): + """ + Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] + passed response.raw to parser(s). + Note: response.raw is not decoded/decompressed by default. + parsers should be instantiated recursively. + Example: + composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) + """ + + parser: Parser + + def is_stream_response(self) -> bool: + return True + + def decode( + self, response: requests.Response + ) -> Generator[MutableMapping[str, Any], None, None]: + yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 66b90c5d..4c0d9f2c 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1125,6 +1125,16 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class JsonLineParser(BaseModel): + encoding: Optional[str] = "utf-8" + + +class CsvParser(BaseModel): + type: Literal["CsvParser"] + encoding: Optional[str] = "utf-8" + delimiter: Optional[str] = "," + + class AsyncJobStatusMap(BaseModel): type: Optional[Literal["AsyncJobStatusMap"]] = None running: List[str] @@ -1504,6 +1514,11 @@ class RecordSelector(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class GzipParser(BaseModel): + type: Literal["GzipParser"] + inner_parser: Union[JsonLineParser, CsvParser] + + class Spec(BaseModel): type: Literal["Spec"] connection_specification: Dict[str, Any] = Field( @@ -1534,6 +1549,11 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class CompositeRawDecoder(BaseModel): + type: Literal["CompositeRawDecoder"] + parser: Union[GzipParser, JsonLineParser, CsvParser] + + class DeclarativeSource1(BaseModel): class Config: extra = Extra.forbid @@ -1936,6 +1956,7 @@ class SimpleRetriever(BaseModel): IterableDecoder, XmlDecoder, GzipJsonDecoder, + CompositeRawDecoder, ] ] = Field( None, diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index deebbe86..9fbfdec9 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -67,6 +67,12 @@ PaginationDecoderDecorator, XmlDecoder, ) +from airbyte_cdk.sources.declarative.decoders.composite_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, + JsonLineParser, +) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, RecordFilter, @@ -125,6 +131,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CompositeErrorHandler as CompositeErrorHandlerModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CompositeRawDecoder as CompositeRawDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) @@ -134,6 +143,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CsvParser as CsvParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CursorPagination as CursorPaginationModel, ) @@ -203,6 +215,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + GzipParser as GzipParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) @@ -227,6 +242,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonLineParser as JsonLineParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -455,6 +473,7 @@ def _init_mappings(self) -> None: BearerAuthenticatorModel: self.create_bearer_authenticator, CheckStreamModel: self.create_check_stream, CompositeErrorHandlerModel: self.create_composite_error_handler, + CompositeRawDecoderModel: self.create_composite_raw_decoder, ConcurrencyLevelModel: self.create_concurrency_level, ConstantBackoffStrategyModel: self.create_constant_backoff_strategy, CursorPaginationModel: self.create_cursor_pagination, @@ -485,7 +504,9 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + JsonLineParserModel: self.create_jsonline_parser, GzipJsonDecoderModel: self.create_gzipjson_decoder, + GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, FlattenFieldsModel: self.create_flatten_fields, @@ -1701,6 +1722,12 @@ def create_jsonl_decoder( ) -> JsonlDecoder: return JsonlDecoder(parameters={}) + @staticmethod + def create_jsonline_parser( + model: JsonLineParserModel, config: Config, **kwargs: Any + ) -> JsonLineParser: + return JsonLineParser(encoding=model.encoding) + @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any @@ -1717,6 +1744,22 @@ def create_gzipjson_decoder( ) -> GzipJsonDecoder: return GzipJsonDecoder(parameters={}, encoding=model.encoding) + def create_gzip_parser( + self, model: GzipParserModel, config: Config, **kwargs: Any + ) -> GzipParser: + inner_parser = self._create_component_from_model(model=model.inner_parser, config=config) + return GzipParser(inner_parser=inner_parser) + + @staticmethod + def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: + return CsvParser(encoding=model.encoding, delimiter=model.delimiter) + + def create_composite_raw_decoder( + self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any + ) -> CompositeRawDecoder: + parser = self._create_component_from_model(model=model.parser, config=config) + return CompositeRawDecoder(parser=parser) + @staticmethod def create_json_file_schema_loader( model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py new file mode 100644 index 00000000..ce5f58b6 --- /dev/null +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -0,0 +1,119 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +import csv +import gzip +import json +from io import BytesIO, StringIO + +import pytest +import requests + +from airbyte_cdk.sources.declarative.decoders.composite_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, + JsonLineParser, +) + + +def compress_with_gzip(data: str, encoding: str = "utf-8"): + """ + Compress the data using Gzip. + """ + buf = BytesIO() + with gzip.GzipFile(fileobj=buf, mode="wb") as f: + f.write(data.encode(encoding)) + return buf.getvalue() + + +def generate_csv(encoding: str) -> bytes: + """ + Generate CSV data with tab-separated values (\t). + """ + data = [ + {"id": 1, "name": "John", "age": 28}, + {"id": 2, "name": "Alice", "age": 34}, + {"id": 3, "name": "Bob", "age": 25}, + ] + + output = StringIO() + writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter="\t") + writer.writeheader() + for row in data: + writer.writerow(row) + + # Ensure the pointer is at the beginning of the buffer before compressing + output.seek(0) + + # Compress the CSV data with Gzip + compressed_data = compress_with_gzip(output.read(), encoding=encoding) + + return compressed_data + + +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=generate_csv(encoding=encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=CsvParser(encoding=encoding, delimiter="\t")) + composite_decoder = CompositeRawDecoder(parser=parser) + counter = 0 + for _ in composite_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def generate_jsonlines(): + """ + Generator function to yield data in JSON Lines format. + This is useful for streaming large datasets. + """ + data = [ + {"id": 1, "message": "Hello, World!"}, + {"id": 2, "message": "Welcome to JSON Lines"}, + {"id": 3, "message": "Streaming data is fun!"}, + ] + for item in data: + yield json.dumps(item) + "\n" # Serialize as JSON Lines + + +def generate_compressed_jsonlines(encoding: str = "utf-8") -> bytes: + """ + Generator to compress the entire response content with Gzip and encode it. + """ + json_lines_content = "".join(generate_jsonlines()) + compressed_data = compress_with_gzip(json_lines_content, encoding=encoding) + return compressed_data + + +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str): + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) + composite_decoder = CompositeRawDecoder(parser=parser) + counter = 0 + for _ in composite_decoder.decode(response): + counter += 1 + assert counter == 3 + + +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): + response_content = "".join(generate_jsonlines()) + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=response_content.encode(encoding=encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + composite_decoder = CompositeRawDecoder(parser=JsonLineParser(encoding=encoding)) + counter = 0 + for _ in composite_decoder.decode(response): + counter += 1 + assert counter == 3 diff --git a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py new file mode 100644 index 00000000..e0cbe4c8 --- /dev/null +++ b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py @@ -0,0 +1,106 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +import gzip +import json +import os + +import pytest +import requests + +from airbyte_cdk import YamlDeclarativeSource +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder +from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( + ModelToComponentFactory, +) + + +@pytest.fixture(name="large_events_response") +def large_event_response_fixture(): + data = {"email": "email1@example.com"} + jsonl_string = f"{json.dumps(data)}\n" + lines_in_response = 2_000_000 # ≈ 58 MB of response + dir_path = os.path.dirname(os.path.realpath(__file__)) + file_path = f"{dir_path}/test_response.txt" + with open(file_path, "w") as file: + for _ in range(lines_in_response): + file.write(jsonl_string) + yield (lines_in_response, file_path) + os.remove(file_path) + + +@pytest.mark.slow +@pytest.mark.limit_memory("20 MB") +@pytest.mark.parametrize( + "decoder_yaml_definition", + [ + "type: JsonlDecoder", + """type: CompositeRawDecoder + parser: + type: JsonLineParser + """, + ], +) +def test_jsonl_decoder_memory_usage( + requests_mock, large_events_response, decoder_yaml_definition: str +): + # + lines_in_response, file_path = large_events_response + content = f""" + name: users + type: DeclarativeStream + retriever: + type: SimpleRetriever + decoder: + {decoder_yaml_definition} + paginator: + type: "NoPagination" + requester: + path: "users/{{{{ stream_slice.slice }}}}" + type: HttpRequester + url_base: "https://for-all-mankind.nasa.com/api/v1" + http_method: GET + authenticator: + type: NoAuth + request_headers: {{}} + request_body_json: {{}} + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + partition_router: + type: ListPartitionRouter + cursor_field: "slice" + values: + - users1 + - users2 + - users3 + - users4 + primary_key: [] + """ + + factory = ModelToComponentFactory() + stream_manifest = YamlDeclarativeSource._parse(content) + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} + ) + + def get_body(): + return open(file_path, "rb", buffering=30) + + counter = 0 + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) + requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) + + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) + for stream_slice in stream_slices: + for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): + counter += 1 + + assert counter == lines_in_response * len(stream_slices) diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index bb2dd0c9..087619dc 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -8,14 +8,8 @@ import pytest import requests -from airbyte_cdk import YamlDeclarativeSource -from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.decoders import GzipJsonDecoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder -from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel -from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( - ModelToComponentFactory, -) @pytest.mark.parametrize( @@ -64,67 +58,6 @@ def large_event_response_fixture(): os.remove(file_path) -@pytest.mark.slow -@pytest.mark.limit_memory("20 MB") -def test_jsonl_decoder_memory_usage(requests_mock, large_events_response): - lines_in_response, file_path = large_events_response - content = """ - name: users - type: DeclarativeStream - retriever: - type: SimpleRetriever - decoder: - type: JsonlDecoder - paginator: - type: "NoPagination" - requester: - path: "users/{{ stream_slice.slice }}" - type: HttpRequester - url_base: "https://for-all-mankind.nasa.com/api/v1" - http_method: GET - authenticator: - type: NoAuth - request_headers: {} - request_body_json: {} - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - partition_router: - type: ListPartitionRouter - cursor_field: "slice" - values: - - users1 - - users2 - - users3 - - users4 - primary_key: [] - """ - - factory = ModelToComponentFactory() - stream_manifest = YamlDeclarativeSource._parse(content) - stream = factory.create_component( - model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={} - ) - - def get_body(): - return open(file_path, "rb", buffering=30) - - counter = 0 - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body()) - requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body()) - - stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) - for stream_slice in stream_slices: - for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice): - counter += 1 - - assert counter == lines_in_response * len(stream_slices) - - @pytest.mark.parametrize( "encoding", [