Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Composite Decoder #179

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2719,9 +2719,59 @@ definitions:
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
- "$ref": "#/definitions/CompositeRawDecoder"
$parameters:
type: object
additionalProperties: true
CompositeRawDecoder:
description: "(This is experimental, use at your own risk)"
type: object
required:
- type
- parser
properties:
type:
type: string
enum: [ CompositeRawDecoder ]
parser:
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
# PARSERS
GzipParser:
type: object
required:
- type
- inner_parser
properties:
type:
type: string
enum: [ GzipParser ]
inner_parser:
anyOf:
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
JsonLineParser:
Comment on lines +2742 to +2755
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

The GzipParser implementation looks good, but could benefit from explicit error handling.

Looking at the implementation and tests, I notice that while it handles normal cases well (including different encodings), it might be worth adding explicit error handling for:

  • Empty or zero-sized inputs
  • Corrupted gzip data
  • Memory constraints with large files

Would you consider adding a try-catch block around the GzipFile operations to gracefully handle these edge cases? This would make the parser more robust in production scenarios. wdyt?

🔗 Analysis chain

GzipParser schema definition.
Is it worth clarifying how GzipParser interacts with various compression levels or edge cases, e.g., 0-sized input? wdyt?


Let me search for more context about the GzipParser implementation and its handling of compression levels and edge cases.

This will help us understand:

  1. The actual implementation details of GzipParser
  2. Any test cases that might reveal edge case handling
  3. Documentation that might explain compression level handling

Let me analyze the results to provide a more informed response.


Let me try a different search strategy to find the implementation and related files.

This should help us find:

  1. All files mentioning GzipParser
  2. The actual implementation with some context
  3. Test files that might show how edge cases are handled

Let me check the test file and implementation for more details about edge case handling.

This will help us understand:

  1. How the GzipParser is tested, including any edge case scenarios
  2. Complete implementation details including error handling
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for GzipParser implementation
ast-grep --pattern 'class GzipParser'

# Search for any tests related to GzipParser
rg -l "GzipParser" --type=python

# Look for documentation about GzipParser
fd -e md -e rst | xargs rg "GzipParser"

Length of output: 143


Script:

#!/bin/bash
# Search for files containing GzipParser
rg -l "GzipParser"

# Search for implementation details with context
rg -A 10 "class GzipParser"

# Look for test files
rg -l "test.*GzipParser"

Length of output: 2579


Script:

#!/bin/bash
# Check the test file content
cat unit_tests/sources/declarative/decoders/test_composite_decoder.py

# Get more context around the GzipParser implementation
rg -B 5 -A 15 "class GzipParser" airbyte_cdk/sources/declarative/decoders/composite_decoder.py

Length of output: 4399

type: object
properties:
encoding:
type: string
default: utf-8
CsvParser:
type: object
required:
- type
properties:
type:
type: string
enum: [ CsvParser ]
encoding:
type: string
default: utf-8
delimiter:
type: string
default: ","
AsyncJobStatusMap:
description: Matches the api job status to Async Job Status.
type: object
Expand Down
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
from airbyte_cdk.sources.declarative.decoders.composite_decoder import CompositeRawDecoder

__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder", "CompositeRawDecoder"]
104 changes: 104 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/composite_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import gzip
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from io import BufferedIOBase
from typing import Any, Generator, MutableMapping, Optional

import pandas as pd
import requests
from numpy import nan

from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

logger = logging.getLogger("airbyte")


@dataclass
class Parser(ABC):
@abstractmethod
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse data and yield dictionaries.
"""
pass


@dataclass
class GzipParser(Parser):
inner_parser: Parser

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.
"""
gzipobj = gzip.GzipFile(fileobj=data, mode="rb")
yield from self.inner_parser.parse(gzipobj)


@dataclass
class JsonLineParser(Parser):
encoding: Optional[str] = "utf-8"

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
for line in data:
try:
yield json.loads(line.decode(encoding=self.encoding))
except json.JSONDecodeError:
logger.warning(f"Cannot decode/parse line {line!r} as JSON")
# Handle invalid JSON lines gracefully (e.g., log and skip)
pass


