diff --git a/jupyterhub_traefik_proxy/consul.py b/jupyterhub_traefik_proxy/consul.py index 6461b956..5865746c 100644 --- a/jupyterhub_traefik_proxy/consul.py +++ b/jupyterhub_traefik_proxy/consul.py @@ -111,7 +111,6 @@ def _stop_traefik(self): os.environ.pop("CONSUL_HTTP_TOKEN") async def persist_dynamic_config(self): - self.log.debug("Saving dynamic config to consul store") data = self.flatten_dict_for_kv( self.dynamic_config, prefix=self.kv_traefik_prefix ) @@ -128,7 +127,6 @@ def append_payload(key, val): append_payload(k, v) try: - self.log.debug(f"Uploading payload to KV store. Payload: {repr(payload)}") results = await self.kv_client.txn.put(payload=payload) status = 1 response = "" @@ -140,17 +138,13 @@ def append_payload(key, val): else: self.log.debug("Successfully uploaded payload to KV store") - # Let's check if it's in there then... - #index, result = await self.kv_client.kv.get(k) - #self.log.debug(f"And the survey says, at {k} we have: {result}") return status, response async def _kv_atomic_add_route_parts( self, jupyterhub_routespec, target, data, route_keys, rule ): - escaped_target = escapism.escape(target, safe=self.key_safe_chars) - escaped_jupyterhub_routespec = escapism.escape( - jupyterhub_routespec, safe=self.key_safe_chars + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] ) try: @@ -158,14 +152,14 @@ async def _kv_atomic_add_route_parts( { "KV": { "Verb": "set", - "Key": escaped_jupyterhub_routespec, + "Key": jupyterhub_routespec, "Value": base64.b64encode(target.encode()).decode(), } }, { "KV": { "Verb": "set", - "Key": escaped_target, + "Key": jupyterhub_target, "Value": base64.b64encode(data.encode()).decode(), } }, @@ -176,13 +170,6 @@ async def _kv_atomic_add_route_parts( "Value": base64.b64encode(target.encode()).decode(), } }, - #{ - # "KV": { - # "Verb": "set", - # "Key": route_keys.service_weight_path, - # "Value": base64.b64encode(b"1").decode(), - # } - #}, { "KV": { "Verb": "set", @@ -211,26 +198,24 @@ async def _kv_atomic_add_route_parts( return status, response async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): - escaped_jupyterhub_routespec = escapism.escape( - jupyterhub_routespec, safe=self.key_safe_chars - ) - index, v = await self.kv_client.kv.get(escaped_jupyterhub_routespec) + index, v = await self.kv_client.kv.get(jupyterhub_routespec) if v is None: self.log.warning( "Route %s doesn't exist. Nothing to delete", jupyterhub_routespec ) return True, None target = v["Value"] - escaped_target = escapism.escape(target, safe=self.key_safe_chars) + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) try: status, response = await self.kv_client.txn.put( payload=[ - {"KV": {"Verb": "delete", "Key": escaped_jupyterhub_routespec}}, - {"KV": {"Verb": "delete", "Key": escaped_target}}, + {"KV": {"Verb": "delete", "Key": jupyterhub_routespec}}, + {"KV": {"Verb": "delete", "Key": jupyterhub_target}}, {"KV": {"Verb": "delete", "Key": route_keys.service_url_path}}, - #{"KV": {"Verb": "delete", "Key": route_keys.service_weight_path}}, {"KV": {"Verb": "delete", "Key": route_keys.router_service_path}}, {"KV": {"Verb": "delete", "Key": route_keys.router_rule_path}}, ] @@ -244,17 +229,13 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): return status, response async def _kv_get_target(self, jupyterhub_routespec): - escaped_jupyterhub_routespec = escapism.escape( - jupyterhub_routespec, safe=self.key_safe_chars - ) - _, res = await self.kv_client.kv.get(escaped_jupyterhub_routespec) + _, res = await self.kv_client.kv.get(jupyterhub_routespec) if res is None: return None return res["Value"].decode() async def _kv_get_data(self, target): - escaped_target = escapism.escape(target, safe=self.key_safe_chars) - _, res = await self.kv_client.kv.get(escaped_target) + _, res = await self.kv_client.kv.get(target) if res is None: return None @@ -264,10 +245,18 @@ async def _kv_get_route_parts(self, kv_entry): key = escapism.unescape(kv_entry["KV"]["Key"]) value = kv_entry["KV"]["Value"] - # Strip the "/jupyterhub" prefix from the routespec - routespec = key.replace(self.kv_jupyterhub_prefix, "") + # Strip the "jupyterhub/routes/" prefix from the routespec + route_prefix = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes/"] + ) + routespec = key.replace(route_prefix, "") + target = base64.b64decode(value.encode()).decode() - data = await self._kv_get_data(target) + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) + + data = await self._kv_get_data(jupyterhub_target) return routespec, target, data @@ -277,9 +266,10 @@ async def _kv_get_jupyterhub_prefixed_entries(self): { "KV": { "Verb": "get-tree", - "Key": escapism.escape( - self.kv_jupyterhub_prefix, safe=self.key_safe_chars - ), + "Key": f"{self.kv_jupyterhub_prefix}/routes" + #escapism.escape( + # self.kv_jupyterhub_prefix, safe=self.key_safe_chars + #)+ "/routes", } } ] diff --git a/jupyterhub_traefik_proxy/etcd.py b/jupyterhub_traefik_proxy/etcd.py index 8afa9f7d..63cf4ab0 100644 --- a/jupyterhub_traefik_proxy/etcd.py +++ b/jupyterhub_traefik_proxy/etcd.py @@ -19,15 +19,13 @@ # Distributed under the terms of the Modified BSD License. from concurrent.futures import ThreadPoolExecutor -import json -import os +import escapism from urllib.parse import urlparse from tornado.concurrent import run_on_executor from traitlets import Any, default, Unicode from jupyterhub.utils import maybe_future -from . import traefik_utils from jupyterhub_traefik_proxy import TKvProxy @@ -78,23 +76,23 @@ def _default_client(self): import etcd3 except ImportError: raise ImportError("Please install etcd3 package to use traefik-proxy with etcd3") + kwargs = { + 'host': etcd_service.hostname, + 'port': etcd_service.port, + 'ca_cert': self.etcd_client_ca_cert, + 'cert_cert': self.etcd_client_cert_crt, + 'cert_key': self.etcd_client_cert_key, + } if self.kv_password: - return etcd3.client( - host=str(etcd_service.hostname), - port=etcd_service.port, - user=self.kv_username, - password=self.kv_password, - ca_cert=self.etcd_client_ca_cert, - cert_cert=self.etcd_client_cert_crt, - cert_key=self.etcd_client_cert_key, - ) - return etcd3.client( - host=str(etcd_service.hostname), - port=etcd_service.port, - ca_cert=self.etcd_client_ca_cert, - cert_cert=self.etcd_client_cert_crt, - cert_key=self.etcd_client_cert_key, - ) + kwargs.update({ + 'user': self.kv_username, + 'password': self.kv_password + }) + return etcd3.client(**kwargs) + + def _clean_resources(self): + super()._clean_resources() + self.kv_client.close() @run_on_executor def _etcd_transaction(self, success_actions): @@ -119,7 +117,7 @@ def _define_kv_specific_static_config(self): self.static_config.update({"providers" : { "etcd" : { "endpoints": [url.netloc], - "rootKey": self.kv_traefik_prefix, # Is rootKey the new prefix? + "rootKey": self.kv_traefik_prefix, } } }) if self.kv_username and self.kv_password: @@ -131,14 +129,13 @@ def _define_kv_specific_static_config(self): async def _kv_atomic_add_route_parts( self, jupyterhub_routespec, target, data, route_keys, rule ): + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) success = [ self.kv_client.transactions.put(jupyterhub_routespec, target), - self.kv_client.transactions.put(target, data), + self.kv_client.transactions.put(jupyterhub_target, data), self.kv_client.transactions.put(route_keys.service_url_path, target), - # The weight is used to balance services, not servers. - # Traefik by default will use round-robin load-balancing anyway. - # See: https://doc.traefik.io/traefik/routing/services/#load-balancing - #self.kv_client.transactions.put(route_keys.service_weight_path, "1"), self.kv_client.transactions.put( route_keys.router_service_path, route_keys.service_alias ), @@ -151,15 +148,17 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): value = await maybe_future(self._etcd_get(jupyterhub_routespec)) if value is None: self.log.warning( - "Route {jupyterhub_routespec} doesn't exist. Nothing to delete" + f"Route {jupyterhub_routespec} doesn't exist. Nothing to delete" ) return True, None - target = value.decode() + jupyterhub_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(value.decode())] + ) success = [ self.kv_client.transactions.delete(jupyterhub_routespec), - self.kv_client.transactions.delete(target), + self.kv_client.transactions.delete(jupyterhub_target), self.kv_client.transactions.delete(route_keys.service_url_path), #self.kv_client.transactions.delete(route_keys.service_weight_path), self.kv_client.transactions.delete(route_keys.router_service_path), @@ -170,7 +169,7 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys): async def _kv_get_target(self, jupyterhub_routespec): value = await maybe_future(self._etcd_get(jupyterhub_routespec)) - if value == None: + if value is None: return None return value.decode() @@ -182,17 +181,23 @@ async def _kv_get_data(self, target): async def _kv_get_route_parts(self, kv_entry): key = kv_entry[1].key.decode() - value = kv_entry[0] + value = kv_entry[0].decode() - # Strip the "/jupyterhub" prefix from the routespec - routespec = key.replace(self.kv_jupyterhub_prefix, "") - target = value.decode() - data = await self._kv_get_data(target) + # Strip the "/jupyterhub/routes/" prefix from the routespec and unescape it + sep = self.kv_separator + route_prefix = sep.join([self.kv_jupyterhub_prefix, "routes" + sep]) + target_prefix = sep.join([self.kv_jupyterhub_prefix, "targets" + sep]) + routespec = escapism.unescape(key.replace(route_prefix, "", 1)) + etcd_target = sep.join([target_prefix, escapism.escape(value)]) + target = escapism.unescape(etcd_target.replace(target_prefix, "", 1)) + data = await self._kv_get_data(etcd_target) return routespec, target, data async def _kv_get_jupyterhub_prefixed_entries(self): - routes = await maybe_future(self._etcd_get_prefix(self.kv_jupyterhub_prefix)) + sep = self.kv_separator + routespecs_prefix = sep.join([self.kv_jupyterhub_prefix, "routes" + sep]) + routes = await maybe_future(self._etcd_get_prefix(routespecs_prefix)) return routes async def persist_dynamic_config(self): diff --git a/jupyterhub_traefik_proxy/fileprovider.py b/jupyterhub_traefik_proxy/fileprovider.py index 091fd7a4..1e3d5e33 100644 --- a/jupyterhub_traefik_proxy/fileprovider.py +++ b/jupyterhub_traefik_proxy/fileprovider.py @@ -18,10 +18,8 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. -import json import os import asyncio -import string import escapism from traitlets import Any, default, Unicode, observe @@ -61,7 +59,6 @@ def _set_dynamic_config_file(self, change): def __init__(self, **kwargs): super().__init__(**kwargs) - #self._set_dynamic_config_file(None) try: # Load initial dynamic config from disk self.dynamic_config = self.dynamic_config_handler.load() @@ -106,7 +103,7 @@ def _clean_resources(self): def _get_route_unsafe(self, traefik_routespec): service_alias = traefik_utils.generate_alias(traefik_routespec, "service") router_alias = traefik_utils.generate_alias(traefik_routespec, "router") - routespec = self._routespec_from_traefik_path(traefik_routespec) + routespec = self.validate_routespec(traefik_routespec) result = {"data": None, "target": None, "routespec": routespec} def get_target_data(d, to_find): @@ -133,8 +130,6 @@ def get_target_data(d, to_find): if result["data"] is None and result["target"] is None: self.log.info(f"No route for {routespec} found!") result = None - self.log.debug(f"traefik routespec: {traefik_routespec}") - self.log.debug(f"result for routespec {routespec}:-\n{result}") return result async def start(self): @@ -179,11 +174,9 @@ async def add_route(self, routespec, target, data): The proxy implementation should also have a way to associate the fact that a route came from JupyterHub. """ - self.log.debug(f"\tTraefikFileProviderProxy.add_route: Adding {routespec} for {target}") - traefik_routespec = self._routespec_to_traefik_path(routespec) + traefik_routespec = self.validate_routespec(routespec) service_alias = traefik_utils.generate_alias(traefik_routespec, "service") router_alias = traefik_utils.generate_alias(traefik_routespec, "router") - #data = json.dumps(data) rule = traefik_utils.generate_rule(traefik_routespec) async with self.mutex: @@ -222,9 +215,6 @@ async def add_route(self, routespec, target, data): } self.persist_dynamic_config() - self.log.debug(f"traefik routespec: {traefik_routespec}") - self.log.debug(f"data for routespec {routespec}:-\n{data}") - if self.should_start: try: # Check if traefik was launched @@ -247,17 +237,17 @@ async def delete_route(self, routespec): **Subclasses must define this method** """ - routespec = self._routespec_to_traefik_path(routespec) + routespec = self.validate_routespec(routespec) service_alias = traefik_utils.generate_alias(routespec, "service") router_alias = traefik_utils.generate_alias(routespec, "router") async with self.mutex: + # Pop each entry and if it's the last one, delete the key self.dynamic_config["http"]["routers"].pop(router_alias, None) self.dynamic_config["http"]["services"].pop(service_alias, None) self.dynamic_config["jupyter"]["routers"].pop(router_alias, None) - # If empty, delete the keys if not self.dynamic_config["http"]["routers"]: self.dynamic_config["http"].pop("routers") if not self.dynamic_config["http"]["services"]: @@ -293,7 +283,7 @@ async def get_all_routes(self): continue escaped_routespec = "".join(router.split("_", 1)[1:]) traefik_routespec = escapism.unescape(escaped_routespec) - routespec = self._routespec_from_traefik_path(traefik_routespec) + routespec = self.validate_routespec(traefik_routespec) all_routes.update({ routespec : self._get_route_unsafe(traefik_routespec) }) @@ -320,7 +310,7 @@ async def get_route(self, routespec): None: if there are no routes matching the given routespec """ - routespec = self._routespec_to_traefik_path(routespec) + routespec = self.validate_routespec(routespec) async with self.mutex: return self._get_route_unsafe(routespec) diff --git a/jupyterhub_traefik_proxy/install.py b/jupyterhub_traefik_proxy/install.py index ddc82824..ac49a699 100644 --- a/jupyterhub_traefik_proxy/install.py +++ b/jupyterhub_traefik_proxy/install.py @@ -28,8 +28,8 @@ "https://github.com/etcd-io/etcd/releases/download/v3.4.15/etcd-v3.4.15-darwin-amd64.tar.gz": "c596709069193bffc639a22558bdea4d801128e635909ea01a6fd5b5c85da729", "https://github.com/etcd-io/etcd/releases/download/v3.3.10/etcd-v3.3.10-linux-amd64.tar.gz": "1620a59150ec0a0124a65540e23891243feb2d9a628092fb1edcc23974724a45", "https://github.com/etcd-io/etcd/releases/download/v3.3.10/etcd-v3.3.10-darwin-amd64.tar.gz": "fac4091c7ba6f032830fad7809a115909d0f0cae5cbf5b34044540def743577b", - "https://github.com/etcd-io/etcd/releases/download/v3.2.25/etcd-v3.2.25-linux-amd64.tar.gz": "8a509ffb1443088d501f19e339a0d9c0058ce20599752c3baef83c1c68790ef7", - "https://github.com/etcd-io/etcd/releases/download/v3.2.25/etcd-v3.2.25-darwin-amd64.tar.gz": "9950684a01d7431bc12c3dba014f222d55a862c6f8af64c09c42d7a59ed6790d", + "https://github.com/etcd-io/etcd/releases/download/v3.2.26/etcd-v3.2.26-linux-amd64.tar.gz": "127d4f2097c09d929beb9d3784590cc11102f4b4d4d4da7ad82d5c9e856afd38", + "https://github.com/etcd-io/etcd/releases/download/v3.2.26/etcd-v3.2.26-darwin-amd64.zip": "0393e650ffa3e61b1fd07c61f8c78af1556896c300c9814545ff0e91f52c3513", } checksums_consul = { diff --git a/jupyterhub_traefik_proxy/kv_proxy.py b/jupyterhub_traefik_proxy/kv_proxy.py index cc69629e..17dd5f2d 100644 --- a/jupyterhub_traefik_proxy/kv_proxy.py +++ b/jupyterhub_traefik_proxy/kv_proxy.py @@ -18,6 +18,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +import escapism import json import os @@ -38,8 +39,6 @@ class TKvProxy(TraefikProxy): kv_client = Any() # Key-value store client - #kv_name = Unicode(config=False, help="""The name of the key value store""") - kv_username = Unicode( config=True, help="""The username for key value store login""" ) @@ -170,8 +169,8 @@ async def _kv_get_route_parts(self, kv_entry): from the key-value store given a `kv_entry`. A `kv_entry` is a key-value store entry where the key starts with - `proxy.jupyterhub_prefix`. It is expected that only the routespecs - will be prefixed with `proxy.jupyterhub_prefix` when added to the kv store. + `proxy.kv_jupyterhub_prefix`. It is expected that only the routespecs + will be prefixed with `proxy.kv_jupyterhub_prefix` when added to the kv store. **Subclasses must define this method** @@ -186,15 +185,15 @@ async def _kv_get_route_parts(self, kv_entry): async def _kv_get_jupyterhub_prefixed_entries(self): """Retrive from the kv store all the key-value pairs where the key starts with - `proxy.jupyterhub_prefix`. - It is expected that only the routespecs will be prefixed with `proxy.jupyterhub_prefix` + `proxy.kv_jupyterhub_prefix`. + It is expected that only the routespecs will be prefixed with `proxy.kv_jupyterhub_prefix` when added to the kv store. **Subclasses must define this method** Returns: 'routes': A list of key-value store entries where the keys start - with `proxy.jupyterhub_prefix`. + with `proxy.kv_jupyterhub_prefix`. """ raise NotImplementedError() @@ -208,12 +207,10 @@ def _clean_resources(self): raise async def _setup_traefik_static_config(self): - self.log.debug("Setup the KV-specific static config") self._define_kv_specific_static_config() await super()._setup_traefik_static_config() async def _setup_traefik_dynamic_config(self): - self.log.info("Loading traefik dynamic config into kv store.") await super()._setup_traefik_dynamic_config() await self.persist_dynamic_config() @@ -244,13 +241,13 @@ async def add_route(self, routespec, target, data): Will raise an appropriate Exception (FIXME: find what?) if the route could not be added. - This proxy implementation prefixes the routespec with `proxy.jupyterhub_prefix` when + This proxy implementation prefixes the routespec with `proxy.kv_jupyterhub_prefix` when adding it to the kv store in orde to associate the fact that the route came from JupyterHub. Everything traefik related is prefixed with `proxy.traefik_prefix`. """ - self.log.info("Adding route for %s to %s.", routespec, target) + self.log.debug("Adding route for %s to %s.", routespec, target) - routespec = self._routespec_to_traefik_path(routespec) + routespec = self.validate_routespec(routespec) route_keys = traefik_utils.generate_route_keys(self, routespec, separator=self.kv_separator) # Store the data dict passed in by JupyterHub @@ -259,7 +256,9 @@ async def add_route(self, routespec, target, data): rule = traefik_utils.generate_rule(routespec) # To be able to delete the route when only routespec is provided - jupyterhub_routespec = self.kv_jupyterhub_prefix + routespec + jupyterhub_routespec = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)] + ) status, response = await self._kv_atomic_add_route_parts( jupyterhub_routespec, target, data, route_keys, rule @@ -295,8 +294,10 @@ async def delete_route(self, routespec): """Delete a route and all the traefik related info associated given a routespec, (if it exists). """ - routespec = self._routespec_to_traefik_path(routespec) - jupyterhub_routespec = self.kv_jupyterhub_prefix + routespec + routespec = self.validate_routespec(routespec) + jupyterhub_routespec = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)] + ) route_keys = traefik_utils.generate_route_keys(self, routespec, separator=self.kv_separator) status, response = await self._kv_atomic_delete_route_parts( @@ -327,7 +328,7 @@ async def get_all_routes(self): for kv_entry in routes: traefik_routespec, target, data = await self._kv_get_route_parts(kv_entry) - routespec = self._routespec_from_traefik_path(traefik_routespec) + routespec = self.validate_routespec(traefik_routespec) all_routes[routespec] = { "routespec": routespec, "target": target, @@ -357,13 +358,17 @@ async def get_route(self, routespec): None: if there are no routes matching the given routespec """ routespec = self.validate_routespec(routespec) - traefik_routespec = self._routespec_to_traefik_path(routespec) - jupyterhub_routespec = self.kv_jupyterhub_prefix + traefik_routespec + jupyterhub_routespec = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "routes", escapism.escape(routespec)] + ) target = await self._kv_get_target(jupyterhub_routespec) if target is None: return None - data = await self._kv_get_data(target) + traefik_target = self.kv_separator.join( + [self.kv_jupyterhub_prefix, "targets", escapism.escape(target)] + ) + data = await self._kv_get_data(traefik_target) return { "routespec": routespec, @@ -376,16 +381,32 @@ def flatten_dict_for_kv(self, data, prefix='traefik'): prefixing each key with :arg:`prefix` and joining each key with `self.kv_separator`. - e.g. flatten_dict_for_kv( {'x' : {'y' : {'z': 'a'} }, {'foo': 'bar'} } ) + If the final value is a `list`, then the provided bottom-level key + shall be appended with an incrementing numeric number, in the style + that is used by traefik's KV store, e.g. + + flatten_dict_for_kv({ + 'x' : { + 'y' : { + 'z': 'a' + } + }, { + 'foo': 'bar' + }, + 'baz': [ 'a', 'b', 'c' ] + }) Returns: result (dict): { - 'traefik.x.y.z' : 'a', - 'traefik.x.foo': 'bar' + 'traefik/x/y/z' : 'a', + 'traefik/x/foo': 'bar', + 'traefik/baz/0': 'a', + 'traefik/baz/1': 'b', + 'traefik/baz/2': 'c', } - - Ref: Taken from https://stackoverflow.com/a/6027615 + + Ref: Inspired by https://stackoverflow.com/a/6027615 """ sep = self.kv_separator items = {} @@ -393,15 +414,11 @@ def flatten_dict_for_kv(self, data, prefix='traefik'): new_key = prefix + sep + k if prefix else k if isinstance(v, MutableMapping): items.update(self.flatten_dict_for_kv(v, prefix=new_key)) - #else: - #items.update({new_key: v}) elif isinstance(v, str): items.update({new_key: v}) elif isinstance(v, list): for n, item in enumerate(v): items.update({ f"{new_key}{sep}{n}" : item }) - #items.update({new_key: ", ".join(v)}) - #transations.append(self.kv_client.transactions.put(k, ", ".join(v))) else: raise ValueError(f"Cannot upload {v} of type {type(v)} to etcd store") return items diff --git a/jupyterhub_traefik_proxy/proxy.py b/jupyterhub_traefik_proxy/proxy.py index 98e07ee9..c7e8df5a 100644 --- a/jupyterhub_traefik_proxy/proxy.py +++ b/jupyterhub_traefik_proxy/proxy.py @@ -58,7 +58,8 @@ class TraefikProxy(Proxy): debug = Bool(False, config=True, help="""Debug the proxy class?""") - traefik_log_level = Unicode("DEBUG", config=True, help="""traefik's log level""") + traefik_log_level = Unicode(config=True, help="""traefik's log level""") + log_level = Unicode(config=True, help="""The Proxy's log level""") traefik_api_password = Unicode( config=True, help="""The password for traefik api login""" @@ -70,21 +71,23 @@ class TraefikProxy(Proxy): def __init__(self, **kwargs): super().__init__(**kwargs) - if kwargs.get('debug', self.debug) == True: - import sys, logging - # Check we don't already have a StreamHandler - addHandler = True - for handler in self.log.handlers: - if isinstance(handler, logging.StreamHandler): - addHandler = False - if addHandler: - self.log.setLevel("DEBUG") - handler = logging.StreamHandler(sys.stdout) - handler.setLevel("DEBUG") - self.log.addHandler(handler) - self.log.debug(f"Initialising {type(self).__name__}") - - #if kwargs.get('debug', self.debug) is True: + if self.log_level: + self._set_log_level() + + def _set_log_level(self): + import sys, logging + # Check we don't already have a StreamHandler + # and add one if necessary + addHandler = True + for handler in self.log.handlers: + if isinstance(handler, logging.StreamHandler): + addHandler = False + level = self.log_level + if addHandler: + self.log.setLevel(level) + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + self.log.addHandler(handler) @default("traefik_api_password") def _warn_empty_password(self): @@ -260,7 +263,8 @@ async def _setup_traefik_static_config(self): """ self.log.info("Setting up traefik's static config...") - self.static_config["log"] = { "level": self.traefik_log_level } + if self.traefik_log_level: + self.static_config["log"] = { "level": self.traefik_log_level } entryPoints = {} @@ -336,14 +340,10 @@ async def _setup_traefik_dynamic_config(self): } }) - - def _routespec_to_traefik_path(self, routespec): - path = self.validate_routespec(routespec) - if path != "/" and path.endswith("/"): - path = path.rstrip("/") - return path - - def _routespec_from_traefik_path(self, routespec): + def validate_routespec(self, routespec): + """Override jupyterhub's default Proxy.validate_routespec method, as traefik + can set router rule's on both Host and PathPrefix rules combined. + """ if not routespec.endswith("/"): routespec = routespec + "/" return routespec diff --git a/jupyterhub_traefik_proxy/traefik_utils.py b/jupyterhub_traefik_proxy/traefik_utils.py index 62d64335..9dca25ae 100644 --- a/jupyterhub_traefik_proxy/traefik_utils.py +++ b/jupyterhub_traefik_proxy/traefik_utils.py @@ -27,12 +27,11 @@ def generate_rule(routespec): routespec = unquote(routespec) if routespec.startswith("/"): # Path-based route, e.g. /proxy/path/ - rule = "PathPrefix(`{0}`)".format(routespec) + rule = f"PathPrefix(`{routespec}`)" else: # Host-based routing, e.g. host.tld/proxy/path/ host, path_prefix = routespec.split("/", 1) - path_prefix = "/" + path_prefix - rule = "Host(`{0}`) && PathPrefix(`{1}`)".format(host, path_prefix) + rule = f"Host(`{host}`) && PathPrefix(`/{path_prefix}`)" return rule @@ -51,12 +50,6 @@ def generate_service_entry( proxy, service_alias, separator="/", url=False): service_entry += separator + "url" return service_entry -def generate_service_weight_entry( proxy, service_alias, separator="/"): - return separator.join( - [proxy.kv_traefik_prefix, "http", "services", service_alias, - "weighted", "services", "0", "weight"] - ) - def generate_router_service_entry(proxy, router_alias): return "/".join( @@ -87,7 +80,6 @@ def generate_route_keys(proxy, routespec, separator="/"): [ "service_alias", "service_url_path", - #"service_weight_path", "router_alias", "router_service_path", "router_rule_path", @@ -97,8 +89,6 @@ def generate_route_keys(proxy, routespec, separator="/"): if separator != ".": service_url_path = generate_service_entry(proxy, service_alias, url=True) router_rule_path = generate_router_rule_entry(proxy, router_alias) - #service_weight_path = generate_service_entry(proxy, service_alias, weight=True) - #service_weight_path = generate_service_weight_entry(proxy, service_alias) router_service_path = generate_router_service_entry(proxy, router_alias) else: service_url_path = generate_service_entry( @@ -107,13 +97,11 @@ def generate_route_keys(proxy, routespec, separator="/"): router_rule_path = generate_router_rule_entry( proxy, router_alias, separator=separator ) - #service_weight_path = "" router_service_path = "" return RouteKeys( service_alias, service_url_path, - #service_weight_path, router_alias, router_service_path, router_rule_path, diff --git a/tests/conftest.py b/tests/conftest.py index 71ba58d2..6dcd6379 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,13 +27,18 @@ class Config: # commandss, e.g. txn # Must be passed to the env parameter of any subprocess.Popen call that runs # etcdctl - etcdctl_env = os.environ.copy().update({"ETCDCTL_API": "3"}) + etcdctl_env = dict(os.environ, ETCDCTL_API="3") + # Etcd3 auth login credentials etcd_password = "secret" etcd_user = "root" + # Consol auth login credentials consul_token = "secret" + # Traefik api auth login credentials + traefik_api_user = "api_admin" + traefik_api_pass = "admin" # Define a "slow" test marker so that we can run the slow tests at the end # ref: https://docs.pytest.org/en/6.0.1/example/simple.html#control-skipping-of-tests-according-to-command-line-option @@ -66,11 +71,11 @@ async def no_auth_consul_proxy(launch_consul): """ proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -85,12 +90,12 @@ async def auth_consul_proxy(launch_consul_acl): """ proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_password=Config.consul_token, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -98,18 +103,18 @@ async def auth_consul_proxy(launch_consul_acl): @pytest.fixture -async def no_auth_etcd_proxy(launch_etcd): +async def no_auth_etcd_proxy(launch_etcd, wait_for_etcd): """ Fixture returning a configured TraefikEtcdProxy. No etcd authentication. """ proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -124,13 +129,13 @@ async def auth_etcd_proxy(launch_etcd_auth): """ proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_username="root", kv_password=Config.etcd_password, check_route_timeout=45, should_start=True, - debug=True + log_level='DEBUG' ) await proxy.start() yield proxy @@ -177,11 +182,11 @@ def _file_proxy(dynamic_config_file, **kwargs): ) return TraefikFileProviderProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, dynamic_config_file = dynamic_config_file, check_route_timeout=60, - debug=True, + log_level='DEBUG', **kwargs ) @@ -208,8 +213,8 @@ async def external_file_proxy_toml(launch_traefik_file): async def external_consul_proxy(launch_consul, configure_consul, launch_traefik_consul): proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=False, debug=True @@ -218,11 +223,11 @@ async def external_consul_proxy(launch_consul, configure_consul, launch_traefik_ @pytest.fixture -def auth_external_consul_proxy(launch_consul_acl, configure_consul_auth, launch_traefik_consul_auth): +async def auth_external_consul_proxy(launch_consul_acl, configure_consul_auth, launch_traefik_consul_auth): proxy = TraefikConsulProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_password=Config.consul_token, check_route_timeout=45, should_start=False, @@ -232,32 +237,33 @@ def auth_external_consul_proxy(launch_consul_acl, configure_consul_auth, launch_ @pytest.fixture -def external_etcd_proxy(launch_etcd, configure_etcd, launch_traefik_etcd): +async def external_etcd_proxy(launch_etcd, configure_etcd, launch_traefik_etcd): proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, check_route_timeout=45, should_start=False, - debug=True + log_level="DEBUG" ) yield proxy + proxy.kv_client.close() @pytest.fixture def auth_external_etcd_proxy(launch_etcd_auth, configure_etcd_auth, launch_traefik_etcd_auth): proxy = TraefikEtcdProxy( public_url="http://127.0.0.1:8000", - traefik_api_password="admin", - traefik_api_username="api_admin", + traefik_api_password=Config.traefik_api_pass, + traefik_api_username=Config.traefik_api_user, kv_password=Config.etcd_password, kv_username="root", check_route_timeout=45, should_start=False, - debug=True + log_level="DEBUG" ) - #traefik_process = configure_and_launch_traefik(provider="etcd", password=Config.etcd_password) yield proxy + proxy.kv_client.close() @@ -267,7 +273,7 @@ def auth_external_etcd_proxy(launch_etcd_auth, configure_etcd_auth, launch_traef ######################################################################### @pytest.fixture -async def launch_traefik_file(): +def launch_traefik_file(): args = ("--configfile", "./tests/config_files/traefik.toml") print(f"\nLAUNCHING TRAEFIK with args: {args}\n") proc = _launch_traefik(*args) @@ -276,31 +282,32 @@ async def launch_traefik_file(): @pytest.fixture -async def launch_traefik_etcd(): - proc = _launch_traefik_cli("--providers.etcd") +def launch_traefik_etcd(): + env = Config.etcdctl_env + proc = _launch_traefik_cli("--providers.etcd", env=env) yield proc shutdown_traefik(proc) @pytest.fixture -async def launch_traefik_etcd_auth(): +def launch_traefik_etcd_auth(): extra_args = ( "--providers.etcd.username=" + Config.etcd_user, "--providers.etcd.password=" + Config.etcd_password ) - proc = _launch_traefik_cli(*extra_args) + proc = _launch_traefik_cli(*extra_args, env=Config.etcdctl_env) yield proc shutdown_traefik(proc) @pytest.fixture -async def launch_traefik_consul(): +def launch_traefik_consul(): proc = _launch_traefik_cli("--providers.consul") yield proc shutdown_traefik(proc) @pytest.fixture -async def launch_traefik_consul_auth(): +def launch_traefik_consul_auth(): extra_args = ( "--providers.consul.username=root", "--providers.consul.password=" + Config.consul_token @@ -336,7 +343,7 @@ def _launch_traefik(*extra_args, env=None): ################################## @pytest.fixture -def configure_etcd(): +def configure_etcd(wait_for_etcd): """Load traefik api rules into the etcd kv store""" yield _config_etcd() @@ -358,7 +365,7 @@ def _config_etcd(*extra_args): proc.wait() @pytest.fixture -def _enable_auth_in_etcd(): +def enable_auth_in_etcd(): user = Config.etcd_user pw = Config.etcd_password subprocess.call(["etcdctl", "user", "add", f"{user}:{pw}"], env=Config.etcdctl_env) @@ -382,25 +389,18 @@ def _enable_auth_in_etcd(): @pytest.fixture -def launch_etcd_auth(launch_etcd, _enable_auth_in_etcd): +def launch_etcd_auth(launch_etcd, wait_for_etcd, enable_auth_in_etcd): yield @pytest.fixture() -async def launch_etcd(): - etcd_proc = subprocess.Popen(["etcd", "--debug"]) - await _wait_for_etcd() +def launch_etcd(): + etcd_proc = subprocess.Popen(["etcd", "--log-level=debug"]) yield etcd_proc - terminate_process(etcd_proc, timeout=15) - - # There have been cases where default.etcd didn't exist... - # Not sure why, but guess it doesn't really matter, just - # check to be safe. - default_etcd = os.path.join(os.getcwd(), "default.etcd") - if os.path.exists(default_etcd): - shutil.rmtree(default_etcd) + shutdown_etcd(etcd_proc) -async def _wait_for_etcd(user=None, pw=None): +@pytest.fixture +def wait_for_etcd(): """Etcd may not be ready if we jump straight into the tests. Make sure it's running before we continue with configuring it or running tests against it. @@ -409,31 +409,20 @@ async def _wait_for_etcd(user=None, pw=None): proxy classes. """ import etcd3 - async def _check_etcd(): - try: - cli = etcd3.client( - user=user, - password=pw - ) - routes = cli.get_prefix('/') - except Exception as e: - print(f"Etcd not up: {e}") - return False - - print( "Etcd is up!" ) - return True - - await exponential_backoff( - _check_etcd, - "Etcd not available", - timeout=20, + assert ( + "is healthy" in + subprocess.check_output( + ["etcdctl", "endpoint", "health"], + env=Config.etcdctl_env, + stderr=subprocess.STDOUT + ).decode(sys.stdout.encoding) ) #@pytest.fixture(scope="function", autouse=True) # Is this referenced anywhere?? #@pytest.fixture -#def clean_etcd(): -# subprocess.run(["etcdctl", "del", '""', "--from-key=true"], env=Config.etcdctl_env) +def clean_etcd(): + subprocess.run(["etcdctl", "del", '""', "--from-key=true"], env=Config.etcdctl_env) # Consul Launchers and configurers # @@ -446,7 +435,7 @@ async def launch_consul(): ) await _wait_for_consul() yield consul_proc - await shutdown_consul(consul_proc) + shutdown_consul(consul_proc) @pytest.fixture @@ -462,7 +451,7 @@ async def launch_consul_acl(): await _wait_for_consul(token=Config.consul_token) yield consul_proc - await shutdown_consul(consul_proc, secret=Config.consul_token) + shutdown_consul(consul_proc, secret=Config.consul_token) shutil.rmtree(os.getcwd() + "/consul.data") @@ -492,13 +481,13 @@ async def _check_consul(): @pytest.fixture -async def configure_consul(): +def configure_consul(): """Load an initial config into the consul KV store""" yield _config_consul() @pytest.fixture -async def configure_consul_auth(): +def configure_consul_auth(): """Load an initial config into the consul KV store, using authentication""" yield _config_consul(secret=Config.consul_token) @@ -533,7 +522,7 @@ def _config_consul(secret=None): # Teardown functions # ######################################################################### -async def shutdown_consul(consul_proc, secret=None): +def shutdown_consul(consul_proc, secret=None): # For some reason, without running `consul leave`, subsequent consul tests fail consul_env = None if secret is not None: @@ -542,6 +531,17 @@ async def shutdown_consul(consul_proc, secret=None): subprocess.call(["consul", "leave"], env=consul_env) terminate_process(consul_proc, timeout=30) +def shutdown_etcd(etcd_proc): + clean_etcd() + terminate_process(etcd_proc, timeout=20) + + # There have been cases where default.etcd didn't exist... + # Not sure why, but guess it doesn't really matter, just + # check to be safe and remove it if there. + default_etcd = os.path.join(os.getcwd(), "default.etcd") + if os.path.exists(default_etcd): + shutil.rmtree(default_etcd) + def shutdown_traefik(traefik_process): terminate_process(traefik_process) diff --git a/tests/proxytest.py b/tests/proxytest.py index c6243697..8bfc17d7 100644 --- a/tests/proxytest.py +++ b/tests/proxytest.py @@ -20,6 +20,9 @@ import websockets +import pprint +pp = pprint.PrettyPrinter(indent=2) + class MockApp: def __init__(self): self.hub = Hub(routespec="/") @@ -73,6 +76,13 @@ def __init__(self, name): def _new_spawner(self, spawner_name, **kwargs): return MockSpawner(spawner_name, user=self, **kwargs) +def assert_equal(value, expected): + try: + assert value == expected + except AssertionError: + pp.pprint({'value': value}) + pp.pprint({"expected": expected}) + raise @pytest.fixture def launch_backend(): @@ -192,10 +202,12 @@ async def test_route_exist(spec, backend): if not expect_value_error(spec): try: del( route["data"]["last_activity"] ) # CHP + except TypeError as e: + raise TypeError(f"{e}\nRoute got:{route}") except KeyError: pass - assert route == expected_output(spec, backend.geturl()) + assert_equal(route, expected_output(spec, backend.geturl())) # Test the actual routing responding_backend1 = await utils.get_responding_backend_port( @@ -204,10 +216,8 @@ async def test_route_exist(spec, backend): responding_backend2 = await utils.get_responding_backend_port( proxy_url, normalize_spec(spec) + "something" ) - assert ( - responding_backend1 == backend.port - and responding_backend2 == backend.port - ) + assert_equal(responding_backend1, backend.port) + assert_equal(responding_backend2, backend.port) for i, spec in enumerate(existing_routes, start=1): backend = default_backend._replace( @@ -226,8 +236,8 @@ async def test_route_exist(spec, backend): for i, spec in enumerate(existing_routes): try: await proxy.add_route(spec, extra_backends[i].geturl(), copy.copy(data)) - except Exception: - pass + except Exception as e: + raise type(e)(f"{e}\nProblem adding Route {spec}") def finalizer(): async def cleanup(): @@ -258,7 +268,7 @@ async def cleanup(): # Test that deleted route does not exist anymore if not expect_value_error(routespec): - assert route == None + assert_equal(route, None) async def _wait_for_deletion(): deleted = 0 @@ -327,7 +337,7 @@ async def test_get_all_routes(proxy, launch_backend): except KeyError: pass - assert routes == expected_output + assert_equal(routes, expected_output) async def test_host_origin_headers(proxy, launch_backend): @@ -369,8 +379,8 @@ async def test_host_origin_headers(proxy, launch_backend): ) resp = await AsyncHTTPClient().fetch(req) - assert resp.headers["Host"] == expected_host_header - assert resp.headers["Origin"] == expected_origin_header + assert_equal(resp.headers["Host"], expected_host_header) + assert_equal(resp.headers["Origin"], expected_origin_header) @pytest.mark.parametrize("username", ["zoe", "50fia", "秀樹", "~TestJH", "has@"])