Skip to content

Commit

Permalink
add redis implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Jan 18, 2024
1 parent eb7b74b commit 2a3ade1
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 1 deletion.
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pytest
pytest-asyncio>=0.17,<0.23
pytest-cov
python-consul2
redis
websockets
4 changes: 3 additions & 1 deletion jupyterhub_traefik_proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,9 @@ async def stop(self):
if the proxy is to be started by the Hub
"""
self._stop_traefik()
self._cleanup()
_cleanup_result = self._cleanup()
if _cleanup_result is not None:
await _cleanup_result

def _cleanup(self):
"""Cleanup after stop
Expand Down
166 changes: 166 additions & 0 deletions jupyterhub_traefik_proxy/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Redis backend"""
import asyncio
from urllib.parse import urlparse

from traitlets import Any, Dict, Unicode, default

from .kv_proxy import TKvProxy
from .traefik_utils import deep_merge


class TraefikRedisProxy(TKvProxy):
"""JupyterHub Proxy implementation using traefik and redis"""

provider_name = "redis"

redis_url = Unicode(
'redis://localhost:6379', config=True, help="""The URL for the redis endpoint"""
)
redis_username = Unicode(config=True, help="The redis username")
redis_password = Unicode(config=True, help="The redis password")

redis_client_kwargs = Dict(
config=True,
help="Additional keyword arguments to pass through to the `redis.Redis` constructor",
)

redis = Any()

@default("redis")
def _connect_redis(self):
try:
from redis.asyncio import Redis
except ImportError:
raise ImportError(
"Please install `redis` package to use traefik-proxy with redis"
)

url = urlparse(self.redis_url)
if url.port:
port = url.port
else:
# default port
port = 6379
kwargs = dict(
host=url.hostname,
port=port,
decode_responses=True,
)
if self.redis_password:
kwargs["password"] = self.redis_password
if self.redis_username:
kwargs["username"] = self.redis_username
kwargs.update(self.redis_client_kwargs)
return Redis(**kwargs)

async def _cleanup(self):
f = super()._cleanup()
if f is not None:
await f
await self.redis.close()

def _setup_traefik_static_config(self):
self.log.debug("Setting up the redis provider in the static config")
url = urlparse(self.redis_url)
redis_config = {
"endpoints": [url.netloc],
"rootKey": self.kv_traefik_prefix,
}
if self.redis_username:
redis_config["username"] = self.redis_username
if self.redis_password:
redis_config["password"] = self.redis_password

self.static_config = deep_merge(
self.static_config, {"providers": {"redis": redis_config}}
)
return super()._setup_traefik_static_config()

async def _kv_atomic_set(self, to_set: dict):
"""Set a collection of keys and values
Should be done atomically (i.e. in a transaction),
setting nothing on failure.
Args:
to_set (dict): key/value pairs to set
Will always be a flattened dict
of single key-value pairs,
not a nested structure.
"""
self.log.debug("Setting redis keys %s", to_set.keys())
await self.redis.mset(to_set)

_delete_script = Any()

@default("_delete_script")
def _register_delete_script(self):
"""Register LUA script for deleting all keys matching in a prefix
Doing the scan & delete from Python is _extremely_ slow
for some reason
"""
_delete_lua = """
local all_keys = {};
local cursor = "";
repeat
local result = redis.call("SCAN", cursor, "match", ARGV[1], "count", ARGV[2])
cursor = result[1];
for i, key in ipairs(result[2]) do
table.insert(all_keys, key);
end
until cursor == "0"
for i, key in ipairs(all_keys) do
redis.call("DEL", key);
end
return #all_keys;
"""
return self.redis.register_script(_delete_lua)

async def _kv_atomic_delete(self, *keys):
"""Delete one or more keys
If a key ends with `self.kv_separator`, it should be a recursive delete
"""

to_delete = []

futures = []
for key in keys:
if key.endswith(self.kv_separator):
prefix = key + "*"
self.log.debug("Deleting redis tree %s", prefix)
f = asyncio.ensure_future(self._delete_script(args=[prefix, 100]))
f.add_done_callback(
lambda f: self.log.debug(
"Deleted %i keys in %s", f.result(), prefix
)
)
futures.append(f)
else:
to_delete.append(key)

if to_delete:
self.log.debug("Deleting redis keys %s", to_delete)
futures.append(self.redis.delete(*to_delete))

await asyncio.gather(*futures)

async def _kv_get_tree(self, prefix):
"""Return all data under prefix as a dict
Should probably use `unflatten_dict_from_kv`
"""
if not prefix.endswith(self.kv_separator):
prefix = prefix + self.kv_separator

keys = []
# is there a possibility this could get too big?
# should we batch?
async for key in self.redis.scan_iter(match=prefix + "*"):
keys.append(key)
self.log.debug("Getting redis keys %s", keys)
values = list(await self.redis.mget(keys))
kv_list = zip(keys, values)
return self.unflatten_dict_from_kv(kv_list, root_key=prefix)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"traefik_consul = jupyterhub_traefik_proxy.consul:TraefikConsulProxy",
"traefik_etcd = jupyterhub_traefik_proxy.etcd:TraefikEtcdProxy",
"traefik_file = jupyterhub_traefik_proxy.fileprovider:TraefikFileProviderProxy",
"traefik_redis = jupyterhub_traefik_proxy.redis:TraefikRedisProxy",
"traefik_toml = jupyterhub_traefik_proxy.toml:TraefikTomlProxy",
]
},
Expand Down
103 changes: 103 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from jupyterhub_traefik_proxy.consul import TraefikConsulProxy
from jupyterhub_traefik_proxy.etcd import TraefikEtcdProxy
from jupyterhub_traefik_proxy.fileprovider import TraefikFileProviderProxy
from jupyterhub_traefik_proxy.redis import TraefikRedisProxy
from jupyterhub_traefik_proxy.traefik_utils import deep_merge

from . import utils
Expand Down Expand Up @@ -51,6 +52,11 @@ class Config:
consul_port = 8500
consul_auth_port = 8501

# redis auth credentials
redis_port = 9988
redis_username = "redisuser"
redis_password = "redispass"

# Traefik api auth login credentials
traefik_api_user = "api_admin"
traefik_api_pass = "admin"
Expand Down Expand Up @@ -176,6 +182,23 @@ async def no_auth_consul_proxy(launch_consul, proxy_args):
await proxy.stop()


@pytest.fixture
async def redis_proxy(launch_redis, proxy_args):
"""
Fixture returning a configured TraefikRedisProxy.
"""
proxy = TraefikRedisProxy(
redis_url=f"redis://127.0.0.1:{Config.redis_port}",
redis_username=Config.redis_username,
redis_password=Config.redis_password,
should_start=True,
**proxy_args,
)
await proxy.start()
yield proxy
await proxy.stop()


@pytest.fixture
async def auth_consul_proxy(launch_consul_auth, proxy_args):
"""
Expand Down Expand Up @@ -384,6 +407,19 @@ async def auth_external_consul_proxy(launch_traefik_consul_auth, proxy_args):
yield proxy


