Skip to content

Commit

Permalink
Add failure handling to MigratingCacheClient
Browse files Browse the repository at this point in the history
## Motivation / Description
I forgot to add this file to the failure handling changes.
  • Loading branch information
bisho committed Nov 10, 2023
1 parent d1162e5 commit 549d2ca
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 33 deletions.
28 changes: 23 additions & 5 deletions src/meta_memcache/extras/migrating_cache_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
IntFlag,
Key,
ReadResponse,
SetMode,
TokenFlag,
Value,
WriteResponse,
)
from meta_memcache.interfaces.router import (
DEFAULT_FAILURE_HANDLING,
FailureHandling,
)


class MigratingCacheClient(HighLevelCommandsMixin):
Expand Down Expand Up @@ -93,6 +96,7 @@ def meta_multiget(
flags: Optional[Set[Flag]] = None,
int_flags: Optional[Dict[IntFlag, int]] = None,
token_flags: Optional[Dict[TokenFlag, bytes]] = None,
failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING,
) -> Dict[Key, ReadResponse]:
migration_mode = self.get_migration_mode()
if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN:
Expand All @@ -101,6 +105,7 @@ def meta_multiget(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
elif migration_mode in (
MigrationMode.POPULATE_WRITES_AND_READS_1PCT,
Expand All @@ -111,16 +116,16 @@ def meta_multiget(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
if self._should_populate_read(migration_mode):
for key, result in results.items():
if isinstance(result, Value):
self._destination_client.set(
self._destination_client.refill(
key=key,
value=result.value,
ttl=self._get_value_ttl(result),
no_reply=True,
set_mode=SetMode.ADD,
)
return results
else:
Expand All @@ -129,6 +134,7 @@ def meta_multiget(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)

def meta_get(
Expand All @@ -137,6 +143,7 @@ def meta_get(
flags: Optional[Set[Flag]] = None,
int_flags: Optional[Dict[IntFlag, int]] = None,
token_flags: Optional[Dict[TokenFlag, bytes]] = None,
failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING,
) -> ReadResponse:
migration_mode = self.get_migration_mode()
if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN:
Expand All @@ -145,6 +152,7 @@ def meta_get(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
elif migration_mode in (
MigrationMode.POPULATE_WRITES_AND_READS_1PCT,
Expand All @@ -155,14 +163,14 @@ def meta_get(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
if isinstance(result, Value) and self._should_populate_read(migration_mode):
self._destination_client.set(
self._destination_client.refill(
key=key,
value=result.value,
ttl=self._get_value_ttl(result),
no_reply=True,
set_mode=SetMode.ADD,
)
return result
else:
Expand All @@ -171,6 +179,7 @@ def meta_get(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)

def meta_set(
Expand All @@ -181,6 +190,7 @@ def meta_set(
flags: Optional[Set[Flag]] = None,
int_flags: Optional[Dict[IntFlag, int]] = None,
token_flags: Optional[Dict[TokenFlag, bytes]] = None,
failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING,
) -> WriteResponse:
origin_response = destination_response = None
migration_mode = self.get_migration_mode()
Expand All @@ -192,6 +202,7 @@ def meta_set(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
if migration_mode > MigrationMode.ONLY_ORIGIN:
destination_response = self._destination_client.meta_set(
Expand All @@ -201,6 +212,7 @@ def meta_set(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN:
assert destination_response is not None # noqa: S101
Expand All @@ -215,6 +227,7 @@ def meta_delete(
flags: Optional[Set[Flag]] = None,
int_flags: Optional[Dict[IntFlag, int]] = None,
token_flags: Optional[Dict[TokenFlag, bytes]] = None,
failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING,
) -> WriteResponse:
origin_response = destination_response = None
migration_mode = self.get_migration_mode()
Expand All @@ -224,13 +237,15 @@ def meta_delete(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
if migration_mode > MigrationMode.ONLY_ORIGIN:
destination_response = self._destination_client.meta_delete(
key=key,
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN:
assert destination_response is not None # noqa: S101
Expand All @@ -245,6 +260,7 @@ def meta_arithmetic(
flags: Optional[Set[Flag]] = None,
int_flags: Optional[Dict[IntFlag, int]] = None,
token_flags: Optional[Dict[TokenFlag, bytes]] = None,
failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING,
) -> WriteResponse:
"""
We can't reliably migrate cache data modified by meta-arithmetic,
Expand All @@ -259,13 +275,15 @@ def meta_arithmetic(
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)
else:
return self._origin_client.meta_arithmetic(
key=key,
flags=flags,
int_flags=int_flags,
token_flags=token_flags,
failure_handling=failure_handling,
)

def touch(
Expand Down
Loading

0 comments on commit 549d2ca

Please sign in to comment.