diff --git a/benchmark.py b/benchmark.py index 52fd157..a5ff173 100644 --- a/benchmark.py +++ b/benchmark.py @@ -13,7 +13,10 @@ default_key_encoder, ) from meta_memcache.connection.pool import ConnectionPool -from meta_memcache.connection.providers import HashRingConnectionPoolProvider +from meta_memcache.connection.providers import ( + HashRingConnectionPoolProvider, + NonConsistentHashPoolProvider, +) from meta_memcache.executors.default import DefaultExecutor from meta_memcache.interfaces.cache_api import CacheApi from meta_memcache.protocol import Success @@ -70,6 +73,7 @@ def connection_pool_builder(server_address: ServerAddress) -> ConnectionPool: class Benchmark: __slots__ = ( "server", + "consistent_sharding", "concurrency", "runs", "ops_per_run", @@ -80,12 +84,14 @@ class Benchmark: def __init__( self, server: str, + consistent_sharding: bool, concurrency: int, runs: int, ops_per_run: int, with_gc: bool, ) -> None: self.server = server + self.consistent_sharding = consistent_sharding self.concurrency = concurrency self.runs = runs self.ops_per_run = ops_per_run @@ -116,7 +122,11 @@ def _build_client(self) -> CacheApi: connection_pool_factory_fn=connection_pool_factory_fn, ) - pool_provider = HashRingConnectionPoolProvider(server_pool=server_pool) + if self.consistent_sharding: + pool_provider = HashRingConnectionPoolProvider(server_pool=server_pool) + else: + pool_provider = NonConsistentHashPoolProvider(server_pool=server_pool) + executor = DefaultExecutor( serializer=MixedSerializer(), key_encoder_fn=default_key_encoder, @@ -146,6 +156,7 @@ def run(self) -> None: total = self.runs * self.ops_per_run * self.concurrency print("=== Starting benchmark ===") print(f" - server: {self.server or ''}") + print(f" - consistent_sharding: {'ON' if self.consistent_sharding else 'OFF'}") print(f" - concurrency: {self.concurrency} threads") print(f" - Requests: {total/1_000_000:.2f}M") print(f" ({self.runs} runs of {self.ops_per_run} reqs per thread)") @@ -179,6 +190,7 @@ def run(self) -> None: @click.command() @click.option("--server", default="", help="Server address as :.") +@click.option("--consistent-sharding/--no-consistent-sharding", default=True) @click.option("--concurrency", default=1, help="Number of threads [1 by default].") @click.option("--runs", default=10, help="Number of runs [10 by default].") @click.option( @@ -194,6 +206,7 @@ def run(self) -> None: ) def cli( server: str, + consistent_sharding: bool = True, concurrency: int = 1, runs: int = 10, ops_per_run: int = 100_000, @@ -201,6 +214,7 @@ def cli( ) -> None: Benchmark( server=server, + consistent_sharding=consistent_sharding, concurrency=concurrency, runs=runs, ops_per_run=ops_per_run, diff --git a/src/meta_memcache/connection/providers.py b/src/meta_memcache/connection/providers.py index f49e161..d8874b2 100644 --- a/src/meta_memcache/connection/providers.py +++ b/src/meta_memcache/connection/providers.py @@ -1,4 +1,5 @@ from typing import Dict, List, Protocol +import zlib from uhashring import HashRing # type: ignore @@ -51,3 +52,22 @@ def get_counters(self) -> Dict[ServerAddress, PoolCounters]: return { server: pool.get_counters() for server, pool in self._server_pool.items() } + + +class NonConsistentHashPoolProvider: + def __init__(self, server_pool: Dict[ServerAddress, ConnectionPool]) -> None: + self._server_pool = server_pool + self._server_count = len(server_pool) + self._servers: List[ServerAddress] = [x for x in server_pool.keys()] + + def get_pool(self, key: Key) -> ConnectionPool: + routing_key = key.routing_key or key.key + server: ServerAddress = self._servers[ + zlib.crc32(routing_key.encode()) % self._server_count + ] + return self._server_pool[server] + + def get_counters(self) -> Dict[ServerAddress, PoolCounters]: + return { + server: pool.get_counters() for server, pool in self._server_pool.items() + }