Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): support failed on http cache write #115

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Dec 4, 2024

What

This PR has been split in different issues to scope the problems found:

How

Summary by CodeRabbit

  • New Features

    • Enhanced caching mechanism for the HTTP client, improving robustness and error handling during cache operations.
    • Improved security in Jinja interpolation by managing restricted extensions and built-in functions.
    • Increased default concurrency level for the ConcurrentDeclarativeSource, enhancing stability during stream processing.
  • Bug Fixes

    • Improved handling of cache save failures to ensure HTTP requests continue processing even when cache writes encounter issues.
    • Enhanced error handling in Jinja interpolation for better clarity and functionality.
  • Tests

    • Added tests to verify that the HTTP client gracefully handles cache save failures, ensuring valid responses are returned.

@maxi297 maxi297 changed the title support failed on http cache write bug(concurrency): support failed on http cache write Dec 4, 2024
Copy link
Contributor

coderabbitai bot commented Dec 4, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The changes in this pull request enhance the HttpClient class in the http_client.py file by introducing a new caching mechanism using the SkipFailureSQLiteCache class. This implementation improves error handling during cache operations, allowing the client to function even when cache writes fail. Additionally, the test suite for the HttpClient is updated to ensure that these new caching behaviors are properly validated, including a test for handling cache save failures gracefully. The jinja.py file is also modified to improve security and error handling in the Jinja interpolation logic.

Changes

File Path Change Summary
airbyte_cdk/sources/streams/http/http_client.py - Added SkipFailureSQLiteDict and SkipFailureSQLiteCache classes for improved caching and error handling.
- Modified _request_session to use SkipFailureSQLiteCache.
unit_tests/sources/streams/http/test_http_client.py - Added RaiseOnInsertConnection class to simulate database errors.
- Introduced test_given_cache_save_failure_then_do_not_break to validate error handling during cache operations.
- Updated existing tests to accommodate new error handling logic.
airbyte_cdk/sources/declarative/interpolation/jinja.py - Added private variables for managing aliases and restricted functions.
- Refactored Jinja environment initialization and modified method signatures for better error handling.

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • aldogonzalez8

Wdyt about the changes? Do you think the new error handling will cover all edge cases?


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/streams/http/http_client.py (3)

146-148: Question about initializing CachedLimiterSession with backend

Should we consider passing only the backend parameter when initializing CachedLimiterSession, instead of both sqlite_path and backend, to avoid potential redundancy or conflicts? Wdyt?


525-529: Should we handle specific exceptions in _write method?

In the _write method of SkipFailureSQLiteDict, we're catching all exceptions and logging a warning. Would it be better to catch specific exceptions (e.g., OperationalError) that we expect during cache writes, to avoid suppressing unexpected errors? Wdyt?


529-529: Consider adding more context to the logged warning

When logging the exception in _write, should we include additional context like the key being written to aid in debugging? Wdyt?

unit_tests/sources/streams/http/test_http_client.py (2)

749-751: Use a more robust check for detecting INSERT operations

In RaiseOnInsertConnection.execute, we're checking if "INSERT" is in str(args). Would it be more reliable to inspect the SQL query directly or use parameterized queries to determine if it's an INSERT operation? Wdyt?


754-764: Should we assert that a warning is logged during cache failure

In the test test_given_cache_save_failure_then_do_not_break, would it be beneficial to assert that a warning was logged when the cache save fails, to ensure that the failure is handled and logged as expected? Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between ac6cf92 and 4a02e00.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/streams/http/http_client.py (3 hunks)
  • unit_tests/sources/streams/http/test_http_client.py (2 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)

57-57: LGTM!

The logger is appropriately instantiated for module-level use.

@maxi297 maxi297 changed the title bug(concurrency): support failed on http cache write fix(concurrency): support failed on http cache write Dec 4, 2024
@github-actions github-actions bot added the bug Something isn't working label Dec 4, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (5)
airbyte_cdk/sources/streams/http/http_client.py (3)

