Skip to content

Commit

Permalink
Allow an UpdateStatuses instance to be passed in to wrapper
Browse files Browse the repository at this point in the history
The `UpdateStatuses` class parameterizes lock duration; however, there is currently no way for this parameter to be set.

Additionally, debugging or instrumentation may call for inspecting state of tasks responsible for updating cache keys; an externally-instantiated `UpdateStatuses` instance makes this more feasible.
  • Loading branch information
charles-dyfis-net committed Aug 21, 2024
1 parent 3ccc9bb commit a4b4f49
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions memoize/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration = None,
invalidation: InvalidationSupport = None):
invalidation: InvalidationSupport = None, update_status_tracker: Optional[UpdateStatuses] = None):
"""Wraps function with memoization.
If entry reaches time it should be updated, refresh is performed in background,
Expand All @@ -41,6 +41,7 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration
:param function method: function to be decorated
:param CacheConfiguration configuration: cache configuration; default: DefaultInMemoryCacheConfiguration
:param InvalidationSupport invalidation: pass created instance of InvalidationSupport to have it configured
:param UpdateStatuses update_status_tracker: optional precreated state tracker to allow observability of this state or non-default update lock timeout
:raises: CachedMethodFailedException upon call: if cached method timed-out or thrown an exception
:raises: NotConfiguredCacheCalledException upon call: if provided configuration is not ready
Expand All @@ -49,18 +50,19 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration
if method is None:
if configuration is None:
configuration = DefaultInMemoryCacheConfiguration()
return functools.partial(memoize, configuration=configuration, invalidation=invalidation)
return functools.partial(memoize, configuration=configuration, invalidation=invalidation, update_status_tracker=update_status_tracker)

if invalidation is not None and not invalidation._initialized() and configuration is not None:
invalidation._initialize(configuration.storage(), configuration.key_extractor(), method)

logger = logging.getLogger('{}@{}'.format(memoize.__name__, method.__name__))
logger.debug('wrapping %s with memoization - configuration: %s', method.__name__, configuration)

update_statuses = UpdateStatuses()
if update_status_tracker is None:
update_status_tracker = UpdateStatuses()

async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration) -> bool:
if update_statuses.is_being_updated(key):
if update_status_tracker.is_being_updated(key):
return False
try:
await configuration_snapshot.storage().release(key)
Expand All @@ -74,24 +76,24 @@ async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration)
async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey,
value_future_provider: Callable[[], asyncio.Future],
configuration_snapshot: CacheConfiguration):
if actual_entry is None and update_statuses.is_being_updated(key):
if actual_entry is None and update_status_tracker.is_being_updated(key):
logger.debug('As entry expired, waiting for results of concurrent refresh %s', key)
entry = await update_statuses.await_updated(key)
entry = await update_status_tracker.await_updated(key)
if isinstance(entry, Exception):
raise CachedMethodFailedException('Concurrent refresh failed to complete') from entry
return entry
elif actual_entry is not None and update_statuses.is_being_updated(key):
elif actual_entry is not None and update_status_tracker.is_being_updated(key):
logger.debug('As update point reached but concurrent update already in progress, '
'relying on concurrent refresh to finish %s', key)
return actual_entry
elif not update_statuses.is_being_updated(key):
update_statuses.mark_being_updated(key)
elif not update_status_tracker.is_being_updated(key):
update_status_tracker.mark_being_updated(key)
try:
value_future = value_future_provider()
value = await value_future
offered_entry = configuration_snapshot.entry_builder().build(key, value)
await configuration_snapshot.storage().offer(key, offered_entry)
update_statuses.mark_updated(key, offered_entry)
update_status_tracker.mark_updated(key, offered_entry)
logger.debug('Successfully refreshed cache for key %s', key)

eviction_strategy = configuration_snapshot.eviction_strategy()
Expand All @@ -106,11 +108,11 @@ async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey,
return offered_entry
except asyncio.TimeoutError as e:
logger.debug('Timeout for %s: %s', key, e)
update_statuses.mark_update_aborted(key, e)
update_status_tracker.mark_update_aborted(key, e)
raise CachedMethodFailedException('Refresh timed out') from e
except Exception as e:
logger.debug('Error while refreshing cache for %s: %s', key, e)
update_statuses.mark_update_aborted(key, e)
update_status_tracker.mark_update_aborted(key, e)
raise CachedMethodFailedException('Refresh failed to complete') from e

@functools.wraps(method)
Expand Down

0 comments on commit a4b4f49

Please sign in to comment.