Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): support failed on http cache write #115

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
43 changes: 42 additions & 1 deletion airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")
logger = logging.getLogger("airbyte")


class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException):
Expand Down Expand Up @@ -142,8 +143,9 @@ def _request_session(self) -> requests.Session:
sqlite_path = str(Path(cache_dir) / self.cache_filename)
else:
sqlite_path = "file::memory:?cache=shared"
backend = SkipFailureSQLiteCache(sqlite_path)
return CachedLimiterSession(
sqlite_path, backend="sqlite", api_budget=self._api_budget, match_headers=True
sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True
) # type: ignore # there are no typeshed stubs for requests_cache
else:
return LimiterSession(api_budget=self._api_budget)
Expand Down Expand Up @@ -517,3 +519,42 @@ def send_request(
)

return request, response


class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
def __getitem__(self, key): # type: ignore # lib is not typed
try:
return super().__getitem__(key) # type: ignore # lib is not typed
except Exception as exception:
if not isinstance(exception, KeyError):
logger.warning(f"Error while retrieving item from cache: {exception}")
else:
raise exception

def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
except Exception as exception:
logger.warning(f"Error while saving item to cache: {exception}")

Comment on lines +525 to +540
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling and type safety

A few suggestions to make this more robust:

  1. The broad exception handling could mask critical issues. What do you think about catching specific exceptions?
  2. The warning messages could be more descriptive by including the key. wdyt?
  3. Consider adding type hints for better maintainability?

Here's a potential improvement:

-class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
+    """A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
+
-    def __getitem__(self, key):  # type: ignore  # lib is not typed
+    def __getitem__(self, key: str) -> Any:  # type: ignore  # return type from parent
         try:
             return super().__getitem__(key)  # type: ignore  # lib is not typed
-        except Exception as exception:
+        except (sqlite3.Error, IOError) as exception:
             if not isinstance(exception, KeyError):
-                logger.warning(f"Error while retrieving item from cache: {exception}")
+                logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
             else:
                 raise exception

     def _write(self, key: str, value: str) -> None:
         try:
             super()._write(key, value)  # type: ignore  # lib is not typed
-        except Exception as exception:
-            logger.warning(f"Error while saving item to cache: {exception}")
+        except (sqlite3.Error, IOError) as exception:
+            logger.warning(f"Error while saving key '{key}' to cache: {exception}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
def __getitem__(self, key): # type: ignore # lib is not typed
try:
return super().__getitem__(key) # type: ignore # lib is not typed
except Exception as exception:
if not isinstance(exception, KeyError):
logger.warning(f"Error while retrieving item from cache: {exception}")
else:
raise exception
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
except Exception as exception:
logger.warning(f"Error while saving item to cache: {exception}")
from typing import Any
import sqlite3
class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
"""A SQLiteDict that logs warnings instead of raising exceptions on cache operations."""
def __getitem__(self, key: str) -> Any: # type: ignore # return type from parent
try:
return super().__getitem__(key) # type: ignore # lib is not typed
except (sqlite3.Error, IOError) as exception:
if not isinstance(exception, KeyError):
logger.warning(f"Error while retrieving key '{key}' from cache: {exception}")
else:
raise exception
def _write(self, key: str, value: str) -> None:
try:
super()._write(key, value) # type: ignore # lib is not typed
except (sqlite3.Error, IOError) as exception:
logger.warning(f"Error while saving key '{key}' to cache: {exception}")



class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache):
def __init__( # type: ignore # ignoring as lib is not typed
self,
db_path="http_cache",
serializer=None,
**kwargs,
) -> None:
super().__init__(db_path, serializer, **kwargs)
skwargs = {"serializer": serializer, **kwargs} if serializer else kwargs
self.responses: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict(
db_path, table_name="responses", **skwargs
)
self.redirects: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict(
db_path,
table_name="redirects",
lock=self.responses._lock,
serializer=None,
**kwargs,
)
28 changes: 27 additions & 1 deletion unit_tests/sources/streams/http/test_http_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import contextlib
import logging
from datetime import timedelta
from sqlite3 import OperationalError
from unittest.mock import MagicMock, patch

import pytest
import requests
import requests_cache
from pympler import asizeof
from requests_cache import CachedRequest

Expand Down Expand Up @@ -741,3 +743,27 @@ def test_given_different_headers_then_response_is_not_cached(requests_mock):
)

assert second_response.json()["test"] == "second response"


class RaiseOnInsertConnection:
def execute(*args, **kwargs) -> None:
if "INSERT" in str(args):
raise OperationalError("database table is locked")


def test_given_cache_save_failure_then_do_not_break(requests_mock, monkeypatch):
@contextlib.contextmanager
def _create_sqlite_write_error_connection(*args, **kwargs):
yield RaiseOnInsertConnection()

monkeypatch.setattr(
requests_cache.backends.sqlite.SQLiteDict,
"connection",
_create_sqlite_write_error_connection,
)
http_client = HttpClient(name="test", logger=MagicMock(), use_cache=True)
requests_mock.register_uri("GET", "https://google.com/", json={"test": "response"})

request, response = http_client.send_request("GET", "https://google.com/", request_kwargs={})

assert response.json()
Loading