146-148: Consider adding a comment explaining the cache backend choice?

The switch to SkipFailureSQLiteCache is a good improvement for handling cache failures gracefully. Would you consider adding a brief comment explaining why this specific backend was chosen? Something like:

+# Use SkipFailureSQLiteCache to prevent cache write failures from breaking the HTTP client
 backend = SkipFailureSQLiteCache(sqlite_path)

wdyt?


524-536: Consider enhancing error handling and type hints?

The implementation looks good! A few suggestions to make it even better:

  1. Would you consider adding more specific error messages? For example:
-            logger.warning(f"Error while retrieving item from cache: {exception}")
+            logger.warning(f"Cache retrieval failed for key '{key}': {exception}")
  1. How about adding type hints to match the parent class?
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> Any:

539-557: Add class documentation?

The implementation looks solid! Would you consider adding a docstring to explain the purpose and behavior of this class? Something like:

 class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache):
+    """SQLite cache implementation that gracefully handles cache operation failures.
+    
+    This cache implementation logs warnings instead of raising exceptions when cache
+    operations fail, ensuring that the HTTP client continues to function even when
+    caching is not possible.
+    """
unit_tests/sources/streams/http/test_http_client.py (2)

748-752: Add test helper documentation?

The test helper looks good! Would you consider adding a docstring to explain its purpose? Something like:

 class RaiseOnInsertConnection:
+    """Test helper that simulates SQLite database lock errors during INSERT operations.
+    Used to verify that the HTTP client handles cache write failures gracefully.
+    """

754-769: Enhance test coverage with additional assertions?

The test looks good! Would you consider adding a few more assertions to make it even more thorough? For example:

 assert response.json()
+# Verify that the request was made despite cache failure
+assert requests_mock.called
+# Verify that the response matches expected data
+assert response.json() == {"test": "response"}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 4a02e00 and 6dc1e79.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/streams/http/http_client.py (3 hunks)
  • unit_tests/sources/streams/http/test_http_client.py (2 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)

57-57: LGTM: Logger initialization looks good!

The logger namespace "airbyte" aligns well with the codebase structure.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)

542-560: Add documentation and improve typing

The implementation looks solid! Would you consider adding:

  1. A docstring explaining the purpose and behavior of this cache implementation?
  2. Explicit return type annotation for the __init__ method?

Here's a suggestion:

+from typing import Optional, Any
+
 class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache):
