Skip to content

Commit

Permalink
Add NonConsistentHashPoolProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
bisho committed Nov 6, 2023
1 parent aea5ace commit ec883ec
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
18 changes: 16 additions & 2 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
default_key_encoder,
)
from meta_memcache.connection.pool import ConnectionPool
from meta_memcache.connection.providers import HashRingConnectionPoolProvider
from meta_memcache.connection.providers import (
HashRingConnectionPoolProvider,
NonConsistentHashPoolProvider,
)
from meta_memcache.executors.default import DefaultExecutor
from meta_memcache.interfaces.cache_api import CacheApi
from meta_memcache.protocol import Success
Expand Down Expand Up @@ -70,6 +73,7 @@ def connection_pool_builder(server_address: ServerAddress) -> ConnectionPool:
class Benchmark:
__slots__ = (
"server",
"consistent_sharding",
"concurrency",
"runs",
"ops_per_run",
Expand All @@ -80,12 +84,14 @@ class Benchmark:
def __init__(
self,
server: str,
consistent_sharding: bool,
concurrency: int,
runs: int,
ops_per_run: int,
with_gc: bool,
) -> None:
self.server = server
self.consistent_sharding = consistent_sharding
self.concurrency = concurrency
self.runs = runs
self.ops_per_run = ops_per_run
Expand Down Expand Up @@ -116,7 +122,11 @@ def _build_client(self) -> CacheApi:
connection_pool_factory_fn=connection_pool_factory_fn,
)

pool_provider = HashRingConnectionPoolProvider(server_pool=server_pool)
if self.consistent_sharding:
pool_provider = HashRingConnectionPoolProvider(server_pool=server_pool)
else:
pool_provider = NonConsistentHashPoolProvider(server_pool=server_pool)

executor = DefaultExecutor(
serializer=MixedSerializer(),
key_encoder_fn=default_key_encoder,
Expand Down Expand Up @@ -146,6 +156,7 @@ def run(self) -> None:
total = self.runs * self.ops_per_run * self.concurrency
print("=== Starting benchmark ===")
print(f" - server: {self.server or '<mocked>'}")
print(f" - consistent_sharding: {'ON' if self.consistent_sharding else 'OFF'}")
print(f" - concurrency: {self.concurrency} threads")
print(f" - Requests: {total/1_000_000:.2f}M")
print(f" ({self.runs} runs of {self.ops_per_run} reqs per thread)")
Expand Down Expand Up @@ -179,6 +190,7 @@ def run(self) -> None:

@click.command()
@click.option("--server", default="", help="Server address as <IP>:<PORT>.")
@click.option("--consistent-sharding/--no-consistent-sharding", default=True)
@click.option("--concurrency", default=1, help="Number of threads [1 by default].")
@click.option("--runs", default=10, help="Number of runs [10 by default].")
@click.option(
Expand All @@ -194,13 +206,15 @@ def run(self) -> None:
)
def cli(
server: str,
consistent_sharding: bool = True,
concurrency: int = 1,
runs: int = 10,
ops_per_run: int = 100_000,
gc: bool = False,
) -> None:
Benchmark(
server=server,
consistent_sharding=consistent_sharding,
concurrency=concurrency,
runs=runs,
ops_per_run=ops_per_run,
Expand Down
20 changes: 20 additions & 0 deletions src/meta_memcache/connection/providers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, List, Protocol
import zlib

from uhashring import HashRing # type: ignore

Expand Down Expand Up @@ -51,3 +52,22 @@ def get_counters(self) -> Dict[ServerAddress, PoolCounters]:
return {
server: pool.get_counters() for server, pool in self._server_pool.items()
}


class NonConsistentHashPoolProvider:
def __init__(self, server_pool: Dict[ServerAddress, ConnectionPool]) -> None:
self._server_pool = server_pool
self._server_count = len(server_pool)
self._servers: List[ServerAddress] = [x for x in server_pool.keys()]

def get_pool(self, key: Key) -> ConnectionPool:
routing_key = key.routing_key or key.key
server: ServerAddress = self._servers[
zlib.crc32(routing_key.encode()) % self._server_count
]
return self._server_pool[server]

def get_counters(self) -> Dict[ServerAddress, PoolCounters]:
return {
server: pool.get_counters() for server, pool in self._server_pool.items()
}

0 comments on commit ec883ec

Please sign in to comment.