Skip to content

Commit

Permalink
feat(airbyte-cdk): add gzipjson decoder (#20)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <[email protected]>
Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
artem1205 and octavia-squidington-iii authored Nov 14, 2024
1 parent 310d26d commit 39786d2
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 22 deletions.
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,45 @@ definitions:
type:
type: string
enum: [XmlDecoder]
CustomDecoder:
title: Custom Decoder
description: Use this to implement custom decoder logic.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomDecoder]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_amazon_ads.components.GzipJsonlDecoder"
$parameters:
type: object
additionalProperties: true
GzipJsonDecoder:
title: GzipJson Decoder
description: Use this if the response is Gzip compressed Json.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [GzipJsonDecoder]
encoding:
type: string
default: utf-8
$parameters:
type: object
additionalProperties: true
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -2404,10 +2443,12 @@ definitions:
title: Decoder
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2520,10 +2561,12 @@ definitions:
title: Decoder
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/declarative/decoders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#

from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder, GzipJsonDecoder
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder

__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
57 changes: 45 additions & 12 deletions airbyte_cdk/sources/declarative/decoders/json_decoder.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import codecs
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Generator, Mapping
from gzip import decompress
from typing import Any, Generator, Mapping, MutableMapping, List, Optional

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from orjson import orjson
import orjson

logger = logging.getLogger("airbyte")

Expand All @@ -24,24 +25,32 @@ class JsonDecoder(Decoder):
def is_stream_response(self) -> bool:
return False

def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
"""
try:
body_json = response.json()
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
yield from body_json
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
)
yield {}

@staticmethod
def parse_body_json(
body_json: MutableMapping[str, Any] | List[MutableMapping[str, Any]],
) -> Generator[MutableMapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
yield from body_json


@dataclass
class IterableDecoder(Decoder):
Expand All @@ -54,7 +63,9 @@ class IterableDecoder(Decoder):
def is_stream_response(self) -> bool:
return True

def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
for line in response.iter_lines():
yield {"record": line.decode()}

Expand All @@ -70,8 +81,30 @@ class JsonlDecoder(Decoder):
def is_stream_response(self) -> bool:
return True

def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
# TODO???: set delimiter? usually it is `\n` but maybe it would be useful to set optional?
# https://github.com/airbytehq/airbyte-internal-issues/issues/8436
for record in response.iter_lines():
yield orjson.loads(record)


@dataclass
class GzipJsonDecoder(JsonDecoder):
encoding: Optional[str]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if self.encoding:
try:
codecs.lookup(self.encoding)
except LookupError:
raise ValueError(
f"Invalid encoding '{self.encoding}'. Please check provided encoding"
)

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
raw_string = decompress(response.content).decode(encoding=self.encoding or "utf-8")
yield from self.parse_body_json(orjson.loads(raw_string))
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.models import (
DatetimeBasedCursor,
SubstreamPartitionRouter,
CustomIncrementalSync,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig


Expand Down Expand Up @@ -32,7 +36,7 @@ class LegacyToPerPartitionStateMigration(StateMigration):
def __init__(
self,
partition_router: SubstreamPartitionRouter,
cursor: DatetimeBasedCursor,
cursor: CustomIncrementalSync | DatetimeBasedCursor,
config: Mapping[str, Any],
parameters: Mapping[str, Any],
):
Expand Down Expand Up @@ -64,7 +68,7 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return False

# There is exactly one parent stream
number_of_parent_streams = len(self._partition_router.parent_stream_configs)
number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed
if number_of_parent_streams != 1:
# There should be exactly one parent stream
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,29 @@ class XmlDecoder(BaseModel):
type: Literal["XmlDecoder"]


class CustomDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomDecoder"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.",
examples=["source_amazon_ads.components.GzipJsonlDecoder"],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GzipJsonDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal["GzipJsonDecoder"]
encoding: Optional[str] = "utf-8"
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class MinMaxDatetime(BaseModel):
type: Literal["MinMaxDatetime"]
datetime: str = Field(
Expand Down Expand Up @@ -1620,7 +1643,16 @@ class SimpleRetriever(BaseModel):
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
title="Partition Router",
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = Field(
decoder: Optional[
Union[
CustomDecoder,
JsonDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
]
] = Field(
None,
description="Component decoding the response so records can be extracted.",
title="Decoder",
Expand Down Expand Up @@ -1680,7 +1712,16 @@ class AsyncRetriever(BaseModel):
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
title="Partition Router",
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = Field(
decoder: Optional[
Union[
CustomDecoder,
JsonDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
]
] = Field(
None,
description="Component decoding the response so records can be extracted.",
title="Decoder",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import (
Decoder,
GzipJsonDecoder,
IterableDecoder,
JsonDecoder,
JsonlDecoder,
Expand Down Expand Up @@ -134,6 +135,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomBackoffStrategy as CustomBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomDecoder as CustomDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomErrorHandler as CustomErrorHandlerModel,
)
Expand Down Expand Up @@ -182,6 +186,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipJsonDecoder as GzipJsonDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
HttpRequester as HttpRequesterModel,
)
Expand Down Expand Up @@ -402,6 +409,7 @@ def _init_mappings(self) -> None:
CursorPaginationModel: self.create_cursor_pagination,
CustomAuthenticatorModel: self.create_custom_component,
CustomBackoffStrategyModel: self.create_custom_component,
CustomDecoderModel: self.create_custom_component,
CustomErrorHandlerModel: self.create_custom_component,
CustomIncrementalSyncModel: self.create_custom_component,
CustomRecordExtractorModel: self.create_custom_component,
Expand All @@ -425,6 +433,7 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -619,11 +628,16 @@ def create_legacy_to_per_partition_state_migration(
"LegacyToPerPartitionStateMigrations can only be applied with a parent stream configuration."
)

if not hasattr(declarative_stream, "incremental_sync"):
raise ValueError(
"LegacyToPerPartitionStateMigrations can only be applied with an incremental_sync configuration."
)

return LegacyToPerPartitionStateMigration(
declarative_stream.retriever.partition_router,
declarative_stream.incremental_sync,
partition_router, # type: ignore # was already checked above
declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams.
config,
declarative_stream.parameters,
declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any]
) # type: ignore # The retriever type was already checked

def create_session_token_authenticator(
Expand Down Expand Up @@ -1548,6 +1562,12 @@ def create_iterable_decoder(
def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder:
return XmlDecoder(parameters={})

@staticmethod
def create_gzipjson_decoder(
model: GzipJsonDecoderModel, config: Config, **kwargs: Any
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

@staticmethod
def create_json_file_schema_loader(
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any
Expand Down
Loading

0 comments on commit 39786d2

Please sign in to comment.