From 32abefe18870bca562204b02d4f101d2093bdb51 Mon Sep 17 00:00:00 2001 From: Charles Duffy Date: Tue, 20 Aug 2024 20:05:18 -0500 Subject: [PATCH 1/3] Allow an UpdateStatuses instance to be passed in to wrapper The `UpdateStatuses` class parameterizes lock duration; however, there is currently no way for this parameter to be set. Additionally, debugging or instrumentation may call for inspecting state of tasks responsible for updating cache keys; an externally-instantiated `UpdateStatuses` instance makes this more feasible. --- memoize/wrapper.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/memoize/wrapper.py b/memoize/wrapper.py index 4379e6b..3ac80bc 100644 --- a/memoize/wrapper.py +++ b/memoize/wrapper.py @@ -18,7 +18,7 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration = None, - invalidation: InvalidationSupport = None): + invalidation: InvalidationSupport = None, update_status_tracker: Optional[UpdateStatuses] = None): """Wraps function with memoization. If entry reaches time it should be updated, refresh is performed in background, @@ -41,6 +41,7 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration :param function method: function to be decorated :param CacheConfiguration configuration: cache configuration; default: DefaultInMemoryCacheConfiguration :param InvalidationSupport invalidation: pass created instance of InvalidationSupport to have it configured + :param UpdateStatuses update_status_tracker: optional precreated state tracker to allow observability of this state or non-default update lock timeout :raises: CachedMethodFailedException upon call: if cached method timed-out or thrown an exception :raises: NotConfiguredCacheCalledException upon call: if provided configuration is not ready @@ -57,10 +58,11 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration logger = logging.getLogger('{}@{}'.format(memoize.__name__, method.__name__)) logger.debug('wrapping %s with memoization - configuration: %s', method.__name__, configuration) - update_statuses = UpdateStatuses() + if update_status_tracker is None: + update_status_tracker = UpdateStatuses() async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration) -> bool: - if update_statuses.is_being_updated(key): + if update_status_tracker.is_being_updated(key): return False try: await configuration_snapshot.storage().release(key) @@ -74,24 +76,24 @@ async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration) async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey, value_future_provider: Callable[[], asyncio.Future], configuration_snapshot: CacheConfiguration): - if actual_entry is None and update_statuses.is_being_updated(key): + if actual_entry is None and update_status_tracker.is_being_updated(key): logger.debug('As entry expired, waiting for results of concurrent refresh %s', key) - entry = await update_statuses.await_updated(key) + entry = await update_status_tracker.await_updated(key) if isinstance(entry, Exception): raise CachedMethodFailedException('Concurrent refresh failed to complete') from entry return entry - elif actual_entry is not None and update_statuses.is_being_updated(key): + elif actual_entry is not None and update_status_tracker.is_being_updated(key): logger.debug('As update point reached but concurrent update already in progress, ' 'relying on concurrent refresh to finish %s', key) return actual_entry - elif not update_statuses.is_being_updated(key): - update_statuses.mark_being_updated(key) + elif not update_status_tracker.is_being_updated(key): + update_status_tracker.mark_being_updated(key) try: value_future = value_future_provider() value = await value_future offered_entry = configuration_snapshot.entry_builder().build(key, value) await configuration_snapshot.storage().offer(key, offered_entry) - update_statuses.mark_updated(key, offered_entry) + update_status_tracker.mark_updated(key, offered_entry) logger.debug('Successfully refreshed cache for key %s', key) eviction_strategy = configuration_snapshot.eviction_strategy() @@ -106,11 +108,11 @@ async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey, return offered_entry except asyncio.TimeoutError as e: logger.debug('Timeout for %s: %s', key, e) - update_statuses.mark_update_aborted(key, e) + update_status_tracker.mark_update_aborted(key, e) raise CachedMethodFailedException('Refresh timed out') from e except Exception as e: logger.debug('Error while refreshing cache for %s: %s', key, e) - update_statuses.mark_update_aborted(key, e) + update_status_tracker.mark_update_aborted(key, e) raise CachedMethodFailedException('Refresh failed to complete') from e @functools.wraps(method) From cbbe4156a489a236c1a6db151809642349e1dcd0 Mon Sep 17 00:00:00 2001 From: Charles Duffy Date: Wed, 21 Aug 2024 15:44:31 -0500 Subject: [PATCH 2/3] Preserve update_status_tracker through partial call --- memoize/wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memoize/wrapper.py b/memoize/wrapper.py index 3ac80bc..a79b96e 100644 --- a/memoize/wrapper.py +++ b/memoize/wrapper.py @@ -50,7 +50,7 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration if method is None: if configuration is None: configuration = DefaultInMemoryCacheConfiguration() - return functools.partial(memoize, configuration=configuration, invalidation=invalidation) + return functools.partial(memoize, configuration=configuration, invalidation=invalidation, update_status_tracker=update_status_tracker) if invalidation is not None and not invalidation._initialized() and configuration is not None: invalidation._initialize(configuration.storage(), configuration.key_extractor(), method) From 8aa85730846ab03f7110de17a63d00ec900548d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=BBmuda?= Date: Mon, 30 Sep 2024 14:39:15 +0200 Subject: [PATCH 3/3] * Reworked ability to configure `UpdateStatuses` (for instance to update lock duration) * Split default implementation into an interface and a default implementation * Added docs & updated version --- CHANGELOG.rst | 6 +++ README.rst | 17 +++--- .../configuration/custom_configuration.py | 18 ++++--- memoize/statuses.py | 53 +++++++++++++------ memoize/wrapper.py | 36 +++++++------ setup.py | 2 +- tests/unit/test_statuses.py | 4 +- 7 files changed, 87 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d5b6529..8598c1f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,9 @@ +3.1.0 +----- + +* Added ability to configure `UpdateStatuses` (for instance to update lock duration) + * Split default implementation into an interface and a default implementation + 3.0.0 ----- diff --git a/README.rst b/README.rst index b7d1a4b..329c412 100644 --- a/README.rst +++ b/README.rst @@ -153,14 +153,15 @@ Example how to customize default config (everything gets overridden): @memoize( configuration=MutableCacheConfiguration - .initialized_with(DefaultInMemoryCacheConfiguration()) - .set_method_timeout(value=timedelta(minutes=2)) - .set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2), - expire_after=timedelta(minutes=5))) - .set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048)) - .set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False)) - .set_storage(LocalInMemoryCacheStorage()) - .set_postprocessing(DeepcopyPostprocessing()) + .initialized_with(DefaultInMemoryCacheConfiguration()) + .set_method_timeout(value=timedelta(minutes=2)) + .set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2), + expire_after=timedelta(minutes=5))) + .set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048)) + .set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False)) + .set_storage(LocalInMemoryCacheStorage()) + .set_postprocessing(DeepcopyPostprocessing()), + update_statuses=InMemoryLocks(update_lock_timeout=timedelta(minutes=5)) ) async def cached(): return 'dummy' diff --git a/examples/configuration/custom_configuration.py b/examples/configuration/custom_configuration.py index 07e724a..59bca48 100644 --- a/examples/configuration/custom_configuration.py +++ b/examples/configuration/custom_configuration.py @@ -5,20 +5,22 @@ from memoize.eviction import LeastRecentlyUpdatedEvictionStrategy from memoize.key import EncodedMethodNameAndArgsKeyExtractor from memoize.postprocessing import DeepcopyPostprocessing +from memoize.statuses import InMemoryLocks from memoize.storage import LocalInMemoryCacheStorage from memoize.wrapper import memoize @memoize( configuration=MutableCacheConfiguration - .initialized_with(DefaultInMemoryCacheConfiguration()) - .set_method_timeout(value=timedelta(minutes=2)) - .set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2), - expire_after=timedelta(minutes=5))) - .set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048)) - .set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False)) - .set_storage(LocalInMemoryCacheStorage()) - .set_postprocessing(DeepcopyPostprocessing()) + .initialized_with(DefaultInMemoryCacheConfiguration()) + .set_method_timeout(value=timedelta(minutes=2)) + .set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2), + expire_after=timedelta(minutes=5))) + .set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048)) + .set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False)) + .set_storage(LocalInMemoryCacheStorage()) + .set_postprocessing(DeepcopyPostprocessing()), + update_statuses=InMemoryLocks(update_lock_timeout=timedelta(minutes=5)) ) async def cached(): return 'dummy' diff --git a/memoize/statuses.py b/memoize/statuses.py index c91f64d..1f2dfd4 100644 --- a/memoize/statuses.py +++ b/memoize/statuses.py @@ -1,29 +1,61 @@ """ -[Internal use only] Encapsulates update state management. +[API] Encapsulates update state management. """ import asyncio import datetime import logging +from abc import ABCMeta, abstractmethod from asyncio import Future from typing import Dict, Awaitable, Union from memoize.entry import CacheKey, CacheEntry -class UpdateStatuses: +class UpdateStatuses(metaclass=ABCMeta): + @abstractmethod + def is_being_updated(self, key: CacheKey) -> bool: + """Check if update for given key is in progress. Obtained info is valid until control gets back to IO-loop.""" + raise NotImplementedError() + + @abstractmethod + def mark_being_updated(self, key: CacheKey) -> None: + """Inform that update has been started. + Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost).. + Calls to 'is_being_updated' will return True until 'mark_updated' will be called.""" + raise NotImplementedError() + + def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None: + """Inform that update has been finished. + Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.""" + raise NotImplementedError() + + @abstractmethod + def mark_update_aborted(self, key: CacheKey, exception: Exception) -> None: + """Inform that update failed to complete. + Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called. + Accepts exception to propagate it across all clients awaiting an update.""" + raise NotImplementedError() + + @abstractmethod + def await_updated(self, key: CacheKey) -> Awaitable[Union[CacheEntry, Exception]]: + """Wait (asynchronously) until update in progress has benn finished. + Returns awaitable with the updated entry + (or awaitable with an exception if update failed/timed-out). + Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost).""" + raise NotImplementedError() + + +class InMemoryLocks(UpdateStatuses): + """Manages in-memory locks to prevent dog-piling. """ def __init__(self, update_lock_timeout: datetime.timedelta = datetime.timedelta(minutes=5)) -> None: self.logger = logging.getLogger(__name__) self._update_lock_timeout = update_lock_timeout self._updates_in_progress: Dict[CacheKey, Future] = {} def is_being_updated(self, key: CacheKey) -> bool: - """Checks if update for given key is in progress. Obtained info is valid until control gets back to IO-loop.""" return key in self._updates_in_progress def mark_being_updated(self, key: CacheKey) -> None: - """Informs that update has been started. - Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost).. - Calls to 'is_being_updated' will return True until 'mark_updated' will be called.""" if key in self._updates_in_progress: raise ValueError('Key {} is already being updated'.format(key)) @@ -42,27 +74,18 @@ def complete_on_timeout_passed(): callback=complete_on_timeout_passed) def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None: - """Informs that update has been finished. - Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.""" if key not in self._updates_in_progress: raise ValueError('Key {} is not being updated'.format(key)) update = self._updates_in_progress.pop(key) update.set_result(entry) def mark_update_aborted(self, key: CacheKey, exception: Exception) -> None: - """Informs that update failed to complete. - Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called. - Accepts exception to propagate it across all clients awaiting an update.""" if key not in self._updates_in_progress: raise ValueError('Key {} is not being updated'.format(key)) update = self._updates_in_progress.pop(key) update.set_result(exception) def await_updated(self, key: CacheKey) -> Awaitable[Union[CacheEntry, Exception]]: - """Waits (asynchronously) until update in progress has benn finished. - Returns awaitable with the updated entry - (or awaitable with an exception if update failed/timed-out). - Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost).""" if not self.is_being_updated(key): raise ValueError('Key {} is not being updated'.format(key)) return self._updates_in_progress[key] diff --git a/memoize/wrapper.py b/memoize/wrapper.py index a79b96e..9875072 100644 --- a/memoize/wrapper.py +++ b/memoize/wrapper.py @@ -14,11 +14,11 @@ from memoize.entry import CacheKey, CacheEntry from memoize.exceptions import CachedMethodFailedException from memoize.invalidation import InvalidationSupport -from memoize.statuses import UpdateStatuses +from memoize.statuses import UpdateStatuses, InMemoryLocks def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration = None, - invalidation: InvalidationSupport = None, update_status_tracker: Optional[UpdateStatuses] = None): + invalidation: InvalidationSupport = None, update_statuses: UpdateStatuses = None): """Wraps function with memoization. If entry reaches time it should be updated, refresh is performed in background, @@ -41,7 +41,8 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration :param function method: function to be decorated :param CacheConfiguration configuration: cache configuration; default: DefaultInMemoryCacheConfiguration :param InvalidationSupport invalidation: pass created instance of InvalidationSupport to have it configured - :param UpdateStatuses update_status_tracker: optional precreated state tracker to allow observability of this state or non-default update lock timeout + :param UpdateStatuses update_statuses: allows to override how cache updates are tracked (e.g. lock config); + default: InMemoryStatuses :raises: CachedMethodFailedException upon call: if cached method timed-out or thrown an exception :raises: NotConfiguredCacheCalledException upon call: if provided configuration is not ready @@ -50,7 +51,12 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration if method is None: if configuration is None: configuration = DefaultInMemoryCacheConfiguration() - return functools.partial(memoize, configuration=configuration, invalidation=invalidation, update_status_tracker=update_status_tracker) + return functools.partial( + memoize, + configuration=configuration, + invalidation=invalidation, + update_statuses=update_statuses + ) if invalidation is not None and not invalidation._initialized() and configuration is not None: invalidation._initialize(configuration.storage(), configuration.key_extractor(), method) @@ -58,11 +64,11 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration logger = logging.getLogger('{}@{}'.format(memoize.__name__, method.__name__)) logger.debug('wrapping %s with memoization - configuration: %s', method.__name__, configuration) - if update_status_tracker is None: - update_status_tracker = UpdateStatuses() + if update_statuses is None: + update_statuses = InMemoryLocks() async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration) -> bool: - if update_status_tracker.is_being_updated(key): + if update_statuses.is_being_updated(key): return False try: await configuration_snapshot.storage().release(key) @@ -76,24 +82,24 @@ async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration) async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey, value_future_provider: Callable[[], asyncio.Future], configuration_snapshot: CacheConfiguration): - if actual_entry is None and update_status_tracker.is_being_updated(key): + if actual_entry is None and update_statuses.is_being_updated(key): logger.debug('As entry expired, waiting for results of concurrent refresh %s', key) - entry = await update_status_tracker.await_updated(key) + entry = await update_statuses.await_updated(key) if isinstance(entry, Exception): raise CachedMethodFailedException('Concurrent refresh failed to complete') from entry return entry - elif actual_entry is not None and update_status_tracker.is_being_updated(key): + elif actual_entry is not None and update_statuses.is_being_updated(key): logger.debug('As update point reached but concurrent update already in progress, ' 'relying on concurrent refresh to finish %s', key) return actual_entry - elif not update_status_tracker.is_being_updated(key): - update_status_tracker.mark_being_updated(key) + elif not update_statuses.is_being_updated(key): + update_statuses.mark_being_updated(key) try: value_future = value_future_provider() value = await value_future offered_entry = configuration_snapshot.entry_builder().build(key, value) await configuration_snapshot.storage().offer(key, offered_entry) - update_status_tracker.mark_updated(key, offered_entry) + update_statuses.mark_updated(key, offered_entry) logger.debug('Successfully refreshed cache for key %s', key) eviction_strategy = configuration_snapshot.eviction_strategy() @@ -108,11 +114,11 @@ async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey, return offered_entry except asyncio.TimeoutError as e: logger.debug('Timeout for %s: %s', key, e) - update_status_tracker.mark_update_aborted(key, e) + update_statuses.mark_update_aborted(key, e) raise CachedMethodFailedException('Refresh timed out') from e except Exception as e: logger.debug('Error while refreshing cache for %s: %s', key, e) - update_status_tracker.mark_update_aborted(key, e) + update_statuses.mark_update_aborted(key, e) raise CachedMethodFailedException('Refresh failed to complete') from e @functools.wraps(method) diff --git a/setup.py b/setup.py index a565190..74e4a2f 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ def prepare_description(): setup( name='py-memoize', - version='3.0.0', + version='3.1.0', author='Michal Zmuda', author_email='zmu.michal@gmail.com', url='https://github.com/DreamLab/memoize', diff --git a/tests/unit/test_statuses.py b/tests/unit/test_statuses.py index cac0ab0..35f36f6 100644 --- a/tests/unit/test_statuses.py +++ b/tests/unit/test_statuses.py @@ -6,14 +6,14 @@ from datetime import timedelta -from memoize.statuses import UpdateStatuses +from memoize.statuses import InMemoryLocks, UpdateStatuses @pytest.mark.asyncio(scope="class") class TestStatuses: def setup_method(self): - self.update_statuses = UpdateStatuses() + self.update_statuses: UpdateStatuses = InMemoryLocks() async def test_should_not_be_updating(self): # given/when/then