diff --git a/src/meta_memcache/__init__.py b/src/meta_memcache/__init__.py index 34234bc..cb8d14b 100644 --- a/src/meta_memcache/__init__.py +++ b/src/meta_memcache/__init__.py @@ -33,16 +33,15 @@ ) from meta_memcache.protocol import ( Conflict, - Flag, - IntFlag, Key, MetaCommand, Miss, NotStored, ServerVersion, + ResponseFlags, + RequestFlags, SetMode, Success, - TokenFlag, Value, ) from meta_memcache.routers.default import DefaultRouter diff --git a/src/meta_memcache/cache_client.py b/src/meta_memcache/cache_client.py index b09f8ae..f6601f0 100644 --- a/src/meta_memcache/cache_client.py +++ b/src/meta_memcache/cache_client.py @@ -1,4 +1,4 @@ -from typing import Callable, Iterable, Optional, Tuple +from typing import Callable, Iterable, Optional from meta_memcache.base.base_cache_client import BaseCacheClient from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin @@ -25,7 +25,7 @@ def cache_client_from_servers( servers: Iterable[ServerAddress], connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( @@ -48,7 +48,7 @@ def cache_client_with_gutter_from_servers( gutter_ttl: int, connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( @@ -76,7 +76,7 @@ def ephemeral_cache_client_from_servers( max_ttl: int, connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( diff --git a/src/meta_memcache/commands/high_level_commands.py b/src/meta_memcache/commands/high_level_commands.py index 96a47ec..a9ec8a5 100644 --- a/src/meta_memcache/commands/high_level_commands.py +++ b/src/meta_memcache/commands/high_level_commands.py @@ -5,7 +5,6 @@ Iterable, Optional, Protocol, - Set, Tuple, Type, TypeVar, @@ -18,35 +17,33 @@ from meta_memcache.interfaces.meta_commands import MetaCommandsProtocol from meta_memcache.interfaces.router import FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, Miss, ReadResponse, + RequestFlags, SetMode, Success, - TokenFlag, Value, + MA_MODE_DEC, ) T = TypeVar("T") _REFILL_FAILURE_HANDLING = FailureHandling(track_write_failures=False) - -DEFAULT_FLAGS: Set[Flag] = { - Flag.RETURN_VALUE, - Flag.RETURN_TTL, - Flag.RETURN_CLIENT_FLAG, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_FETCHED, -} -DEFAULT_CAS_FLAGS: Set[Flag] = { - Flag.RETURN_VALUE, - Flag.RETURN_TTL, - Flag.RETURN_CLIENT_FLAG, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_FETCHED, - Flag.RETURN_CAS_TOKEN, -} +DEFAULT_GET_FLAGS = RequestFlags( + return_value=True, + return_ttl=True, + return_client_flag=True, + return_last_access=True, + return_fetched=True, +) +DEFAULT_GET_CAS_FLAGS = RequestFlags( + return_value=True, + return_ttl=True, + return_client_flag=True, + return_last_access=True, + return_fetched=True, + return_cas_token=True, +) class HighLevelCommandMixinWithMetaCommands( @@ -83,7 +80,7 @@ def _get_delta_flags( no_reply: bool = False, cas_token: Optional[int] = None, return_value: bool = False, - ) -> Tuple[Set[Flag], Dict[IntFlag, int], Dict[TokenFlag, bytes]]: + ) -> RequestFlags: ... # pragma: no cover @@ -99,28 +96,21 @@ def set( set_mode: SetMode = SetMode.SET, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() + flags = RequestFlags(cache_ttl=ttl) if no_reply: - flags.add(Flag.NOREPLY) - int_flags: Dict[IntFlag, int] = { - IntFlag.CACHE_TTL: ttl, - } + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_cas_mismatch: - flags.add(Flag.MARK_STALE) - if set_mode == SetMode.SET: - token_flags = None - else: - token_flags = {TokenFlag.MODE: set_mode.value} + flags.mark_stale = True + if set_mode != SetMode.SET: + flags.mode = set_mode.value result = self.meta_set( key=key, value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) return isinstance(result, Success) @@ -149,17 +139,18 @@ def refill( there is no need to track failures. """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() + flags = RequestFlags( + cache_ttl=ttl, + mode=SetMode.ADD.value, + ) if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True result = self.meta_set( key=key, value=value, ttl=ttl, flags=flags, - int_flags={IntFlag.CACHE_TTL: ttl}, - token_flags={TokenFlag.MODE: SetMode.ADD.value}, failure_handling=_REFILL_FAILURE_HANDLING, ) @@ -179,21 +170,16 @@ def delete( it exists or not, use invalidate() instead. """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = {} + flags = RequestFlags() if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_deletion_ttl > 0: - flags.add(Flag.MARK_STALE) - int_flags[IntFlag.CACHE_TTL] = stale_policy.mark_stale_on_deletion_ttl + flags.mark_stale = True + flags.cache_ttl = stale_policy.mark_stale_on_deletion_ttl - result = self.meta_delete( - key=key, - flags=flags, - int_flags=int_flags, - ) + result = self.meta_delete(key=key, flags=flags) return isinstance(result, Success) @@ -208,21 +194,16 @@ def invalidate( Returns true of the key deleted or it didn't exist anyway """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = {} + flags = RequestFlags() if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_deletion_ttl > 0: - flags.add(Flag.MARK_STALE) - int_flags[IntFlag.CACHE_TTL] = stale_policy.mark_stale_on_deletion_ttl + flags.mark_stale = True + flags.cache_ttl = stale_policy.mark_stale_on_deletion_ttl - result = self.meta_delete( - key=key, - flags=flags, - int_flags=int_flags, - ) + result = self.meta_delete(key=key, flags=flags) return isinstance(result, (Success, Miss)) @@ -233,11 +214,10 @@ def touch( no_reply: bool = False, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags = {IntFlag.CACHE_TTL: ttl} + flags = RequestFlags(cache_ttl=ttl) if no_reply: - flags.add(Flag.NOREPLY) - result = self.meta_get(key, flags=flags, int_flags=int_flags) + flags.no_reply = True + result = self.meta_get(key, flags=flags) return isinstance(result, Success) @@ -289,23 +269,23 @@ def get_or_lease_cas( if isinstance(result, Value): # It is a hit. - if result.win: + if result.flags.win: # Win flag present, meaning we got the lease to # recache/cache the item. We need to mimic a miss. - return None, result.cas_token - if result.size == 0 and result.win is False: + return None, result.flags.cas_token + if result.size == 0 and result.flags.win is False: # The value is empty, this is a miss lease, # and we lost, so we must keep retrying and - # wait for the winner to populate the value. + # wait for the.flags.winner to populate the value. if i < lease_policy.miss_retries: continue else: # We run out of retries, behave as a miss - return None, result.cas_token + return None, result.flags.cas_token else: # There is data, either the is no lease or # we lost and should use the stale value. - return result.value, result.cas_token + return result.value, result.flags.cas_token else: # With MISS_LEASE_TTL we should always get a value # because on miss a lease empty value is generated @@ -346,22 +326,17 @@ def _multi_get( return_cas_token: bool = False, ) -> Dict[Key, Optional[Value]]: if return_cas_token: - flags = DEFAULT_CAS_FLAGS.copy() - else: - flags = DEFAULT_FLAGS.copy() - if recache_policy is None and touch_ttl is None: - int_flags = None + flags = DEFAULT_GET_CAS_FLAGS.copy() else: - int_flags = {} - if recache_policy: - int_flags[IntFlag.RECACHE_TTL] = recache_policy.ttl - if touch_ttl is not None and touch_ttl >= 0: - int_flags[IntFlag.CACHE_TTL] = touch_ttl + flags = DEFAULT_GET_FLAGS.copy() + if recache_policy: + flags.recache_ttl = recache_policy.ttl + if touch_ttl is not None and touch_ttl >= 0: + flags.cache_ttl = touch_ttl results = self.meta_multiget( keys=[key if isinstance(key, Key) else Key(key) for key in keys], flags=flags, - int_flags=int_flags, ) return {k: self._process_get_result(k, v) for k, v in results.items()} @@ -380,7 +355,7 @@ def get_cas( if result is None: return None, None else: - return result.value, result.cas_token + return result.value, result.flags.cas_token def _get( self: HighLevelCommandMixinWithMetaCommands, @@ -392,21 +367,17 @@ def _get( ) -> Optional[Value]: key = key if isinstance(key, Key) else Key(key) if return_cas_token: - flags = DEFAULT_CAS_FLAGS.copy() - else: - flags = DEFAULT_FLAGS.copy() - if lease_policy is None and recache_policy is None and touch_ttl is None: - int_flags = None + flags = DEFAULT_GET_CAS_FLAGS.copy() else: - int_flags = {} - if lease_policy: - int_flags[IntFlag.MISS_LEASE_TTL] = lease_policy.ttl - if recache_policy: - int_flags[IntFlag.RECACHE_TTL] = recache_policy.ttl - if touch_ttl is not None and touch_ttl >= 0: - int_flags[IntFlag.CACHE_TTL] = touch_ttl - - result = self.meta_get(key, flags=flags, int_flags=int_flags) + flags = DEFAULT_GET_FLAGS.copy() + if lease_policy: + flags.vivify_on_miss_ttl = lease_policy.ttl + if recache_policy: + flags.recache_ttl = recache_policy.ttl + if touch_ttl is not None and touch_ttl >= 0: + flags.cache_ttl = touch_ttl + + result = self.meta_get(key, flags=flags) return self._process_get_result(key, result) def _process_get_result( @@ -416,7 +387,7 @@ def _process_get_result( ) -> Optional[Value]: if isinstance(result, Value): # It is a hit - if result.win: + if result.flags.win: # Win flag present, meaning we got the lease to # recache the item. We need to mimic a miss, so # we set the value to None. @@ -477,25 +448,22 @@ def _get_delta_flags( no_reply: bool = False, cas_token: Optional[int] = None, return_value: bool = False, - ) -> Tuple[Set[Flag], Dict[IntFlag, int], Dict[TokenFlag, bytes]]: - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = { - IntFlag.MA_DELTA_VALUE: abs(delta), - } - token_flags: Dict[TokenFlag, bytes] = {} - + ) -> RequestFlags: + flags = RequestFlags( + ma_delta_value=abs(delta), + ) if return_value: - flags.add(Flag.RETURN_VALUE) + flags.return_value = True if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if refresh_ttl is not None: - int_flags[IntFlag.CACHE_TTL] = refresh_ttl + flags.cache_ttl = refresh_ttl if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if delta < 0: - token_flags[TokenFlag.MODE] = b"-" + flags.mode = MA_MODE_DEC - return flags, int_flags, token_flags + return flags def delta( self: HighLevelCommandMixinWithMetaCommands, @@ -506,15 +474,13 @@ def delta( cas_token: Optional[int] = None, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( delta=delta, refresh_ttl=refresh_ttl, no_reply=no_reply, cas_token=cas_token, ) - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + result = self.meta_arithmetic(key=key, flags=flags) return isinstance(result, Success) def delta_initialize( @@ -528,17 +494,15 @@ def delta_initialize( cas_token: Optional[int] = None, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( delta=delta, refresh_ttl=refresh_ttl, no_reply=no_reply, cas_token=cas_token, ) - int_flags[IntFlag.MA_INITIAL_VALUE] = abs(initial_value) - int_flags[IntFlag.MISS_LEASE_TTL] = initial_ttl - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + flags.ma_initial_value = abs(initial_value) + flags.vivify_on_miss_ttl = initial_ttl + result = self.meta_arithmetic(key=key, flags=flags) return isinstance(result, Success) def delta_and_get( @@ -549,15 +513,13 @@ def delta_and_get( cas_token: Optional[int] = None, ) -> Optional[int]: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( return_value=True, delta=delta, refresh_ttl=refresh_ttl, cas_token=cas_token, ) - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + result = self.meta_arithmetic(key=key, flags=flags) if isinstance(result, Value): if isinstance(result.value, str) and result.value.isnumeric(): return int(result.value) @@ -577,17 +539,15 @@ def delta_initialize_and_get( cas_token: Optional[int] = None, ) -> Optional[int]: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( return_value=True, delta=delta, refresh_ttl=refresh_ttl, cas_token=cas_token, ) - int_flags[IntFlag.MA_INITIAL_VALUE] = abs(initial_value) - int_flags[IntFlag.MISS_LEASE_TTL] = initial_ttl - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + flags.ma_initial_value = abs(initial_value) + flags.vivify_on_miss_ttl = initial_ttl + result = self.meta_arithmetic(key=key, flags=flags) if isinstance(result, Value): if isinstance(result.value, str) and result.value.isnumeric(): return int(result.value) diff --git a/src/meta_memcache/commands/meta_commands.py b/src/meta_memcache/commands/meta_commands.py index 6d1901e..647f95e 100644 --- a/src/meta_memcache/commands/meta_commands.py +++ b/src/meta_memcache/commands/meta_commands.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from meta_memcache.errors import MemcacheError from meta_memcache.interfaces.router import ( @@ -8,15 +8,13 @@ ) from meta_memcache.protocol import ( Conflict, - Flag, - IntFlag, Key, MetaCommand, Miss, NotStored, ReadResponse, + RequestFlags, Success, - TokenFlag, Value, ValueContainer, WriteResponse, @@ -27,9 +25,7 @@ class MetaCommandsMixin: def meta_multiget( self: HasRouter, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: results: Dict[Key, ReadResponse] = {} @@ -37,8 +33,6 @@ def meta_multiget( command=MetaCommand.META_GET, keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ).items(): if not isinstance(result, (Miss, Value, Success)): @@ -51,17 +45,13 @@ def meta_multiget( def meta_get( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: result = self.router.exec( command=MetaCommand.META_GET, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Miss, Value, Success)): @@ -73,9 +63,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( @@ -83,8 +71,6 @@ def meta_set( key=key, value=ValueContainer(value), flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss)): @@ -94,17 +80,13 @@ def meta_set( def meta_delete( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( command=MetaCommand.META_DELETE, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss)): @@ -116,17 +98,13 @@ def meta_delete( def meta_arithmetic( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( command=MetaCommand.META_ARITHMETIC, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss, Value)): diff --git a/src/meta_memcache/configuration.py b/src/meta_memcache/configuration.py index bc92ffa..ab2911e 100644 --- a/src/meta_memcache/configuration.py +++ b/src/meta_memcache/configuration.py @@ -1,8 +1,7 @@ -import base64 import hashlib import socket from enum import IntEnum -from typing import Callable, Dict, Iterable, NamedTuple, Optional, Tuple +from typing import Callable, Dict, Iterable, NamedTuple, Optional from meta_memcache.connection.pool import ConnectionPool from meta_memcache.protocol import Key, ServerVersion @@ -157,17 +156,18 @@ class StalePolicy(NamedTuple): mark_stale_on_cas_mismatch: bool = False -def default_key_encoder(key: Key) -> Tuple[bytes, bool]: +def default_key_encoder(key: Key) -> bytes: """ Generate valid memcache key as bytes for given Key. """ - if key.is_unicode or len(key.key) > MAX_KEY_SIZE: - key_hash = hashlib.blake2b(key.key.encode(), digest_size=18).digest() - return base64.b64encode(key_hash), True - elif " " in key.key: - raise ValueError(f"Invalid key {key}") - else: - return key.key.encode("ascii"), False + # TODO: Support + # if isinstance(key.key, bytes): + # data = key.key + # else: + data = key.key.encode() + if len(data) >= MAX_KEY_SIZE: + data = hashlib.blake2b(data, digest_size=18).digest() + return data class MigrationMode(IntEnum): diff --git a/src/meta_memcache/connection/memcache_socket.py b/src/meta_memcache/connection/memcache_socket.py index 4c1d967..73ad81f 100644 --- a/src/meta_memcache/connection/memcache_socket.py +++ b/src/meta_memcache/connection/memcache_socket.py @@ -1,6 +1,8 @@ import logging import socket -from typing import Union +from typing import Optional, Tuple, Union + +import meta_socket from meta_memcache.errors import MemcacheError from meta_memcache.protocol import ( @@ -12,6 +14,7 @@ NotStored, ServerVersion, Success, + ResponseFlags, Value, get_store_success_response_header, ) @@ -119,7 +122,9 @@ def _reset_buffer(self) -> None: self._pos = 0 self._read = remaining_data - def _get_single_header(self) -> memoryview: + def _get_single_header( + self, + ) -> Tuple[int, Optional[int], Optional[int], Optional[ResponseFlags]]: # Reset buffer for new data if self._read == self._pos: self._read = 0 @@ -127,22 +132,18 @@ def _get_single_header(self) -> memoryview: elif self._pos > self._reset_buffer_size: self._reset_buffer() - endl_pos = -1 while True: - if self._read - self._pos > ENDL_LEN: - endl_pos = self._buf.find(ENDL, self._pos, self._read) - if endl_pos >= 0: - break - # Missing data, but still space in buffer, so read more + if self._read != self._pos: + # We have data in the buffer: Try to find the header + if header_data := meta_socket.parse_header( + self._buf_view, self._pos, self._read + ): + self._pos = header_data[0] + return header_data if self._recv_info_buffer() <= 0: break - if endl_pos < 0: - raise MemcacheError("Bad response. Socket might have closed unexpectedly") - - pos = self._pos - self._pos = endl_pos + ENDL_LEN - return self._buf_view[pos:endl_pos] + raise MemcacheError("Bad response. Socket might have closed unexpectedly") def sendall(self, data: bytes, with_noop: bool = False) -> None: if with_noop: @@ -153,10 +154,12 @@ def sendall(self, data: bytes, with_noop: bool = False) -> None: def _read_until_noop_header(self) -> None: while self._noop_expected > 0: header = self._get_single_header() - if header[0:2] == b"MN": + if header[1] == meta_socket.RESPONSE_NOOP: self._noop_expected -= 1 - def _get_header(self) -> memoryview: + def _get_header( + self, + ) -> Tuple[int, Optional[int], Optional[int], Optional[ResponseFlags]]: try: if self._noop_expected > 0: self._read_until_noop_header() @@ -169,30 +172,34 @@ def _get_header(self) -> memoryview: def get_response( self, ) -> Union[Value, Success, NotStored, Conflict, Miss]: - header = self._get_header().tobytes() - response_code = header[0:2] + (_, response_code, size, flags) = self._get_header() result: Union[Value, Success, NotStored, Conflict, Miss] try: - if response_code == b"VA": + if response_code == meta_socket.RESPONSE_VALUE: # Value response - result = Value.from_header(header) - elif response_code == self._store_success_response_header: + assert size is not None and flags is not None + result = Value(size=size, flags=flags, value=None) + elif response_code == meta_socket.RESPONSE_SUCCESS: # Stored or no value, return Success - result = Success.from_header(header) - elif response_code == b"NS": + assert flags is not None + result = Success(flags=flags) + elif response_code == meta_socket.RESPONSE_NOT_STORED: # Value response, parse size and flags result = NOT_STORED - elif response_code == b"EX": + elif response_code == meta_socket.RESPONSE_CONFLICT: # Already exists, not changed, CAS conflict result = CONFLICT - elif response_code == b"EN" or response_code == b"NF": + elif response_code == meta_socket.RESPONSE_MISS: # Not Found, Miss. result = MISS else: - raise MemcacheError(f"Unknown response: {bytes(response_code)!r}") + raise MemcacheError(f"Unknown response: {response_code}") except Exception as e: - _log.warning(f"Error parsing response header in {self}: {header!r}") - raise MemcacheError(f"Error parsing response header {header!r}") from e + _log.warning( + f"Error parsing response header in {self}: " + f"Response: {response_code}, size {size}, flags: {flags}" + ) + raise MemcacheError("Error parsing response header") from e return result diff --git a/src/meta_memcache/executors/default.py b/src/meta_memcache/executors/default.py index 6e382ae..a0954bd 100644 --- a/src/meta_memcache/executors/default.py +++ b/src/meta_memcache/executors/default.py @@ -1,5 +1,7 @@ import logging -from typing import Callable, Dict, List, Optional, Set, Tuple +from typing import Callable, Dict, List, Optional, Tuple + +from meta_socket import RequestFlags, build_cmd from meta_memcache.base.base_serializer import BaseSerializer from meta_memcache.configuration import default_key_encoder @@ -9,30 +11,30 @@ from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.protocol import ( ENDL, - Flag, - IntFlag, Key, MaybeValue, MemcacheResponse, MetaCommand, Miss, NotStored, + ResponseFlags, ServerVersion, Success, - TokenFlag, Value, ValueContainer, - encode_size, ) _log: logging.Logger = logging.getLogger(__name__) +meta_commands_values = {cmd: cmd.value for cmd in MetaCommand} + + class DefaultExecutor: def __init__( self, serializer: BaseSerializer, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, touch_ttl_to_consider_write_failure: Optional[int] = 50, ) -> None: @@ -47,41 +49,39 @@ def _build_cmd( command: MetaCommand, key: Key, size: Optional[int] = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, version: ServerVersion = ServerVersion.STABLE, ) -> bytes: - encoded_key, is_binary = self._key_encoder_fn(key) - cmd = [command.value, encoded_key] - if size is not None: - cmd.append(encode_size(size, version=version)) - cmd_flags: List[bytes] = [] - if is_binary: - cmd_flags.append(Flag.BINARY.value) - if flags: - cmd_flags.extend(flag.value for flag in flags) - if int_flags: - for int_flag, int_value in int_flags.items(): - cmd_flags.append(int_flag.value + str(int_value).encode("ascii")) - if token_flags: - for token_flag, bytes_value in token_flags.items(): - cmd_flags.append(token_flag.value + bytes_value) - cmd.extend(cmd_flags) - return b" ".join(cmd) + ENDL + encoded_key = self._key_encoder_fn(key) + cmd = meta_commands_values[command] + if version == ServerVersion.STABLE: + return build_cmd( + cmd, + encoded_key, + size, + flags, + ) + else: + return build_cmd( + cmd, + encoded_key, + size, + flags, + legacy_size_format=True, + ) - def _prepare_serialized_value_and_int_flags( + def _prepare_serialized_value_and_flags( self, value: ValueContainer, - int_flags: Optional[Dict[IntFlag, int]], - ) -> Tuple[Optional[bytes], Optional[Dict[IntFlag, int]]]: + flags: Optional[RequestFlags], + ) -> Tuple[Optional[bytes], RequestFlags]: encoded_value = self._serializer.serialize(value.value) - int_flags = int_flags if int_flags is not None else {} - int_flags[IntFlag.SET_CLIENT_FLAG] = encoded_value.encoding_id - return encoded_value.data, int_flags + flags = flags if flags is not None else RequestFlags() + flags.client_flag = encoded_value.encoding_id + return encoded_value.data, flags def _is_a_write_failure( - self, command: MetaCommand, int_flags: Optional[Dict[IntFlag, int]] + self, command: MetaCommand, flags: Optional[RequestFlags] ) -> bool: if command in ( MetaCommand.META_DELETE, @@ -91,7 +91,7 @@ def _is_a_write_failure( if ( self._touch_ttl_to_consider_write_failure is not None and command == MetaCommand.META_GET - and (touch_ttl := (int_flags or {}).get(IntFlag.CACHE_TTL, None)) + and (touch_ttl := (flags.cache_ttl if flags else None)) and 0 < touch_ttl <= self._touch_ttl_to_consider_write_failure ): return True @@ -103,16 +103,14 @@ def exec_on_pool( command: MetaCommand, key: Key, value: MaybeValue, - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> MemcacheResponse: - cmd_value, int_flags = ( - (None, int_flags) + cmd_value, flags = ( + (None, flags) if value is None - else self._prepare_serialized_value_and_int_flags(value, int_flags) + else self._prepare_serialized_value_and_flags(value, flags) ) try: conn = pool.pop_connection() @@ -124,8 +122,6 @@ def exec_on_pool( key=key, value=cmd_value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) return self._conn_recv_response(conn, flags=flags) except Exception as e: @@ -134,7 +130,7 @@ def exec_on_pool( finally: pool.release_connection(conn, error=error) except MemcacheServerError: - if track_write_failures and self._is_a_write_failure(command, int_flags): + if track_write_failures and self._is_a_write_failure(command, flags): self.on_write_failure(key) raise_on_server_error = ( raise_on_server_error @@ -153,9 +149,7 @@ def exec_multi_on_pool( # noqa: C901 pool: ConnectionPool, command: MetaCommand, key_values: List[Tuple[Key, MaybeValue]], - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> Dict[Key, MemcacheResponse]: @@ -166,12 +160,10 @@ def exec_multi_on_pool( # noqa: C901 try: # with pool.get_connection() as conn: for key, value in key_values: - cmd_value, int_flags = ( - (None, int_flags) + cmd_value, flags = ( + (None, flags) if value is None - else self._prepare_serialized_value_and_int_flags( - value, int_flags - ) + else self._prepare_serialized_value_and_flags(value, flags) ) self._conn_send_cmd( @@ -180,8 +172,6 @@ def exec_multi_on_pool( # noqa: C901 key=key, value=cmd_value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) for key, _ in key_values: results[key] = self._conn_recv_response(conn, flags=flags) @@ -191,7 +181,7 @@ def exec_multi_on_pool( # noqa: C901 finally: pool.release_connection(conn, error=error) except MemcacheServerError: - if track_write_failures and self._is_a_write_failure(command, int_flags): + if track_write_failures and self._is_a_write_failure(command, flags): for key, _ in key_values: self.on_write_failure(key) raise_on_server_error = ( @@ -214,9 +204,7 @@ def _conn_send_cmd( command: MetaCommand, key: Key, value: Optional[bytes] = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> None: """ Execute command on a connection @@ -226,16 +214,12 @@ def _conn_send_cmd( key, size=len(value) if value is not None else None, flags=flags, - int_flags=int_flags, - token_flags=token_flags, version=conn.get_version(), ) # write meta commands with NOREPLY can potentially return errors # they are not fully silent, so we need to add a no-op to the wire. with_noop = ( - command != MetaCommand.META_GET - and flags is not None - and Flag.NOREPLY in flags + command != MetaCommand.META_GET and flags is not None and flags.no_reply ) if value: @@ -246,18 +230,18 @@ def _conn_send_cmd( def _conn_recv_response( self, conn: MemcacheSocket, - flags: Optional[Set[Flag]] = None, + flags: Optional[RequestFlags] = None, ) -> MemcacheResponse: """ Read response on a connection """ - if flags and Flag.NOREPLY in flags: - return Success() + if flags and flags.no_reply: + return Success(flags=ResponseFlags()) result = conn.get_response() if isinstance(result, Value): data = conn.get_value(result.size) if result.size > 0: - encoding_id = result.client_flag or 0 + encoding_id = result.flags.client_flag or 0 try: result.value = self._serializer.unserialize(data, encoding_id) except Exception: diff --git a/src/meta_memcache/extras/client_wrapper.py b/src/meta_memcache/extras/client_wrapper.py index 61f6678..6a1f6be 100644 --- a/src/meta_memcache/extras/client_wrapper.py +++ b/src/meta_memcache/extras/client_wrapper.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin from meta_memcache.configuration import ServerAddress @@ -6,11 +6,9 @@ from meta_memcache.interfaces.router import FailureHandling, DEFAULT_FAILURE_HANDLING from meta_memcache.interfaces.cache_api import CacheApi from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, - TokenFlag, + RequestFlags, WriteResponse, ) @@ -33,32 +31,24 @@ def __init__( def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: return self.client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: return self.client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) @@ -67,9 +57,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_set( @@ -77,40 +65,30 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) diff --git a/src/meta_memcache/extras/migrating_cache_client.py b/src/meta_memcache/extras/migrating_cache_client.py index b1ce0d1..9bb5386 100644 --- a/src/meta_memcache/extras/migrating_cache_client.py +++ b/src/meta_memcache/extras/migrating_cache_client.py @@ -1,6 +1,6 @@ import random import time -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Dict, List, Optional, Union from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin from meta_memcache.configuration import MigrationMode, ServerAddress @@ -8,12 +8,10 @@ from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.interfaces.cache_api import CacheApi from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, + RequestFlags, SetMode, - TokenFlag, Value, WriteResponse, ) @@ -75,7 +73,11 @@ def get_migration_mode(self) -> MigrationMode: return current_mode def _get_value_ttl(self, value: Value) -> int: - ttl = value.ttl if value.ttl is not None else self._default_read_backfill_ttl + ttl = ( + value.flags.ttl + if value.flags.ttl is not None + else self._default_read_backfill_ttl + ) if ttl < 0: # TTL for items marked to store forvered is returned as -1 ttl = 0 @@ -90,17 +92,13 @@ def _should_populate_read(self, migration_mode: MigrationMode) -> bool: def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> Dict[Key, ReadResponse]: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: return self._destination_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -109,8 +107,6 @@ def meta_multiget( results = self._origin_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if self._should_populate_read(migration_mode): for key, result in results.items(): @@ -127,24 +123,18 @@ def meta_multiget( return self._origin_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> ReadResponse: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: return self._destination_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -153,8 +143,6 @@ def meta_get( result = self._origin_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if isinstance(result, Value) and self._should_populate_read(migration_mode): self._destination_client.set( @@ -169,8 +157,6 @@ def meta_get( return self._origin_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def meta_set( @@ -178,9 +164,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -190,8 +174,6 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_set( @@ -199,8 +181,6 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -212,9 +192,7 @@ def meta_set( def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -222,15 +200,11 @@ def meta_delete( origin_response = self._origin_client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -242,9 +216,7 @@ def meta_delete( def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: """ We can't reliably migrate cache data modified by meta-arithmetic, @@ -257,15 +229,11 @@ def meta_arithmetic( return self._destination_client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) else: return self._origin_client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def touch( diff --git a/src/meta_memcache/extras/probabilistic_hot_cache.py b/src/meta_memcache/extras/probabilistic_hot_cache.py index 1e69699..3766a85 100644 --- a/src/meta_memcache/extras/probabilistic_hot_cache.py +++ b/src/meta_memcache/extras/probabilistic_hot_cache.py @@ -124,10 +124,11 @@ def _store_in_hot_cache_if_necessary( allowed: bool, ) -> None: if not is_hot: - hit_after_write = value.fetched or 0 - last_read_age = value.last_access if value.last_access is not None else 9999 + last_read_age = ( + value.flags.last_access if value.flags.last_access is not None else 9999 + ) if ( - hit_after_write > 0 + value.flags.fetched and last_read_age <= self._max_last_access_age_seconds ): # Is detected as hot diff --git a/src/meta_memcache/interfaces/executor.py b/src/meta_memcache/interfaces/executor.py index f211b1a..c3ae584 100644 --- a/src/meta_memcache/interfaces/executor.py +++ b/src/meta_memcache/interfaces/executor.py @@ -1,15 +1,14 @@ -from typing import Dict, List, Optional, Protocol, Set, Tuple +from typing import Dict, List, Optional, Protocol, Tuple + from meta_memcache.connection.pool import ConnectionPool from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -20,9 +19,7 @@ def exec_on_pool( command: MetaCommand, key: Key, value: MaybeValue, - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> MemcacheResponse: @@ -38,9 +35,7 @@ def exec_multi_on_pool( pool: ConnectionPool, command: MetaCommand, key_values: List[Tuple[Key, MaybeValue]], - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> Dict[Key, MemcacheResponse]: diff --git a/src/meta_memcache/interfaces/meta_commands.py b/src/meta_memcache/interfaces/meta_commands.py index ed6fa23..8850394 100644 --- a/src/meta_memcache/interfaces/meta_commands.py +++ b/src/meta_memcache/interfaces/meta_commands.py @@ -1,13 +1,11 @@ -from typing import Any, Dict, List, Optional, Protocol, Set +from typing import Any, Dict, List, Optional, Protocol from meta_memcache.interfaces.router import FailureHandling, DEFAULT_FAILURE_HANDLING from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, - TokenFlag, WriteResponse, + RequestFlags, ) @@ -15,9 +13,7 @@ class MetaCommandsProtocol(Protocol): def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: ... # pragma: no cover @@ -25,9 +21,7 @@ def meta_multiget( def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: ... # pragma: no cover @@ -37,9 +31,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover @@ -47,9 +39,7 @@ def meta_set( def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover @@ -57,9 +47,7 @@ def meta_delete( def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover diff --git a/src/meta_memcache/interfaces/router.py b/src/meta_memcache/interfaces/router.py index 2e212d4..4373d9b 100644 --- a/src/meta_memcache/interfaces/router.py +++ b/src/meta_memcache/interfaces/router.py @@ -1,17 +1,16 @@ -from typing import Dict, List, NamedTuple, Optional, Protocol, Set +from typing import Dict, List, NamedTuple, Optional, Protocol + from meta_memcache.configuration import ServerAddress from meta_memcache.connection.pool import PoolCounters from meta_memcache.interfaces.executor import Executor from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -34,9 +33,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -52,9 +49,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ diff --git a/src/meta_memcache/protocol.py b/src/meta_memcache/protocol.py index 95216d0..76cfa31 100644 --- a/src/meta_memcache/protocol.py +++ b/src/meta_memcache/protocol.py @@ -1,6 +1,19 @@ from dataclasses import dataclass from enum import Enum, IntEnum -from typing import Any, Dict, List, Optional, Union +from typing import Any, List, Optional, Union + +from meta_socket import ( # noqa: F401 + ResponseFlags, + RequestFlags as RequestFlags, + SET_MODE_ADD, + SET_MODE_APPEND, + SET_MODE_PREPEND, + SET_MODE_REPLACE, + SET_MODE_SET, + MA_MODE_INC as MA_MODE_INC, + MA_MODE_DEC as MA_MODE_DEC, +) + ENDL = b"\r\n" NOOP: bytes = b"mn" + ENDL @@ -37,57 +50,11 @@ class MetaCommand(Enum): class SetMode(Enum): - SET = b"S" # Default - ADD = b"E" # Add if item does NOT EXIST, else LRU bump and return NS - APPEND = b"A" # If item exists, append the new value to its data. - PREPEND = b"P" # If item exists, prepend the new value to its data. - REPLACE = b"R" # Set only if item already exists. - - -class Flag(Enum): - BINARY = b"b" - NOREPLY = b"q" - RETURN_CLIENT_FLAG = b"f" - RETURN_CAS_TOKEN = b"c" - RETURN_VALUE = b"v" - RETURN_TTL = b"t" - RETURN_SIZE = b"s" - RETURN_LAST_ACCESS = b"l" - RETURN_FETCHED = b"h" - RETURN_KEY = b"k" - NO_UPDATE_LRU = b"u" - MARK_STALE = b"I" - - -class IntFlag(Enum): - CACHE_TTL = b"T" - RECACHE_TTL = b"R" - MISS_LEASE_TTL = b"N" - SET_CLIENT_FLAG = b"F" - MA_INITIAL_VALUE = b"J" - MA_DELTA_VALUE = b"D" - CAS_TOKEN = b"C" - - -class TokenFlag(Enum): - OPAQUE = b"O" - # 'M' (mode switch): - # * Meta Arithmetic: - # - I or +: increment - # - D or -: decrement - # * Meta Set: See SetMode Enum above - # - E: "add" command. LRU bump and return NS if item exists. Else add. - # - A: "append" command. If item exists, append the new value to its data. - # - P: "prepend" command. If item exists, prepend the new value to its data. - # - R: "replace" command. Set only if item already exists. - # - S: "set" command. The default mode, added for completeness. - MODE = b"M" - - -# Store maps of byte values (int) to enum value -flag_values: Dict[int, Flag] = {f.value[0]: f for f in Flag} -int_flags_values: Dict[int, IntFlag] = {f.value[0]: f for f in IntFlag} -token_flags_values: Dict[int, TokenFlag] = {f.value[0]: f for f in TokenFlag} + SET = SET_MODE_SET # Default + ADD = SET_MODE_ADD # Add if item does NOT EXIST, else LRU bump and return NS + APPEND = SET_MODE_APPEND # If item exists, append the new value to its data. + PREPEND = SET_MODE_PREPEND # If item exists, prepend the new value to its data. + REPLACE = SET_MODE_REPLACE # Set only if item already exists. @dataclass @@ -103,168 +70,21 @@ class Miss(MemcacheResponse): # Response flags -TOKEN_FLAG_OPAQUE = ord("O") -INT_FLAG_CAS_TOKEN = ord("c") -INT_FLAG_FETCHED = ord("h") -INT_FLAG_LAST_ACCESS = ord("l") -INT_FLAG_TTL = ord("t") -INT_FLAG_CLIENT_FLAG = ord("f") -INT_FLAG_SIZE = ord("s") -FLAG_WIN = ord("W") -FLAG_LOST = ord("Z") -FLAG_STALE = ord("X") - - -# @dataclass(slots=True, init=False) +EMPTY_RESPONSE_FLAGS = ResponseFlags() + + @dataclass class Success(MemcacheResponse): - __slots__ = ( - "cas_token", - "fetched", - "last_access", - "ttl", - "client_flag", - "win", - "stale", - "real_size", - "opaque", - ) - cas_token: Optional[int] - fetched: Optional[int] - last_access: Optional[int] - ttl: Optional[int] - client_flag: Optional[int] - win: Optional[bool] - stale: bool - real_size: Optional[int] - opaque: Optional[bytes] + __slots__ = ("flags",) + flags: ResponseFlags + - def __init__( - self, - *, - cas_token: Optional[int] = None, - fetched: Optional[int] = None, - last_access: Optional[int] = None, - ttl: Optional[int] = None, - client_flag: Optional[int] = None, - win: Optional[bool] = None, - stale: bool = False, - real_size: Optional[int] = None, - opaque: Optional[bytes] = None, - ) -> None: - self.cas_token = cas_token - self.fetched = fetched - self.last_access = last_access - self.ttl = ttl - self.client_flag = client_flag - self.win = win - self.stale = stale - self.real_size = real_size - self.opaque = opaque - - @classmethod - def from_header(cls, header: "Blob") -> "Success": - result = cls() - result._set_flags(header) - return result - - def _set_flags(self, header: bytes, pos: int = 3) -> None: # noqa: C901 - header_size = len(header) - while pos < header_size: - flag = header[pos] - pos += 1 - if flag == SPACE: - continue - end = pos - while end < header_size: - if header[end] == SPACE: - break - end += 1 - - if flag == INT_FLAG_CAS_TOKEN: - self.cas_token = int(header[pos:end]) - elif flag == INT_FLAG_FETCHED: - self.fetched = int(header[pos:end]) - elif flag == INT_FLAG_LAST_ACCESS: - self.last_access = int(header[pos:end]) - elif flag == INT_FLAG_TTL: - self.ttl = int(header[pos:end]) - elif flag == INT_FLAG_CLIENT_FLAG: - self.client_flag = int(header[pos:end]) - elif flag == FLAG_WIN: - self.win = True - elif flag == FLAG_LOST: - self.win = False - elif flag == FLAG_STALE: - self.stale = True - elif flag == INT_FLAG_SIZE: - self.real_size = int(header[pos:end]) - elif flag == TOKEN_FLAG_OPAQUE: - self.opaque = header[pos:end] - pos = end + 1 - - -# @dataclass(slots=True, init=False) @dataclass class Value(Success): - __slots__ = ( - "cas_token", - "fetched", - "last_access", - "ttl", - "client_flag", - "win", - "stale", - "real_size", - "opaque", - "size", - "value", - ) + __slots__ = ("flags", "size", "value") size: int value: Optional[Any] - def __init__( - self, - *, - size: int, - value: Optional[Any] = None, - cas_token: Optional[int] = None, - fetched: Optional[int] = None, - last_access: Optional[int] = None, - ttl: Optional[int] = None, - client_flag: Optional[int] = None, - win: Optional[bool] = None, - stale: bool = False, - real_size: Optional[int] = None, - opaque: Optional[bytes] = None, - ) -> None: - self.size = size - self.value = value - self.cas_token = cas_token - self.fetched = fetched - self.last_access = last_access - self.ttl = ttl - self.client_flag = client_flag - self.win = win - self.stale = stale - self.real_size = real_size - self.opaque = opaque - - @classmethod - def from_header(cls, header: "Blob") -> "Value": - header_size = len(header) - if header_size < 4 or header[2] != SPACE: - raise ValueError(f"Invalid header {header!r}") - end = 4 - while end < header_size: - if header[end] == SPACE: - break - end += 1 - size = int(header[3:end]) - result = cls(size=size) - result._set_flags(header, pos=end + 1) - return result - @dataclass class ValueContainer: @@ -309,10 +129,3 @@ def get_store_success_response_header(version: ServerVersion) -> bytes: if version == ServerVersion.AWS_1_6_6: return b"OK" return b"HD" - - -def encode_size(size: int, version: ServerVersion) -> bytes: - if version == ServerVersion.AWS_1_6_6: - return b"S" + str(size).encode("ascii") - else: - return str(size).encode("ascii") diff --git a/src/meta_memcache/routers/default.py b/src/meta_memcache/routers/default.py index 37ad312..0ebcef2 100644 --- a/src/meta_memcache/routers/default.py +++ b/src/meta_memcache/routers/default.py @@ -1,5 +1,6 @@ from collections import defaultdict -from typing import Callable, DefaultDict, Dict, List, Optional, Set, Tuple +from typing import Callable, DefaultDict, Dict, List, Optional, Tuple + from meta_memcache.configuration import ServerAddress from meta_memcache.connection.pool import ConnectionPool, PoolCounters @@ -7,14 +8,12 @@ from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -32,9 +31,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -49,8 +46,6 @@ def exec( key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, track_write_failures=failure_handling.track_write_failures, raise_on_server_error=failure_handling.raise_on_server_error, ) @@ -60,9 +55,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ @@ -78,8 +71,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, track_write_failures=failure_handling.track_write_failures, raise_on_server_error=failure_handling.raise_on_server_error, ) diff --git a/src/meta_memcache/routers/ephemeral.py b/src/meta_memcache/routers/ephemeral.py index e13fde0..291ec4a 100644 --- a/src/meta_memcache/routers/ephemeral.py +++ b/src/meta_memcache/routers/ephemeral.py @@ -1,20 +1,18 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from meta_memcache.connection.providers import ConnectionPoolProvider from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) from meta_memcache.routers.default import DefaultRouter -from meta_memcache.routers.helpers import adjust_int_flags_for_max_ttl +from meta_memcache.routers.helpers import adjust_flags_for_max_ttl class EphemeralRouter(DefaultRouter): @@ -52,18 +50,14 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: return super().exec( command=command, key=key, value=value, - flags=flags, - int_flags=adjust_int_flags_for_max_ttl(int_flags, self._max_ttl), - token_flags=token_flags, + flags=adjust_flags_for_max_ttl(flags, self._max_ttl), failure_handling=failure_handling, ) @@ -72,17 +66,13 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: return super().exec_multi( command=command, keys=keys, values=values, - flags=flags, - int_flags=adjust_int_flags_for_max_ttl(int_flags, self._max_ttl), - token_flags=token_flags, + flags=adjust_flags_for_max_ttl(flags, self._max_ttl), failure_handling=failure_handling, ) diff --git a/src/meta_memcache/routers/gutter.py b/src/meta_memcache/routers/gutter.py index 91e8eb0..0f32511 100644 --- a/src/meta_memcache/routers/gutter.py +++ b/src/meta_memcache/routers/gutter.py @@ -1,21 +1,19 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from meta_memcache.connection.providers import ConnectionPoolProvider from meta_memcache.errors import MemcacheServerError from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) from meta_memcache.routers.default import DefaultRouter -from meta_memcache.routers.helpers import adjust_int_flags_for_max_ttl +from meta_memcache.routers.helpers import adjust_flags_for_max_ttl class GutterRouter(DefaultRouter): @@ -38,9 +36,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -57,8 +53,6 @@ def exec( key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # We always want to raise on server errors so we can # try the gutter pool raise_on_server_error=True, @@ -67,15 +61,13 @@ def exec( ) except MemcacheServerError: # Override TTLs > than gutter TTL - int_flags = adjust_int_flags_for_max_ttl(int_flags, self._gutter_ttl) + flags = adjust_flags_for_max_ttl(flags, self._gutter_ttl) return self.executor.exec_on_pool( pool=self.gutter_pool_provider.get_pool(key), command=command, key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # Respect the raise_on_server_error flag if the gutter pool also # fails raise_on_server_error=failure_handling.raise_on_server_error, @@ -89,9 +81,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ @@ -110,8 +100,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # We always want to raise on server errors so we can # try the gutter pool raise_on_server_error=True, @@ -126,7 +114,7 @@ def exec_multi( gutter_values.append(value) if gutter_keys: # Override TTLs > than gutter TTL - int_flags = adjust_int_flags_for_max_ttl(int_flags, self._gutter_ttl) + flags = adjust_flags_for_max_ttl(flags, self._gutter_ttl) for pool, key_values in self._exec_multi_prepare_pool_map( self.gutter_pool_provider.get_pool, gutter_keys, gutter_values ).items(): @@ -136,8 +124,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # Respect the raise_on_server_error flag if the gutter pool also # fails raise_on_server_error=failure_handling.raise_on_server_error, diff --git a/src/meta_memcache/routers/helpers.py b/src/meta_memcache/routers/helpers.py index f8f1bf3..f9ebc29 100644 --- a/src/meta_memcache/routers/helpers.py +++ b/src/meta_memcache/routers/helpers.py @@ -1,23 +1,24 @@ -from typing import Dict, Optional +from typing import Optional -from meta_memcache.protocol import IntFlag +from meta_memcache.protocol import RequestFlags -def adjust_int_flags_for_max_ttl( - int_flags: Optional[Dict[IntFlag, int]], +def _adjust_ttl(ttl: Optional[int], max_ttl: int) -> Optional[int]: + if ttl is not None and (ttl == 0 or ttl > max_ttl): + return max_ttl + return ttl + + +def adjust_flags_for_max_ttl( + flags: Optional[RequestFlags], max_ttl: int, -) -> Optional[Dict[IntFlag, int]]: +) -> Optional[RequestFlags]: """ Override TTLs > than `max_ttl` """ - if int_flags: - for flag in ( - IntFlag.CACHE_TTL, - IntFlag.RECACHE_TTL, - IntFlag.MISS_LEASE_TTL, - ): - ttl = int_flags.get(flag) - if ttl is not None and (ttl == 0 or ttl > max_ttl): - int_flags[flag] = max_ttl + if flags: + flags.cache_ttl = _adjust_ttl(flags.cache_ttl, max_ttl) + flags.recache_ttl = _adjust_ttl(flags.recache_ttl, max_ttl) + flags.vivify_on_miss_ttl = _adjust_ttl(flags.vivify_on_miss_ttl, max_ttl) - return int_flags + return flags diff --git a/src/meta_memcache/settings.py b/src/meta_memcache/settings.py index 9e9a207..cecbbfd 100644 --- a/src/meta_memcache/settings.py +++ b/src/meta_memcache/settings.py @@ -5,4 +5,7 @@ DEFAULT_READ_BUFFER_SIZE = 4096 -MAX_KEY_SIZE = 250 +# Max key is 250, but when using binary keys will be b64 encoded +# so take more space. Keys longer than this will be hashed, so +# it's not a problem. +MAX_KEY_SIZE = 187 diff --git a/tests/cache_client_test.py b/tests/cache_client_test.py index 9e376e9..e224f6f 100644 --- a/tests/cache_client_test.py +++ b/tests/cache_client_test.py @@ -12,7 +12,7 @@ ) from meta_memcache.connection.pool import ConnectionPool, PoolCounters from meta_memcache.errors import MemcacheServerError -from meta_memcache.protocol import NOOP, Flag, IntFlag +from meta_memcache.protocol import NOOP, RequestFlags from meta_memcache.settings import DEFAULT_MARK_DOWN_PERIOD_S @@ -167,10 +167,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_called_once_with(Key("foo")) @@ -194,10 +194,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_called_once_with(Key("foo")) @@ -220,10 +220,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_not_called() diff --git a/tests/commands_test.py b/tests/commands_test.py index c48d791..cafb033 100644 --- a/tests/commands_test.py +++ b/tests/commands_test.py @@ -27,10 +27,10 @@ from meta_memcache.protocol import ( Miss, NotStored, + ResponseFlags, + RequestFlags, ServerVersion, Success, - IntFlag, - TokenFlag, Value, ) from meta_memcache.routers.default import DefaultRouter @@ -133,7 +133,7 @@ def test_set_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.set(key="foo", value="bar", ttl=300) memcache_socket.sendall.assert_called_once_with( @@ -242,7 +242,7 @@ def test_set_cmd( cache_client.set(key=Key("foo"), value=b"123", ttl=300, cas_token=666) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -252,7 +252,7 @@ def test_set_cmd( key=Key("foo"), value=b"123", ttl=300, cas_token=666, stale_policy=StalePolicy() ) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -266,7 +266,7 @@ def test_set_cmd( stale_policy=StalePolicy(mark_stale_on_cas_mismatch=True), ) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 I T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 I T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -277,7 +277,7 @@ def test_set_cmd_1_6_6( memcache_socket_1_6_6: MemcacheSocket, cache_client_1_6_6: CacheClient, ) -> None: - memcache_socket_1_6_6.get_response.return_value = Success() + memcache_socket_1_6_6.get_response.return_value = Success(flags=ResponseFlags()) cache_client_1_6_6.set(key="foo", value="bar", ttl=300) memcache_socket_1_6_6.sendall.assert_called_once_with( @@ -290,7 +290,7 @@ def test_set_success_fail( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.set(key=Key("foo"), value="bar", ttl=300) assert result is True @@ -302,15 +302,13 @@ def test_set_success_fail( def test_refill( cache_client_with_mocked_meta_commands: CacheApi, meta_command_mock: MagicMock ) -> None: - meta_command_mock.meta_set.return_value = Success() + meta_command_mock.meta_set.return_value = Success(flags=ResponseFlags()) cache_client_with_mocked_meta_commands.refill(key="foo", value="bar", ttl=300) meta_command_mock.meta_set.assert_called_once_with( key=Key(key="foo"), value="bar", ttl=300, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 300}, - token_flags={TokenFlag.MODE: SetMode.ADD.value}, + flags=RequestFlags(cache_ttl=300, mode=SetMode.ADD.value), failure_handling=FailureHandling(track_write_failures=False), ) @@ -319,7 +317,7 @@ def test_delete_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.delete(key="foo") memcache_socket.sendall.assert_called_once_with(b"md foo\r\n", with_noop=False) @@ -367,7 +365,7 @@ def test_delete_success_fail( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.delete(key=Key("foo")) assert result is True @@ -384,7 +382,7 @@ def test_invalidate_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.invalidate(key="foo") memcache_socket.sendall.assert_called_once_with(b"md foo\r\n", with_noop=False) @@ -432,7 +430,7 @@ def test_invalidate_success_fail( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.invalidate(key=Key("foo")) assert result is True @@ -449,7 +447,7 @@ def test_touch_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.touch(key="foo", ttl=60) memcache_socket.sendall.assert_called_once_with(b"mg foo T60\r\n", with_noop=False) @@ -559,12 +557,14 @@ def test_get_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> ) assert_called_once_with_command( memcache_socket.sendall, - b"mg lCV3WxKxtWrdY4s1+R710+9J b t l v h f R30 T300\r\n", + b"mg w7puw63Dp29k4o23 b t l v h f R30 T300\r\n", with_noop=False, ) memcache_socket.sendall.reset_mock() - memcache_socket.get_response.return_value = Value(size=0, value=None) + memcache_socket.get_response.return_value = Value( + size=0, value=None, flags=ResponseFlags() + ) cache_client.get_or_lease( key=Key("foo"), lease_policy=LeasePolicy(), @@ -618,8 +618,10 @@ def test_get_value(memcache_socket: MemcacheSocket, cache_client: CacheClient) - memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -642,7 +644,7 @@ def test_get_value(memcache_socket: MemcacheSocket, cache_client: CacheClient) - def test_get_other(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) try: cache_client.get( key=Key("foo"), @@ -661,8 +663,10 @@ def test_value_wrong_type( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -695,8 +699,10 @@ def test_deserialization_error( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -720,10 +726,12 @@ def test_recache_win_returns_miss( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - win=True, - stale=True, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + win=True, + stale=True, + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -741,10 +749,12 @@ def test_recache_lost_returns_stale_value( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - win=False, - stale=True, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + win=False, + stale=True, + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -762,8 +772,10 @@ def test_get_or_lease_hit( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -786,8 +798,10 @@ def test_get_or_lease_miss_win( memcache_socket.get_response.return_value = Value( size=0, value=None, - win=True, - cas_token=expected_cas_token, + flags=ResponseFlags( + win=True, + cas_token=expected_cas_token, + ), ) memcache_socket.get_value.return_value = b"" @@ -813,20 +827,26 @@ def test_get_or_lease_miss_lost_then_data( Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ), ] memcache_socket.get_value.side_effect = [b"", b"", encoded_value.data] @@ -863,20 +883,26 @@ def test_get_or_lease_miss_lost_then_win( Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=0, value=None, - win=True, - cas_token=expected_cas_token, + flags=ResponseFlags( + win=True, + cas_token=expected_cas_token, + ), ), ] memcache_socket.get_value.side_effect = [b"", b"", b""] @@ -912,8 +938,10 @@ def test_get_or_lease_miss_runs_out_of_retries( memcache_socket.get_response.return_value = Value( size=0, value=None, - win=False, - cas_token=expected_cas_token, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token, + ), ) memcache_socket.get_value.return_value = b"" @@ -1155,7 +1183,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - cache_client.delta(key=Key("foo"), delta=1, refresh_ttl=60, no_reply=True) memcache_socket.sendall.assert_called_once_with( - b"ma foo q D1 T60\r\n", with_noop=True + b"ma foo q T60 D1\r\n", with_noop=True ) memcache_socket.sendall.reset_mock() @@ -1174,7 +1202,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - cas_token=123, ) memcache_socket.sendall.assert_called_once_with( - b"ma foo q D1 C123 J10 N60\r\n", with_noop=True + b"ma foo q N60 J10 D1 C123\r\n", with_noop=True ) memcache_socket.sendall.reset_mock() @@ -1197,12 +1225,12 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - ) assert result is None memcache_socket.sendall.assert_called_once_with( - b"ma foo v D1 J0 N60\r\n", with_noop=False + b"ma foo v N60 J0 D1\r\n", with_noop=False ) memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.delta(key=Key("foo"), delta=1) assert result is True @@ -1210,7 +1238,9 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() - memcache_socket.get_response.return_value = Value(size=2, value=None) + memcache_socket.get_response.return_value = Value( + size=2, value=None, flags=ResponseFlags() + ) memcache_socket.get_value.return_value = b"10" result = cache_client.delta_and_get(key=Key("foo"), delta=1) @@ -1225,7 +1255,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - ) assert result == 10 memcache_socket.sendall.assert_called_once_with( - b"ma foo v D1 J0 N60\r\n", with_noop=False + b"ma foo v N60 J0 D1\r\n", with_noop=False ) memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() @@ -1234,8 +1264,16 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - def test_multi_get(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> None: memcache_socket.get_response.side_effect = [ Miss(), - Value(size=2, value=None, client_flag=MixedSerializer.BINARY), - Value(size=2, value=None, win=True), + Value( + size=2, + value=None, + flags=ResponseFlags(client_flag=MixedSerializer.BINARY), + ), + Value( + size=2, + value=None, + flags=ResponseFlags(win=True), + ), ] memcache_socket.get_value.return_value = b"OK" diff --git a/tests/ephemeral_cache_client_test.py b/tests/ephemeral_cache_client_test.py index ddc2a30..e1249a3 100644 --- a/tests/ephemeral_cache_client_test.py +++ b/tests/ephemeral_cache_client_test.py @@ -1,5 +1,5 @@ from unittest.mock import call -from meta_memcache.protocol import NOOP, Flag, IntFlag +from meta_memcache.protocol import NOOP, RequestFlags from pytest_mock import MockerFixture @@ -29,10 +29,10 @@ def test_ephemeral_cache_client(mocker: MockerFixture) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) c.sendall.assert_has_calls([call(b"mg bar q T60\r\n"), call(b"mg foo q T60\r\n")]) c.sendall.reset_mock() diff --git a/tests/memcache_socket_test.py b/tests/memcache_socket_test.py index 796b21b..455c17d 100644 --- a/tests/memcache_socket_test.py +++ b/tests/memcache_socket_test.py @@ -67,11 +67,11 @@ def test_get_response( ms = MemcacheSocket(fake_socket) result = ms.get_response() assert isinstance(result, Success) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 assert result.size == 2 @@ -84,11 +84,11 @@ def test_get_response_1_6_6( ms = MemcacheSocket(fake_socket, version=ServerVersion.AWS_1_6_6) result = ms.get_response() assert isinstance(result, Success) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 assert result.size == 2 @@ -121,7 +121,7 @@ def test_get_value( ms = MemcacheSocket(fake_socket) result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 assert result.size == 2 ms.get_value(2) @@ -135,9 +135,9 @@ def test_get_value_large( ms = MemcacheSocket(fake_socket, buffer_size=100) result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 - assert result.win is True - assert result.opaque == b"xxx" + assert result.flags.cas_token == 1 + assert result.flags.win is True + assert bytes(result.flags.opaque) == b"xxx" assert result.size == 200 value = ms.get_value(result.size) assert len(value) == result.size diff --git a/tests/migrating_cache_client_test.py b/tests/migrating_cache_client_test.py index 533a34c..0a96457 100644 --- a/tests/migrating_cache_client_test.py +++ b/tests/migrating_cache_client_test.py @@ -1,7 +1,15 @@ from unittest.mock import Mock import pytest -from meta_memcache import CacheClient, IntFlag, Key, SetMode, Value, WriteFailureEvent +from meta_memcache import ( + CacheClient, + Key, + SetMode, + Value, + WriteFailureEvent, + ResponseFlags, + RequestFlags, +) from meta_memcache.extras.migrating_cache_client import ( MigratingCacheClient, MigrationMode, @@ -74,13 +82,17 @@ def _set_cache_client_mock_get_return_values(client: Mock, ttl: int = 10) -> Non client.meta_get.return_value = Value( size=3, value="bar", - ttl=ttl, + flags=ResponseFlags( + ttl=ttl, + ), ) client.meta_multiget.return_value = { Key(key="foo", routing_key=None, is_unicode=False): Value( size=3, value="bar", - ttl=ttl, + flags=ResponseFlags( + ttl=ttl, + ), ) } @@ -155,9 +167,7 @@ def test_migration_mode_origin_only( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_not_called() @@ -165,9 +175,7 @@ def test_migration_mode_origin_only( migration_client_origin_only.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_not_called() @@ -175,9 +183,7 @@ def test_migration_mode_origin_only( migration_client_origin_only.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -215,9 +221,7 @@ def test_migration_mode_destination_only( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes @@ -225,9 +229,7 @@ def test_migration_mode_destination_only( origin_client.meta_delete.assert_not_called() destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic @@ -235,9 +237,7 @@ def test_migration_mode_destination_only( origin_client.meta_arithmetic.assert_not_called() destination_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) # Touch @@ -277,41 +277,31 @@ def test_migration_mode_populate_writes( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -439,41 +429,31 @@ def test_migration_mode_populate_writes_and_reads_1pct( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -554,41 +534,31 @@ def test_migration_mode_populate_writes_and_reads_10pct( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -632,32 +602,24 @@ def test_migration_mode_use_destination_update_origin( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic @@ -665,9 +627,7 @@ def test_migration_mode_use_destination_update_origin( origin_client.meta_arithmetic.assert_not_called() destination_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) # Touch diff --git a/tests/probabilistic_hot_cache_test.py b/tests/probabilistic_hot_cache_test.py index 728bdc5..afb5496 100644 --- a/tests/probabilistic_hot_cache_test.py +++ b/tests/probabilistic_hot_cache_test.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from unittest.mock import Mock from prometheus_client import CollectorRegistry @@ -6,31 +6,31 @@ import pytest -from meta_memcache import CacheClient, IntFlag, Key, Value +from meta_memcache import CacheClient, Key, Value from meta_memcache.errors import MemcacheError from meta_memcache.extras.probabilistic_hot_cache import ( CachedValue, ProbabilisticHotCache, ) from meta_memcache.metrics.prometheus import PrometheusMetricsCollector -from meta_memcache.protocol import Flag, Miss, ReadResponse, TokenFlag +from meta_memcache.protocol import Miss, ReadResponse, ResponseFlags, RequestFlags @pytest.fixture def client() -> Mock: def meta_get( key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: if key.key.endswith("hot"): return Value( size=1, value=1, - fetched=1, - last_access=1, + flags=ResponseFlags( + fetched=True, + last_access=1, + ), ) elif key.key.endswith("miss"): return Miss() @@ -38,15 +38,15 @@ def meta_get( return Value( size=1, value=1, - fetched=1, - last_access=9999, + flags=ResponseFlags( + fetched=True, + last_access=9999, + ), ) def meta_multiget( keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: return {key: meta_get(key=key) for key in keys} @@ -74,15 +74,13 @@ def random(monkeypatch) -> Mock: DEFAULT_FLAGS = { - "flags": { - Flag.RETURN_TTL, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_VALUE, - Flag.RETURN_FETCHED, - Flag.RETURN_CLIENT_FLAG, - }, - "int_flags": None, - "token_flags": None, + "flags": RequestFlags( + return_ttl=True, + return_last_access=True, + return_value=True, + return_fetched=True, + return_client_flag=True, + ), "failure_handling": DEFAULT_FAILURE_HANDLING, } @@ -639,8 +637,10 @@ def test_stale_expires( client.meta_get.side_effect = lambda *args, **kwargs: Value( size=1, value=1, - fetched=1, - last_access=9999, + flags=ResponseFlags( + fetched=True, + last_access=9999, + ), ) # The item will no longer be in the hot cache