From 309bcc3375eed287ee354f5e4fece0e97fc39dc2 Mon Sep 17 00:00:00 2001 From: Guillermo Perez Date: Tue, 7 Nov 2023 10:34:03 +0100 Subject: [PATCH 1/2] Implement response heading parsing in rust ## Motivation / Description There was nothing else to optimize in python, but Rust can parse the string and build a ResponseFlags object in 22-50ns depending on the number of flags. While the interface between rust and python will never reach ns performance, it helps indeed finding the header in the response buffer. Initial (before all optimizations): multithreaded: Overall: 110779.55 RPS / 9.03 us/req singlethreaded: Overall: 111545.63 RPS / 8.96 us/req Python optimized: multithreaded: Overall: 193340.40 RPS / 5.17 us/req singlethreaded: Overall: 193036.56 RPS / 5.18 us/req Using rust for header parsing: multithreaded: Overall: 245898.34 RPS / 4.07 us/req singlethreaded: Overall: 246165.19 RPS / 4.06 us/req --- src/meta_memcache/__init__.py | 1 + .../commands/high_level_commands.py | 16 +- .../connection/memcache_socket.py | 65 ++++--- src/meta_memcache/executors/default.py | 5 +- .../extras/migrating_cache_client.py | 6 +- .../extras/probabilistic_hot_cache.py | 7 +- src/meta_memcache/protocol.py | 165 ++---------------- tests/commands_test.py | 135 +++++++++----- tests/memcache_socket_test.py | 16 +- tests/migrating_cache_client_test.py | 18 +- tests/probabilistic_hot_cache_test.py | 20 ++- 11 files changed, 195 insertions(+), 259 deletions(-) diff --git a/src/meta_memcache/__init__.py b/src/meta_memcache/__init__.py index 34234bc..e2609f5 100644 --- a/src/meta_memcache/__init__.py +++ b/src/meta_memcache/__init__.py @@ -40,6 +40,7 @@ Miss, NotStored, ServerVersion, + ResponseFlags, SetMode, Success, TokenFlag, diff --git a/src/meta_memcache/commands/high_level_commands.py b/src/meta_memcache/commands/high_level_commands.py index 96a47ec..dfac617 100644 --- a/src/meta_memcache/commands/high_level_commands.py +++ b/src/meta_memcache/commands/high_level_commands.py @@ -289,23 +289,23 @@ def get_or_lease_cas( if isinstance(result, Value): # It is a hit. - if result.win: + if result.flags.win: # Win flag present, meaning we got the lease to # recache/cache the item. We need to mimic a miss. - return None, result.cas_token - if result.size == 0 and result.win is False: + return None, result.flags.cas_token + if result.size == 0 and result.flags.win is False: # The value is empty, this is a miss lease, # and we lost, so we must keep retrying and - # wait for the winner to populate the value. + # wait for the.flags.winner to populate the value. if i < lease_policy.miss_retries: continue else: # We run out of retries, behave as a miss - return None, result.cas_token + return None, result.flags.cas_token else: # There is data, either the is no lease or # we lost and should use the stale value. - return result.value, result.cas_token + return result.value, result.flags.cas_token else: # With MISS_LEASE_TTL we should always get a value # because on miss a lease empty value is generated @@ -380,7 +380,7 @@ def get_cas( if result is None: return None, None else: - return result.value, result.cas_token + return result.value, result.flags.cas_token def _get( self: HighLevelCommandMixinWithMetaCommands, @@ -416,7 +416,7 @@ def _process_get_result( ) -> Optional[Value]: if isinstance(result, Value): # It is a hit - if result.win: + if result.flags.win: # Win flag present, meaning we got the lease to # recache the item. We need to mimic a miss, so # we set the value to None. diff --git a/src/meta_memcache/connection/memcache_socket.py b/src/meta_memcache/connection/memcache_socket.py index 4c1d967..66505c1 100644 --- a/src/meta_memcache/connection/memcache_socket.py +++ b/src/meta_memcache/connection/memcache_socket.py @@ -1,17 +1,21 @@ import logging import socket -from typing import Union +from typing import Optional, Tuple, Union + +import meta_socket from meta_memcache.errors import MemcacheError from meta_memcache.protocol import ( ENDL, ENDL_LEN, + EMPTY_RESPONSE_FLAGS, NOOP, Conflict, Miss, NotStored, ServerVersion, Success, + ResponseFlags, Value, get_store_success_response_header, ) @@ -119,7 +123,9 @@ def _reset_buffer(self) -> None: self._pos = 0 self._read = remaining_data - def _get_single_header(self) -> memoryview: + def _get_single_header( + self, + ) -> Tuple[int, Optional[int], Optional[int], Optional[ResponseFlags]]: # Reset buffer for new data if self._read == self._pos: self._read = 0 @@ -127,22 +133,19 @@ def _get_single_header(self) -> memoryview: elif self._pos > self._reset_buffer_size: self._reset_buffer() - endl_pos = -1 while True: - if self._read - self._pos > ENDL_LEN: - endl_pos = self._buf.find(ENDL, self._pos, self._read) - if endl_pos >= 0: - break + if self._read != self._pos: + # We have data in the buffer: find the header + if header_data := meta_socket.parse_header( + self._buf_view, self._pos, self._read + ): + self._pos = header_data[0] + return header_data # Missing data, but still space in buffer, so read more if self._recv_info_buffer() <= 0: break - if endl_pos < 0: - raise MemcacheError("Bad response. Socket might have closed unexpectedly") - - pos = self._pos - self._pos = endl_pos + ENDL_LEN - return self._buf_view[pos:endl_pos] + raise MemcacheError("Bad response. Socket might have closed unexpectedly") def sendall(self, data: bytes, with_noop: bool = False) -> None: if with_noop: @@ -153,10 +156,12 @@ def sendall(self, data: bytes, with_noop: bool = False) -> None: def _read_until_noop_header(self) -> None: while self._noop_expected > 0: header = self._get_single_header() - if header[0:2] == b"MN": + if header[1] == meta_socket.RESPONSE_NOOP: self._noop_expected -= 1 - def _get_header(self) -> memoryview: + def _get_header( + self, + ) -> Tuple[int, Optional[int], Optional[int], Optional[ResponseFlags]]: try: if self._noop_expected > 0: self._read_until_noop_header() @@ -169,30 +174,36 @@ def _get_header(self) -> memoryview: def get_response( self, ) -> Union[Value, Success, NotStored, Conflict, Miss]: - header = self._get_header().tobytes() - response_code = header[0:2] + (_, response_code, size, flags) = self._get_header() result: Union[Value, Success, NotStored, Conflict, Miss] try: - if response_code == b"VA": + if response_code == meta_socket.RESPONSE_VALUE: + if size is None: + raise MemcacheError("Bad value response. Missing size") # Value response - result = Value.from_header(header) - elif response_code == self._store_success_response_header: + result = Value( + size=size, flags=flags or EMPTY_RESPONSE_FLAGS, value=None + ) + elif response_code == meta_socket.RESPONSE_SUCCESS: # Stored or no value, return Success - result = Success.from_header(header) - elif response_code == b"NS": + result = Success(flags=flags or EMPTY_RESPONSE_FLAGS) + elif response_code == meta_socket.RESPONSE_NOT_STORED: # Value response, parse size and flags result = NOT_STORED - elif response_code == b"EX": + elif response_code == meta_socket.RESPONSE_CONFLICT: # Already exists, not changed, CAS conflict result = CONFLICT - elif response_code == b"EN" or response_code == b"NF": + elif response_code == meta_socket.RESPONSE_MISS: # Not Found, Miss. result = MISS else: - raise MemcacheError(f"Unknown response: {bytes(response_code)!r}") + raise MemcacheError(f"Unknown response: {response_code}") except Exception as e: - _log.warning(f"Error parsing response header in {self}: {header!r}") - raise MemcacheError(f"Error parsing response header {header!r}") from e + _log.warning( + f"Error parsing response header in {self}: " + f"Response: {response_code}, size {size}, flags: {flags}" + ) + raise MemcacheError("Error parsing response header") from e return result diff --git a/src/meta_memcache/executors/default.py b/src/meta_memcache/executors/default.py index 6e382ae..0383aa6 100644 --- a/src/meta_memcache/executors/default.py +++ b/src/meta_memcache/executors/default.py @@ -17,6 +17,7 @@ MetaCommand, Miss, NotStored, + ResponseFlags, ServerVersion, Success, TokenFlag, @@ -252,12 +253,12 @@ def _conn_recv_response( Read response on a connection """ if flags and Flag.NOREPLY in flags: - return Success() + return Success(flags=ResponseFlags()) result = conn.get_response() if isinstance(result, Value): data = conn.get_value(result.size) if result.size > 0: - encoding_id = result.client_flag or 0 + encoding_id = result.flags.client_flag or 0 try: result.value = self._serializer.unserialize(data, encoding_id) except Exception: diff --git a/src/meta_memcache/extras/migrating_cache_client.py b/src/meta_memcache/extras/migrating_cache_client.py index b1ce0d1..9d6c442 100644 --- a/src/meta_memcache/extras/migrating_cache_client.py +++ b/src/meta_memcache/extras/migrating_cache_client.py @@ -75,7 +75,11 @@ def get_migration_mode(self) -> MigrationMode: return current_mode def _get_value_ttl(self, value: Value) -> int: - ttl = value.ttl if value.ttl is not None else self._default_read_backfill_ttl + ttl = ( + value.flags.ttl + if value.flags.ttl is not None + else self._default_read_backfill_ttl + ) if ttl < 0: # TTL for items marked to store forvered is returned as -1 ttl = 0 diff --git a/src/meta_memcache/extras/probabilistic_hot_cache.py b/src/meta_memcache/extras/probabilistic_hot_cache.py index 1e69699..3766a85 100644 --- a/src/meta_memcache/extras/probabilistic_hot_cache.py +++ b/src/meta_memcache/extras/probabilistic_hot_cache.py @@ -124,10 +124,11 @@ def _store_in_hot_cache_if_necessary( allowed: bool, ) -> None: if not is_hot: - hit_after_write = value.fetched or 0 - last_read_age = value.last_access if value.last_access is not None else 9999 + last_read_age = ( + value.flags.last_access if value.flags.last_access is not None else 9999 + ) if ( - hit_after_write > 0 + value.flags.fetched and last_read_age <= self._max_last_access_age_seconds ): # Is detected as hot diff --git a/src/meta_memcache/protocol.py b/src/meta_memcache/protocol.py index 95216d0..fe76688 100644 --- a/src/meta_memcache/protocol.py +++ b/src/meta_memcache/protocol.py @@ -2,6 +2,8 @@ from enum import Enum, IntEnum from typing import Any, Dict, List, Optional, Union +from meta_socket import ResponseFlags + ENDL = b"\r\n" NOOP: bytes = b"mn" + ENDL ENDL_LEN = 2 @@ -103,168 +105,25 @@ class Miss(MemcacheResponse): # Response flags -TOKEN_FLAG_OPAQUE = ord("O") -INT_FLAG_CAS_TOKEN = ord("c") -INT_FLAG_FETCHED = ord("h") -INT_FLAG_LAST_ACCESS = ord("l") -INT_FLAG_TTL = ord("t") -INT_FLAG_CLIENT_FLAG = ord("f") -INT_FLAG_SIZE = ord("s") -FLAG_WIN = ord("W") -FLAG_LOST = ord("Z") -FLAG_STALE = ord("X") - - -# @dataclass(slots=True, init=False) +EMPTY_RESPONSE_FLAGS = ResponseFlags() + + @dataclass class Success(MemcacheResponse): - __slots__ = ( - "cas_token", - "fetched", - "last_access", - "ttl", - "client_flag", - "win", - "stale", - "real_size", - "opaque", - ) - cas_token: Optional[int] - fetched: Optional[int] - last_access: Optional[int] - ttl: Optional[int] - client_flag: Optional[int] - win: Optional[bool] - stale: bool - real_size: Optional[int] - opaque: Optional[bytes] - - def __init__( - self, - *, - cas_token: Optional[int] = None, - fetched: Optional[int] = None, - last_access: Optional[int] = None, - ttl: Optional[int] = None, - client_flag: Optional[int] = None, - win: Optional[bool] = None, - stale: bool = False, - real_size: Optional[int] = None, - opaque: Optional[bytes] = None, - ) -> None: - self.cas_token = cas_token - self.fetched = fetched - self.last_access = last_access - self.ttl = ttl - self.client_flag = client_flag - self.win = win - self.stale = stale - self.real_size = real_size - self.opaque = opaque + __slots__ = ("flags",) + flags: ResponseFlags @classmethod - def from_header(cls, header: "Blob") -> "Success": - result = cls() - result._set_flags(header) - return result - - def _set_flags(self, header: bytes, pos: int = 3) -> None: # noqa: C901 - header_size = len(header) - while pos < header_size: - flag = header[pos] - pos += 1 - if flag == SPACE: - continue - end = pos - while end < header_size: - if header[end] == SPACE: - break - end += 1 - - if flag == INT_FLAG_CAS_TOKEN: - self.cas_token = int(header[pos:end]) - elif flag == INT_FLAG_FETCHED: - self.fetched = int(header[pos:end]) - elif flag == INT_FLAG_LAST_ACCESS: - self.last_access = int(header[pos:end]) - elif flag == INT_FLAG_TTL: - self.ttl = int(header[pos:end]) - elif flag == INT_FLAG_CLIENT_FLAG: - self.client_flag = int(header[pos:end]) - elif flag == FLAG_WIN: - self.win = True - elif flag == FLAG_LOST: - self.win = False - elif flag == FLAG_STALE: - self.stale = True - elif flag == INT_FLAG_SIZE: - self.real_size = int(header[pos:end]) - elif flag == TOKEN_FLAG_OPAQUE: - self.opaque = header[pos:end] - pos = end + 1 - - -# @dataclass(slots=True, init=False) + def default(cls) -> "Success": + return cls(flags=ResponseFlags()) + + @dataclass class Value(Success): - __slots__ = ( - "cas_token", - "fetched", - "last_access", - "ttl", - "client_flag", - "win", - "stale", - "real_size", - "opaque", - "size", - "value", - ) + __slots__ = ("flags", "size", "value") size: int value: Optional[Any] - def __init__( - self, - *, - size: int, - value: Optional[Any] = None, - cas_token: Optional[int] = None, - fetched: Optional[int] = None, - last_access: Optional[int] = None, - ttl: Optional[int] = None, - client_flag: Optional[int] = None, - win: Optional[bool] = None, - stale: bool = False, - real_size: Optional[int] = None, - opaque: Optional[bytes] = None, - ) -> None: - self.size = size - self.value = value - self.cas_token = cas_token - self.fetched = fetched - self.last_access = last_access - self.ttl = ttl - self.client_flag = client_flag - self.win = win - self.stale = stale - self.real_size = real_size - self.opaque = opaque - - @classmethod - def from_header(cls, header: "Blob") -> "Value": - header_size = len(header) - if header_size < 4 or header[2] != SPACE: - raise ValueError(f"Invalid header {header!r}") - end = 4 - while end < header_size: - if header[end] == SPACE: - break - end += 1 - size = int(header[3:end]) - result = cls(size=size) - result._set_flags(header, pos=end + 1) - return result - @dataclass class ValueContainer: diff --git a/tests/commands_test.py b/tests/commands_test.py index c48d791..3bd2241 100644 --- a/tests/commands_test.py +++ b/tests/commands_test.py @@ -27,6 +27,7 @@ from meta_memcache.protocol import ( Miss, NotStored, + ResponseFlags, ServerVersion, Success, IntFlag, @@ -133,7 +134,7 @@ def test_set_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.set(key="foo", value="bar", ttl=300) memcache_socket.sendall.assert_called_once_with( @@ -277,7 +278,7 @@ def test_set_cmd_1_6_6( memcache_socket_1_6_6: MemcacheSocket, cache_client_1_6_6: CacheClient, ) -> None: - memcache_socket_1_6_6.get_response.return_value = Success() + memcache_socket_1_6_6.get_response.return_value = Success(flags=ResponseFlags()) cache_client_1_6_6.set(key="foo", value="bar", ttl=300) memcache_socket_1_6_6.sendall.assert_called_once_with( @@ -290,7 +291,7 @@ def test_set_success_fail( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.set(key=Key("foo"), value="bar", ttl=300) assert result is True @@ -302,7 +303,7 @@ def test_set_success_fail( def test_refill( cache_client_with_mocked_meta_commands: CacheApi, meta_command_mock: MagicMock ) -> None: - meta_command_mock.meta_set.return_value = Success() + meta_command_mock.meta_set.return_value = Success(flags=ResponseFlags()) cache_client_with_mocked_meta_commands.refill(key="foo", value="bar", ttl=300) meta_command_mock.meta_set.assert_called_once_with( key=Key(key="foo"), @@ -319,7 +320,7 @@ def test_delete_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.delete(key="foo") memcache_socket.sendall.assert_called_once_with(b"md foo\r\n", with_noop=False) @@ -367,7 +368,7 @@ def test_delete_success_fail( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.delete(key=Key("foo")) assert result is True @@ -384,7 +385,7 @@ def test_invalidate_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.invalidate(key="foo") memcache_socket.sendall.assert_called_once_with(b"md foo\r\n", with_noop=False) @@ -432,7 +433,7 @@ def test_invalidate_success_fail( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.invalidate(key=Key("foo")) assert result is True @@ -449,7 +450,7 @@ def test_touch_cmd( memcache_socket: MemcacheSocket, cache_client: CacheClient, ) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) cache_client.touch(key="foo", ttl=60) memcache_socket.sendall.assert_called_once_with(b"mg foo T60\r\n", with_noop=False) @@ -564,7 +565,9 @@ def test_get_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> ) memcache_socket.sendall.reset_mock() - memcache_socket.get_response.return_value = Value(size=0, value=None) + memcache_socket.get_response.return_value = Value( + size=0, value=None, flags=ResponseFlags() + ) cache_client.get_or_lease( key=Key("foo"), lease_policy=LeasePolicy(), @@ -618,8 +621,10 @@ def test_get_value(memcache_socket: MemcacheSocket, cache_client: CacheClient) - memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -642,7 +647,7 @@ def test_get_value(memcache_socket: MemcacheSocket, cache_client: CacheClient) - def test_get_other(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> None: - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) try: cache_client.get( key=Key("foo"), @@ -661,8 +666,10 @@ def test_value_wrong_type( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -695,8 +702,10 @@ def test_deserialization_error( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -720,10 +729,12 @@ def test_recache_win_returns_miss( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - win=True, - stale=True, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + win=True, + stale=True, + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -741,10 +752,12 @@ def test_recache_lost_returns_stale_value( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - win=False, - stale=True, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + win=False, + stale=True, + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -762,8 +775,10 @@ def test_get_or_lease_hit( memcache_socket.get_response.return_value = Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ) memcache_socket.get_value.return_value = encoded_value.data @@ -786,8 +801,10 @@ def test_get_or_lease_miss_win( memcache_socket.get_response.return_value = Value( size=0, value=None, - win=True, - cas_token=expected_cas_token, + flags=ResponseFlags( + win=True, + cas_token=expected_cas_token, + ), ) memcache_socket.get_value.return_value = b"" @@ -813,20 +830,26 @@ def test_get_or_lease_miss_lost_then_data( Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=len(encoded_value.data), value=None, - cas_token=expected_cas_token, - client_flag=encoded_value.encoding_id, + flags=ResponseFlags( + cas_token=expected_cas_token, + client_flag=encoded_value.encoding_id, + ), ), ] memcache_socket.get_value.side_effect = [b"", b"", encoded_value.data] @@ -863,20 +886,26 @@ def test_get_or_lease_miss_lost_then_win( Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=0, value=None, - win=False, - cas_token=expected_cas_token - 1, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token - 1, + ), ), Value( size=0, value=None, - win=True, - cas_token=expected_cas_token, + flags=ResponseFlags( + win=True, + cas_token=expected_cas_token, + ), ), ] memcache_socket.get_value.side_effect = [b"", b"", b""] @@ -912,8 +941,10 @@ def test_get_or_lease_miss_runs_out_of_retries( memcache_socket.get_response.return_value = Value( size=0, value=None, - win=False, - cas_token=expected_cas_token, + flags=ResponseFlags( + win=False, + cas_token=expected_cas_token, + ), ) memcache_socket.get_value.return_value = b"" @@ -1202,7 +1233,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() - memcache_socket.get_response.return_value = Success() + memcache_socket.get_response.return_value = Success(flags=ResponseFlags()) result = cache_client.delta(key=Key("foo"), delta=1) assert result is True @@ -1210,7 +1241,9 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() - memcache_socket.get_response.return_value = Value(size=2, value=None) + memcache_socket.get_response.return_value = Value( + size=2, value=None, flags=ResponseFlags() + ) memcache_socket.get_value.return_value = b"10" result = cache_client.delta_and_get(key=Key("foo"), delta=1) @@ -1234,8 +1267,16 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - def test_multi_get(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> None: memcache_socket.get_response.side_effect = [ Miss(), - Value(size=2, value=None, client_flag=MixedSerializer.BINARY), - Value(size=2, value=None, win=True), + Value( + size=2, + value=None, + flags=ResponseFlags(client_flag=MixedSerializer.BINARY), + ), + Value( + size=2, + value=None, + flags=ResponseFlags(win=True), + ), ] memcache_socket.get_value.return_value = b"OK" diff --git a/tests/memcache_socket_test.py b/tests/memcache_socket_test.py index 796b21b..455c17d 100644 --- a/tests/memcache_socket_test.py +++ b/tests/memcache_socket_test.py @@ -67,11 +67,11 @@ def test_get_response( ms = MemcacheSocket(fake_socket) result = ms.get_response() assert isinstance(result, Success) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 assert result.size == 2 @@ -84,11 +84,11 @@ def test_get_response_1_6_6( ms = MemcacheSocket(fake_socket, version=ServerVersion.AWS_1_6_6) result = ms.get_response() assert isinstance(result, Success) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 assert result.size == 2 @@ -121,7 +121,7 @@ def test_get_value( ms = MemcacheSocket(fake_socket) result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 + assert result.flags.cas_token == 1 assert result.size == 2 ms.get_value(2) @@ -135,9 +135,9 @@ def test_get_value_large( ms = MemcacheSocket(fake_socket, buffer_size=100) result = ms.get_response() assert isinstance(result, Value) - assert result.cas_token == 1 - assert result.win is True - assert result.opaque == b"xxx" + assert result.flags.cas_token == 1 + assert result.flags.win is True + assert bytes(result.flags.opaque) == b"xxx" assert result.size == 200 value = ms.get_value(result.size) assert len(value) == result.size diff --git a/tests/migrating_cache_client_test.py b/tests/migrating_cache_client_test.py index 533a34c..ddede09 100644 --- a/tests/migrating_cache_client_test.py +++ b/tests/migrating_cache_client_test.py @@ -1,7 +1,15 @@ from unittest.mock import Mock import pytest -from meta_memcache import CacheClient, IntFlag, Key, SetMode, Value, WriteFailureEvent +from meta_memcache import ( + CacheClient, + IntFlag, + Key, + SetMode, + Value, + WriteFailureEvent, + ResponseFlags, +) from meta_memcache.extras.migrating_cache_client import ( MigratingCacheClient, MigrationMode, @@ -74,13 +82,17 @@ def _set_cache_client_mock_get_return_values(client: Mock, ttl: int = 10) -> Non client.meta_get.return_value = Value( size=3, value="bar", - ttl=ttl, + flags=ResponseFlags( + ttl=ttl, + ), ) client.meta_multiget.return_value = { Key(key="foo", routing_key=None, is_unicode=False): Value( size=3, value="bar", - ttl=ttl, + flags=ResponseFlags( + ttl=ttl, + ), ) } diff --git a/tests/probabilistic_hot_cache_test.py b/tests/probabilistic_hot_cache_test.py index 728bdc5..4e02fcf 100644 --- a/tests/probabilistic_hot_cache_test.py +++ b/tests/probabilistic_hot_cache_test.py @@ -13,7 +13,7 @@ ProbabilisticHotCache, ) from meta_memcache.metrics.prometheus import PrometheusMetricsCollector -from meta_memcache.protocol import Flag, Miss, ReadResponse, TokenFlag +from meta_memcache.protocol import Flag, Miss, ReadResponse, TokenFlag, ResponseFlags @pytest.fixture @@ -29,8 +29,10 @@ def meta_get( return Value( size=1, value=1, - fetched=1, - last_access=1, + flags=ResponseFlags( + fetched=True, + last_access=1, + ), ) elif key.key.endswith("miss"): return Miss() @@ -38,8 +40,10 @@ def meta_get( return Value( size=1, value=1, - fetched=1, - last_access=9999, + flags=ResponseFlags( + fetched=True, + last_access=9999, + ), ) def meta_multiget( @@ -639,8 +643,10 @@ def test_stale_expires( client.meta_get.side_effect = lambda *args, **kwargs: Value( size=1, value=1, - fetched=1, - last_access=9999, + flags=ResponseFlags( + fetched=True, + last_access=9999, + ), ) # The item will no longer be in the hot cache From 5490922051899b83664cee0e32dfe434cf87c9cf Mon Sep 17 00:00:00 2001 From: Guillermo Perez Date: Tue, 7 Nov 2023 22:58:19 +0100 Subject: [PATCH 2/2] Build cmd in rust ## Motivation / Description The second cpu-intensive part of the request processing is building the cmd. Also instead of building dicts of flags we can use a single flags object, which also simplifies the API of the lower commands. I chose to still built a single flags object, but we could explore building one flags object per meta-command, as the flags that they support differ, and it could lead to a more type-safe low-level implementation. ## Performance: * Initial: multithreaded: Overall: 110779.55 RPS / 9.03 us/req singlethreaded: Overall: 111545.63 RPS / 8.96 us/req * Rust only for response parsing multithreaded: Overall: 245898.34 RPS / 4.07 us/req singlethreaded: Overall: 246165.19 RPS / 4.06 us/req * Now (rust also for build_cmd) multithreaded: Overall: 319587.03 RPS / 3.13 us/req singlethreaded: Overall: 323101.77 RPS / 3.10 us/req --- src/meta_memcache/__init__.py | 4 +- src/meta_memcache/cache_client.py | 8 +- .../commands/high_level_commands.py | 204 +++++++----------- src/meta_memcache/commands/meta_commands.py | 36 +--- src/meta_memcache/configuration.py | 20 +- .../connection/memcache_socket.py | 14 +- src/meta_memcache/executors/default.py | 111 ++++------ src/meta_memcache/extras/client_wrapper.py | 36 +--- .../extras/migrating_cache_client.py | 50 +---- src/meta_memcache/interfaces/executor.py | 15 +- src/meta_memcache/interfaces/meta_commands.py | 26 +-- src/meta_memcache/interfaces/router.py | 15 +- src/meta_memcache/protocol.py | 82 ++----- src/meta_memcache/routers/default.py | 19 +- src/meta_memcache/routers/ephemeral.py | 24 +-- src/meta_memcache/routers/gutter.py | 28 +-- src/meta_memcache/routers/helpers.py | 31 +-- src/meta_memcache/settings.py | 5 +- tests/cache_client_test.py | 26 +-- tests/commands_test.py | 23 +- tests/ephemeral_cache_client_test.py | 10 +- tests/migrating_cache_client_test.py | 106 +++------ tests/probabilistic_hot_cache_test.py | 30 ++- 23 files changed, 311 insertions(+), 612 deletions(-) diff --git a/src/meta_memcache/__init__.py b/src/meta_memcache/__init__.py index e2609f5..cb8d14b 100644 --- a/src/meta_memcache/__init__.py +++ b/src/meta_memcache/__init__.py @@ -33,17 +33,15 @@ ) from meta_memcache.protocol import ( Conflict, - Flag, - IntFlag, Key, MetaCommand, Miss, NotStored, ServerVersion, ResponseFlags, + RequestFlags, SetMode, Success, - TokenFlag, Value, ) from meta_memcache.routers.default import DefaultRouter diff --git a/src/meta_memcache/cache_client.py b/src/meta_memcache/cache_client.py index b09f8ae..f6601f0 100644 --- a/src/meta_memcache/cache_client.py +++ b/src/meta_memcache/cache_client.py @@ -1,4 +1,4 @@ -from typing import Callable, Iterable, Optional, Tuple +from typing import Callable, Iterable, Optional from meta_memcache.base.base_cache_client import BaseCacheClient from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin @@ -25,7 +25,7 @@ def cache_client_from_servers( servers: Iterable[ServerAddress], connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( @@ -48,7 +48,7 @@ def cache_client_with_gutter_from_servers( gutter_ttl: int, connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( @@ -76,7 +76,7 @@ def ephemeral_cache_client_from_servers( max_ttl: int, connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( diff --git a/src/meta_memcache/commands/high_level_commands.py b/src/meta_memcache/commands/high_level_commands.py index dfac617..a9ec8a5 100644 --- a/src/meta_memcache/commands/high_level_commands.py +++ b/src/meta_memcache/commands/high_level_commands.py @@ -5,7 +5,6 @@ Iterable, Optional, Protocol, - Set, Tuple, Type, TypeVar, @@ -18,35 +17,33 @@ from meta_memcache.interfaces.meta_commands import MetaCommandsProtocol from meta_memcache.interfaces.router import FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, Miss, ReadResponse, + RequestFlags, SetMode, Success, - TokenFlag, Value, + MA_MODE_DEC, ) T = TypeVar("T") _REFILL_FAILURE_HANDLING = FailureHandling(track_write_failures=False) - -DEFAULT_FLAGS: Set[Flag] = { - Flag.RETURN_VALUE, - Flag.RETURN_TTL, - Flag.RETURN_CLIENT_FLAG, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_FETCHED, -} -DEFAULT_CAS_FLAGS: Set[Flag] = { - Flag.RETURN_VALUE, - Flag.RETURN_TTL, - Flag.RETURN_CLIENT_FLAG, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_FETCHED, - Flag.RETURN_CAS_TOKEN, -} +DEFAULT_GET_FLAGS = RequestFlags( + return_value=True, + return_ttl=True, + return_client_flag=True, + return_last_access=True, + return_fetched=True, +) +DEFAULT_GET_CAS_FLAGS = RequestFlags( + return_value=True, + return_ttl=True, + return_client_flag=True, + return_last_access=True, + return_fetched=True, + return_cas_token=True, +) class HighLevelCommandMixinWithMetaCommands( @@ -83,7 +80,7 @@ def _get_delta_flags( no_reply: bool = False, cas_token: Optional[int] = None, return_value: bool = False, - ) -> Tuple[Set[Flag], Dict[IntFlag, int], Dict[TokenFlag, bytes]]: + ) -> RequestFlags: ... # pragma: no cover @@ -99,28 +96,21 @@ def set( set_mode: SetMode = SetMode.SET, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() + flags = RequestFlags(cache_ttl=ttl) if no_reply: - flags.add(Flag.NOREPLY) - int_flags: Dict[IntFlag, int] = { - IntFlag.CACHE_TTL: ttl, - } + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_cas_mismatch: - flags.add(Flag.MARK_STALE) - if set_mode == SetMode.SET: - token_flags = None - else: - token_flags = {TokenFlag.MODE: set_mode.value} + flags.mark_stale = True + if set_mode != SetMode.SET: + flags.mode = set_mode.value result = self.meta_set( key=key, value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) return isinstance(result, Success) @@ -149,17 +139,18 @@ def refill( there is no need to track failures. """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() + flags = RequestFlags( + cache_ttl=ttl, + mode=SetMode.ADD.value, + ) if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True result = self.meta_set( key=key, value=value, ttl=ttl, flags=flags, - int_flags={IntFlag.CACHE_TTL: ttl}, - token_flags={TokenFlag.MODE: SetMode.ADD.value}, failure_handling=_REFILL_FAILURE_HANDLING, ) @@ -179,21 +170,16 @@ def delete( it exists or not, use invalidate() instead. """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = {} + flags = RequestFlags() if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_deletion_ttl > 0: - flags.add(Flag.MARK_STALE) - int_flags[IntFlag.CACHE_TTL] = stale_policy.mark_stale_on_deletion_ttl + flags.mark_stale = True + flags.cache_ttl = stale_policy.mark_stale_on_deletion_ttl - result = self.meta_delete( - key=key, - flags=flags, - int_flags=int_flags, - ) + result = self.meta_delete(key=key, flags=flags) return isinstance(result, Success) @@ -208,21 +194,16 @@ def invalidate( Returns true of the key deleted or it didn't exist anyway """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = {} + flags = RequestFlags() if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_deletion_ttl > 0: - flags.add(Flag.MARK_STALE) - int_flags[IntFlag.CACHE_TTL] = stale_policy.mark_stale_on_deletion_ttl + flags.mark_stale = True + flags.cache_ttl = stale_policy.mark_stale_on_deletion_ttl - result = self.meta_delete( - key=key, - flags=flags, - int_flags=int_flags, - ) + result = self.meta_delete(key=key, flags=flags) return isinstance(result, (Success, Miss)) @@ -233,11 +214,10 @@ def touch( no_reply: bool = False, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags = {IntFlag.CACHE_TTL: ttl} + flags = RequestFlags(cache_ttl=ttl) if no_reply: - flags.add(Flag.NOREPLY) - result = self.meta_get(key, flags=flags, int_flags=int_flags) + flags.no_reply = True + result = self.meta_get(key, flags=flags) return isinstance(result, Success) @@ -346,22 +326,17 @@ def _multi_get( return_cas_token: bool = False, ) -> Dict[Key, Optional[Value]]: if return_cas_token: - flags = DEFAULT_CAS_FLAGS.copy() - else: - flags = DEFAULT_FLAGS.copy() - if recache_policy is None and touch_ttl is None: - int_flags = None + flags = DEFAULT_GET_CAS_FLAGS.copy() else: - int_flags = {} - if recache_policy: - int_flags[IntFlag.RECACHE_TTL] = recache_policy.ttl - if touch_ttl is not None and touch_ttl >= 0: - int_flags[IntFlag.CACHE_TTL] = touch_ttl + flags = DEFAULT_GET_FLAGS.copy() + if recache_policy: + flags.recache_ttl = recache_policy.ttl + if touch_ttl is not None and touch_ttl >= 0: + flags.cache_ttl = touch_ttl results = self.meta_multiget( keys=[key if isinstance(key, Key) else Key(key) for key in keys], flags=flags, - int_flags=int_flags, ) return {k: self._process_get_result(k, v) for k, v in results.items()} @@ -392,21 +367,17 @@ def _get( ) -> Optional[Value]: key = key if isinstance(key, Key) else Key(key) if return_cas_token: - flags = DEFAULT_CAS_FLAGS.copy() - else: - flags = DEFAULT_FLAGS.copy() - if lease_policy is None and recache_policy is None and touch_ttl is None: - int_flags = None + flags = DEFAULT_GET_CAS_FLAGS.copy() else: - int_flags = {} - if lease_policy: - int_flags[IntFlag.MISS_LEASE_TTL] = lease_policy.ttl - if recache_policy: - int_flags[IntFlag.RECACHE_TTL] = recache_policy.ttl - if touch_ttl is not None and touch_ttl >= 0: - int_flags[IntFlag.CACHE_TTL] = touch_ttl - - result = self.meta_get(key, flags=flags, int_flags=int_flags) + flags = DEFAULT_GET_FLAGS.copy() + if lease_policy: + flags.vivify_on_miss_ttl = lease_policy.ttl + if recache_policy: + flags.recache_ttl = recache_policy.ttl + if touch_ttl is not None and touch_ttl >= 0: + flags.cache_ttl = touch_ttl + + result = self.meta_get(key, flags=flags) return self._process_get_result(key, result) def _process_get_result( @@ -477,25 +448,22 @@ def _get_delta_flags( no_reply: bool = False, cas_token: Optional[int] = None, return_value: bool = False, - ) -> Tuple[Set[Flag], Dict[IntFlag, int], Dict[TokenFlag, bytes]]: - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = { - IntFlag.MA_DELTA_VALUE: abs(delta), - } - token_flags: Dict[TokenFlag, bytes] = {} - + ) -> RequestFlags: + flags = RequestFlags( + ma_delta_value=abs(delta), + ) if return_value: - flags.add(Flag.RETURN_VALUE) + flags.return_value = True if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if refresh_ttl is not None: - int_flags[IntFlag.CACHE_TTL] = refresh_ttl + flags.cache_ttl = refresh_ttl if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if delta < 0: - token_flags[TokenFlag.MODE] = b"-" + flags.mode = MA_MODE_DEC - return flags, int_flags, token_flags + return flags def delta( self: HighLevelCommandMixinWithMetaCommands, @@ -506,15 +474,13 @@ def delta( cas_token: Optional[int] = None, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( delta=delta, refresh_ttl=refresh_ttl, no_reply=no_reply, cas_token=cas_token, ) - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + result = self.meta_arithmetic(key=key, flags=flags) return isinstance(result, Success) def delta_initialize( @@ -528,17 +494,15 @@ def delta_initialize( cas_token: Optional[int] = None, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( delta=delta, refresh_ttl=refresh_ttl, no_reply=no_reply, cas_token=cas_token, ) - int_flags[IntFlag.MA_INITIAL_VALUE] = abs(initial_value) - int_flags[IntFlag.MISS_LEASE_TTL] = initial_ttl - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + flags.ma_initial_value = abs(initial_value) + flags.vivify_on_miss_ttl = initial_ttl + result = self.meta_arithmetic(key=key, flags=flags) return isinstance(result, Success) def delta_and_get( @@ -549,15 +513,13 @@ def delta_and_get( cas_token: Optional[int] = None, ) -> Optional[int]: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( return_value=True, delta=delta, refresh_ttl=refresh_ttl, cas_token=cas_token, ) - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + result = self.meta_arithmetic(key=key, flags=flags) if isinstance(result, Value): if isinstance(result.value, str) and result.value.isnumeric(): return int(result.value) @@ -577,17 +539,15 @@ def delta_initialize_and_get( cas_token: Optional[int] = None, ) -> Optional[int]: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( return_value=True, delta=delta, refresh_ttl=refresh_ttl, cas_token=cas_token, ) - int_flags[IntFlag.MA_INITIAL_VALUE] = abs(initial_value) - int_flags[IntFlag.MISS_LEASE_TTL] = initial_ttl - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + flags.ma_initial_value = abs(initial_value) + flags.vivify_on_miss_ttl = initial_ttl + result = self.meta_arithmetic(key=key, flags=flags) if isinstance(result, Value): if isinstance(result.value, str) and result.value.isnumeric(): return int(result.value) diff --git a/src/meta_memcache/commands/meta_commands.py b/src/meta_memcache/commands/meta_commands.py index 6d1901e..647f95e 100644 --- a/src/meta_memcache/commands/meta_commands.py +++ b/src/meta_memcache/commands/meta_commands.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from meta_memcache.errors import MemcacheError from meta_memcache.interfaces.router import ( @@ -8,15 +8,13 @@ ) from meta_memcache.protocol import ( Conflict, - Flag, - IntFlag, Key, MetaCommand, Miss, NotStored, ReadResponse, + RequestFlags, Success, - TokenFlag, Value, ValueContainer, WriteResponse, @@ -27,9 +25,7 @@ class MetaCommandsMixin: def meta_multiget( self: HasRouter, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: results: Dict[Key, ReadResponse] = {} @@ -37,8 +33,6 @@ def meta_multiget( command=MetaCommand.META_GET, keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ).items(): if not isinstance(result, (Miss, Value, Success)): @@ -51,17 +45,13 @@ def meta_multiget( def meta_get( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: result = self.router.exec( command=MetaCommand.META_GET, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Miss, Value, Success)): @@ -73,9 +63,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( @@ -83,8 +71,6 @@ def meta_set( key=key, value=ValueContainer(value), flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss)): @@ -94,17 +80,13 @@ def meta_set( def meta_delete( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( command=MetaCommand.META_DELETE, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss)): @@ -116,17 +98,13 @@ def meta_delete( def meta_arithmetic( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( command=MetaCommand.META_ARITHMETIC, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss, Value)): diff --git a/src/meta_memcache/configuration.py b/src/meta_memcache/configuration.py index bc92ffa..ab2911e 100644 --- a/src/meta_memcache/configuration.py +++ b/src/meta_memcache/configuration.py @@ -1,8 +1,7 @@ -import base64 import hashlib import socket from enum import IntEnum -from typing import Callable, Dict, Iterable, NamedTuple, Optional, Tuple +from typing import Callable, Dict, Iterable, NamedTuple, Optional from meta_memcache.connection.pool import ConnectionPool from meta_memcache.protocol import Key, ServerVersion @@ -157,17 +156,18 @@ class StalePolicy(NamedTuple): mark_stale_on_cas_mismatch: bool = False -def default_key_encoder(key: Key) -> Tuple[bytes, bool]: +def default_key_encoder(key: Key) -> bytes: """ Generate valid memcache key as bytes for given Key. """ - if key.is_unicode or len(key.key) > MAX_KEY_SIZE: - key_hash = hashlib.blake2b(key.key.encode(), digest_size=18).digest() - return base64.b64encode(key_hash), True - elif " " in key.key: - raise ValueError(f"Invalid key {key}") - else: - return key.key.encode("ascii"), False + # TODO: Support + # if isinstance(key.key, bytes): + # data = key.key + # else: + data = key.key.encode() + if len(data) >= MAX_KEY_SIZE: + data = hashlib.blake2b(data, digest_size=18).digest() + return data class MigrationMode(IntEnum): diff --git a/src/meta_memcache/connection/memcache_socket.py b/src/meta_memcache/connection/memcache_socket.py index 66505c1..73ad81f 100644 --- a/src/meta_memcache/connection/memcache_socket.py +++ b/src/meta_memcache/connection/memcache_socket.py @@ -8,7 +8,6 @@ from meta_memcache.protocol import ( ENDL, ENDL_LEN, - EMPTY_RESPONSE_FLAGS, NOOP, Conflict, Miss, @@ -135,13 +134,12 @@ def _get_single_header( while True: if self._read != self._pos: - # We have data in the buffer: find the header + # We have data in the buffer: Try to find the header if header_data := meta_socket.parse_header( self._buf_view, self._pos, self._read ): self._pos = header_data[0] return header_data - # Missing data, but still space in buffer, so read more if self._recv_info_buffer() <= 0: break @@ -178,15 +176,13 @@ def get_response( result: Union[Value, Success, NotStored, Conflict, Miss] try: if response_code == meta_socket.RESPONSE_VALUE: - if size is None: - raise MemcacheError("Bad value response. Missing size") # Value response - result = Value( - size=size, flags=flags or EMPTY_RESPONSE_FLAGS, value=None - ) + assert size is not None and flags is not None + result = Value(size=size, flags=flags, value=None) elif response_code == meta_socket.RESPONSE_SUCCESS: # Stored or no value, return Success - result = Success(flags=flags or EMPTY_RESPONSE_FLAGS) + assert flags is not None + result = Success(flags=flags) elif response_code == meta_socket.RESPONSE_NOT_STORED: # Value response, parse size and flags result = NOT_STORED diff --git a/src/meta_memcache/executors/default.py b/src/meta_memcache/executors/default.py index 0383aa6..a0954bd 100644 --- a/src/meta_memcache/executors/default.py +++ b/src/meta_memcache/executors/default.py @@ -1,5 +1,7 @@ import logging -from typing import Callable, Dict, List, Optional, Set, Tuple +from typing import Callable, Dict, List, Optional, Tuple + +from meta_socket import RequestFlags, build_cmd from meta_memcache.base.base_serializer import BaseSerializer from meta_memcache.configuration import default_key_encoder @@ -9,8 +11,6 @@ from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.protocol import ( ENDL, - Flag, - IntFlag, Key, MaybeValue, MemcacheResponse, @@ -20,20 +20,21 @@ ResponseFlags, ServerVersion, Success, - TokenFlag, Value, ValueContainer, - encode_size, ) _log: logging.Logger = logging.getLogger(__name__) +meta_commands_values = {cmd: cmd.value for cmd in MetaCommand} + + class DefaultExecutor: def __init__( self, serializer: BaseSerializer, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, touch_ttl_to_consider_write_failure: Optional[int] = 50, ) -> None: @@ -48,41 +49,39 @@ def _build_cmd( command: MetaCommand, key: Key, size: Optional[int] = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, version: ServerVersion = ServerVersion.STABLE, ) -> bytes: - encoded_key, is_binary = self._key_encoder_fn(key) - cmd = [command.value, encoded_key] - if size is not None: - cmd.append(encode_size(size, version=version)) - cmd_flags: List[bytes] = [] - if is_binary: - cmd_flags.append(Flag.BINARY.value) - if flags: - cmd_flags.extend(flag.value for flag in flags) - if int_flags: - for int_flag, int_value in int_flags.items(): - cmd_flags.append(int_flag.value + str(int_value).encode("ascii")) - if token_flags: - for token_flag, bytes_value in token_flags.items(): - cmd_flags.append(token_flag.value + bytes_value) - cmd.extend(cmd_flags) - return b" ".join(cmd) + ENDL + encoded_key = self._key_encoder_fn(key) + cmd = meta_commands_values[command] + if version == ServerVersion.STABLE: + return build_cmd( + cmd, + encoded_key, + size, + flags, + ) + else: + return build_cmd( + cmd, + encoded_key, + size, + flags, + legacy_size_format=True, + ) - def _prepare_serialized_value_and_int_flags( + def _prepare_serialized_value_and_flags( self, value: ValueContainer, - int_flags: Optional[Dict[IntFlag, int]], - ) -> Tuple[Optional[bytes], Optional[Dict[IntFlag, int]]]: + flags: Optional[RequestFlags], + ) -> Tuple[Optional[bytes], RequestFlags]: encoded_value = self._serializer.serialize(value.value) - int_flags = int_flags if int_flags is not None else {} - int_flags[IntFlag.SET_CLIENT_FLAG] = encoded_value.encoding_id - return encoded_value.data, int_flags + flags = flags if flags is not None else RequestFlags() + flags.client_flag = encoded_value.encoding_id + return encoded_value.data, flags def _is_a_write_failure( - self, command: MetaCommand, int_flags: Optional[Dict[IntFlag, int]] + self, command: MetaCommand, flags: Optional[RequestFlags] ) -> bool: if command in ( MetaCommand.META_DELETE, @@ -92,7 +91,7 @@ def _is_a_write_failure( if ( self._touch_ttl_to_consider_write_failure is not None and command == MetaCommand.META_GET - and (touch_ttl := (int_flags or {}).get(IntFlag.CACHE_TTL, None)) + and (touch_ttl := (flags.cache_ttl if flags else None)) and 0 < touch_ttl <= self._touch_ttl_to_consider_write_failure ): return True @@ -104,16 +103,14 @@ def exec_on_pool( command: MetaCommand, key: Key, value: MaybeValue, - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> MemcacheResponse: - cmd_value, int_flags = ( - (None, int_flags) + cmd_value, flags = ( + (None, flags) if value is None - else self._prepare_serialized_value_and_int_flags(value, int_flags) + else self._prepare_serialized_value_and_flags(value, flags) ) try: conn = pool.pop_connection() @@ -125,8 +122,6 @@ def exec_on_pool( key=key, value=cmd_value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) return self._conn_recv_response(conn, flags=flags) except Exception as e: @@ -135,7 +130,7 @@ def exec_on_pool( finally: pool.release_connection(conn, error=error) except MemcacheServerError: - if track_write_failures and self._is_a_write_failure(command, int_flags): + if track_write_failures and self._is_a_write_failure(command, flags): self.on_write_failure(key) raise_on_server_error = ( raise_on_server_error @@ -154,9 +149,7 @@ def exec_multi_on_pool( # noqa: C901 pool: ConnectionPool, command: MetaCommand, key_values: List[Tuple[Key, MaybeValue]], - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> Dict[Key, MemcacheResponse]: @@ -167,12 +160,10 @@ def exec_multi_on_pool( # noqa: C901 try: # with pool.get_connection() as conn: for key, value in key_values: - cmd_value, int_flags = ( - (None, int_flags) + cmd_value, flags = ( + (None, flags) if value is None - else self._prepare_serialized_value_and_int_flags( - value, int_flags - ) + else self._prepare_serialized_value_and_flags(value, flags) ) self._conn_send_cmd( @@ -181,8 +172,6 @@ def exec_multi_on_pool( # noqa: C901 key=key, value=cmd_value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) for key, _ in key_values: results[key] = self._conn_recv_response(conn, flags=flags) @@ -192,7 +181,7 @@ def exec_multi_on_pool( # noqa: C901 finally: pool.release_connection(conn, error=error) except MemcacheServerError: - if track_write_failures and self._is_a_write_failure(command, int_flags): + if track_write_failures and self._is_a_write_failure(command, flags): for key, _ in key_values: self.on_write_failure(key) raise_on_server_error = ( @@ -215,9 +204,7 @@ def _conn_send_cmd( command: MetaCommand, key: Key, value: Optional[bytes] = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> None: """ Execute command on a connection @@ -227,16 +214,12 @@ def _conn_send_cmd( key, size=len(value) if value is not None else None, flags=flags, - int_flags=int_flags, - token_flags=token_flags, version=conn.get_version(), ) # write meta commands with NOREPLY can potentially return errors # they are not fully silent, so we need to add a no-op to the wire. with_noop = ( - command != MetaCommand.META_GET - and flags is not None - and Flag.NOREPLY in flags + command != MetaCommand.META_GET and flags is not None and flags.no_reply ) if value: @@ -247,12 +230,12 @@ def _conn_send_cmd( def _conn_recv_response( self, conn: MemcacheSocket, - flags: Optional[Set[Flag]] = None, + flags: Optional[RequestFlags] = None, ) -> MemcacheResponse: """ Read response on a connection """ - if flags and Flag.NOREPLY in flags: + if flags and flags.no_reply: return Success(flags=ResponseFlags()) result = conn.get_response() if isinstance(result, Value): diff --git a/src/meta_memcache/extras/client_wrapper.py b/src/meta_memcache/extras/client_wrapper.py index 61f6678..6a1f6be 100644 --- a/src/meta_memcache/extras/client_wrapper.py +++ b/src/meta_memcache/extras/client_wrapper.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin from meta_memcache.configuration import ServerAddress @@ -6,11 +6,9 @@ from meta_memcache.interfaces.router import FailureHandling, DEFAULT_FAILURE_HANDLING from meta_memcache.interfaces.cache_api import CacheApi from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, - TokenFlag, + RequestFlags, WriteResponse, ) @@ -33,32 +31,24 @@ def __init__( def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: return self.client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: return self.client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) @@ -67,9 +57,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_set( @@ -77,40 +65,30 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) diff --git a/src/meta_memcache/extras/migrating_cache_client.py b/src/meta_memcache/extras/migrating_cache_client.py index 9d6c442..9bb5386 100644 --- a/src/meta_memcache/extras/migrating_cache_client.py +++ b/src/meta_memcache/extras/migrating_cache_client.py @@ -1,6 +1,6 @@ import random import time -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Dict, List, Optional, Union from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin from meta_memcache.configuration import MigrationMode, ServerAddress @@ -8,12 +8,10 @@ from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.interfaces.cache_api import CacheApi from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, + RequestFlags, SetMode, - TokenFlag, Value, WriteResponse, ) @@ -94,17 +92,13 @@ def _should_populate_read(self, migration_mode: MigrationMode) -> bool: def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> Dict[Key, ReadResponse]: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: return self._destination_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -113,8 +107,6 @@ def meta_multiget( results = self._origin_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if self._should_populate_read(migration_mode): for key, result in results.items(): @@ -131,24 +123,18 @@ def meta_multiget( return self._origin_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> ReadResponse: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: return self._destination_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -157,8 +143,6 @@ def meta_get( result = self._origin_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if isinstance(result, Value) and self._should_populate_read(migration_mode): self._destination_client.set( @@ -173,8 +157,6 @@ def meta_get( return self._origin_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def meta_set( @@ -182,9 +164,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -194,8 +174,6 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_set( @@ -203,8 +181,6 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -216,9 +192,7 @@ def meta_set( def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -226,15 +200,11 @@ def meta_delete( origin_response = self._origin_client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -246,9 +216,7 @@ def meta_delete( def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: """ We can't reliably migrate cache data modified by meta-arithmetic, @@ -261,15 +229,11 @@ def meta_arithmetic( return self._destination_client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) else: return self._origin_client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def touch( diff --git a/src/meta_memcache/interfaces/executor.py b/src/meta_memcache/interfaces/executor.py index f211b1a..c3ae584 100644 --- a/src/meta_memcache/interfaces/executor.py +++ b/src/meta_memcache/interfaces/executor.py @@ -1,15 +1,14 @@ -from typing import Dict, List, Optional, Protocol, Set, Tuple +from typing import Dict, List, Optional, Protocol, Tuple + from meta_memcache.connection.pool import ConnectionPool from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -20,9 +19,7 @@ def exec_on_pool( command: MetaCommand, key: Key, value: MaybeValue, - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> MemcacheResponse: @@ -38,9 +35,7 @@ def exec_multi_on_pool( pool: ConnectionPool, command: MetaCommand, key_values: List[Tuple[Key, MaybeValue]], - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> Dict[Key, MemcacheResponse]: diff --git a/src/meta_memcache/interfaces/meta_commands.py b/src/meta_memcache/interfaces/meta_commands.py index ed6fa23..8850394 100644 --- a/src/meta_memcache/interfaces/meta_commands.py +++ b/src/meta_memcache/interfaces/meta_commands.py @@ -1,13 +1,11 @@ -from typing import Any, Dict, List, Optional, Protocol, Set +from typing import Any, Dict, List, Optional, Protocol from meta_memcache.interfaces.router import FailureHandling, DEFAULT_FAILURE_HANDLING from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, - TokenFlag, WriteResponse, + RequestFlags, ) @@ -15,9 +13,7 @@ class MetaCommandsProtocol(Protocol): def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: ... # pragma: no cover @@ -25,9 +21,7 @@ def meta_multiget( def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: ... # pragma: no cover @@ -37,9 +31,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover @@ -47,9 +39,7 @@ def meta_set( def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover @@ -57,9 +47,7 @@ def meta_delete( def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover diff --git a/src/meta_memcache/interfaces/router.py b/src/meta_memcache/interfaces/router.py index 2e212d4..4373d9b 100644 --- a/src/meta_memcache/interfaces/router.py +++ b/src/meta_memcache/interfaces/router.py @@ -1,17 +1,16 @@ -from typing import Dict, List, NamedTuple, Optional, Protocol, Set +from typing import Dict, List, NamedTuple, Optional, Protocol + from meta_memcache.configuration import ServerAddress from meta_memcache.connection.pool import PoolCounters from meta_memcache.interfaces.executor import Executor from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -34,9 +33,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -52,9 +49,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ diff --git a/src/meta_memcache/protocol.py b/src/meta_memcache/protocol.py index fe76688..76cfa31 100644 --- a/src/meta_memcache/protocol.py +++ b/src/meta_memcache/protocol.py @@ -1,8 +1,19 @@ from dataclasses import dataclass from enum import Enum, IntEnum -from typing import Any, Dict, List, Optional, Union +from typing import Any, List, Optional, Union + +from meta_socket import ( # noqa: F401 + ResponseFlags, + RequestFlags as RequestFlags, + SET_MODE_ADD, + SET_MODE_APPEND, + SET_MODE_PREPEND, + SET_MODE_REPLACE, + SET_MODE_SET, + MA_MODE_INC as MA_MODE_INC, + MA_MODE_DEC as MA_MODE_DEC, +) -from meta_socket import ResponseFlags ENDL = b"\r\n" NOOP: bytes = b"mn" + ENDL @@ -39,57 +50,11 @@ class MetaCommand(Enum): class SetMode(Enum): - SET = b"S" # Default - ADD = b"E" # Add if item does NOT EXIST, else LRU bump and return NS - APPEND = b"A" # If item exists, append the new value to its data. - PREPEND = b"P" # If item exists, prepend the new value to its data. - REPLACE = b"R" # Set only if item already exists. - - -class Flag(Enum): - BINARY = b"b" - NOREPLY = b"q" - RETURN_CLIENT_FLAG = b"f" - RETURN_CAS_TOKEN = b"c" - RETURN_VALUE = b"v" - RETURN_TTL = b"t" - RETURN_SIZE = b"s" - RETURN_LAST_ACCESS = b"l" - RETURN_FETCHED = b"h" - RETURN_KEY = b"k" - NO_UPDATE_LRU = b"u" - MARK_STALE = b"I" - - -class IntFlag(Enum): - CACHE_TTL = b"T" - RECACHE_TTL = b"R" - MISS_LEASE_TTL = b"N" - SET_CLIENT_FLAG = b"F" - MA_INITIAL_VALUE = b"J" - MA_DELTA_VALUE = b"D" - CAS_TOKEN = b"C" - - -class TokenFlag(Enum): - OPAQUE = b"O" - # 'M' (mode switch): - # * Meta Arithmetic: - # - I or +: increment - # - D or -: decrement - # * Meta Set: See SetMode Enum above - # - E: "add" command. LRU bump and return NS if item exists. Else add. - # - A: "append" command. If item exists, append the new value to its data. - # - P: "prepend" command. If item exists, prepend the new value to its data. - # - R: "replace" command. Set only if item already exists. - # - S: "set" command. The default mode, added for completeness. - MODE = b"M" - - -# Store maps of byte values (int) to enum value -flag_values: Dict[int, Flag] = {f.value[0]: f for f in Flag} -int_flags_values: Dict[int, IntFlag] = {f.value[0]: f for f in IntFlag} -token_flags_values: Dict[int, TokenFlag] = {f.value[0]: f for f in TokenFlag} + SET = SET_MODE_SET # Default + ADD = SET_MODE_ADD # Add if item does NOT EXIST, else LRU bump and return NS + APPEND = SET_MODE_APPEND # If item exists, append the new value to its data. + PREPEND = SET_MODE_PREPEND # If item exists, prepend the new value to its data. + REPLACE = SET_MODE_REPLACE # Set only if item already exists. @dataclass @@ -113,10 +78,6 @@ class Success(MemcacheResponse): __slots__ = ("flags",) flags: ResponseFlags - @classmethod - def default(cls) -> "Success": - return cls(flags=ResponseFlags()) - @dataclass class Value(Success): @@ -168,10 +129,3 @@ def get_store_success_response_header(version: ServerVersion) -> bytes: if version == ServerVersion.AWS_1_6_6: return b"OK" return b"HD" - - -def encode_size(size: int, version: ServerVersion) -> bytes: - if version == ServerVersion.AWS_1_6_6: - return b"S" + str(size).encode("ascii") - else: - return str(size).encode("ascii") diff --git a/src/meta_memcache/routers/default.py b/src/meta_memcache/routers/default.py index 37ad312..0ebcef2 100644 --- a/src/meta_memcache/routers/default.py +++ b/src/meta_memcache/routers/default.py @@ -1,5 +1,6 @@ from collections import defaultdict -from typing import Callable, DefaultDict, Dict, List, Optional, Set, Tuple +from typing import Callable, DefaultDict, Dict, List, Optional, Tuple + from meta_memcache.configuration import ServerAddress from meta_memcache.connection.pool import ConnectionPool, PoolCounters @@ -7,14 +8,12 @@ from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -32,9 +31,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -49,8 +46,6 @@ def exec( key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, track_write_failures=failure_handling.track_write_failures, raise_on_server_error=failure_handling.raise_on_server_error, ) @@ -60,9 +55,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ @@ -78,8 +71,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, track_write_failures=failure_handling.track_write_failures, raise_on_server_error=failure_handling.raise_on_server_error, ) diff --git a/src/meta_memcache/routers/ephemeral.py b/src/meta_memcache/routers/ephemeral.py index e13fde0..291ec4a 100644 --- a/src/meta_memcache/routers/ephemeral.py +++ b/src/meta_memcache/routers/ephemeral.py @@ -1,20 +1,18 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from meta_memcache.connection.providers import ConnectionPoolProvider from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) from meta_memcache.routers.default import DefaultRouter -from meta_memcache.routers.helpers import adjust_int_flags_for_max_ttl +from meta_memcache.routers.helpers import adjust_flags_for_max_ttl class EphemeralRouter(DefaultRouter): @@ -52,18 +50,14 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: return super().exec( command=command, key=key, value=value, - flags=flags, - int_flags=adjust_int_flags_for_max_ttl(int_flags, self._max_ttl), - token_flags=token_flags, + flags=adjust_flags_for_max_ttl(flags, self._max_ttl), failure_handling=failure_handling, ) @@ -72,17 +66,13 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: return super().exec_multi( command=command, keys=keys, values=values, - flags=flags, - int_flags=adjust_int_flags_for_max_ttl(int_flags, self._max_ttl), - token_flags=token_flags, + flags=adjust_flags_for_max_ttl(flags, self._max_ttl), failure_handling=failure_handling, ) diff --git a/src/meta_memcache/routers/gutter.py b/src/meta_memcache/routers/gutter.py index 91e8eb0..0f32511 100644 --- a/src/meta_memcache/routers/gutter.py +++ b/src/meta_memcache/routers/gutter.py @@ -1,21 +1,19 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from meta_memcache.connection.providers import ConnectionPoolProvider from meta_memcache.errors import MemcacheServerError from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) from meta_memcache.routers.default import DefaultRouter -from meta_memcache.routers.helpers import adjust_int_flags_for_max_ttl +from meta_memcache.routers.helpers import adjust_flags_for_max_ttl class GutterRouter(DefaultRouter): @@ -38,9 +36,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -57,8 +53,6 @@ def exec( key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # We always want to raise on server errors so we can # try the gutter pool raise_on_server_error=True, @@ -67,15 +61,13 @@ def exec( ) except MemcacheServerError: # Override TTLs > than gutter TTL - int_flags = adjust_int_flags_for_max_ttl(int_flags, self._gutter_ttl) + flags = adjust_flags_for_max_ttl(flags, self._gutter_ttl) return self.executor.exec_on_pool( pool=self.gutter_pool_provider.get_pool(key), command=command, key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # Respect the raise_on_server_error flag if the gutter pool also # fails raise_on_server_error=failure_handling.raise_on_server_error, @@ -89,9 +81,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ @@ -110,8 +100,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # We always want to raise on server errors so we can # try the gutter pool raise_on_server_error=True, @@ -126,7 +114,7 @@ def exec_multi( gutter_values.append(value) if gutter_keys: # Override TTLs > than gutter TTL - int_flags = adjust_int_flags_for_max_ttl(int_flags, self._gutter_ttl) + flags = adjust_flags_for_max_ttl(flags, self._gutter_ttl) for pool, key_values in self._exec_multi_prepare_pool_map( self.gutter_pool_provider.get_pool, gutter_keys, gutter_values ).items(): @@ -136,8 +124,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # Respect the raise_on_server_error flag if the gutter pool also # fails raise_on_server_error=failure_handling.raise_on_server_error, diff --git a/src/meta_memcache/routers/helpers.py b/src/meta_memcache/routers/helpers.py index f8f1bf3..f9ebc29 100644 --- a/src/meta_memcache/routers/helpers.py +++ b/src/meta_memcache/routers/helpers.py @@ -1,23 +1,24 @@ -from typing import Dict, Optional +from typing import Optional -from meta_memcache.protocol import IntFlag +from meta_memcache.protocol import RequestFlags -def adjust_int_flags_for_max_ttl( - int_flags: Optional[Dict[IntFlag, int]], +def _adjust_ttl(ttl: Optional[int], max_ttl: int) -> Optional[int]: + if ttl is not None and (ttl == 0 or ttl > max_ttl): + return max_ttl + return ttl + + +def adjust_flags_for_max_ttl( + flags: Optional[RequestFlags], max_ttl: int, -) -> Optional[Dict[IntFlag, int]]: +) -> Optional[RequestFlags]: """ Override TTLs > than `max_ttl` """ - if int_flags: - for flag in ( - IntFlag.CACHE_TTL, - IntFlag.RECACHE_TTL, - IntFlag.MISS_LEASE_TTL, - ): - ttl = int_flags.get(flag) - if ttl is not None and (ttl == 0 or ttl > max_ttl): - int_flags[flag] = max_ttl + if flags: + flags.cache_ttl = _adjust_ttl(flags.cache_ttl, max_ttl) + flags.recache_ttl = _adjust_ttl(flags.recache_ttl, max_ttl) + flags.vivify_on_miss_ttl = _adjust_ttl(flags.vivify_on_miss_ttl, max_ttl) - return int_flags + return flags diff --git a/src/meta_memcache/settings.py b/src/meta_memcache/settings.py index 9e9a207..cecbbfd 100644 --- a/src/meta_memcache/settings.py +++ b/src/meta_memcache/settings.py @@ -5,4 +5,7 @@ DEFAULT_READ_BUFFER_SIZE = 4096 -MAX_KEY_SIZE = 250 +# Max key is 250, but when using binary keys will be b64 encoded +# so take more space. Keys longer than this will be hashed, so +# it's not a problem. +MAX_KEY_SIZE = 187 diff --git a/tests/cache_client_test.py b/tests/cache_client_test.py index 9e376e9..e224f6f 100644 --- a/tests/cache_client_test.py +++ b/tests/cache_client_test.py @@ -12,7 +12,7 @@ ) from meta_memcache.connection.pool import ConnectionPool, PoolCounters from meta_memcache.errors import MemcacheServerError -from meta_memcache.protocol import NOOP, Flag, IntFlag +from meta_memcache.protocol import NOOP, RequestFlags from meta_memcache.settings import DEFAULT_MARK_DOWN_PERIOD_S @@ -167,10 +167,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_called_once_with(Key("foo")) @@ -194,10 +194,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_called_once_with(Key("foo")) @@ -220,10 +220,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_not_called() diff --git a/tests/commands_test.py b/tests/commands_test.py index 3bd2241..cafb033 100644 --- a/tests/commands_test.py +++ b/tests/commands_test.py @@ -28,10 +28,9 @@ Miss, NotStored, ResponseFlags, + RequestFlags, ServerVersion, Success, - IntFlag, - TokenFlag, Value, ) from meta_memcache.routers.default import DefaultRouter @@ -243,7 +242,7 @@ def test_set_cmd( cache_client.set(key=Key("foo"), value=b"123", ttl=300, cas_token=666) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -253,7 +252,7 @@ def test_set_cmd( key=Key("foo"), value=b"123", ttl=300, cas_token=666, stale_policy=StalePolicy() ) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -267,7 +266,7 @@ def test_set_cmd( stale_policy=StalePolicy(mark_stale_on_cas_mismatch=True), ) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 I T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 I T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -309,9 +308,7 @@ def test_refill( key=Key(key="foo"), value="bar", ttl=300, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 300}, - token_flags={TokenFlag.MODE: SetMode.ADD.value}, + flags=RequestFlags(cache_ttl=300, mode=SetMode.ADD.value), failure_handling=FailureHandling(track_write_failures=False), ) @@ -560,7 +557,7 @@ def test_get_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> ) assert_called_once_with_command( memcache_socket.sendall, - b"mg lCV3WxKxtWrdY4s1+R710+9J b t l v h f R30 T300\r\n", + b"mg w7puw63Dp29k4o23 b t l v h f R30 T300\r\n", with_noop=False, ) memcache_socket.sendall.reset_mock() @@ -1186,7 +1183,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - cache_client.delta(key=Key("foo"), delta=1, refresh_ttl=60, no_reply=True) memcache_socket.sendall.assert_called_once_with( - b"ma foo q D1 T60\r\n", with_noop=True + b"ma foo q T60 D1\r\n", with_noop=True ) memcache_socket.sendall.reset_mock() @@ -1205,7 +1202,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - cas_token=123, ) memcache_socket.sendall.assert_called_once_with( - b"ma foo q D1 C123 J10 N60\r\n", with_noop=True + b"ma foo q N60 J10 D1 C123\r\n", with_noop=True ) memcache_socket.sendall.reset_mock() @@ -1228,7 +1225,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - ) assert result is None memcache_socket.sendall.assert_called_once_with( - b"ma foo v D1 J0 N60\r\n", with_noop=False + b"ma foo v N60 J0 D1\r\n", with_noop=False ) memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() @@ -1258,7 +1255,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - ) assert result == 10 memcache_socket.sendall.assert_called_once_with( - b"ma foo v D1 J0 N60\r\n", with_noop=False + b"ma foo v N60 J0 D1\r\n", with_noop=False ) memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() diff --git a/tests/ephemeral_cache_client_test.py b/tests/ephemeral_cache_client_test.py index ddc2a30..e1249a3 100644 --- a/tests/ephemeral_cache_client_test.py +++ b/tests/ephemeral_cache_client_test.py @@ -1,5 +1,5 @@ from unittest.mock import call -from meta_memcache.protocol import NOOP, Flag, IntFlag +from meta_memcache.protocol import NOOP, RequestFlags from pytest_mock import MockerFixture @@ -29,10 +29,10 @@ def test_ephemeral_cache_client(mocker: MockerFixture) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) c.sendall.assert_has_calls([call(b"mg bar q T60\r\n"), call(b"mg foo q T60\r\n")]) c.sendall.reset_mock() diff --git a/tests/migrating_cache_client_test.py b/tests/migrating_cache_client_test.py index ddede09..0a96457 100644 --- a/tests/migrating_cache_client_test.py +++ b/tests/migrating_cache_client_test.py @@ -3,12 +3,12 @@ import pytest from meta_memcache import ( CacheClient, - IntFlag, Key, SetMode, Value, WriteFailureEvent, ResponseFlags, + RequestFlags, ) from meta_memcache.extras.migrating_cache_client import ( MigratingCacheClient, @@ -167,9 +167,7 @@ def test_migration_mode_origin_only( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_not_called() @@ -177,9 +175,7 @@ def test_migration_mode_origin_only( migration_client_origin_only.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_not_called() @@ -187,9 +183,7 @@ def test_migration_mode_origin_only( migration_client_origin_only.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -227,9 +221,7 @@ def test_migration_mode_destination_only( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes @@ -237,9 +229,7 @@ def test_migration_mode_destination_only( origin_client.meta_delete.assert_not_called() destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic @@ -247,9 +237,7 @@ def test_migration_mode_destination_only( origin_client.meta_arithmetic.assert_not_called() destination_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) # Touch @@ -289,41 +277,31 @@ def test_migration_mode_populate_writes( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -451,41 +429,31 @@ def test_migration_mode_populate_writes_and_reads_1pct( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -566,41 +534,31 @@ def test_migration_mode_populate_writes_and_reads_10pct( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -644,32 +602,24 @@ def test_migration_mode_use_destination_update_origin( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic @@ -677,9 +627,7 @@ def test_migration_mode_use_destination_update_origin( origin_client.meta_arithmetic.assert_not_called() destination_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) # Touch diff --git a/tests/probabilistic_hot_cache_test.py b/tests/probabilistic_hot_cache_test.py index 4e02fcf..afb5496 100644 --- a/tests/probabilistic_hot_cache_test.py +++ b/tests/probabilistic_hot_cache_test.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from unittest.mock import Mock from prometheus_client import CollectorRegistry @@ -6,23 +6,21 @@ import pytest -from meta_memcache import CacheClient, IntFlag, Key, Value +from meta_memcache import CacheClient, Key, Value from meta_memcache.errors import MemcacheError from meta_memcache.extras.probabilistic_hot_cache import ( CachedValue, ProbabilisticHotCache, ) from meta_memcache.metrics.prometheus import PrometheusMetricsCollector -from meta_memcache.protocol import Flag, Miss, ReadResponse, TokenFlag, ResponseFlags +from meta_memcache.protocol import Miss, ReadResponse, ResponseFlags, RequestFlags @pytest.fixture def client() -> Mock: def meta_get( key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: if key.key.endswith("hot"): @@ -48,9 +46,7 @@ def meta_get( def meta_multiget( keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: return {key: meta_get(key=key) for key in keys} @@ -78,15 +74,13 @@ def random(monkeypatch) -> Mock: DEFAULT_FLAGS = { - "flags": { - Flag.RETURN_TTL, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_VALUE, - Flag.RETURN_FETCHED, - Flag.RETURN_CLIENT_FLAG, - }, - "int_flags": None, - "token_flags": None, + "flags": RequestFlags( + return_ttl=True, + return_last_access=True, + return_value=True, + return_fetched=True, + return_client_flag=True, + ), "failure_handling": DEFAULT_FAILURE_HANDLING, }