@pytest.fixture
async def external_redis_proxy(launch_traefik_redis, proxy_args):
proxy = TraefikRedisProxy(
redis_url=f"redis://127.0.0.1:{Config.redis_port}",
redis_username=Config.redis_username,
redis_password=Config.redis_password,
should_start=False,
**proxy_args,
)
await proxy._start_future
yield proxy


@pytest.fixture
async def external_etcd_proxy(launch_traefik_etcd, etcd_client_ca, proxy_args):
proxy = _make_etcd_proxy(
Expand Down Expand Up @@ -420,6 +456,8 @@ async def auth_external_etcd_proxy(
"auth_external_etcd_proxy",
"external_file_proxy_toml",
"external_file_proxy_yaml",
"redis_proxy",
"external_redis_proxy",
]
)
def proxy(request, client_ca, proxy_ssl_key_cert):
Expand Down Expand Up @@ -466,6 +504,16 @@ def launch_traefik_etcd_auth(launch_traefik, launch_etcd_auth, etcd_client_ca):
)


@pytest.fixture
def launch_traefik_redis(launch_traefik, launch_redis):
return launch_traefik(
"--providers.redis",
f"--providers.redis.endpoints=127.0.0.1:{Config.redis_port}",
f"--providers.redis.username={Config.redis_username}",
f"--providers.redis.password={Config.redis_password}",
)


@pytest.fixture
def launch_traefik_consul(launch_traefik, launch_consul):
return launch_traefik("--providers.consul")
Expand Down Expand Up @@ -717,6 +765,61 @@ async def _check_consul():
)


# Redis launch


@pytest.fixture(scope="module")
def launch_redis():
with TemporaryDirectory() as path:
print(f"Launching redis in {path}")
redis_proc = subprocess.Popen(
[
"redis-server",
"--port",
str(Config.redis_port),
"--",
"user",
Config.redis_username,
f">{Config.redis_password}",
"allcommands",
"allkeys",
"on",
],
cwd=path,
)
try:
# asyncio.run instead of await because this fixture's scope
# is module-scoped, while event_loop is 'function'-scoped
asyncio.run(_wait_for_redis())
yield redis_proc
finally:
terminate_process(redis_proc)


async def _wait_for_redis():
import redis
from redis.asyncio import Redis

async def _check_redis():
try:
r = Redis(
port=Config.redis_port,
username=Config.redis_username,
password=Config.redis_password,
)
await r.get("x")
except redis.exceptions.ConnectionError as e:
print(e)
return False
return True

await exponential_backoff(
_check_redis,
"Redis not available",
timeout=20,
)


#########################################################################
# Teardown functions #
#########################################################################
Expand Down

0 comments on commit 2a3ade1

Please sign in to comment.