+    """
+    A SQLiteCache implementation that gracefully handles cache operation failures.
+    
+    This cache implementation logs warnings instead of raising exceptions when cache
+    operations fail, ensuring that the application continues to function even when
+    cache writes encounter issues.
+    """
+
     def __init__(  # type: ignore  # ignoring as lib is not typed
         self,
-        db_path="http_cache",
-        serializer=None,
+        db_path: str = "http_cache",
+        serializer: Optional[Any] = None,
         **kwargs,
     ) -> None:
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 6dc1e79 and 2f53e37.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/streams/http/http_client.py (3 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/streams/http/http_client.py (1)

146-148: Consider potential concurrent access issues

The sqlite_path is used both for backend creation and session creation. Have you verified how this handles concurrent access scenarios? wdyt about adding a test case for concurrent access to ensure it behaves as expected?

✅ Verification successful

SQLite concurrent access appears to be handled by the underlying library

I see that requests_cache already handles concurrent access through its implementation - there's a lock parameter being used in the codebase. The SQLite backend from requests_cache is designed to handle concurrent access scenarios. The existing tests also cover SQLite error cases, though they focus on write errors rather than concurrency.

Since the concurrent access is handled by the underlying library and we're using it as intended, there's no immediate concern here. However, wdyt about adding a note in the docstring to make it explicit that it's safe for concurrent use? This would help future maintainers understand the design choice better.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for existing concurrent access tests
rg -A 5 "concurrent|parallel|threading|multiprocess" "unit_tests/sources/streams/http/test_http_client.py"

Length of output: 106


Script:

#!/bin/bash
# Let's check if there are any tests related to SQLite caching
rg -A 5 "SQLite|sqlite" "unit_tests/sources/streams/http/test_http_client.py"

# Also check the implementation for any concurrent access handling
rg -A 5 "lock|Lock|concurrent|thread" "airbyte_cdk/sources/streams/http/http_client.py"

# Let's also check if SQLiteDict itself has any concurrent access handling
ast-grep --pattern 'class SkipFailureSQLiteDict($$$)'

Length of output: 1554

Comment on lines +524 to +539
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
def __getitem__(self, key): # type: ignore # lib is not typed
try:
return super().__getitem__(key) # type: ignore # lib is not typed
except Exception as exception:
if not isinstance(exception, KeyError):
logger.warning(f"Error while retrieving item from cache: {exception}")
else:
raise exception

def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
except Exception as exception:
logger.warning(f"Error while saving item to cache: {exception}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling and type safety

A few suggestions to make this more robust:

  1. The broad exception handling could mask critical issues. What do you think about catching specific exceptions?
  2. The warning messages could be more descriptive by including the key. wdyt?
  3. Consider adding type hints for better maintainability?

Here's a potential improvement:

-class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+    """A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
+
-    def __getitem__(self, key):  # type: ignore  # lib is not typed
+    def __getitem__(self, key: str) -> Any:  # type: ignore  # return type from parent
         try:
             return super().__getitem__(key)  # type: ignore  # lib is not typed
-        except Exception as exception:
+        except (sqlite3.Error, IOError) as exception:
             if not isinstance(exception, KeyError):
-                logger.warning(f"Error while retrieving item from cache: {exception}")
+                logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
             else:
                 raise exception

     def _write(self, key: str, value: str) -> None:
         try:
             super()._write(key, value)  # type: ignore  # lib is not typed
-        except Exception as exception:
-            logger.warning(f"Error while saving item to cache: {exception}")
+        except (sqlite3.Error, IOError) as exception:
+            logger.warning(f"Error while saving key '{key}' to cache: {exception}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
def __getitem__(self, key): # type: ignore # lib is not typed
try:
return super().__getitem__(key) # type: ignore # lib is not typed
except Exception as exception:
if not isinstance(exception, KeyError):
logger.warning(f"Error while retrieving item from cache: {exception}")
else:
raise exception
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
except Exception as exception:
logger.warning(f"Error while saving item to cache: {exception}")
from typing import Any
import sqlite3
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
"""A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
def __getitem__(self, key: str) -> Any: # type: ignore # return type from parent
try:
return super().__getitem__(key) # type: ignore # lib is not typed
except (sqlite3.Error, IOError) as exception:
if not isinstance(exception, KeyError):
logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
else:
raise exception
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
except (sqlite3.Error, IOError) as exception:
logger.warning(f"Error while saving key '{key}' to cache: {exception}")

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
airbyte_cdk/sources/declarative/interpolation/jinja.py (1)

112-112: Typo in exception message

There's a typo in the exception message: "Cound not interpolate" should be "Could not interpolate". Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2f53e37 and c7302d9.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/interpolation/jinja.py (5 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (3 hunks)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py (1)

51-59: Could making _ENVIRONMENT a global variable introduce threading issues?

By changing _ENVIRONMENT to a global variable, there might be potential threading concerns in multi-threaded contexts. Should we ensure that the environment is thread-safe or consider using instance variables instead? Wdyt?

airbyte_cdk/sources/streams/http/http_client.py (1)

524-539: Could we catch specific exceptions and include the key in warning messages?

Catching broad exceptions like Exception might mask critical issues. What do you think about catching specific exceptions like sqlite3.Error or IOError instead? Also, including the key in the warning messages could improve debuggability. Wdyt?

@maxi297 maxi297 force-pushed the maxi297/support-failure-on-http-cache-write branch from c7302d9 to 50d6f21 Compare December 5, 2024 02:00
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py (1)

33-59: LGTM! Great security improvements.

The addition of restricted extensions and builtin functions is a solid security practice. The documentation clearly explains the security considerations. What do you think about adding a link to Jinja's security documentation in the comments for future reference? wdyt?

airbyte_cdk/sources/streams/http/http_client.py (1)

541-559: Consider adding class documentation.

The implementation looks good, especially sharing the lock between tables. What do you think about adding a docstring explaining the purpose of this class and its error-handling behavior? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between c7302d9 and 50d6f21.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/interpolation/jinja.py (5 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (3 hunks)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/interpolation/jinja.py (3)

Line range hint 89-108: LGTM! Robust alias handling and error management.

The validation of reserved keywords and clear error messages will help catch issues early. The default value handling is well implemented.


136-141: LGTM! Good performance optimization.

The addition of caching and more specific type annotations improves both performance and type safety.


144-148: LGTM! Good type safety improvement.

The removal of Optional[str] makes the method contract clearer and safer. The caching implementation is well documented.

airbyte_cdk/sources/streams/http/http_client.py (2)

57-57: LGTM! Good logging practice.

Using a specific logger namespace helps with log filtering and organization.


524-539: Consider the previous review suggestions for robustness.

The previous review raised valid points about exception handling and logging that are still applicable.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

59-60: Fix typos in comments for clarity

There are minor typos in the comments:

  • Line 59: "A value lower than than could cause..." (duplicate "than")
  • Line 60: "...but not partition reader is consuming them." (Perhaps you meant "but no partition reader is consuming them"?)

Would you consider correcting these for better readability? Wdyt?

airbyte_cdk/sources/streams/http/http_client.py (1)

57-57: Reuse the existing logger instance

Since self._logger is already defined in the class, could we use it instead of creating a new module-level logger? This would keep logging consistent within the class. Wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 50d6f21 and d357f67.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (4 hunks)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

125-125: Confirm calculation of initial partitions

The updated calculation uses integer division:

initial_number_of_partitions_to_generate = self.SINGLE_THREADED_CONCURRENCY_LEVEL // 2

Given SINGLE_THREADED_CONCURRENCY_LEVEL is 2, this results in initial_number_of_partitions_to_generate being 1. Just to confirm, is this the intended behavior to prevent deadlocks by ensuring there is at least one partition? Wdyt?

airbyte_cdk/sources/streams/http/http_client.py (1)

526-541: 🛠️ Refactor suggestion

Catch specific exceptions and enhance logging

Would it be better to catch specific exceptions, such as sqlite3.Error or IOError, instead of a broad Exception? This can prevent masking other unexpected issues. Additionally, including the key in the warning messages might provide more context for troubleshooting. What do you think about adding type hints for better clarity and maintainability? Wdyt?

Here's a potential improvement:

+import sqlite3

 class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
-    def __getitem__(self, key):  # type: ignore  # lib is not typed
+    def __getitem__(self, key: str) -> Any:
         try:
             return super().__getitem__(key)  # type: ignore  # lib is not typed
-        except Exception as exception:
+        except (sqlite3.Error, IOError) as exception:
             if not isinstance(exception, KeyError):
-                logger.warning(f"Error while retrieving item from cache: {exception}")
+                logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
             else:
                 raise

     def _write(self, key: str, value: str) -> None:
         try:
             super()._write(key, value)  # type: ignore  # lib is not typed
-        except Exception as exception:
-            logger.warning(f"Error while saving item to cache: {exception}")
+        except (sqlite3.Error, IOError) as exception:
+            logger.warning(f"Error while saving key '{key}' to cache: {exception}")

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (4)
airbyte_cdk/sources/streams/http/http_client.py (1)

145-146: Consider removing the temporary logging comment.

The TODO comment suggests this is temporary logging. Should we remove it or make it more permanent? wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)

123-135: The method looks good, but the name could be more descriptive.

The implementation is solid with good error handling and documentation. However, would you consider renaming _actually_group to something more descriptive like _initialize_stream_groups or _setup_stream_categories? The current name doesn't fully convey its purpose. wdyt?


145-147: Consider extracting the initialization check.

The null check and initialization pattern is duplicated in both read() and discover(). Would you consider extracting this into a private method like _ensure_groups_initialized()? This would reduce duplication and make the code more maintainable. wdyt?

+    def _ensure_groups_initialized(self) -> None:
+        if self._concurrent_streams is None:
+            self._actually_group()
+
     def read(self, ...):
-        if self._concurrent_streams is None:
-            self._actually_group()
+        self._ensure_groups_initialized()
         ...

     def discover(self, ...):
-        if self._concurrent_streams is None:
-            self._actually_group()
+        self._ensure_groups_initialized()
         ...

Also applies to: 173-175


113-113: Consider adding a comment explaining the partition calculation.

The calculation SINGLE_THREADED_CONCURRENCY_LEVEL // 2 looks correct, but would you consider adding a comment explaining why half the concurrency level is chosen? This would help future maintainers understand the reasoning. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between d357f67 and 494f5c6.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (6 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (4 hunks)
🔇 Additional comments (5)
airbyte_cdk/sources/streams/http/http_client.py (4)

57-57: LGTM! Logger initialization looks good.

The logger instance is properly initialized for the "airbyte" namespace.


148-150: LGTM! Cache configuration looks good.

The cache configuration properly initializes the new SkipFailureSQLiteCache backend with appropriate settings.


525-540: Consider enhancing error handling in SkipFailureSQLiteDict.

I notice this was mentioned in a previous review, but it's worth revisiting. What do you think about:

  1. Catching specific SQLite exceptions instead of generic Exception?
  2. Including the key in warning messages for better debugging?
  3. Adding type hints for better maintainability?

Here's a potential improvement:

 class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+    """A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
+
-    def __getitem__(self, key):  # type: ignore  # lib is not typed
+    def __getitem__(self, key: str) -> Any:  # type: ignore  # return type from parent
         try:
             return super().__getitem__(key)  # type: ignore  # lib is not typed
-        except Exception as exception:
+        except (sqlite3.Error, IOError) as exception:
             if not isinstance(exception, KeyError):
-                logger.warning(f"Error while retrieving item from cache: {exception}")
+                logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
             else:
                 raise exception

     def _write(self, key: str, value: str) -> None:
         try:
             super()._write(key, value)  # type: ignore  # lib is not typed
-        except Exception as exception:
-            logger.warning(f"Error while saving item to cache: {exception}")
+        except (sqlite3.Error, IOError) as exception:
+            logger.warning(f"Error while saving key '{key}' to cache: {exception}")

542-563: LGTM! SkipFailureSQLiteCache implementation looks good.

The implementation properly initializes both responses and redirects tables with appropriate settings. The use of WAL mode and fast_save is a good choice for performance.

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

59-61: LGTM! The concurrency level change makes sense.

The increase to 2 should prevent the PartitionEnqueuer deadlock by ensuring there's always room for both producing and consuming futures. The new private attributes also improve state management.

Would you consider adding a test to verify that the PartitionEnqueuer doesn't deadlock with this new configuration? wdyt?

Let's check if there are any existing deadlock-related tests:

Also applies to: 82-84

✅ Verification successful

The concurrency level change is well-tested and safe!

Looking at the test files, I can see that:

  1. The PartitionEnqueuer has dedicated tests for handling futures limits and waiting behavior
  2. The ConcurrentDeclarativeSource tests verify proper handling of concurrent streams with different concurrency levels
  3. The code ensures a minimum concurrency level of 2 to prevent deadlocks

No additional tests are needed as the current test coverage already validates the deadlock prevention mechanism. The change from 1 to 2 is well-justified and properly tested.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for deadlock-related tests
rg -i "deadlock|partition.*enqueuer.*test" --type python

Length of output: 89


Script:

#!/bin/bash
# Search for test files related to partition or enqueuer
fd "test.*\.py$" | xargs rg -i "partition.*enqueuer|deadlock"

# Also search for any test files that might test the concurrent source
fd "test.*\.py$" | xargs rg -i "concurrent.*declarative.*source"

# Look for the actual test directory structure
fd "test" --type d

Length of output: 7244


Script:

#!/bin/bash
# Let's check the actual test content for PartitionEnqueuer to see if it covers concurrency levels
cat unit_tests/sources/streams/concurrent/test_partition_enqueuer.py

# Also check the concurrent declarative source tests for any concurrency level testing
cat unit_tests/sources/declarative/test_concurrent_declarative_source.py

Length of output: 70413

sqlite_path = "file::memory:?cache=shared"
backend = SkipFailureSQLiteCache(self._name, sqlite_path) # TODO maybe add a busy timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Would you consider adding a busy timeout to prevent SQLite lock errors? Here's a suggestion 🤔

I noticed we have a test case simulating a "database table is locked" error, and the current implementation inherits from requests_cache.backends.sqlite.SQLiteCache without any timeout configuration. What if we modified the __init__ to include a busy timeout? Something like:

def __init__(self, table_name="response", db_path="http_cache", serializer=None):
    super().__init__(table_name, db_path, serializer)
    self._db.execute("PRAGMA busy_timeout = 30000")  # 30 seconds timeout, wdyt?

This would help prevent those pesky lock errors in concurrent scenarios while keeping the implementation simple. What do you think about this approach? 🤔

🔗 Analysis chain

Consider implementing busy timeout for SQLite.

The TODO comment suggests adding a busy timeout. This could help prevent "database is locked" errors in concurrent scenarios. Would you like me to provide an example implementation?

Let's check if there are any existing busy timeout implementations or issues:


Let me check the implementation of the SQLite cache to understand the context better.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing busy timeout implementations
rg -A 2 "busy_timeout|pragma busy_timeout"

# Search for database locked errors in tests
rg "database.*locked|sqlite.*locked"

Length of output: 198


Script:

#!/bin/bash
# Find the SQLite cache implementation
ast-grep --pattern 'class SkipFailureSQLiteCache'

# Also check for any test cases related to concurrent access
rg -B 2 -A 5 "SkipFailureSQLiteCache"

Length of output: 1631

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

59-60: Typographical corrections in comments

I noticed some typos in the comments that might need correction. Specifically, "lower than than" could be "lower than that," and the sentence in line 60 could be rephrased for clarity. Wdyt?

Here's the suggested fix:

-        # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
-        # because it has hit the limit of futures but not partition reader is consuming them.
+        # By default, we defer to a value of 2. A value lower than that could cause a PartitionEnqueuer to be stuck in a deadlock state
+        # because it has hit the limit of futures but no partition reader is consuming them.

145-147: Consider refactoring repeated code into a helper method

I noticed that the check if self._concurrent_streams is None: self._actually_group() is repeated in both the read and discover methods (lines 145-147 and 173-175). Perhaps we could extract this logic into a helper method to adhere to the DRY principle and improve maintainability. Wdyt?

Proposed addition:

Add a helper method:

def _ensure_streams_grouped(self):
    if self._concurrent_streams is None:
        self._actually_group()

Then, in the read method (lines 145-147):

-            if self._concurrent_streams is None:
-                self._actually_group()
+            self._ensure_streams_grouped()

And in the discover method (lines 173-175):

-        if self._concurrent_streams is None:
-            self._actually_group()
+        self._ensure_streams_grouped()

Also applies to: 173-175

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 494f5c6 and 3098764.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (7 hunks)

@natikgadzhi
Copy link
Contributor

@maxi297 are there any pieces in here taht we still want to merge?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants