Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow an UpdateStatuses instance to be passed in to wrapper #34

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
3.1.0
-----

* Added ability to configure `UpdateStatuses` (for instance to update lock duration)
* Split default implementation into an interface and a default implementation

3.0.0
-----

Expand Down
17 changes: 9 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ Example how to customize default config (everything gets overridden):

@memoize(
configuration=MutableCacheConfiguration
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing())
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing()),
update_statuses=InMemoryLocks(update_lock_timeout=timedelta(minutes=5))
)
async def cached():
return 'dummy'
Expand Down
18 changes: 10 additions & 8 deletions examples/configuration/custom_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
from memoize.eviction import LeastRecentlyUpdatedEvictionStrategy
from memoize.key import EncodedMethodNameAndArgsKeyExtractor
from memoize.postprocessing import DeepcopyPostprocessing
from memoize.statuses import InMemoryLocks
from memoize.storage import LocalInMemoryCacheStorage
from memoize.wrapper import memoize


@memoize(
configuration=MutableCacheConfiguration
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing())
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing()),
update_statuses=InMemoryLocks(update_lock_timeout=timedelta(minutes=5))
)
async def cached():
return 'dummy'
53 changes: 38 additions & 15 deletions memoize/statuses.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,61 @@
"""
[Internal use only] Encapsulates update state management.
[API] Encapsulates update state management.
"""
import asyncio
import datetime
import logging
from abc import ABCMeta, abstractmethod
from asyncio import Future
from typing import Dict, Awaitable, Union

from memoize.entry import CacheKey, CacheEntry


class UpdateStatuses:
class UpdateStatuses(metaclass=ABCMeta):
@abstractmethod
def is_being_updated(self, key: CacheKey) -> bool:
"""Check if update for given key is in progress. Obtained info is valid until control gets back to IO-loop."""
raise NotImplementedError()

@abstractmethod
def mark_being_updated(self, key: CacheKey) -> None:
"""Inform that update has been started.
Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost)..
Calls to 'is_being_updated' will return True until 'mark_updated' will be called."""
raise NotImplementedError()

def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None:
"""Inform that update has been finished.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called."""
raise NotImplementedError()

@abstractmethod
def mark_update_aborted(self, key: CacheKey, exception: Exception) -> None:
"""Inform that update failed to complete.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.
Accepts exception to propagate it across all clients awaiting an update."""
raise NotImplementedError()

@abstractmethod
def await_updated(self, key: CacheKey) -> Awaitable[Union[CacheEntry, Exception]]:
"""Wait (asynchronously) until update in progress has benn finished.
Returns awaitable with the updated entry
(or awaitable with an exception if update failed/timed-out).
Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost)."""
raise NotImplementedError()


class InMemoryLocks(UpdateStatuses):
"""Manages in-memory locks to prevent dog-piling. """
def __init__(self, update_lock_timeout: datetime.timedelta = datetime.timedelta(minutes=5)) -> None:
self.logger = logging.getLogger(__name__)
self._update_lock_timeout = update_lock_timeout
self._updates_in_progress: Dict[CacheKey, Future] = {}

def is_being_updated(self, key: CacheKey) -> bool:
"""Checks if update for given key is in progress. Obtained info is valid until control gets back to IO-loop."""
return key in self._updates_in_progress

def mark_being_updated(self, key: CacheKey) -> None:
"""Informs that update has been started.
Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost)..
Calls to 'is_being_updated' will return True until 'mark_updated' will be called."""
if key in self._updates_in_progress:
raise ValueError('Key {} is already being updated'.format(key))

Expand All @@ -42,27 +74,18 @@ def complete_on_timeout_passed():
callback=complete_on_timeout_passed)

def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None:
"""Informs that update has been finished.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called."""
if key not in self._updates_in_progress:
raise ValueError('Key {} is not being updated'.format(key))
update = self._updates_in_progress.pop(key)
update.set_result(entry)

def mark_update_aborted(self, key: CacheKey, exception: Exception) -> None:
"""Informs that update failed to complete.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.
Accepts exception to propagate it across all clients awaiting an update."""
if key not in self._updates_in_progress:
raise ValueError('Key {} is not being updated'.format(key))
update = self._updates_in_progress.pop(key)
update.set_result(exception)

def await_updated(self, key: CacheKey) -> Awaitable[Union[CacheEntry, Exception]]:
"""Waits (asynchronously) until update in progress has benn finished.
Returns awaitable with the updated entry
(or awaitable with an exception if update failed/timed-out).
Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost)."""
if not self.is_being_updated(key):
raise ValueError('Key {} is not being updated'.format(key))
return self._updates_in_progress[key]
16 changes: 12 additions & 4 deletions memoize/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from memoize.entry import CacheKey, CacheEntry
from memoize.exceptions import CachedMethodFailedException
from memoize.invalidation import InvalidationSupport
from memoize.statuses import UpdateStatuses
from memoize.statuses import UpdateStatuses, InMemoryLocks


def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration = None,
invalidation: InvalidationSupport = None):
invalidation: InvalidationSupport = None, update_statuses: UpdateStatuses = None):
"""Wraps function with memoization.

If entry reaches time it should be updated, refresh is performed in background,
Expand All @@ -41,6 +41,8 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration
:param function method: function to be decorated
:param CacheConfiguration configuration: cache configuration; default: DefaultInMemoryCacheConfiguration
:param InvalidationSupport invalidation: pass created instance of InvalidationSupport to have it configured
:param UpdateStatuses update_statuses: allows to override how cache updates are tracked (e.g. lock config);
default: InMemoryStatuses

:raises: CachedMethodFailedException upon call: if cached method timed-out or thrown an exception
:raises: NotConfiguredCacheCalledException upon call: if provided configuration is not ready
Expand All @@ -49,15 +51,21 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration
if method is None:
if configuration is None:
configuration = DefaultInMemoryCacheConfiguration()
return functools.partial(memoize, configuration=configuration, invalidation=invalidation)
return functools.partial(
memoize,
configuration=configuration,
invalidation=invalidation,
update_statuses=update_statuses
)

if invalidation is not None and not invalidation._initialized() and configuration is not None:
invalidation._initialize(configuration.storage(), configuration.key_extractor(), method)

logger = logging.getLogger('{}@{}'.format(memoize.__name__, method.__name__))
logger.debug('wrapping %s with memoization - configuration: %s', method.__name__, configuration)

update_statuses = UpdateStatuses()
if update_statuses is None:
update_statuses = InMemoryLocks()

async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration) -> bool:
if update_statuses.is_being_updated(key):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def prepare_description():

setup(
name='py-memoize',
version='3.0.0',
version='3.1.0',
author='Michal Zmuda',
author_email='[email protected]',
url='https://github.com/DreamLab/memoize',
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

from datetime import timedelta

from memoize.statuses import UpdateStatuses
from memoize.statuses import InMemoryLocks, UpdateStatuses


@pytest.mark.asyncio(scope="class")
class TestStatuses:

def setup_method(self):
self.update_statuses = UpdateStatuses()
self.update_statuses: UpdateStatuses = InMemoryLocks()

async def test_should_not_be_updating(self):
# given/when/then
Expand Down