From 2a3ade1bdcd494567e15f9ff6a7d700e94cbbc8f Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 18 Jan 2024 15:59:43 +0100 Subject: [PATCH] add redis implementation --- dev-requirements.txt | 1 + jupyterhub_traefik_proxy/proxy.py | 4 +- jupyterhub_traefik_proxy/redis.py | 166 ++++++++++++++++++++++++++++++ setup.py | 1 + tests/conftest.py | 103 ++++++++++++++++++ 5 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 jupyterhub_traefik_proxy/redis.py diff --git a/dev-requirements.txt b/dev-requirements.txt index 83a74a17..1c56e0aa 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,4 +15,5 @@ pytest pytest-asyncio>=0.17,<0.23 pytest-cov python-consul2 +redis websockets diff --git a/jupyterhub_traefik_proxy/proxy.py b/jupyterhub_traefik_proxy/proxy.py index 15903484..f8e566eb 100644 --- a/jupyterhub_traefik_proxy/proxy.py +++ b/jupyterhub_traefik_proxy/proxy.py @@ -596,7 +596,9 @@ async def stop(self): if the proxy is to be started by the Hub """ self._stop_traefik() - self._cleanup() + _cleanup_result = self._cleanup() + if _cleanup_result is not None: + await _cleanup_result def _cleanup(self): """Cleanup after stop diff --git a/jupyterhub_traefik_proxy/redis.py b/jupyterhub_traefik_proxy/redis.py new file mode 100644 index 00000000..b4abaf84 --- /dev/null +++ b/jupyterhub_traefik_proxy/redis.py @@ -0,0 +1,166 @@ +"""Redis backend""" +import asyncio +from urllib.parse import urlparse + +from traitlets import Any, Dict, Unicode, default + +from .kv_proxy import TKvProxy +from .traefik_utils import deep_merge + + +class TraefikRedisProxy(TKvProxy): + """JupyterHub Proxy implementation using traefik and redis""" + + provider_name = "redis" + + redis_url = Unicode( + 'redis://localhost:6379', config=True, help="""The URL for the redis endpoint""" + ) + redis_username = Unicode(config=True, help="The redis username") + redis_password = Unicode(config=True, help="The redis password") + + redis_client_kwargs = Dict( + config=True, + help="Additional keyword arguments to pass through to the `redis.Redis` constructor", + ) + + redis = Any() + + @default("redis") + def _connect_redis(self): + try: + from redis.asyncio import Redis + except ImportError: + raise ImportError( + "Please install `redis` package to use traefik-proxy with redis" + ) + + url = urlparse(self.redis_url) + if url.port: + port = url.port + else: + # default port + port = 6379 + kwargs = dict( + host=url.hostname, + port=port, + decode_responses=True, + ) + if self.redis_password: + kwargs["password"] = self.redis_password + if self.redis_username: + kwargs["username"] = self.redis_username + kwargs.update(self.redis_client_kwargs) + return Redis(**kwargs) + + async def _cleanup(self): + f = super()._cleanup() + if f is not None: + await f + await self.redis.close() + + def _setup_traefik_static_config(self): + self.log.debug("Setting up the redis provider in the static config") + url = urlparse(self.redis_url) + redis_config = { + "endpoints": [url.netloc], + "rootKey": self.kv_traefik_prefix, + } + if self.redis_username: + redis_config["username"] = self.redis_username + if self.redis_password: + redis_config["password"] = self.redis_password + + self.static_config = deep_merge( + self.static_config, {"providers": {"redis": redis_config}} + ) + return super()._setup_traefik_static_config() + + async def _kv_atomic_set(self, to_set: dict): + """Set a collection of keys and values + + Should be done atomically (i.e. in a transaction), + setting nothing on failure. + + Args: + + to_set (dict): key/value pairs to set + Will always be a flattened dict + of single key-value pairs, + not a nested structure. + """ + self.log.debug("Setting redis keys %s", to_set.keys()) + await self.redis.mset(to_set) + + _delete_script = Any() + + @default("_delete_script") + def _register_delete_script(self): + """Register LUA script for deleting all keys matching in a prefix + + Doing the scan & delete from Python is _extremely_ slow + for some reason + """ + _delete_lua = """ + local all_keys = {}; + local cursor = ""; + repeat + local result = redis.call("SCAN", cursor, "match", ARGV[1], "count", ARGV[2]) + cursor = result[1]; + for i, key in ipairs(result[2]) do + table.insert(all_keys, key); + end + until cursor == "0" + for i, key in ipairs(all_keys) do + redis.call("DEL", key); + end + return #all_keys; + """ + return self.redis.register_script(_delete_lua) + + async def _kv_atomic_delete(self, *keys): + """Delete one or more keys + + If a key ends with `self.kv_separator`, it should be a recursive delete + """ + + to_delete = [] + + futures = [] + for key in keys: + if key.endswith(self.kv_separator): + prefix = key + "*" + self.log.debug("Deleting redis tree %s", prefix) + f = asyncio.ensure_future(self._delete_script(args=[prefix, 100])) + f.add_done_callback( + lambda f: self.log.debug( + "Deleted %i keys in %s", f.result(), prefix + ) + ) + futures.append(f) + else: + to_delete.append(key) + + if to_delete: + self.log.debug("Deleting redis keys %s", to_delete) + futures.append(self.redis.delete(*to_delete)) + + await asyncio.gather(*futures) + + async def _kv_get_tree(self, prefix): + """Return all data under prefix as a dict + + Should probably use `unflatten_dict_from_kv` + """ + if not prefix.endswith(self.kv_separator): + prefix = prefix + self.kv_separator + + keys = [] + # is there a possibility this could get too big? + # should we batch? + async for key in self.redis.scan_iter(match=prefix + "*"): + keys.append(key) + self.log.debug("Getting redis keys %s", keys) + values = list(await self.redis.mget(keys)) + kv_list = zip(keys, values) + return self.unflatten_dict_from_kv(kv_list, root_key=prefix) diff --git a/setup.py b/setup.py index 3981a573..7d33ca02 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,7 @@ "traefik_consul = jupyterhub_traefik_proxy.consul:TraefikConsulProxy", "traefik_etcd = jupyterhub_traefik_proxy.etcd:TraefikEtcdProxy", "traefik_file = jupyterhub_traefik_proxy.fileprovider:TraefikFileProviderProxy", + "traefik_redis = jupyterhub_traefik_proxy.redis:TraefikRedisProxy", "traefik_toml = jupyterhub_traefik_proxy.toml:TraefikTomlProxy", ] }, diff --git a/tests/conftest.py b/tests/conftest.py index e35c0daf..6c59c0d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,7 @@ from jupyterhub_traefik_proxy.consul import TraefikConsulProxy from jupyterhub_traefik_proxy.etcd import TraefikEtcdProxy from jupyterhub_traefik_proxy.fileprovider import TraefikFileProviderProxy +from jupyterhub_traefik_proxy.redis import TraefikRedisProxy from jupyterhub_traefik_proxy.traefik_utils import deep_merge from . import utils @@ -51,6 +52,11 @@ class Config: consul_port = 8500 consul_auth_port = 8501 + # redis auth credentials + redis_port = 9988 + redis_username = "redisuser" + redis_password = "redispass" + # Traefik api auth login credentials traefik_api_user = "api_admin" traefik_api_pass = "admin" @@ -176,6 +182,23 @@ async def no_auth_consul_proxy(launch_consul, proxy_args): await proxy.stop() +@pytest.fixture +async def redis_proxy(launch_redis, proxy_args): + """ + Fixture returning a configured TraefikRedisProxy. + """ + proxy = TraefikRedisProxy( + redis_url=f"redis://127.0.0.1:{Config.redis_port}", + redis_username=Config.redis_username, + redis_password=Config.redis_password, + should_start=True, + **proxy_args, + ) + await proxy.start() + yield proxy + await proxy.stop() + + @pytest.fixture async def auth_consul_proxy(launch_consul_auth, proxy_args): """ @@ -384,6 +407,19 @@ async def auth_external_consul_proxy(launch_traefik_consul_auth, proxy_args): yield proxy +@pytest.fixture +async def external_redis_proxy(launch_traefik_redis, proxy_args): + proxy = TraefikRedisProxy( + redis_url=f"redis://127.0.0.1:{Config.redis_port}", + redis_username=Config.redis_username, + redis_password=Config.redis_password, + should_start=False, + **proxy_args, + ) + await proxy._start_future + yield proxy + + @pytest.fixture async def external_etcd_proxy(launch_traefik_etcd, etcd_client_ca, proxy_args): proxy = _make_etcd_proxy( @@ -420,6 +456,8 @@ async def auth_external_etcd_proxy( "auth_external_etcd_proxy", "external_file_proxy_toml", "external_file_proxy_yaml", + "redis_proxy", + "external_redis_proxy", ] ) def proxy(request, client_ca, proxy_ssl_key_cert): @@ -466,6 +504,16 @@ def launch_traefik_etcd_auth(launch_traefik, launch_etcd_auth, etcd_client_ca): ) +@pytest.fixture +def launch_traefik_redis(launch_traefik, launch_redis): + return launch_traefik( + "--providers.redis", + f"--providers.redis.endpoints=127.0.0.1:{Config.redis_port}", + f"--providers.redis.username={Config.redis_username}", + f"--providers.redis.password={Config.redis_password}", + ) + + @pytest.fixture def launch_traefik_consul(launch_traefik, launch_consul): return launch_traefik("--providers.consul") @@ -717,6 +765,61 @@ async def _check_consul(): ) +# Redis launch + + +@pytest.fixture(scope="module") +def launch_redis(): + with TemporaryDirectory() as path: + print(f"Launching redis in {path}") + redis_proc = subprocess.Popen( + [ + "redis-server", + "--port", + str(Config.redis_port), + "--", + "user", + Config.redis_username, + f">{Config.redis_password}", + "allcommands", + "allkeys", + "on", + ], + cwd=path, + ) + try: + # asyncio.run instead of await because this fixture's scope + # is module-scoped, while event_loop is 'function'-scoped + asyncio.run(_wait_for_redis()) + yield redis_proc + finally: + terminate_process(redis_proc) + + +async def _wait_for_redis(): + import redis + from redis.asyncio import Redis + + async def _check_redis(): + try: + r = Redis( + port=Config.redis_port, + username=Config.redis_username, + password=Config.redis_password, + ) + await r.get("x") + except redis.exceptions.ConnectionError as e: + print(e) + return False + return True + + await exponential_backoff( + _check_redis, + "Redis not available", + timeout=20, + ) + + ######################################################################### # Teardown functions # #########################################################################