-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add Composite Decoder #179
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a couple of questions to improve my understanding
|
||
@dataclass | ||
class Parser(ABC): | ||
inner_parser: Optional["Parser"] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems weird to me that the interface exposes an inner_parser as a public field. Should this be removed from the interface and the parser implementations will decide whatever they use internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be removed from the interface and the parser implementations will decide whatever they use internally?
this was added to interface to use declarative approach and wrap parser-in-parser-in-parser as many times as it is needed, e.g.:
- decoder:
type: CompositeRawDecoder
parser:
type: GzipParser
inner_parser:
type: parser1
inner_parser:
type: parser2
...
In my understanding parent parsers does not know about inner_parser implementation and all they do is just pass transformed object down through the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that it'll be part of the implementations but not the interface. So for example in declarative_component_schema.yaml
, we will have:
DefaultDecoder:
title: Decoder with a parser
type: object
required:
- type
- parser
properties:
type:
type: string
enum: [DefaultDecoder]
parser:
anyOf:
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/JsonlParser"
<...>
CsvParser:
title: CSV Parser
type: object
required:
- type
properties:
type:
type: string
enum: [CsvParser]
<probably some CSV parsing options that we have for file based>
GzipParser:
title: GZIP Parser
type: object
required:
- type
- underlying_parser
properties:
type:
type: string
enum: [GzipParser]
underlying_parser:
anyOf:
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/JsonlParser"
<...>
Each of these parser components will have their instantiation method in the model_to_component_factory and if they require an inner parser (like the GzipParser), we will instante that parser at that point. So the fact that a parser uses a parser underneath does not need to be exposed as part of the Python interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, refactored
|
||
@abstractmethod | ||
def parse( | ||
self, data: BufferedDataInput | Iterable, *args, **kwargs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused by this interface. Does that mean that all the users with have to check the data
parameter as such:
def parse(
self, data: BufferedDataInput | Iterable, *args, **kwargs
) -> Generator[MutableMapping[str, Any], None, None]:
if isinstance(data, BufferedDataInput):
< do X >
elif isinstance(data, Iterable):
< do Y >
else:
raise ValueError(f"Unexpected type {type(data)}")
I also don't see the usage of BufferedDataInput
in the sample parsers you provided below, maybe I missed it. Can you show me how this would be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that mean that all the users with have to check the data parameter as such:
Usually not, cause I expect user to choose valid parser sequence.
I also don't see the usage of BufferedDataInput in the sample parsers you provided below, maybe I missed it. Can you show me how this would be used?
We start processing from passing response.raw
in
self.parser.parse(data=response.raw)
which in fact is readable (buffered) object.
any decompressor, like gzip or zip returns decompressor object , which in turn is also readable (buffered).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok! I really like the idea of having a buffered reader as a part of the interface. Maybe it should even be a by more meaning that the file-based CsvParser is relying on more methods than just "read". Therefore, should it be IOBase instead of BufferedDataInput
?
So I think my question now is: can we remove Iterable
from the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess yes, refactored interface to BufferedIOBase
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's cool! I think this is a good improvement
class Parser(ABC): | ||
@abstractmethod | ||
def parse( | ||
self, data: BufferedIOBase, *args, **kwargs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I understand: Why do we have *args, **kwargs
in the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no reason (or real use-case) for now, can be deleted.
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
📝 WalkthroughWalkthroughThis pull request introduces a new Changes
Sequence DiagramsequenceDiagram
participant Response as HTTP Response
participant Decoder as CompositeRawDecoder
participant Parser as Selected Parser
participant Result as Parsed Records
Response->>Decoder: Receive raw response
Decoder->>Parser: Select appropriate parser
Parser->>Parser: Parse data (Gzip/JSON/CSV)
Parser-->>Result: Generate record dictionaries
Possibly related PRs
Suggested labels
Suggested reviewers
Hey there! 👋 I noticed you've added some really cool parsing capabilities. Would you be interested in discussing how these new parsers might handle edge cases? Wdyt about adding some additional error handling or logging for malformed data? 🤔 Tip CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (20)
airbyte_cdk/sources/declarative/decoders/__init__.py (1)
10-10
: Nice import of CompositeRawDecoder.
This addition expands the module's offerings. Perhaps we might include a docstring in the composite_decoder.py file to explain its use cases for future readers, wdyt?airbyte_cdk/sources/declarative/decoders/composite_decoder.py (4)
18-19
: Interface design looks clear.
Introducing Parser as an ABC is a neat approach to unify custom parsers. Perhaps we could add a short docstring on the intended usage or common parameters, so new contributors can adopt the pattern seamlessly, wdyt?
55-58
: Potential optimization for error handling.
When JSON decoding fails, we log a warning and skip. This is good for resilience, but might it be helpful to have a configurable option to raise an exception if unexpected lines are encountered, wdyt?
76-83
: Potential multi-chunk reading concern.
Reading large CSVs in chunks is effective. Could we consider memory usage limits or advanced chunk sizing if extremely large files are processed, wdyt?
95-104
: CompositeRawDecoder usage clarity.
Implementation looks straightforward, but could we include an inline doc or example clarifying what happens if the underlying parser fails on a line (e.g., how are partial results handled), wdyt?unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (2)
25-31
: Pragmatic approach for test data generation.
Generating 2 million lines is a hefty load. Should we consider adding an environment-based toggle or a smaller default for local testing, wdyt?
35-36
: Tip on skipping slow tests.
We decorate this with @pytest.mark.slow. Maybe we can also provide a quick way to skip slow tests locally unless explicitly enabled, wdyt?unit_tests/sources/declarative/decoders/test_composite_decoder.py (2)
55-67
: Gzip CSV decoding test coverage is good!
Testing with multiple encodings ensures robust coverage. Could we also add a test scenario for missing headers or partial CSV lines, wdyt?
93-106
: Gzip JSON lines test scenario is comprehensive.
We check repeated iteration logic and assert record counts. Might it be useful to test a scenario containing at least one invalid line within a gzipped bundle, wdyt?airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
1112-1114
: Consider clarifying the class docstring.
Would you consider adding a short docstring to explain the JsonLineParser's usage? This can help future maintainers easily identify its purpose. wdyt?
1116-1121
: Check consistency with other parsers.
You might consider adding docstrings or class-level comments like with JsonLineParser for uniformity. wdyt?
1536-1540
: Allowing flexible parser composition.
Might it be helpful to add a small example or usage note on how the CompositeRawDecoder is configured, especially with nested formats? This could reduce confusion for new developers. wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
146-148
: CsvParserModel import.
Nice addition. Could we also provide a quick usage example in the docstring for create_csv_parser to guide developers? wdyt?
242-244
: JsonLineParserModel import.
Everything looks aligned. Possibly add a note or reference in docstrings to ensure clarity for new maintainers. wdyt?
461-461
: Adding CompositeRawDecoder constructor mapping.
This is a straightforward approach. Perhaps confirm we log or handle the scenario if a user tries to combine multiple decoders? wdyt?
1690-1695
: Clean approach for JsonLineParser creation.
Consider adding input validation for cases where encoding is None to avoid runtime errors if a user sets it unexpectedly. wdyt?
1720-1725
: CompositeRawDecoder creation method.
Perhaps log the selected parser type at runtime to aid debugging. wdyt?airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
2726-2728
: Experimental annotation in CompositeRawDecoder.
Would you consider a short note specifying potential changes to the interface, to caution users about relying on it immediately? wdyt?
2756-2761
: JsonLineParser YAML entry.
Everything here is straightforward. Maybe add a quick usage snippet explaining how the encoding field is applied. wdyt?
2762-2774
: CsvParser defaults.
Would you consider adding an example for unusual delimiters like "\t" or ";" to the documentation to make it more explicit? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/decoders/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/decoders/composite_decoder.py
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)unit_tests/sources/declarative/decoders/test_composite_decoder.py
(1 hunks)unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
(1 hunks)unit_tests/sources/declarative/decoders/test_json_decoder.py
(0 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/declarative/decoders/test_json_decoder.py
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/decoders/__init__.py (1)
12-12
: Exporting CompositeRawDecoder via all is a good step.
This ensures that the decoder is publicly accessible when importing from the package. No issues found here!
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1)
33-33
: Question about layering multiple GzipParsers.
Even though GzipParser can wrap another parser, might we want to add a quick guard that ensures we’re not repeatedly decompressing if the file is already decompressed, wdyt?
unit_tests/sources/declarative/decoders/test_composite_decoder.py (1)
108-119
: Nice variety of encodings for JSON lines.
Everything looks smooth here. No further suggestions.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
1501-1505
: Fallback strategy for inner_parser.
Could it be valuable to define a fallback parser if none is specified? This might guard against unexpected null references if inner_parser is optional in future. wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
70-75
: Imports appear correct.
Everything looks consistent with the newly introduced classes from composite_decoder. Great job!
1716-1719
: CSV parser constructor.
This is good. Could setting a default delimiter to "," be validated against potential multi-character delimiters or non-standard uses? wdyt?
134-136
: Ensure alignment with newly added model.
Would you consider verifying that CompositeRawDecoderModel is properly referenced in all relevant factory methods? wdyt?
✅ Verification successful
Let me gather information about the usage of CompositeRawDecoderModel in the codebase.
This will help us understand:
- Where CompositeRawDecoderModel is used in factory methods
- All references to CompositeRawDecoderModel in the codebase
- All factory-related files that might need to reference this model
Let me check the test file to ensure proper test coverage for the CompositeRawDecoderModel.
Let me try one more search to look for any composite decoder related tests with a broader pattern.
CompositeRawDecoderModel is properly integrated in the factory
Based on the codebase analysis, the CompositeRawDecoderModel is correctly referenced in all necessary places within the model_to_component_factory.py:
- It's properly imported from the schema
- Registered in the factory mapping to its creation method
- Has a dedicated factory method
create_composite_raw_decoder
with correct type hints
However, I notice there might be missing test coverage. Would you consider adding tests for the composite raw decoder factory method to ensure its reliability? wdyt?
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for CompositeRawDecoderModel usage in factory methods
ast-grep --pattern 'def $_(self, $_, CompositeRawDecoderModel, $_) { $$$ }'
# Search for any references to CompositeRawDecoderModel
rg "CompositeRawDecoderModel" -A 2
# Look for factory-related files that might need to reference this model
fd "factory" --type f
Length of output: 1464
Script:
#!/bin/bash
# Check test coverage for CompositeRawDecoderModel
rg "CompositeRawDecoderModel" unit_tests/sources/declarative/parsers/test_model_to_component_factory.py -A 3
# Look for test cases related to composite raw decoder
ast-grep --pattern 'def test_$_composite_raw_decoder$_($$$) { $$$ }' unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Length of output: 253
Script:
#!/bin/bash
# Search for any composite decoder related tests with a broader pattern
rg "composite.*decoder" -i unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
# Also check the actual implementation details of the factory method
rg "create_composite_raw_decoder" -A 5 airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Length of output: 920
215-217
: Confirm references to GzipParserModel.
Just to be sure: do we need specialized error handling or fallback for mismatched references to GzipParserModel? wdyt?
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
2722-2722
: Reference to CompositeRawDecoder.
Looks good for including CompositeRawDecoder in the SimpleRetriever’s decoder list. Have you verified any potential conflicts with existing decoders? wdyt?
✅ Verification successful
Let me verify the potential conflicts with existing decoders by searching through the codebase.
Let me search for more specific information about the CompositeRawDecoder and its parsers.
CompositeRawDecoder looks compatible with existing decoders
Based on the schema definitions, the CompositeRawDecoder is designed to work with a specific set of parsers (GzipParser, JsonLineParser, CsvParser) in a composable way. There are no conflicts visible as:
- Each parser has well-defined, non-overlapping responsibilities
- GzipParser is explicitly designed to work with inner parsers (JsonLineParser or CsvParser)
- The schema enforces these relationships through proper type definitions
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for decoder definitions in the schema file
rg -A 5 "CompositeRawDecoder|RawDecoder" airbyte_cdk/sources/declarative/declarative_component_schema.yaml
# Search for any decoder implementations or references
ast-grep --pattern 'class $_ implements RawDecoder'
ast-grep --pattern 'class $_ extends RawDecoder'
ast-grep --pattern 'class CompositeRawDecoder'
# Look for any usage patterns of decoders
rg -A 3 "decoder.*:" airbyte_cdk/sources/declarative/
Length of output: 17586
Script:
#!/bin/bash
# Search for parser definitions that might be used with CompositeRawDecoder
rg -A 10 "GzipParser|JsonLineParser|CsvParser" airbyte_cdk/sources/declarative/declarative_component_schema.yaml
# Look for any implementations or usages of these parsers
ast-grep --pattern 'class $_(GzipParser|JsonLineParser|CsvParser)'
rg "type.*: .*Parser" airbyte_cdk/sources/declarative/
Length of output: 1372
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
1690-1695
: Consider handling None encoding?Would you like to ensure there's a safe default or check when model.encoding is None, so we avoid unexpected decoding errors? wdyt?
1718-1721
: Handle defaults for CSV parser options?Would you like to set a fallback if model.delimiter or model.encoding is None, or raise a clearer error? wdyt?
1722-1727
: Validate parser configuration?Would you be open to adding a quick check if model.parser is None or invalid? This may prevent unexpected runtime errors. wdyt?
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (3)
18-29
: Naming consideration for Parser?Would you consider something more descriptive like "DataParser"? Or are you comfortable with the generic name? wdyt?
47-61
: Graceful handling of malformed JSON lines.You currently skip lines that aren’t valid JSON. If you anticipate a high failure rate, would it be helpful to optionally stop or store such lines? wdyt?
63-83
: About the TODO note.There’s a TODO: "migrate implementation to re-use file-base classes." Would you like help creating a GitHub issue or add the changes in this PR? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)
🔇 Additional comments (12)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)
70-75
: Looks good!
The imports from "composite_decoder" appear consistent with the rest of the codebase, nicely grouped, and concise.
134-136
: No issues spotted.
Adding "CompositeRawDecoderModel" to the imports for usage in the mappings is straightforward.
146-148
: All set here.
The "CsvParserModel" import nicely aligns with the rest of the decoders.
215-217
: Imports look appropriate.
"GzipParserModel" is properly added for new parser creation.
242-244
: No concerns here.
"JsonLineParserModel" import is consistent with the design.
461-461
: Clean addition.
The "CompositeRawDecoderModel" mapped to "self.create_composite_raw_decoder" in "PYDANTIC_MODEL_TO_CONSTRUCTOR" ensures correct wiring.
492-492
: Well-integrated.
"JsonLineParserModel" is mapped via the new create method.
494-494
: All good.
"GzipParserModel" is properly linked to the factory method.
1712-1717
: Raise an error for missing inner_parser?
Would it help to log or raise an error if the inner_parser is missing or invalid, so the user knows immediately? wdyt?
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (3)
1-12
: Imports appear concise and purposeful.
The dependencies (gzip, json, logging, pandas, requests, etc.) match the intended functionality here.
31-44
: Potential memory usage check for GzipParser?
Would you like to confirm that the GzipFile approach handles large files gracefully, or possibly add chunk-based reading if needed? wdyt?
85-104
: CompositeRawDecoder implementation appears solid.
The streaming approach to parse data from response.raw looks good. No major issues spotted.
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (5)
18-29
: How about enhancing the docstring for better clarity? wdyt?The docstring could provide more details about:
- Expected format/structure of the yielded dictionaries
- Any specific error handling expectations
- Whether the parser should close the input stream
Example:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ - Parse data and yield dictionaries. + Parse data from a buffered stream and yield dictionaries. + + Args: + data: A buffered stream containing the data to parse + + Returns: + A generator yielding parsed records as dictionaries + + Raises: + ValueError: If the data cannot be parsed according to the expected format """ pass
35-43
: Should we add error handling for corrupted gzip data? wdyt?The current implementation might raise uncaught exceptions for corrupted gzip data. Consider wrapping the gzip operations in a try-except block:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - gzipobj = gzip.GzipFile(fileobj=data, mode="rb") - yield from self.inner_parser.parse(gzipobj) + try: + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) + except gzip.BadGzipFile as e: + raise ValueError(f"Failed to decompress gzip data: {e}")
50-59
: How about enhancing the error logging with more context? wdyt?The current warning message could be more helpful by including line numbers and truncated content:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - for line in data: + for line_num, line in enumerate(data, 1): try: yield json.loads(line.decode(encoding=self.encoding or "utf-8")) except json.JSONDecodeError: - logger.warning(f"Cannot decode/parse line {line!r} as JSON") + decoded_line = line.decode(encoding=self.encoding or "utf-8") + preview = decoded_line[:100] + "..." if len(decoded_line) > 100 else decoded_line + logger.warning( + f"Cannot parse line {line_num} as JSON. Preview: {preview!r}" + )
67-80
: Consider using csv.DictReader instead of pandas for simpler CSV parsing? wdyt?Using pandas for basic CSV parsing might be overkill, especially since we're only using basic features. The standard library's
csv.DictReader
could be more lightweight:def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - reader = pd.read_csv( # type: ignore - data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding - ) - for chunk in reader: - chunk = chunk.replace({nan: None}).to_dict(orient="records") - for row in chunk: - yield row + text_data = data.read().decode(encoding=self.encoding or "utf-8") + csv_reader = csv.DictReader( + text_data.splitlines(), + delimiter=self.delimiter + ) + yield from csv_readerThis would:
- Remove the pandas dependency
- Simplify the code
- Handle the None/NaN conversion automatically
83-102
: Should we improve type safety and documentation for response.raw handling? wdyt?The
# type: ignore[arg-type]
suggests a type mismatch betweenresponse.raw
andBufferedIOBase
. Consider:
- Adding type hints to clarify the expected type:
def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: + # response.raw is a urllib3.response.HTTPResponse which is compatible with BufferedIOBase yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
- Enhancing the class docstring with details about response.raw handling:
""" Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] passed response.raw to parser(s). Note: response.raw is not decoded/decompressed by default. parsers should be instantiated recursively. + + Important: + - The response.raw attribute is a raw HTTP response from urllib3 + - It must be read in binary mode to properly handle compression + - The stream is automatically closed when fully consumed + Example: composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py
(1 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1)
1-16
: LGTM! Clean and well-organized imports.
The imports are logically grouped and all seem necessary for the implementation.
…te-decoder-with-parsers # Conflicts: # airbyte_cdk/sources/declarative/decoders/__init__.py
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (10)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
2773-2788
: Would you consider enhancing the documentation for CompositeRawDecoder?The experimental component could benefit from:
- A more detailed description explaining its purpose and use cases
- Example configurations showing how to chain parsers
- Clear warnings about potential breaking changes since it's experimental
wdyt?
2809-2821
: Would you consider adding more CSV configuration options?The CsvParser could benefit from additional configuration options commonly needed when parsing CSV files:
quote_char
: Character used for quoting fieldsescape_char
: Character used for escaping special charactershas_header
: Boolean indicating if the first row is a headerskip_rows
: Number of rows to skip from the beginningThese options would make the parser more flexible for various CSV formats. wdyt?
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)
1128-1130
: Consider adding encoding validation.Would it help to validate the encoding parameter against Python's standard encodings to prevent runtime errors? For example, using
codecs.lookup()
to verify the encoding exists. wdyt?from codecs import lookup class JsonLineParser(BaseModel): encoding: Optional[str] = "utf-8" @validator("encoding") def validate_encoding(cls, v): try: if v is not None: lookup(v) return v except LookupError: raise ValueError(f"Unknown encoding: {v}")
1132-1135
: Consider adding parameter validation.Would it be helpful to add validation for both parameters? For example:
- Validate encoding like in JsonLineParser
- Ensure delimiter is exactly one character long
class CsvParser(BaseModel): type: Literal["CsvParser"] encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," @validator("encoding") def validate_encoding(cls, v): try: if v is not None: lookup(v) return v except LookupError: raise ValueError(f"Unknown encoding: {v}") @validator("delimiter") def validate_delimiter(cls, v): if v is not None and len(v) != 1: raise ValueError("Delimiter must be exactly one character") return v
1517-1520
: Consider validating inner_parser.Would it be helpful to ensure inner_parser is always provided since it's required for the composite pattern to work? We could add a validator to raise a descriptive error if it's missing. wdyt?
class GzipParser(BaseModel): type: Literal["GzipParser"] inner_parser: Union[JsonLineParser, CsvParser] @validator("inner_parser") def validate_inner_parser(cls, v): if v is None: raise ValueError("inner_parser is required for GzipParser") return v
1552-1555
: Consider validating parser.Similar to GzipParser, would it be helpful to validate that parser is always provided? We could add a validator to raise a descriptive error if it's missing. wdyt?
class CompositeRawDecoder(BaseModel): type: Literal["CompositeRawDecoder"] parser: Union[GzipParser, JsonLineParser, CsvParser] @validator("parser") def validate_parser(cls, v): if v is None: raise ValueError("parser is required for CompositeRawDecoder") return vairbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
1725-1729
: Consider adding error handling.Would it be helpful to add try-catch block to handle potential encoding errors early? This could provide better error messages during parser creation rather than at runtime. wdyt?
@staticmethod def create_jsonline_parser( model: JsonLineParserModel, config: Config, **kwargs: Any ) -> JsonLineParser: try: if model.encoding: codecs.lookup(model.encoding) return JsonLineParser(encoding=model.encoding) except LookupError: raise ValueError(f"Invalid encoding '{model.encoding}' specified for JsonLineParser")
1747-1751
: Consider adding error handling for inner parser creation.Would it be helpful to add try-catch block to handle potential errors during inner parser creation? This could provide better error messages about which part of the composite failed. wdyt?
def create_gzip_parser( self, model: GzipParserModel, config: Config, **kwargs: Any ) -> GzipParser: try: inner_parser = self._create_component_from_model(model=model.inner_parser, config=config) return GzipParser(inner_parser=inner_parser) except Exception as e: raise ValueError(f"Failed to create inner parser for GzipParser: {str(e)}")
1754-1755
: Consider adding error handling.Would it be helpful to add validation for both encoding and delimiter during parser creation? This could catch configuration issues early. wdyt?
@staticmethod def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: try: if model.encoding: codecs.lookup(model.encoding) if model.delimiter and len(model.delimiter) != 1: raise ValueError("Delimiter must be exactly one character") return CsvParser(encoding=model.encoding, delimiter=model.delimiter) except LookupError: raise ValueError(f"Invalid encoding '{model.encoding}' specified for CsvParser")
1757-1761
: Consider adding error handling for parser creation.Would it be helpful to add try-catch block to handle potential errors during parser creation? This could provide better error messages about which parser failed to be created. wdyt?
def create_composite_raw_decoder( self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any ) -> CompositeRawDecoder: try: parser = self._create_component_from_model(model=model.parser, config=config) return CompositeRawDecoder(parser=parser) except Exception as e: raise ValueError(f"Failed to create parser for CompositeRawDecoder: {str(e)}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/decoders/__init__.py
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/decoders/init.py
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
2789-2802
: The GzipParser implementation looks good, but could benefit from explicit error handling.
Looking at the implementation and tests, I notice that while it handles normal cases well (including different encodings), it might be worth adding explicit error handling for:
- Empty or zero-sized inputs
- Corrupted gzip data
- Memory constraints with large files
Would you consider adding these error cases to the schema? wdyt?
2803-2808
:
Should we add the required type field to JsonLineParser?
I notice that JsonLineParser is missing the required type
field in its schema, unlike the other parsers. This could lead to inconsistency in validation. Would you consider adding it? wdyt?
JsonLineParser:
type: object
+ required:
+ - type
properties:
+ type:
+ type: string
+ enum: [ JsonLineParser ]
encoding:
type: string
default: utf-8
Likely invalid or redundant comment.
2769-2772
: Verify the impact on existing decoder implementations.
The addition of CompositeRawDecoder
to SimpleRetriever's decoder options looks good. However, let's verify that this doesn't affect existing decoder implementations. I'll help check for any potential conflicts:
✅ Verification successful
The addition of CompositeRawDecoder looks safe and well-integrated
Based on the codebase analysis, the CompositeRawDecoder is properly integrated into the existing decoder hierarchy and doesn't conflict with other decoders. Here's what I found:
- It's properly defined in the schema alongside other decoders like JsonDecoder, XmlDecoder, etc.
- The implementation follows the same pattern as other decoders
- The factory code (ModelToComponentFactory) has proper handling for CompositeRawDecoder
- No existing decoders are modified or impacted by this addition
The decoder is properly isolated and follows the existing patterns, making it a safe addition to the SimpleRetriever's decoder options.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing decoder implementations that might be affected
echo "Searching for decoder implementations..."
rg -l "class.*Decoder.*\(.*\):" --type python
# Search for decoder usage in existing manifests
echo "Searching for decoder usage in manifests..."
rg -l "decoder:" --type yaml
Length of output: 361
Script:
#!/bin/bash
# Let's try searching without file type restrictions since python extension wasn't recognized
echo "Searching for decoder implementations..."
rg -l "class.*Decoder.*\(.*\):"
# Let's also look for specific decoder usage patterns
echo "Searching for decoder references in code..."
rg "Decoder" -A 3
# Check for any existing composite decoder patterns
echo "Searching for composite decoder patterns..."
rg "CompositeRawDecoder|composite.*decoder" -i
Length of output: 77271
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
476-476
: LGTM!
The mapping of CompositeRawDecoderModel to its factory method is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few comments and small change requests. overall good though
properties: | ||
type: | ||
type: string | ||
enum: [ CompositeRawDecoder ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very small nit, for consistency, lets remove the space between [
and the name`
|
||
|
||
@dataclass | ||
class CompositeRawDecoder(Decoder): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for consistency w/ the file name, should the file be renamed composite_raw_decorder
?
@@ -1701,6 +1722,12 @@ def create_jsonl_decoder( | |||
) -> JsonlDecoder: | |||
return JsonlDecoder(parameters={}) | |||
|
|||
@staticmethod | |||
def create_jsonline_parser( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit should be create_json_line_parser
""" | ||
Parse CSV data from decompressed bytes. | ||
""" | ||
reader = pd.read_csv( # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was there a reason why we're opting for pandas
? I think this discussion was brought up while we were implementing the file based cdk, and ultimately we decided to use the csv
library for reading from CSVs. The interfaces are similar but different in a few ways.
The main reason I ask is that if we plan to migrate this implementation to use file based, it would probably make sense to use the same library. How much lift do you estimate it would be to use csv
instead. And for example on how its used, you can see csv_parser.py
. If the lift is reasonable I think we should use it, but if the effort is high, I would be okay leaving this as is
JsonLineParser: | ||
type: object | ||
properties: | ||
encoding: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this definition is missing in the definition, the requirement that every component be explicitly typed:
required:
- type
properties:
type:
type: string
enum: [JsonLineParser]
What
Resolving Source Amazon Seller Partner CDK migration
add composite decoder to apply
decompressors
|decoders
consequentlyNote
memory test usage moved to
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
.JsonlDecoder
(as well as others streamable decoders) should be deleted/replaced withCompositeRawDecoder
+parser
Summary by CodeRabbit
Summary by CodeRabbit
New Features
GzipParser
,JsonLineParser
, andCsvParser
.CompositeRawDecoder
for enhanced data decoding capabilities.Bug Fixes
SimpleRetriever
to include the newCompositeRawDecoder
.Tests
CompositeRawDecoder
and its parsers.Chores
JsonlDecoder
.