@dataclass
class CsvParser(Parser):
# TODO: migrate implementation to re-use file-base classes
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse CSV data from decompressed bytes.
"""
reader = pd.read_csv(
data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding
)
for chunk in reader:
chunk = chunk.replace({nan: None}).to_dict(orient="records")
for row in chunk:
yield row


@dataclass
class CompositeRawDecoder(Decoder):
"""
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default.
parsers should be instantiated recursively.
Example:
composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
"""

parser: Parser

def is_stream_response(self) -> bool:
return True

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,16 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class JsonLineParser(BaseModel):
encoding: Optional[str] = "utf-8"


class CsvParser(BaseModel):
type: Literal["CsvParser"]
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","


class AsyncJobStatusMap(BaseModel):
type: Optional[Literal["AsyncJobStatusMap"]] = None
running: List[str]
Expand Down Expand Up @@ -1488,6 +1498,11 @@ class RecordSelector(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GzipParser(BaseModel):
type: Literal["GzipParser"]
inner_parser: Union[JsonLineParser, CsvParser]


class Spec(BaseModel):
type: Literal["Spec"]
connection_specification: Dict[str, Any] = Field(
Expand Down Expand Up @@ -1518,6 +1533,11 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CompositeRawDecoder(BaseModel):
type: Literal["CompositeRawDecoder"]
parser: Union[GzipParser, JsonLineParser, CsvParser]


class DeclarativeSource1(BaseModel):
class Config:
extra = Extra.forbid
Expand Down Expand Up @@ -1895,6 +1915,7 @@ class SimpleRetriever(BaseModel):
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
CompositeRawDecoder,
]
] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
PaginationDecoderDecorator,
XmlDecoder,
)
from airbyte_cdk.sources.declarative.decoders.composite_decoder import (
CompositeRawDecoder,
CsvParser,
GzipParser,
JsonLineParser,
)
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
RecordFilter,
Expand Down Expand Up @@ -125,6 +131,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CompositeErrorHandler as CompositeErrorHandlerModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CompositeRawDecoder as CompositeRawDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConcurrencyLevel as ConcurrencyLevelModel,
)
Expand All @@ -134,6 +143,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConstantBackoffStrategy as ConstantBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CsvParser as CsvParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CursorPagination as CursorPaginationModel,
)
Expand Down Expand Up @@ -200,6 +212,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipJsonDecoder as GzipJsonDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipParser as GzipParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
HttpComponentsResolver as HttpComponentsResolverModel,
)
Expand All @@ -224,6 +239,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonLineParser as JsonLineParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -440,6 +458,7 @@ def _init_mappings(self) -> None:
BearerAuthenticatorModel: self.create_bearer_authenticator,
CheckStreamModel: self.create_check_stream,
CompositeErrorHandlerModel: self.create_composite_error_handler,
CompositeRawDecoderModel: self.create_composite_raw_decoder,
ConcurrencyLevelModel: self.create_concurrency_level,
ConstantBackoffStrategyModel: self.create_constant_backoff_strategy,
CursorPaginationModel: self.create_cursor_pagination,
Expand Down Expand Up @@ -470,7 +489,9 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonLineParserModel: self.create_jsonline_parser,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
GzipParserModel: self.create_gzip_parser,
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -1666,6 +1687,12 @@ def create_jsonl_decoder(
) -> JsonlDecoder:
return JsonlDecoder(parameters={})

@staticmethod
def create_jsonline_parser(
model: JsonLineParserModel, config: Config, **kwargs: Any
) -> JsonLineParser:
return JsonLineParser(encoding=model.encoding)

@staticmethod
def create_iterable_decoder(
model: IterableDecoderModel, config: Config, **kwargs: Any
Expand All @@ -1682,6 +1709,22 @@ def create_gzipjson_decoder(
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

def create_gzip_parser(
self, model: GzipParserModel, config: Config, **kwargs: Any
) -> GzipParser:
inner_parser = self._create_component_from_model(model=model.inner_parser, config=config)
return GzipParser(inner_parser=inner_parser)

@staticmethod
def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser:
return CsvParser(encoding=model.encoding, delimiter=model.delimiter)

def create_composite_raw_decoder(
self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any
) -> CompositeRawDecoder:
parser = self._create_component_from_model(model=model.parser, config=config)
return CompositeRawDecoder(parser=parser)

@staticmethod
def create_json_file_schema_loader(
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any
Expand Down
Loading
Loading