-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add Composite Decoder #179
base: main
Are you sure you want to change the base?
Changes from 17 commits
5b7724d
990584e
50782f7
17add9e
655ce35
513aa43
eada5bf
1984ef1
8d3f82a
f2c9b0a
961356c
5fa937c
1b85c26
a181608
5276ed1
adf9d3d
7604b99
bbd5e3a
f9a97db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2719,9 +2719,59 @@ definitions: | |
- "$ref": "#/definitions/IterableDecoder" | ||
- "$ref": "#/definitions/XmlDecoder" | ||
- "$ref": "#/definitions/GzipJsonDecoder" | ||
- "$ref": "#/definitions/CompositeRawDecoder" | ||
$parameters: | ||
type: object | ||
additionalProperties: true | ||
CompositeRawDecoder: | ||
description: "(This is experimental, use at your own risk)" | ||
type: object | ||
required: | ||
- type | ||
- parser | ||
properties: | ||
type: | ||
type: string | ||
enum: [ CompositeRawDecoder ] | ||
parser: | ||
anyOf: | ||
- "$ref": "#/definitions/GzipParser" | ||
- "$ref": "#/definitions/JsonLineParser" | ||
- "$ref": "#/definitions/CsvParser" | ||
# PARSERS | ||
GzipParser: | ||
type: object | ||
required: | ||
- type | ||
- inner_parser | ||
properties: | ||
type: | ||
type: string | ||
enum: [ GzipParser ] | ||
inner_parser: | ||
anyOf: | ||
- "$ref": "#/definitions/JsonLineParser" | ||
- "$ref": "#/definitions/CsvParser" | ||
JsonLineParser: | ||
artem1205 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
type: object | ||
properties: | ||
encoding: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this definition is missing in the definition, the requirement that every component be explicitly typed:
|
||
type: string | ||
default: utf-8 | ||
CsvParser: | ||
type: object | ||
required: | ||
- type | ||
properties: | ||
type: | ||
type: string | ||
enum: [ CsvParser ] | ||
encoding: | ||
type: string | ||
default: utf-8 | ||
delimiter: | ||
type: string | ||
default: "," | ||
AsyncJobStatusMap: | ||
description: Matches the api job status to Async Job Status. | ||
type: object | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import gzip | ||
import json | ||
import logging | ||
from abc import ABC, abstractmethod | ||
from dataclasses import dataclass | ||
from io import BufferedIOBase | ||
from typing import Any, Generator, MutableMapping, Optional | ||
|
||
import pandas as pd | ||
import requests | ||
from numpy import nan | ||
|
||
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder | ||
|
||
logger = logging.getLogger("airbyte") | ||
|
||
|
||
@dataclass | ||
class Parser(ABC): | ||
@abstractmethod | ||
def parse( | ||
self, | ||
data: BufferedIOBase, | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
""" | ||
Parse data and yield dictionaries. | ||
""" | ||
pass | ||
|
||
|
||
@dataclass | ||
class GzipParser(Parser): | ||
inner_parser: Parser | ||
|
||
def parse( | ||
self, | ||
data: BufferedIOBase, | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
""" | ||
Decompress gzipped bytes and pass decompressed data to the inner parser. | ||
""" | ||
gzipobj = gzip.GzipFile(fileobj=data, mode="rb") | ||
yield from self.inner_parser.parse(gzipobj) | ||
|
||
|
||
@dataclass | ||
class JsonLineParser(Parser): | ||
encoding: Optional[str] = "utf-8" | ||
|
||
def parse( | ||
self, | ||
data: BufferedIOBase, | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
for line in data: | ||
try: | ||
yield json.loads(line.decode(encoding=self.encoding or "utf-8")) | ||
except json.JSONDecodeError: | ||
logger.warning(f"Cannot decode/parse line {line!r} as JSON") | ||
|
||
|
||
@dataclass | ||
class CsvParser(Parser): | ||
# TODO: migrate implementation to re-use file-base classes | ||
encoding: Optional[str] = "utf-8" | ||
delimiter: Optional[str] = "," | ||
|
||
def parse( | ||
self, | ||
data: BufferedIOBase, | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
""" | ||
Parse CSV data from decompressed bytes. | ||
""" | ||
reader = pd.read_csv( # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was there a reason why we're opting for The main reason I ask is that if we plan to migrate this implementation to use file based, it would probably make sense to use the same library. How much lift do you estimate it would be to use |
||
data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding | ||
) | ||
for chunk in reader: | ||
chunk = chunk.replace({nan: None}).to_dict(orient="records") | ||
for row in chunk: | ||
yield row | ||
|
||
|
||
@dataclass | ||
class CompositeRawDecoder(Decoder): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for consistency w/ the file name, should the file be renamed |
||
""" | ||
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] | ||
passed response.raw to parser(s). | ||
Note: response.raw is not decoded/decompressed by default. | ||
parsers should be instantiated recursively. | ||
Example: | ||
composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) | ||
""" | ||
|
||
parser: Parser | ||
|
||
def is_stream_response(self) -> bool: | ||
return True | ||
|
||
def decode( | ||
self, response: requests.Response | ||
) -> Generator[MutableMapping[str, Any], None, None]: | ||
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,12 @@ | |
PaginationDecoderDecorator, | ||
XmlDecoder, | ||
) | ||
from airbyte_cdk.sources.declarative.decoders.composite_decoder import ( | ||
CompositeRawDecoder, | ||
CsvParser, | ||
GzipParser, | ||
JsonLineParser, | ||
) | ||
from airbyte_cdk.sources.declarative.extractors import ( | ||
DpathExtractor, | ||
RecordFilter, | ||
|
@@ -125,6 +131,9 @@ | |
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
CompositeErrorHandler as CompositeErrorHandlerModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
CompositeRawDecoder as CompositeRawDecoderModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
ConcurrencyLevel as ConcurrencyLevelModel, | ||
) | ||
|
@@ -134,6 +143,9 @@ | |
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
ConstantBackoffStrategy as ConstantBackoffStrategyModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
CsvParser as CsvParserModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
CursorPagination as CursorPaginationModel, | ||
) | ||
|
@@ -200,6 +212,9 @@ | |
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
GzipJsonDecoder as GzipJsonDecoderModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
GzipParser as GzipParserModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
HttpComponentsResolver as HttpComponentsResolverModel, | ||
) | ||
|
@@ -224,6 +239,9 @@ | |
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
JsonlDecoder as JsonlDecoderModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
JsonLineParser as JsonLineParserModel, | ||
) | ||
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( | ||
JwtAuthenticator as JwtAuthenticatorModel, | ||
) | ||
|
@@ -440,6 +458,7 @@ def _init_mappings(self) -> None: | |
BearerAuthenticatorModel: self.create_bearer_authenticator, | ||
CheckStreamModel: self.create_check_stream, | ||
CompositeErrorHandlerModel: self.create_composite_error_handler, | ||
CompositeRawDecoderModel: self.create_composite_raw_decoder, | ||
ConcurrencyLevelModel: self.create_concurrency_level, | ||
ConstantBackoffStrategyModel: self.create_constant_backoff_strategy, | ||
CursorPaginationModel: self.create_cursor_pagination, | ||
|
@@ -470,7 +489,9 @@ def _init_mappings(self) -> None: | |
InlineSchemaLoaderModel: self.create_inline_schema_loader, | ||
JsonDecoderModel: self.create_json_decoder, | ||
JsonlDecoderModel: self.create_jsonl_decoder, | ||
JsonLineParserModel: self.create_jsonline_parser, | ||
GzipJsonDecoderModel: self.create_gzipjson_decoder, | ||
GzipParserModel: self.create_gzip_parser, | ||
KeysToLowerModel: self.create_keys_to_lower_transformation, | ||
IterableDecoderModel: self.create_iterable_decoder, | ||
XmlDecoderModel: self.create_xml_decoder, | ||
|
@@ -1666,6 +1687,12 @@ def create_jsonl_decoder( | |
) -> JsonlDecoder: | ||
return JsonlDecoder(parameters={}) | ||
|
||
@staticmethod | ||
def create_jsonline_parser( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit should be |
||
model: JsonLineParserModel, config: Config, **kwargs: Any | ||
) -> JsonLineParser: | ||
return JsonLineParser(encoding=model.encoding) | ||
|
||
@staticmethod | ||
def create_iterable_decoder( | ||
model: IterableDecoderModel, config: Config, **kwargs: Any | ||
|
@@ -1682,6 +1709,22 @@ def create_gzipjson_decoder( | |
) -> GzipJsonDecoder: | ||
return GzipJsonDecoder(parameters={}, encoding=model.encoding) | ||
|
||
def create_gzip_parser( | ||
self, model: GzipParserModel, config: Config, **kwargs: Any | ||
) -> GzipParser: | ||
inner_parser = self._create_component_from_model(model=model.inner_parser, config=config) | ||
return GzipParser(inner_parser=inner_parser) | ||
|
||
@staticmethod | ||
def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: | ||
return CsvParser(encoding=model.encoding, delimiter=model.delimiter) | ||
|
||
def create_composite_raw_decoder( | ||
self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any | ||
) -> CompositeRawDecoder: | ||
parser = self._create_component_from_model(model=model.parser, config=config) | ||
return CompositeRawDecoder(parser=parser) | ||
|
||
@staticmethod | ||
def create_json_file_schema_loader( | ||
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very small nit, for consistency, lets remove the space between
[
and the name`