From c90686b8b31c73bb2dc2fc86ea070c65a423b728 Mon Sep 17 00:00:00 2001 From: Alex Leach Date: Wed, 23 Jun 2021 09:38:37 +0000 Subject: [PATCH] Further work to upgrade the proxy to support the Traefik v2 API, in relation to issue: https://github.com/jupyterhub/traefik-proxy/issues/97 Although all tests passed on my test system, the github workflow tests failed with the etcd tests, for the following reasons:- - Hadn't actually tested etcd-3.4.15, the default version installed by the install script. I'd used the default ubuntu version, 3.2.26. I'm not sure if this caused issues (maybe?, see next point), but some new warnings are emitted about the log parameters for instance. - The tests mainly failed due to what I thought was a nasty race-condition between successive TraefikEtcdProxy test fixture calls. The `proxy()` fixture in `tests/test_proxy.py` doesn't appear to fully destroy the dependent Proxy classes between test runs. In the case of TraefikEtcdProxy, this leaves the etcd3/grpc client open through the end of one test and into the next test. Then the next TraefikEtcdProxy test gets a connection error. I don't know why only one grpc client is allowed - is this related to the etcd version? - but regardless, the less than ideal solution is to manually call `TraefikEtcdProxy.kv_client.close()` during the teardown of the external_etcd* test runs. This is also manually called now by `TraefikEtcdProxy.stop()`, when `should_start=True`. (This took me days to figure out!!) Some further modifications to the code include:- - Changed the KV store structure for jupyterhub-specific information. All jupyterhub routespecs, targets and data are now stored as:- jupyterhub/ -/routes/{escaped_routespec} = {target} -/targets/{escaped_target} = {data} N.B. I think this can be condensed to one single request. Atm, to get the {data} for a routespec, two KV get requests are required: 1. to get the {target}, and 2. to get the {data} using that {target}. The {target} is also in the traefik configuration, so it seems like unnecessary duplication of information and redirection. - Added `log_level` and `traefik_log_level` config parameters to the base Proxy class. The first sets the log level for the logger (and handler) of the Proxy class. The latter sets traefik's log level, if `should_start=True`. - General tidy up, removing excessive debug statements and commented-out code. --- jupyterhub_traefik_proxy/consul.py | 64 ++++----- jupyterhub_traefik_proxy/etcd.py | 75 ++++++----- jupyterhub_traefik_proxy/fileprovider.py | 22 +--- jupyterhub_traefik_proxy/install.py | 4 +- jupyterhub_traefik_proxy/kv_proxy.py | 73 ++++++---- jupyterhub_traefik_proxy/proxy.py | 50 +++---- jupyterhub_traefik_proxy/traefik_utils.py | 16 +-- tests/conftest.py | 154 +++++++++++----------- tests/proxytest.py | 32 +++-- 9 files changed, 245 insertions(+), 245 deletions(-) diff --git a/jupyterhub_traefik_proxy/consul.py b/jupyterhub_traefik_proxy/consul.py index 6461b956..5865746c 100644 --- a/jupyterhub_traefik_proxy/consul.py +++ b/jupyterhub_traefik_proxy/consul.py @@ -111,7 +111,6 @@ def _stop_traefik(self): os.environ.pop("CONSUL_HTTP_TOKEN") async def persist_dynamic_config(self): - self.log.debug("Saving dynamic config to consul store") data = self.flatten_dict_for_kv( self.dynamic_config, prefix=self.kv_traefik_prefix ) @@ -128,7 +127,6 @@ def append_payload(key, val): append_payload(k, v) try: - self.log.debug(f"Uploading payload to KV store. Payload: {repr(payload)}") results = await self.kv_client.txn.put(payload=payload) status = 1 response = "" @@ -140,17 +138,13 @@ def append_payload(key, val): else: self.log.debug("Successfully uploaded payload to KV store") - # Let's check if it's in there then... - #index, result = await self.kv_client.kv.get(k) - #self.log.debug(f"And the survey says, at {k} we have: {result}") return status, response async def _kv_atomic_add_route_parts( self, jupyterhub_routespec, target, data, route_keys, rule ): - escaped_target = escapism.escape(target, safe=self.key_safe_chars) - escaped_jupyterhub_routespec = escapism.escape( - jupyterhub_routespec, safe=self.key_safe_chars + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] ) try: @@ -158,14 +152,14 @@ async def _kv_atomic_add_route_parts( { "KV": { "Verb": "set", - "Key": escaped_jupyterhub_routespec, + "Key": jupyterhub_routespec, "Value": base64.b64encode(target.encode()).decode(), } }, { "KV": { "Verb": "set", - "Key": escaped_target, + "Key": jupyterhub_target, "Value": base64.b64encode(data.encode()).decode(), } }, @@ -176,13 +170,6 @@ async def _kv_atomic_add_route_parts( "Value": base64.b64encode(target.encode()).decode(), } }, - #{ - # "KV": { - # "Verb": "set", - # "Key": route_keys.service_weight_path, - # "Value": base64.b64encode(b"1").decode(), - # } - #}, { "KV": { "Verb": "set", @@ -211,26 +198,24 @@ async def _kv_atomic_add_route_parts( return status, response async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): - escaped_jupyterhub_routespec = escapism.escape( - jupyterhub_routespec, safe=self.key_safe_chars - ) - index, v = await self.kv_client.kv.get(escaped_jupyterhub_routespec) + index, v = await self.kv_client.kv.get(jupyterhub_routespec) if v is None: self.log.warning( "Route %s doesn't exist. Nothing to delete", jupyterhub_routespec ) return True, None target = v["Value"] - escaped_target = escapism.escape(target, safe=self.key_safe_chars) + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) try: status, response = await self.kv_client.txn.put( payload=[ - {"KV": {"Verb": "delete", "Key": escaped_jupyterhub_routespec}}, - {"KV": {"Verb": "delete", "Key": escaped_target}}, + {"KV": {"Verb": "delete", "Key": jupyterhub_routespec}}, + {"KV": {"Verb": "delete", "Key": jupyterhub_target}}, {"KV": {"Verb": "delete", "Key": route_keys.service_url_path}}, - #{"KV": {"Verb": "delete", "Key": route_keys.service_weight_path}}, {"KV": {"Verb": "delete", "Key": route_keys.router_service_path}}, {"KV": {"Verb": "delete", "Key": route_keys.router_rule_path}}, ] @@ -244,17 +229,13 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): return status, response async def _kv_get_target(self, jupyterhub_routespec): - escaped_jupyterhub_routespec = escapism.escape( - jupyterhub_routespec, safe=self.key_safe_chars - ) - _, res = await self.kv_client.kv.get(escaped_jupyterhub_routespec) + _, res = await self.kv_client.kv.get(jupyterhub_routespec) if res is None: return None return res["Value"].decode() async def _kv_get_data(self, target): - escaped_target = escapism.escape(target, safe=self.key_safe_chars) - _, res = await self.kv_client.kv.get(escaped_target) + _, res = await self.kv_client.kv.get(target) if res is None: return None @@ -264,10 +245,18 @@ async def _kv_get_route_parts(self, kv_entry): key = escapism.unescape(kv_entry["KV"]["Key"]) value = kv_entry["KV"]["Value"] - # Strip the "/jupyterhub" prefix from the routespec - routespec = key.replace(self.kv_jupyterhub_prefix, "") + # Strip the "jupyterhub/routes/" prefix from the routespec + route_prefix = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes/"] + ) + routespec = key.replace(route_prefix, "") + target = base64.b64decode(value.encode()).decode() - data = await self._kv_get_data(target) + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) + + data = await self._kv_get_data(jupyterhub_target) return routespec, target, data @@ -277,9 +266,10 @@ async def _kv_get_jupyterhub_prefixed_entries(self): { "KV": { "Verb": "get-tree", - "Key": escapism.escape( - self.kv_jupyterhub_prefix, safe=self.key_safe_chars - ), + "Key": f"{self.kv_jupyterhub_prefix}/routes" + #escapism.escape( + # self.kv_jupyterhub_prefix, safe=self.key_safe_chars + #)+ "/routes", } } ] diff --git a/jupyterhub_traefik_proxy/etcd.py b/jupyterhub_traefik_proxy/etcd.py index 8afa9f7d..63cf4ab0 100644 --- a/jupyterhub_traefik_proxy/etcd.py +++ b/jupyterhub_traefik_proxy/etcd.py @@ -19,15 +19,13 @@ # Distributed under the terms of the Modified BSD License. from concurrent.futures import ThreadPoolExecutor -import json -import os +import escapism from urllib.parse import urlparse from tornado.concurrent import run_on_executor from traitlets import Any, default, Unicode from jupyterhub.utils import maybe_future -from . import traefik_utils from jupyterhub_traefik_proxy import TKvProxy @@ -78,23 +76,23 @@ def _default_client(self): import etcd3 except ImportError: raise ImportError("Please install etcd3 package to use traefik-proxy with etcd3") + kwargs = { + 'host': etcd_service.hostname, + 'port': etcd_service.port, + 'ca_cert': self.etcd_client_ca_cert, + 'cert_cert': self.etcd_client_cert_crt, + 'cert_key': self.etcd_client_cert_key, + } if self.kv_password: - return etcd3.client( - host=str(etcd_service.hostname), - port=etcd_service.port, - user=self.kv_username, - password=self.kv_password, - ca_cert=self.etcd_client_ca_cert, - cert_cert=self.etcd_client_cert_crt, - cert_key=self.etcd_client_cert_key, - ) - return etcd3.client( - host=str(etcd_service.hostname), - port=etcd_service.port, - ca_cert=self.etcd_client_ca_cert, - cert_cert=self.etcd_client_cert_crt, - cert_key=self.etcd_client_cert_key, - ) + kwargs.update({ + 'user': self.kv_username, + 'password': self.kv_password + }) + return etcd3.client(**kwargs) + + def _clean_resources(self): + super()._clean_resources() + self.kv_client.close() @run_on_executor def _etcd_transaction(self, success_actions): @@ -119,7 +117,7 @@ def _define_kv_specific_static_config(self): self.static_config.update({"providers" : { "etcd" : { "endpoints": [url.netloc], - "rootKey": self.kv_traefik_prefix, # Is rootKey the new prefix? + "rootKey": self.kv_traefik_prefix, } } }) if self.kv_username and self.kv_password: @@ -131,14 +129,13 @@ def _define_kv_specific_static_config(self): async def _kv_atomic_add_route_parts( self, jupyterhub_routespec, target, data, route_keys, rule ): + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) success = [ self.kv_client.transactions.put(jupyterhub_routespec, target), - self.kv_client.transactions.put(target, data), + self.kv_client.transactions.put(jupyterhub_target, data), self.kv_client.transactions.put(route_keys.service_url_path, target), - # The weight is used to balance services, not servers. - # Traefik by default will use round-robin load-balancing anyway. - # See: https://doc.traefik.io/traefik/routing/services/#load-balancing - #self.kv_client.transactions.put(route_keys.service_weight_path, "1"), self.kv_client.transactions.put( route_keys.router_service_path, route_keys.service_alias ), @@ -151,15 +148,17 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): value = await maybe_future(self._etcd_get(jupyterhub_routespec)) if value is None: self.log.warning( - "Route {jupyterhub_routespec} doesn't exist. Nothing to delete" + f"Route {jupyterhub_routespec} doesn't exist. Nothing to delete" ) return True, None - target = value.decode() + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(value.decode())] + ) success = [ self.kv_client.transactions.delete(jupyterhub_routespec), - self.kv_client.transactions.delete(target), + self.kv_client.transactions.delete(jupyterhub_target), self.kv_client.transactions.delete(route_keys.service_url_path), #self.kv_client.transactions.delete(route_keys.service_weight_path), self.kv_client.transactions.delete(route_keys.router_service_path), @@ -170,7 +169,7 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): async def _kv_get_target(self, jupyterhub_routespec): value = await maybe_future(self._etcd_get(jupyterhub_routespec)) - if value == None: + if value is None: return None return value.decode() @@ -182,17 +181,23 @@ async def _kv_get_data(self, target): async def _kv_get_route_parts(self, kv_entry): key = kv_entry[1].key.decode() - value = kv_entry[0] + value = kv_entry[0].decode() - # Strip the "/jupyterhub" prefix from the routespec - routespec = key.replace(self.kv_jupyterhub_prefix, "") - target = value.decode() - data = await self._kv_get_data(target) + # Strip the "/jupyterhub/routes/" prefix from the routespec and unescape it + sep = self.kv_separator + route_prefix = sep.join([self.kv_jupyterhub_prefix, "routes" + sep]) + target_prefix = sep.join([self.kv_jupyterhub_prefix, "targets" + sep]) + routespec = escapism.unescape(key.replace(route_prefix, "", 1)) + etcd_target = sep.join([target_prefix, escapism.escape(value)]) + target = escapism.unescape(etcd_target.replace(target_prefix, "", 1)) + data = await self._kv_get_data(etcd_target) return routespec, target, data async def _kv_get_jupyterhub_prefixed_entries(self): - routes = await maybe_future(self._etcd_get_prefix(self.kv_jupyterhub_prefix)) + sep = self.kv_separator + routespecs_prefix = sep.join([self.kv_jupyterhub_prefix, "routes" + sep]) + routes = await maybe_future(self._etcd_get_prefix(routespecs_prefix)) return routes async def persist_dynamic_config(self): diff --git a/jupyterhub_traefik_proxy/fileprovider.py b/jupyterhub_traefik_proxy/fileprovider.py index 091fd7a4..1e3d5e33 100644 --- a/jupyterhub_traefik_proxy/fileprovider.py +++ b/jupyterhub_traefik_proxy/fileprovider.py @@ -18,10 +18,8 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. -import json import os import asyncio -import string import escapism from traitlets import Any, default, Unicode, observe @@ -61,7 +59,6 @@ def _set_dynamic_config_file(self, change): def __init__(self, **kwargs): super().__init__(**kwargs) - #self._set_dynamic_config_file(None) try: # Load initial dynamic config from disk self.dynamic_config = self.dynamic_config_handler.load() @@ -106,7 +103,7 @@ def _clean_resources(self): def _get_route_unsafe(self, traefik_routespec): service_alias = traefik_utils.generate_alias(traefik_routespec, "service") router_alias = traefik_utils.generate_alias(traefik_routespec, "router") - routespec = self._routespec_from_traefik_path(traefik_routespec) + routespec = self.validate_routespec(traefik_routespec) result = {"data": None, "target": None, "routespec": routespec} def get_target_data(d, to_find): @@ -133,8 +130,6 @@ def get_target_data(d, to_find): if result["data"] is None and result["target"] is None: self.log.info(f"No route for {routespec} found!") result = None - self.log.debug(f"traefik routespec: {traefik_routespec}") - self.log.debug(f"result for routespec {routespec}:-\n{result}") return result async def start(self): @@ -179,11 +174,9 @@ async def add_route(self, routespec, target, data): The proxy implementation should also have a way to associate the fact that a route came from JupyterHub. """ - self.log.debug(f"\tTraefikFileProviderProxy.add_route: Adding {routespec} for {target}") - traefik_routespec = self._routespec_to_traefik_path(routespec) + traefik_routespec = self.validate_routespec(routespec) service_alias = traefik_utils.generate_alias(traefik_routespec, "service") router_alias = traefik_utils.generate_alias(traefik_routespec, "router") - #data = json.dumps(data) rule = traefik_utils.generate_rule(traefik_routespec) async with self.mutex: @@ -222,9 +215,6 @@ async def add_route(self, routespec, target, data): } self.persist_dynamic_config() - self.log.debug(f"traefik routespec: {traefik_routespec}") - self.log.debug(f"data for routespec {routespec}:-\n{data}") - if self.should_start: try: # Check if traefik was launched @@ -247,17 +237,17 @@ async def delete_route(self, routespec): **Subclasses must define this method** """ - routespec = self._routespec_to_traefik_path(routespec) + routespec = self.validate_routespec(routespec) service_alias = traefik_utils.generate_alias(routespec, "service") router_alias = traefik_utils.generate_alias(routespec, "router") async with self.mutex: + # Pop each entry and if it's the last one, delete the key self.dynamic_config["http"]["routers"].pop(router_alias, None) self.dynamic_config["http"]["services"].pop(service_alias, None) self.dynamic_config["jupyter"]["routers"].pop(router_alias, None) - # If empty, delete the keys if not self.dynamic_config["http"]["routers"]: self.dynamic_config["http"].pop("routers") if not self.dynamic_config["http"]["services"]: @@ -293,7 +283,7 @@ async def get_all_routes(self): continue escaped_routespec = "".join(router.split("_", 1)[1:]) traefik_routespec = escapism.unescape(escaped_routespec) - routespec = self._routespec_from_traefik_path(traefik_routespec) + routespec = self.validate_routespec(traefik_routespec) all_routes.update({ routespec : self._get_route_unsafe(traefik_routespec) }) @@ -320,7 +310,7 @@ async def get_route(self, routespec): None: if there are no routes matching the given routespec """ - routespec = self._routespec_to_traefik_path(routespec) + routespec = self.validate_routespec(routespec) async with self.mutex: return self._get_route_unsafe(routespec) diff --git a/jupyterhub_traefik_proxy/install.py b/jupyterhub_traefik_proxy/install.py index ddc82824..ac49a699 100644 --- a/jupyterhub_traefik_proxy/install.py +++ b/jupyterhub_traefik_proxy/install.py @@ -28,8 +28,8 @@ "https://github.com/etcd-io/etcd/releases/download/v3.4.15/etcd-v3.4.15-darwin-amd64.tar.gz": "c596709069193bffc639a22558bdea4d801128e635909ea01a6fd5b5c85da729", "https://github.com/etcd-io/etcd/releases/download/v3.3.10/etcd-v3.3.10-linux-amd64.tar.gz": "1620a59150ec0a0124a65540e23891243feb2d9a628092fb1edcc23974724a45", "https://github.com/etcd-io/etcd/releases/download/v3.3.10/etcd-v3.3.10-darwin-amd64.tar.gz": "fac4091c7ba6f032830fad7809a115909d0f0cae5cbf5b34044540def743577b", - "https://github.com/etcd-io/etcd/releases/download/v3.2.25/etcd-v3.2.25-linux-amd64.tar.gz": "8a509ffb1443088d501f19e339a0d9c0058ce20599752c3baef83c1c68790ef7", - "https://github.com/etcd-io/etcd/releases/download/v3.2.25/etcd-v3.2.25-darwin-amd64.tar.gz": "9950684a01d7431bc12c3dba014f222d55a862c6f8af64c09c42d7a59ed6790d", + "https://github.com/etcd-io/etcd/releases/download/v3.2.26/etcd-v3.2.26-linux-amd64.tar.gz": "127d4f2097c09d929beb9d3784590cc11102f4b4d4d4da7ad82d5c9e856afd38", + "https://github.com/etcd-io/etcd/releases/download/v3.2.26/etcd-v3.2.26-darwin-amd64.zip": "0393e650ffa3e61b1fd07c61f8c78af1556896c300c9814545ff0e91f52c3513", } checksums_consul = { diff --git a/jupyterhub_traefik_proxy/kv_proxy.py b/jupyterhub_traefik_proxy/kv_proxy.py index cc69629e..17dd5f2d 100644 --- a/jupyterhub_traefik_proxy/kv_proxy.py +++ b/jupyterhub_traefik_proxy/kv_proxy.py @@ -18,6 +18,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +import escapism import json import os @@ -38,8 +39,6 @@ class TKvProxy(TraefikProxy): kv_client = Any() # Key-value store client - #kv_name = Unicode(config=False, help="""The name of the key value store""") - kv_username = Unicode( config=True, help="""The username for key value store login""" ) @@ -170,8 +169,8 @@ async def _kv_get_route_parts(self, kv_entry): from the key-value store given a `kv_entry`. A `kv_entry` is a key-value store entry where the key starts with - `proxy.jupyterhub_prefix`. It is expected that only the routespecs - will be prefixed with `proxy.jupyterhub_prefix` when added to the kv store. + `proxy.kv_jupyterhub_prefix`. It is expected that only the routespecs + will be prefixed with `proxy.kv_jupyterhub_prefix` when added to the kv store. **Subclasses must define this method** @@ -186,15 +185,15 @@ async def _kv_get_route_parts(self, kv_entry): async def _kv_get_jupyterhub_prefixed_entries(self): """Retrive from the kv store all the key-value pairs where the key starts with - `proxy.jupyterhub_prefix`. - It is expected that only the routespecs will be prefixed with `proxy.jupyterhub_prefix` + `proxy.kv_jupyterhub_prefix`. + It is expected that only the routespecs will be prefixed with `proxy.kv_jupyterhub_prefix` when added to the kv store. **Subclasses must define this method** Returns: 'routes': A list of key-value store entries where the keys start - with `proxy.jupyterhub_prefix`. + with `proxy.kv_jupyterhub_prefix`. """ raise NotImplementedError() @@ -208,12 +207,10 @@ def _clean_resources(self): raise async def _setup_traefik_static_config(self): - self.log.debug("Setup the KV-specific static config") self._define_kv_specific_static_config() await super()._setup_traefik_static_config() async def _setup_traefik_dynamic_config(self): - self.log.info("Loading traefik dynamic config into kv store.") await super()._setup_traefik_dynamic_config() await self.persist_dynamic_config() @@ -244,13 +241,13 @@ async def add_route(self, routespec, target, data): Will raise an appropriate Exception (FIXME: find what?) if the route could not be added. - This proxy implementation prefixes the routespec with `proxy.jupyterhub_prefix` when + This proxy implementation prefixes the routespec with `proxy.kv_jupyterhub_prefix` when adding it to the kv store in orde to associate the fact that the route came from JupyterHub. Everything traefik related is prefixed with `proxy.traefik_prefix`. """ - self.log.info("Adding route for %s to %s.", routespec, target) + self.log.debug("Adding route for %s to %s.", routespec, target) - routespec = self._routespec_to_traefik_path(routespec) + routespec = self.validate_routespec(routespec) route_keys = traefik_utils.generate_route_keys(self, routespec, separator=self.kv_separator) # Store the data dict passed in by JupyterHub @@ -259,7 +256,9 @@ async def add_route(self, routespec, target, data): rule = traefik_utils.generate_rule(routespec) # To be able to delete the route when only routespec is provided - jupyterhub_routespec = self.kv_jupyterhub_prefix + routespec + jupyterhub_routespec = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)] + ) status, response = await self._kv_atomic_add_route_parts( jupyterhub_routespec, target, data, route_keys, rule @@ -295,8 +294,10 @@ async def delete_route(self, routespec): """Delete a route and all the traefik related info associated given a routespec, (if it exists). """ - routespec = self._routespec_to_traefik_path(routespec) - jupyterhub_routespec = self.kv_jupyterhub_prefix + routespec + routespec = self.validate_routespec(routespec) + jupyterhub_routespec = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)] + ) route_keys = traefik_utils.generate_route_keys(self, routespec, separator=self.kv_separator) status, response = await self._kv_atomic_delete_route_parts( @@ -327,7 +328,7 @@ async def get_all_routes(self): for kv_entry in routes: traefik_routespec, target, data = await self._kv_get_route_parts(kv_entry) - routespec = self._routespec_from_traefik_path(traefik_routespec) + routespec = self.validate_routespec(traefik_routespec) all_routes[routespec] = { "routespec": routespec, "target": target, @@ -357,13 +358,17 @@ async def get_route(self, routespec): None: if there are no routes matching the given routespec """ routespec = self.validate_routespec(routespec) - traefik_routespec = self._routespec_to_traefik_path(routespec) - jupyterhub_routespec = self.kv_jupyterhub_prefix + traefik_routespec + jupyterhub_routespec = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)] + ) target = await self._kv_get_target(jupyterhub_routespec) if target is None: return None - data = await self._kv_get_data(target) + traefik_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) + data = await self._kv_get_data(traefik_target) return { "routespec": routespec, @@ -376,16 +381,32 @@ def flatten_dict_for_kv(self, data, prefix='traefik'): prefixing each key with :arg:`prefix` and joining each key with `self.kv_separator`. - e.g. flatten_dict_for_kv( {'x' : {'y' : {'z': 'a'} }, {'foo': 'bar'} } ) + If the final value is a `list`, then the provided bottom-level key + shall be appended with an incrementing numeric number, in the style + that is used by traefik's KV store, e.g. + + flatten_dict_for_kv({ + 'x' : { + 'y' : { + 'z': 'a' + } + }, { + 'foo': 'bar' + }, + 'baz': [ 'a', 'b', 'c' ] + }) Returns: result (dict): { - 'traefik.x.y.z' : 'a', - 'traefik.x.foo': 'bar' + 'traefik/x/y/z' : 'a', + 'traefik/x/foo': 'bar', + 'traefik/baz/0': 'a', + 'traefik/baz/1': 'b', + 'traefik/baz/2': 'c', } - - Ref: Taken from https://stackoverflow.com/a/6027615 + + Ref: Inspired by https://stackoverflow.com/a/6027615 """ sep = self.kv_separator items = {} @@ -393,15 +414,11 @@ def flatten_dict_for_kv(self, data, prefix='traefik'): new_key = prefix + sep + k if prefix else k if isinstance(v, MutableMapping): items.update(self.flatten_dict_for_kv(v, prefix=new_key)) - #else: - #items.update({new_key: v}) elif isinstance(v, str): items.update({new_key: v}) elif isinstance(v, list): for n, item in enumerate(v): items.update({ f"{new_key}{sep}{n}" : item }) - #items.update({new_key: ", ".join(v)}) - #transations.append(self.kv_client.transactions.put(k, ", ".join(v))) else: raise ValueError(f"Cannot upload {v} of type {type(v)} to etcd store") return items diff --git a/jupyterhub_traefik_proxy/proxy.py b/jupyterhub_traefik_proxy/proxy.py index 98e07ee9..c7e8df5a 100644 --- a/jupyterhub_traefik_proxy/proxy.py +++ b/jupyterhub_traefik_proxy/proxy.py @@ -58,7 +58,8 @@ class TraefikProxy(Proxy): debug = Bool(False, config=True, help="""Debug the proxy class?""") - traefik_log_level = Unicode("DEBUG", config=True, help="""traefik's log level""") + traefik_log_level = Unicode(config=True, help="""traefik's log level""") + log_level = Unicode(config=True, help="""The Proxy's log level""") traefik_api_password = Unicode( config=True, help="""The password for traefik api login""" @@ -70,21 +71,23 @@ class TraefikProxy(Proxy): def __init__(self, **kwargs): super().__init__(**kwargs) - if kwargs.get('debug', self.debug) == True: - import sys, logging - # Check we don't already have a StreamHandler - addHandler = True - for handler in self.log.handlers: - if isinstance(handler, logging.StreamHandler): - addHandler = False - if addHandler: - self.log.setLevel("DEBUG") - handler = logging.StreamHandler(sys.stdout) - handler.setLevel("DEBUG") - self.log.addHandler(handler) - self.log.debug(f"Initialising {type(self).__name__}") - - #if kwargs.get('debug', self.debug) is True: + if self.log_level: + self._set_log_level() + + def _set_log_level(self): + import sys, logging + # Check we don't already have a StreamHandler + # and add one if necessary + addHandler = True + for handler in self.log.handlers: + if isinstance(handler, logging.StreamHandler): + addHandler = False + level = self.log_level + if addHandler: + self.log.setLevel(level) + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + self.log.addHandler(handler) @default("traefik_api_password") def _warn_empty_password(self): @@ -260,7 +263,8 @@ async def _setup_traefik_static_config(self): """ self.log.info("Setting up traefik's static config...") - self.static_config["log"] = { "level": self.traefik_log_level } + if self.traefik_log_level: + self.static_config["log"] = { "level": self.traefik_log_level } entryPoints = {} @@ -336,14 +340,10 @@ async def _setup_traefik_dynamic_config(self): } }) - - def _routespec_to_traefik_path(self, routespec): - path = self.validate_routespec(routespec) - if path != "/" and path.endswith("/"): - path = path.rstrip("/") - return path - - def _routespec_from_traefik_path(self, routespec): + def validate_routespec(self, routespec): + """Override jupyterhub's default Proxy.validate_routespec method, as traefik + can set router rule's on both Host and PathPrefix rules combined. + """ if not routespec.endswith("/"): routespec = routespec + "/" return routespec diff --git a/jupyterhub_traefik_proxy/traefik_utils.py b/jupyterhub_traefik_proxy/traefik_utils.py index 62d64335..9dca25ae 100644 --- a/jupyterhub_traefik_proxy/traefik_utils.py +++ b/jupyterhub_traefik_proxy/traefik_utils.py @@ -27,12 +27,11 @@ def generate_rule(routespec): routespec = unquote(routespec) if routespec.startswith("/"): # Path-based route, e.g. /proxy/path/ - rule = "PathPrefix(`{0}`)".format(routespec) + rule = f"PathPrefix(`{routespec}`)" else: # Host-based routing, e.g. host.tld/proxy/path/ host, path_prefix = routespec.split("/", 1) - path_prefix = "/" + path_prefix - rule = "Host(`{0}`) && PathPrefix(`{1}`)".format(host, path_prefix) + rule = f"Host(`{host}`) && PathPrefix(`/{path_prefix}`)" return rule @@ -51,12 +50,6 @@ def generate_service_entry( proxy, service_alias, separator="/", url=False): service_entry += separator + "url" return service_entry -def generate_service_weight_entry( proxy, service_alias, separator="/"): - return separator.join( - [proxy.kv_traefik_prefix, "http", "services", service_alias, - "weighted", "services", "0", "weight"] - ) - def generate_router_service_entry(proxy, router_alias): return "/".join( @@ -87,7 +80,6 @@ def generate_route_keys(proxy, routespec, separator="/"): [ "service_alias", "service_url_path", - #"service_weight_path", "router_alias", "router_service_path", "router_rule_path", @@ -97,8 +89,6 @@ def generate_route_keys(proxy, routespec, separator="/"): if separator != ".": service_url_path = generate_service_entry(proxy, service_alias, url=True) router_rule_path = generate_router_rule_entry(proxy, router_alias) - #service_weight_path = generate_service_entry(proxy, service_alias, weight=True) - #service_weight_path = generate_service_weight_entry(proxy, service_alias) router_service_path = generate_router_service_entry(proxy, router_alias) else: service_url_path = generate_service_entry( @@ -107,13 +97,11 @@ def generate_route_keys(proxy, routespec, separator="/"): router_rule_path = generate_router_rule_entry( proxy, router_alias, separator=separator ) - #service_weight_path = "" router_service_path = "" return RouteKeys( service_alias, service_url_path, - #service_weight_path, router_alias, router_service_path, router_rule_path, diff --git a/tests/conftest.py b/tests/conftest.py index 71ba58d2..6dcd6379 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,13 +27,18 @@ class Config: # commandss, e.g. txn # Must be passed to the env parameter of any subprocess.Popen call that runs # etcdctl - etcdctl_env = os.environ.copy().update({"ETCDCTL_API": "3"}) + etcdctl_env = dict(os.environ, ETCDCTL_API="3") + # Etcd3 auth login credentials etcd_password = "secret" etcd_user = "root" + # Consol auth login credentials consul_token = "secret" + # Traefik api auth login credentials + traefik_api_user = "api_admin" + traefik_api_pass = "admin" # Define a "slow" test marker so that we can run the slow tests at the end # ref: https://docs.pytest.org/en/6.0.1/example/simple.html#control-skipping-of-tests-according-to-command-line-option @@ -66,11 +71,11 @@ async def no_auth_consul_proxy(launch_consul): """ proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -85,12 +90,12 @@ async def auth_consul_proxy(launch_consul_acl): """ proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_password=Config.consul_token, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -98,18 +103,18 @@ async def auth_consul_proxy(launch_consul_acl): @pytest.fixture -async def no_auth_etcd_proxy(launch_etcd): +async def no_auth_etcd_proxy(launch_etcd, wait_for_etcd): """ Fixture returning a configured TraefikEtcdProxy. No etcd authentication. """ proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -124,13 +129,13 @@ async def auth_etcd_proxy(launch_etcd_auth): """ proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_username="root", kv_password=Config.etcd_password, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -177,11 +182,11 @@ def _file_proxy(dynamic_config_file, **kwargs): ) return TraefikFileProviderProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, dynamic_config_file = dynamic_config_file, check_route_timeout=60, - debug=True, + log_level='DEBUG', **kwargs ) @@ -208,8 +213,8 @@ async def external_file_proxy_toml(launch_traefik_file): async def external_consul_proxy(launch_consul, configure_consul, launch_traefik_consul): proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=False, debug=True @@ -218,11 +223,11 @@ async def external_consul_proxy(launch_consul, configure_consul, launch_traefik_ @pytest.fixture -def auth_external_consul_proxy(launch_consul_acl, configure_consul_auth, launch_traefik_consul_auth): +async def auth_external_consul_proxy(launch_consul_acl, configure_consul_auth, launch_traefik_consul_auth): proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_password=Config.consul_token, check_route_timeout=45, should_start=False, @@ -232,32 +237,33 @@ def auth_external_consul_proxy(launch_consul_acl, configure_consul_auth, launch_ @pytest.fixture -def external_etcd_proxy(launch_etcd, configure_etcd, launch_traefik_etcd): +async def external_etcd_proxy(launch_etcd, configure_etcd, launch_traefik_etcd): proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=False, - debug=True + log_level="DEBUG" ) yield proxy + proxy.kv_client.close() @pytest.fixture def auth_external_etcd_proxy(launch_etcd_auth, configure_etcd_auth, launch_traefik_etcd_auth): proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_password=Config.etcd_password, kv_username="root", check_route_timeout=45, should_start=False, - debug=True + log_level="DEBUG" ) - #traefik_process = configure_and_launch_traefik(provider="etcd", password=Config.etcd_password) yield proxy + proxy.kv_client.close() @@ -267,7 +273,7 @@ def auth_external_etcd_proxy(launch_etcd_auth, configure_etcd_auth, launch_traef ######################################################################### @pytest.fixture -async def launch_traefik_file(): +def launch_traefik_file(): args = ("--configfile", "./tests/config_files/traefik.toml") print(f"\nLAUNCHING TRAEFIK with args: {args}\n") proc = _launch_traefik(*args) @@ -276,31 +282,32 @@ async def launch_traefik_file(): @pytest.fixture -async def launch_traefik_etcd(): - proc = _launch_traefik_cli("--providers.etcd") +def launch_traefik_etcd(): + env = Config.etcdctl_env + proc = _launch_traefik_cli("--providers.etcd", env=env) yield proc shutdown_traefik(proc) @pytest.fixture -async def launch_traefik_etcd_auth(): +def launch_traefik_etcd_auth(): extra_args = ( "--providers.etcd.username=" + Config.etcd_user, "--providers.etcd.password=" + Config.etcd_password ) - proc = _launch_traefik_cli(*extra_args) + proc = _launch_traefik_cli(*extra_args, env=Config.etcdctl_env) yield proc shutdown_traefik(proc) @pytest.fixture -async def launch_traefik_consul(): +def launch_traefik_consul(): proc = _launch_traefik_cli("--providers.consul") yield proc shutdown_traefik(proc) @pytest.fixture -async def launch_traefik_consul_auth(): +def launch_traefik_consul_auth(): extra_args = ( "--providers.consul.username=root", "--providers.consul.password=" + Config.consul_token @@ -336,7 +343,7 @@ def _launch_traefik(*extra_args, env=None): ################################## @pytest.fixture -def configure_etcd(): +def configure_etcd(wait_for_etcd): """Load traefik api rules into the etcd kv store""" yield _config_etcd() @@ -358,7 +365,7 @@ def _config_etcd(*extra_args): proc.wait() @pytest.fixture -def _enable_auth_in_etcd(): +def enable_auth_in_etcd(): user = Config.etcd_user pw = Config.etcd_password subprocess.call(["etcdctl", "user", "add", f"{user}:{pw}"], env=Config.etcdctl_env) @@ -382,25 +389,18 @@ def _enable_auth_in_etcd(): @pytest.fixture -def launch_etcd_auth(launch_etcd, _enable_auth_in_etcd): +def launch_etcd_auth(launch_etcd, wait_for_etcd, enable_auth_in_etcd): yield @pytest.fixture() -async def launch_etcd(): - etcd_proc = subprocess.Popen(["etcd", "--debug"]) - await _wait_for_etcd() +def launch_etcd(): + etcd_proc = subprocess.Popen(["etcd", "--log-level=debug"]) yield etcd_proc - terminate_process(etcd_proc, timeout=15) - - # There have been cases where default.etcd didn't exist... - # Not sure why, but guess it doesn't really matter, just - # check to be safe. - default_etcd = os.path.join(os.getcwd(), "default.etcd") - if os.path.exists(default_etcd): - shutil.rmtree(default_etcd) + shutdown_etcd(etcd_proc) -async def _wait_for_etcd(user=None, pw=None): +@pytest.fixture +def wait_for_etcd(): """Etcd may not be ready if we jump straight into the tests. Make sure it's running before we continue with configuring it or running tests against it. @@ -409,31 +409,20 @@ async def _wait_for_etcd(user=None, pw=None): proxy classes. """ import etcd3 - async def _check_etcd(): - try: - cli = etcd3.client( - user=user, - password=pw - ) - routes = cli.get_prefix('/') - except Exception as e: - print(f"Etcd not up: {e}") - return False - - print( "Etcd is up!" ) - return True - - await exponential_backoff( - _check_etcd, - "Etcd not available", - timeout=20, + assert ( + "is healthy" in + subprocess.check_output( + ["etcdctl", "endpoint", "health"], + env=Config.etcdctl_env, + stderr=subprocess.STDOUT + ).decode(sys.stdout.encoding) ) #@pytest.fixture(scope="function", autouse=True) # Is this referenced anywhere?? #@pytest.fixture -#def clean_etcd(): -# subprocess.run(["etcdctl", "del", '""', "--from-key=true"], env=Config.etcdctl_env) +def clean_etcd(): + subprocess.run(["etcdctl", "del", '""', "--from-key=true"], env=Config.etcdctl_env) # Consul Launchers and configurers # @@ -446,7 +435,7 @@ async def launch_consul(): ) await _wait_for_consul() yield consul_proc - await shutdown_consul(consul_proc) + shutdown_consul(consul_proc) @pytest.fixture @@ -462,7 +451,7 @@ async def launch_consul_acl(): await _wait_for_consul(token=Config.consul_token) yield consul_proc - await shutdown_consul(consul_proc, secret=Config.consul_token) + shutdown_consul(consul_proc, secret=Config.consul_token) shutil.rmtree(os.getcwd() + "/consul.data") @@ -492,13 +481,13 @@ async def _check_consul(): @pytest.fixture -async def configure_consul(): +def configure_consul(): """Load an initial config into the consul KV store""" yield _config_consul() @pytest.fixture -async def configure_consul_auth(): +def configure_consul_auth(): """Load an initial config into the consul KV store, using authentication""" yield _config_consul(secret=Config.consul_token) @@ -533,7 +522,7 @@ def _config_consul(secret=None): # Teardown functions # ######################################################################### -async def shutdown_consul(consul_proc, secret=None): +def shutdown_consul(consul_proc, secret=None): # For some reason, without running `consul leave`, subsequent consul tests fail consul_env = None if secret is not None: @@ -542,6 +531,17 @@ async def shutdown_consul(consul_proc, secret=None): subprocess.call(["consul", "leave"], env=consul_env) terminate_process(consul_proc, timeout=30) +def shutdown_etcd(etcd_proc): + clean_etcd() + terminate_process(etcd_proc, timeout=20) + + # There have been cases where default.etcd didn't exist... + # Not sure why, but guess it doesn't really matter, just + # check to be safe and remove it if there. + default_etcd = os.path.join(os.getcwd(), "default.etcd") + if os.path.exists(default_etcd): + shutil.rmtree(default_etcd) + def shutdown_traefik(traefik_process): terminate_process(traefik_process) diff --git a/tests/proxytest.py b/tests/proxytest.py index c6243697..8bfc17d7 100644 --- a/tests/proxytest.py +++ b/tests/proxytest.py @@ -20,6 +20,9 @@ import websockets +import pprint +pp = pprint.PrettyPrinter(indent=2) + class MockApp: def __init__(self): self.hub = Hub(routespec="/") @@ -73,6 +76,13 @@ def __init__(self, name): def _new_spawner(self, spawner_name, **kwargs): return MockSpawner(spawner_name, user=self, **kwargs) +def assert_equal(value, expected): + try: + assert value == expected + except AssertionError: + pp.pprint({'value': value}) + pp.pprint({"expected": expected}) + raise @pytest.fixture def launch_backend(): @@ -192,10 +202,12 @@ async def test_route_exist(spec, backend): if not expect_value_error(spec): try: del( route["data"]["last_activity"] ) # CHP + except TypeError as e: + raise TypeError(f"{e}\nRoute got:{route}") except KeyError: pass - assert route == expected_output(spec, backend.geturl()) + assert_equal(route, expected_output(spec, backend.geturl())) # Test the actual routing responding_backend1 = await utils.get_responding_backend_port( @@ -204,10 +216,8 @@ async def test_route_exist(spec, backend): responding_backend2 = await utils.get_responding_backend_port( proxy_url, normalize_spec(spec) + "something" ) - assert ( - responding_backend1 == backend.port - and responding_backend2 == backend.port - ) + assert_equal(responding_backend1, backend.port) + assert_equal(responding_backend2, backend.port) for i, spec in enumerate(existing_routes, start=1): backend = default_backend._replace( @@ -226,8 +236,8 @@ async def test_route_exist(spec, backend): for i, spec in enumerate(existing_routes): try: await proxy.add_route(spec, extra_backends[i].geturl(), copy.copy(data)) - except Exception: - pass + except Exception as e: + raise type(e)(f"{e}\nProblem adding Route {spec}") def finalizer(): async def cleanup(): @@ -258,7 +268,7 @@ async def cleanup(): # Test that deleted route does not exist anymore if not expect_value_error(routespec): - assert route == None + assert_equal(route, None) async def _wait_for_deletion(): deleted = 0 @@ -327,7 +337,7 @@ async def test_get_all_routes(proxy, launch_backend): except KeyError: pass - assert routes == expected_output + assert_equal(routes, expected_output) async def test_host_origin_headers(proxy, launch_backend): @@ -369,8 +379,8 @@ async def test_host_origin_headers(proxy, launch_backend): ) resp = await AsyncHTTPClient().fetch(req) - assert resp.headers["Host"] == expected_host_header - assert resp.headers["Origin"] == expected_origin_header + assert_equal(resp.headers["Host"], expected_host_header) + assert_equal(resp.headers["Origin"], expected_origin_header) @pytest.mark.parametrize("username", ["zoe", "50fia", "秀樹", "~TestJH", "has@"])