-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(concurrency): support failed on http cache write #115
base: main
Are you sure you want to change the base?
Changes from 9 commits
4a02e00
4072af7
e8f57a4
3171b61
6dc1e79
3fdbfa3
2f53e37
50d6f21
d357f67
494f5c6
3098764
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -54,6 +54,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger = logging.getLogger("airbyte") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -94,6 +95,7 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._name = name | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._api_budget: APIBudget = api_budget or APIBudget(policies=[]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._logger = logger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if session: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._session = session | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -107,7 +109,6 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if isinstance(authenticator, AuthBase): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._session.auth = authenticator | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._logger = logger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._error_handler = error_handler or HttpStatusErrorHandler(self._logger) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if backoff_strategy is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if isinstance(backoff_strategy, list): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -139,11 +140,14 @@ def _request_session(self) -> requests.Session: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Use in-memory cache if cache_dir is not set | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# This is a non-obvious interface, but it ensures we don't write sql files when running unit tests | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if cache_dir: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._logger.info(f"Using path {cache_dir} for HTTP cache") # TODO: remove | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sqlite_path = str(Path(cache_dir) / self.cache_filename) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._logger.info("Using memory for cache") # TODO: remove | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sqlite_path = "file::memory:?cache=shared" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
backend = SkipFailureSQLiteCache(self._name, sqlite_path) # TODO maybe add a busy timeout | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return CachedLimiterSession( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sqlite_path, backend="sqlite", api_budget=self._api_budget, match_headers=True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) # type: ignore # there are no typeshed stubs for requests_cache | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return LimiterSession(api_budget=self._api_budget) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -517,3 +521,44 @@ def send_request( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return request, response | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def __getitem__(self, key): # type: ignore # lib is not typed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return super().__getitem__(key) # type: ignore # lib is not typed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except Exception as exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if not isinstance(exception, KeyError): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.warning(f"Error while retrieving item from cache: {exception}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise exception | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def _write(self, key: str, value: str) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
super()._write(key, value) # type: ignore # lib is not typed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
except Exception as exception: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.warning(f"Error while saving item to cache: {exception}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+525
to
+540
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance error handling and type safety A few suggestions to make this more robust:
Here's a potential improvement: -class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+ """A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
+
- def __getitem__(self, key): # type: ignore # lib is not typed
+ def __getitem__(self, key: str) -> Any: # type: ignore # return type from parent
try:
return super().__getitem__(key) # type: ignore # lib is not typed
- except Exception as exception:
+ except (sqlite3.Error, IOError) as exception:
if not isinstance(exception, KeyError):
- logger.warning(f"Error while retrieving item from cache: {exception}")
+ logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
else:
raise exception
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
- except Exception as exception:
- logger.warning(f"Error while saving item to cache: {exception}")
+ except (sqlite3.Error, IOError) as exception:
+ logger.warning(f"Error while saving key '{key}' to cache: {exception}") 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def __init__( # type: ignore # ignoring as lib is not typed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
table_name="response", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
db_path="http_cache", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
serializer=None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
**kwargs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
super().__init__(db_path, serializer, **kwargs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
skwargs = {"serializer": serializer, **kwargs} if serializer else kwargs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.responses: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
db_path, table_name=table_name, fast_save=True, wal=True, **skwargs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.redirects: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
db_path, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
table_name=f"redirects_{table_name}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fast_save=True, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
wal=True, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
lock=self.responses._lock, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
serializer=None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
**kwargs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Would you consider adding a busy timeout to prevent SQLite lock errors? Here's a suggestion 🤔
I noticed we have a test case simulating a "database table is locked" error, and the current implementation inherits from
requests_cache.backends.sqlite.SQLiteCache
without any timeout configuration. What if we modified the__init__
to include a busy timeout? Something like:This would help prevent those pesky lock errors in concurrent scenarios while keeping the implementation simple. What do you think about this approach? 🤔
🔗 Analysis chain
Consider implementing busy timeout for SQLite.
The TODO comment suggests adding a busy timeout. This could help prevent "database is locked" errors in concurrent scenarios. Would you like me to provide an example implementation?
Let's check if there are any existing busy timeout implementations or issues:
Let me check the implementation of the SQLite cache to understand the context better.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 198
Script:
Length of output: 1631