diff --git a/src/meta_memcache/extras/migrating_cache_client.py b/src/meta_memcache/extras/migrating_cache_client.py index f62fcb1..d36df8e 100644 --- a/src/meta_memcache/extras/migrating_cache_client.py +++ b/src/meta_memcache/extras/migrating_cache_client.py @@ -12,11 +12,14 @@ IntFlag, Key, ReadResponse, - SetMode, TokenFlag, Value, WriteResponse, ) +from meta_memcache.interfaces.router import ( + DEFAULT_FAILURE_HANDLING, + FailureHandling, +) class MigratingCacheClient(HighLevelCommandsMixin): @@ -93,6 +96,7 @@ def meta_multiget( flags: Optional[Set[Flag]] = None, int_flags: Optional[Dict[IntFlag, int]] = None, token_flags: Optional[Dict[TokenFlag, bytes]] = None, + failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: @@ -101,6 +105,7 @@ def meta_multiget( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -111,16 +116,16 @@ def meta_multiget( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) if self._should_populate_read(migration_mode): for key, result in results.items(): if isinstance(result, Value): - self._destination_client.set( + self._destination_client.refill( key=key, value=result.value, ttl=self._get_value_ttl(result), no_reply=True, - set_mode=SetMode.ADD, ) return results else: @@ -129,6 +134,7 @@ def meta_multiget( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) def meta_get( @@ -137,6 +143,7 @@ def meta_get( flags: Optional[Set[Flag]] = None, int_flags: Optional[Dict[IntFlag, int]] = None, token_flags: Optional[Dict[TokenFlag, bytes]] = None, + failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: @@ -145,6 +152,7 @@ def meta_get( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -155,14 +163,14 @@ def meta_get( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) if isinstance(result, Value) and self._should_populate_read(migration_mode): - self._destination_client.set( + self._destination_client.refill( key=key, value=result.value, ttl=self._get_value_ttl(result), no_reply=True, - set_mode=SetMode.ADD, ) return result else: @@ -171,6 +179,7 @@ def meta_get( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) def meta_set( @@ -181,6 +190,7 @@ def meta_set( flags: Optional[Set[Flag]] = None, int_flags: Optional[Dict[IntFlag, int]] = None, token_flags: Optional[Dict[TokenFlag, bytes]] = None, + failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -192,6 +202,7 @@ def meta_set( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_set( @@ -201,6 +212,7 @@ def meta_set( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -215,6 +227,7 @@ def meta_delete( flags: Optional[Set[Flag]] = None, int_flags: Optional[Dict[IntFlag, int]] = None, token_flags: Optional[Dict[TokenFlag, bytes]] = None, + failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -224,6 +237,7 @@ def meta_delete( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_delete( @@ -231,6 +245,7 @@ def meta_delete( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -245,6 +260,7 @@ def meta_arithmetic( flags: Optional[Set[Flag]] = None, int_flags: Optional[Dict[IntFlag, int]] = None, token_flags: Optional[Dict[TokenFlag, bytes]] = None, + failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: """ We can't reliably migrate cache data modified by meta-arithmetic, @@ -259,6 +275,7 @@ def meta_arithmetic( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) else: return self._origin_client.meta_arithmetic( @@ -266,6 +283,7 @@ def meta_arithmetic( flags=flags, int_flags=int_flags, token_flags=token_flags, + failure_handling=failure_handling, ) def touch( diff --git a/tests/migrating_cache_client_test.py b/tests/migrating_cache_client_test.py index 9131d0e..3fc227b 100644 --- a/tests/migrating_cache_client_test.py +++ b/tests/migrating_cache_client_test.py @@ -1,7 +1,8 @@ from unittest.mock import Mock +from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING import pytest -from meta_memcache import CacheClient, IntFlag, Key, SetMode, Value, WriteFailureEvent +from meta_memcache import CacheClient, IntFlag, Key, Value, WriteFailureEvent from meta_memcache.extras.migrating_cache_client import ( MigratingCacheClient, MigrationMode, @@ -162,6 +163,7 @@ def test_migration_mode_origin_only( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_set.assert_not_called() @@ -172,6 +174,7 @@ def test_migration_mode_origin_only( flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_delete.assert_not_called() @@ -182,6 +185,7 @@ def test_migration_mode_origin_only( flags=set(), int_flags={IntFlag.MA_DELTA_VALUE: 1}, token_flags={}, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_arithmetic.assert_not_called() @@ -222,6 +226,7 @@ def test_migration_mode_destination_only( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Deletes @@ -232,6 +237,7 @@ def test_migration_mode_destination_only( flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Arithmetic @@ -242,6 +248,7 @@ def test_migration_mode_destination_only( flags=set(), int_flags={IntFlag.MA_DELTA_VALUE: 1}, token_flags={}, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Touch @@ -284,6 +291,7 @@ def test_migration_mode_populate_writes( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), @@ -292,6 +300,7 @@ def test_migration_mode_populate_writes( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Deletes (both receive writes) @@ -301,12 +310,14 @@ def test_migration_mode_populate_writes( flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Arithmetic @@ -316,6 +327,7 @@ def test_migration_mode_populate_writes( flags=set(), int_flags={IntFlag.MA_DELTA_VALUE: 1}, token_flags={}, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_arithmetic.assert_not_called() @@ -349,26 +361,24 @@ def test_migration_mode_populate_reads_handles_non_expiring_keys( origin_client.meta_get.assert_called_once() destination_client.meta_get.assert_not_called() origin_client.set.assert_not_called() - destination_client.set.assert_called_once_with( + destination_client.refill.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=0, no_reply=True, - set_mode=SetMode.ADD, ) - destination_client.set.reset_mock() + destination_client.refill.reset_mock() # Multi-get migration_client.multi_get(keys=["foo"]) origin_client.meta_multiget.assert_called_once() destination_client.meta_multiget.assert_not_called() origin_client.set.assert_not_called() - destination_client.set.assert_called_once_with( + destination_client.refill.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=0, no_reply=True, - set_mode=SetMode.ADD, ) @@ -394,15 +404,15 @@ def test_migration_mode_populate_writes_and_reads_1pct( migration_client.get(key="foo") origin_client.meta_get.assert_called_once() destination_client.meta_get.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_not_called() + origin_client.refill.assert_not_called() + destination_client.refill.assert_not_called() # Multi-get migration_client.multi_get(keys=["foo"]) origin_client.meta_multiget.assert_called_once() destination_client.meta_multiget.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_not_called() + origin_client.refill.assert_not_called() + destination_client.refill.assert_not_called() origin_client.meta_get.reset_mock() origin_client.meta_multiget.reset_mock() @@ -414,27 +424,25 @@ def test_migration_mode_populate_writes_and_reads_1pct( migration_client.get(key="foo") origin_client.meta_get.assert_called_once() destination_client.meta_get.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_called_once_with( + origin_client.refill.assert_not_called() + destination_client.refill.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, no_reply=True, - set_mode=SetMode.ADD, ) - destination_client.set.reset_mock() + destination_client.refill.reset_mock() # Multi-get migration_client.multi_get(keys=["foo"]) origin_client.meta_multiget.assert_called_once() destination_client.meta_multiget.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_called_once_with( + origin_client.refill.assert_not_called() + destination_client.refill.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, no_reply=True, - set_mode=SetMode.ADD, ) # Sets (both receive writes) @@ -446,6 +454,7 @@ def test_migration_mode_populate_writes_and_reads_1pct( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), @@ -454,6 +463,7 @@ def test_migration_mode_populate_writes_and_reads_1pct( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Deletes (both receive writes) @@ -463,12 +473,14 @@ def test_migration_mode_populate_writes_and_reads_1pct( flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Arithmetic @@ -478,6 +490,7 @@ def test_migration_mode_populate_writes_and_reads_1pct( flags=set(), int_flags={IntFlag.MA_DELTA_VALUE: 1}, token_flags={}, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_arithmetic.assert_not_called() @@ -509,15 +522,15 @@ def test_migration_mode_populate_writes_and_reads_10pct( migration_client.get(key="foo") origin_client.meta_get.assert_called_once() destination_client.meta_get.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_not_called() + origin_client.refill.assert_not_called() + destination_client.refill.assert_not_called() # Multi-get migration_client.multi_get(keys=["foo"]) origin_client.meta_multiget.assert_called_once() destination_client.meta_multiget.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_not_called() + origin_client.refill.assert_not_called() + destination_client.refill.assert_not_called() origin_client.meta_get.reset_mock() origin_client.meta_multiget.reset_mock() @@ -529,27 +542,25 @@ def test_migration_mode_populate_writes_and_reads_10pct( migration_client.get(key="foo") origin_client.meta_get.assert_called_once() destination_client.meta_get.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_called_once_with( + origin_client.refill.assert_not_called() + destination_client.refill.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, no_reply=True, - set_mode=SetMode.ADD, ) - destination_client.set.reset_mock() + destination_client.refill.reset_mock() # Multi-get migration_client.multi_get(keys=["foo"]) origin_client.meta_multiget.assert_called_once() destination_client.meta_multiget.assert_not_called() - origin_client.set.assert_not_called() - destination_client.set.assert_called_once_with( + origin_client.refill.assert_not_called() + destination_client.refill.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, no_reply=True, - set_mode=SetMode.ADD, ) # Sets (both receive writes) @@ -561,6 +572,7 @@ def test_migration_mode_populate_writes_and_reads_10pct( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), @@ -569,6 +581,7 @@ def test_migration_mode_populate_writes_and_reads_10pct( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Deletes (both receive writes) @@ -578,12 +591,14 @@ def test_migration_mode_populate_writes_and_reads_10pct( flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Arithmetic @@ -593,6 +608,7 @@ def test_migration_mode_populate_writes_and_reads_10pct( flags=set(), int_flags={IntFlag.MA_DELTA_VALUE: 1}, token_flags={}, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_arithmetic.assert_not_called() @@ -639,6 +655,7 @@ def test_migration_mode_use_destination_update_origin( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), @@ -647,6 +664,7 @@ def test_migration_mode_use_destination_update_origin( flags=set(), int_flags={IntFlag.CACHE_TTL: 10}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Deletes (both receive writes) @@ -656,12 +674,14 @@ def test_migration_mode_use_destination_update_origin( flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), flags=set(), int_flags={}, token_flags=None, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Arithmetic @@ -672,6 +692,7 @@ def test_migration_mode_use_destination_update_origin( flags=set(), int_flags={IntFlag.MA_DELTA_VALUE: 1}, token_flags={}, + failure_handling=DEFAULT_FAILURE_HANDLING, ) # Touch