Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): support failed on http cache write #115

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
31 changes: 30 additions & 1 deletion airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")
logger = logging.getLogger("airbyte")


class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException):
Expand Down Expand Up @@ -142,8 +143,9 @@ def _request_session(self) -> requests.Session:
sqlite_path = str(Path(cache_dir) / self.cache_filename)
else:
sqlite_path = "file::memory:?cache=shared"
backend = SkipFailureSQLiteCache(sqlite_path)
return CachedLimiterSession(
sqlite_path, backend="sqlite", api_budget=self._api_budget, match_headers=True
sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True
) # type: ignore # there are no typeshed stubs for requests_cache
else:
return LimiterSession(api_budget=self._api_budget)
Expand Down Expand Up @@ -517,3 +519,30 @@ def send_request(
)

return request, response


class SkipFailureSQLiteDict(requests_cache.backends.sqlite.SQLiteDict):
def _write(self, key, value):
try:
super()._write(key, value)
except Exception as exception:
logger.warning(exception)


class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache):
def __init__(
self,
db_path = 'http_cache',
serializer = None,
**kwargs,
) -> None:
super().__init__(db_path, serializer, **kwargs)
skwargs = {'serializer': serializer, **kwargs} if serializer else kwargs
self.responses: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict(db_path, table_name='responses', **skwargs)
self.redirects: requests_cache.backends.sqlite.SQLiteDict = SkipFailureSQLiteDict(
db_path,
table_name='redirects',
lock=self.responses._lock,
serializer=None,
**kwargs,
)
23 changes: 22 additions & 1 deletion unit_tests/sources/streams/http/test_http_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import contextlib
import logging
from datetime import timedelta
from sqlite3 import OperationalError
from unittest.mock import MagicMock, patch

import pytest
import requests
import requests_cache
from pympler import asizeof
from requests_cache import CachedRequest

Expand Down Expand Up @@ -741,3 +743,22 @@ def test_given_different_headers_then_response_is_not_cached(requests_mock):
)

assert second_response.json()["test"] == "second response"


class RaiseOnInsertConnection:
def execute(*args, **kwargs) -> None:
if "INSERT" in str(args):
raise OperationalError("database table is locked")


def test_given_cache_save_failure_then_do_not_break(requests_mock, monkeypatch):
@contextlib.contextmanager
def _create_sqlite_write_error_connection(*args, **kwargs):
yield RaiseOnInsertConnection()
monkeypatch.setattr(requests_cache.backends.sqlite.SQLiteDict, "connection", _create_sqlite_write_error_connection)
http_client = HttpClient(name="test", logger=MagicMock(), use_cache=True)
requests_mock.register_uri("GET", "https://google.com/", json={"test": "response"})

request, response = http_client.send_request("GET", "https://google.com/", request_kwargs={})

assert response.json()
Loading