Skip to content

Commit

Permalink
Further work to upgrade the proxy to support the Traefik v2 API, in r…
Browse files Browse the repository at this point in the history
…elation to

issue: jupyterhub#97

Although all tests passed on my test system, the github workflow tests failed with the
etcd tests, for the following reasons:-

    - Hadn't actually tested etcd-3.4.15, the default version installed by the
      install script. I'd used the default ubuntu version, 3.2.26. I'm not sure
      if this caused issues (maybe?, see next point), but some new warnings are
      emitted about the log parameters for instance.

    - The tests mainly failed due to what I thought was a nasty race-condition
      between successive TraefikEtcdProxy test fixture calls. The `proxy()`
      fixture in `tests/test_proxy.py` doesn't appear to fully destroy the
      dependent Proxy classes between test runs. In the case of
      TraefikEtcdProxy, this leaves the etcd3/grpc client open through the end
      of one test and into the next test. Then the next TraefikEtcdProxy test
      gets a connection error.  I don't know why only one grpc client is
      allowed - is this related to the etcd version? - but regardless, the less
      than ideal solution is to manually call
      `TraefikEtcdProxy.kv_client.close()` during the teardown of the
      external_etcd* test runs. This is also manually called now by
      `TraefikEtcdProxy.stop()`, when `should_start=True`.
            (This took me days to figure out!!)

Some further modifications to the code include:-

    - Changed the KV store structure for jupyterhub-specific information. All
      jupyterhub routespecs, targets and data are now stored as:-

          jupyterhub/
                   -/routes/{escaped_routespec} = {target}
                   -/targets/{escaped_target}   = {data}

      N.B. I think this can be condensed to one single request. Atm, to get the
      {data} for a routespec, two KV get requests are required: 1. to get the
      {target}, and 2. to get the {data} using that {target}. The {target} is
      also in the traefik configuration, so it seems like unnecessary
      duplication of information and redirection.

    - Added `log_level` and `traefik_log_level` config parameters to the base
      Proxy class. The first sets the log level for the logger (and handler)
      of the Proxy class. The latter sets traefik's log level, if
      `should_start=True`.

    - General tidy up, removing excessive debug statements and commented-out
      code.
  • Loading branch information
alexleach committed Jun 23, 2021
1 parent 8459b13 commit c90686b
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 245 deletions.
64 changes: 27 additions & 37 deletions jupyterhub_traefik_proxy/consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def _stop_traefik(self):
os.environ.pop("CONSUL_HTTP_TOKEN")

async def persist_dynamic_config(self):
self.log.debug("Saving dynamic config to consul store")
data = self.flatten_dict_for_kv(
self.dynamic_config, prefix=self.kv_traefik_prefix
)
Expand All @@ -128,7 +127,6 @@ def append_payload(key, val):
append_payload(k, v)

try:
self.log.debug(f"Uploading payload to KV store. Payload: {repr(payload)}")
results = await self.kv_client.txn.put(payload=payload)
status = 1
response = ""
Expand All @@ -140,32 +138,28 @@ def append_payload(key, val):
else:
self.log.debug("Successfully uploaded payload to KV store")

# Let's check if it's in there then...
#index, result = await self.kv_client.kv.get(k)
#self.log.debug(f"And the survey says, at {k} we have: {result}")
return status, response

async def _kv_atomic_add_route_parts(
self, jupyterhub_routespec, target, data, route_keys, rule
):
escaped_target = escapism.escape(target, safe=self.key_safe_chars)
escaped_jupyterhub_routespec = escapism.escape(
jupyterhub_routespec, safe=self.key_safe_chars
jupyterhub_target = self.kv_separator.join(
[self.kv_jupyterhub_prefix, "targets", escapism.escape(target)]
)

try:
payload=[
{
"KV": {
"Verb": "set",
"Key": escaped_jupyterhub_routespec,
"Key": jupyterhub_routespec,
"Value": base64.b64encode(target.encode()).decode(),
}
},
{
"KV": {
"Verb": "set",
"Key": escaped_target,
"Key": jupyterhub_target,
"Value": base64.b64encode(data.encode()).decode(),
}
},
Expand All @@ -176,13 +170,6 @@ async def _kv_atomic_add_route_parts(
"Value": base64.b64encode(target.encode()).decode(),
}
},
#{
# "KV": {
# "Verb": "set",
# "Key": route_keys.service_weight_path,
# "Value": base64.b64encode(b"1").decode(),
# }
#},
{
"KV": {
"Verb": "set",
Expand Down Expand Up @@ -211,26 +198,24 @@ async def _kv_atomic_add_route_parts(
return status, response

async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys):
escaped_jupyterhub_routespec = escapism.escape(
jupyterhub_routespec, safe=self.key_safe_chars
)

index, v = await self.kv_client.kv.get(escaped_jupyterhub_routespec)
index, v = await self.kv_client.kv.get(jupyterhub_routespec)
if v is None:
self.log.warning(
"Route %s doesn't exist. Nothing to delete", jupyterhub_routespec
)
return True, None
target = v["Value"]
escaped_target = escapism.escape(target, safe=self.key_safe_chars)
jupyterhub_target = self.kv_separator.join(
[self.kv_jupyterhub_prefix, "targets", escapism.escape(target)]
)

try:
status, response = await self.kv_client.txn.put(
payload=[
{"KV": {"Verb": "delete", "Key": escaped_jupyterhub_routespec}},
{"KV": {"Verb": "delete", "Key": escaped_target}},
{"KV": {"Verb": "delete", "Key": jupyterhub_routespec}},
{"KV": {"Verb": "delete", "Key": jupyterhub_target}},
{"KV": {"Verb": "delete", "Key": route_keys.service_url_path}},
#{"KV": {"Verb": "delete", "Key": route_keys.service_weight_path}},
{"KV": {"Verb": "delete", "Key": route_keys.router_service_path}},
{"KV": {"Verb": "delete", "Key": route_keys.router_rule_path}},
]
Expand All @@ -244,17 +229,13 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys):
return status, response

async def _kv_get_target(self, jupyterhub_routespec):
escaped_jupyterhub_routespec = escapism.escape(
jupyterhub_routespec, safe=self.key_safe_chars
)
_, res = await self.kv_client.kv.get(escaped_jupyterhub_routespec)
_, res = await self.kv_client.kv.get(jupyterhub_routespec)
if res is None:
return None
return res["Value"].decode()

async def _kv_get_data(self, target):
escaped_target = escapism.escape(target, safe=self.key_safe_chars)
_, res = await self.kv_client.kv.get(escaped_target)
_, res = await self.kv_client.kv.get(target)

if res is None:
return None
Expand All @@ -264,10 +245,18 @@ async def _kv_get_route_parts(self, kv_entry):
key = escapism.unescape(kv_entry["KV"]["Key"])
value = kv_entry["KV"]["Value"]

# Strip the "/jupyterhub" prefix from the routespec
routespec = key.replace(self.kv_jupyterhub_prefix, "")
# Strip the "jupyterhub/routes/" prefix from the routespec
route_prefix = self.kv_separator.join(
[self.kv_jupyterhub_prefix, "routes/"]
)
routespec = key.replace(route_prefix, "")

target = base64.b64decode(value.encode()).decode()
data = await self._kv_get_data(target)
jupyterhub_target = self.kv_separator.join(
[self.kv_jupyterhub_prefix, "targets", escapism.escape(target)]
)

data = await self._kv_get_data(jupyterhub_target)

return routespec, target, data

Expand All @@ -277,9 +266,10 @@ async def _kv_get_jupyterhub_prefixed_entries(self):
{
"KV": {
"Verb": "get-tree",
"Key": escapism.escape(
self.kv_jupyterhub_prefix, safe=self.key_safe_chars
),
"Key": f"{self.kv_jupyterhub_prefix}/routes"
#escapism.escape(
# self.kv_jupyterhub_prefix, safe=self.key_safe_chars
#)+ "/routes",
}
}
]
Expand Down
75 changes: 40 additions & 35 deletions jupyterhub_traefik_proxy/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
# Distributed under the terms of the Modified BSD License.

from concurrent.futures import ThreadPoolExecutor
import json
import os
import escapism
from urllib.parse import urlparse

from tornado.concurrent import run_on_executor
from traitlets import Any, default, Unicode

from jupyterhub.utils import maybe_future
from . import traefik_utils
from jupyterhub_traefik_proxy import TKvProxy


Expand Down Expand Up @@ -78,23 +76,23 @@ def _default_client(self):
import etcd3
except ImportError:
raise ImportError("Please install etcd3 package to use traefik-proxy with etcd3")
kwargs = {
'host': etcd_service.hostname,
'port': etcd_service.port,
'ca_cert': self.etcd_client_ca_cert,
'cert_cert': self.etcd_client_cert_crt,
'cert_key': self.etcd_client_cert_key,
}
if self.kv_password:
return etcd3.client(
host=str(etcd_service.hostname),
port=etcd_service.port,
user=self.kv_username,
password=self.kv_password,
ca_cert=self.etcd_client_ca_cert,
cert_cert=self.etcd_client_cert_crt,
cert_key=self.etcd_client_cert_key,
)
return etcd3.client(
host=str(etcd_service.hostname),
port=etcd_service.port,
ca_cert=self.etcd_client_ca_cert,
cert_cert=self.etcd_client_cert_crt,
cert_key=self.etcd_client_cert_key,
)
kwargs.update({
'user': self.kv_username,
'password': self.kv_password
})
return etcd3.client(**kwargs)

def _clean_resources(self):
super()._clean_resources()
self.kv_client.close()

@run_on_executor
def _etcd_transaction(self, success_actions):
Expand All @@ -119,7 +117,7 @@ def _define_kv_specific_static_config(self):
self.static_config.update({"providers" : {
"etcd" : {
"endpoints": [url.netloc],
"rootKey": self.kv_traefik_prefix, # Is rootKey the new prefix?
"rootKey": self.kv_traefik_prefix,
}
} })
if self.kv_username and self.kv_password:
Expand All @@ -131,14 +129,13 @@ def _define_kv_specific_static_config(self):
async def _kv_atomic_add_route_parts(
self, jupyterhub_routespec, target, data, route_keys, rule
):
jupyterhub_target = self.kv_separator.join(
[self.kv_jupyterhub_prefix, "targets", escapism.escape(target)]
)
success = [
self.kv_client.transactions.put(jupyterhub_routespec, target),
self.kv_client.transactions.put(target, data),
self.kv_client.transactions.put(jupyterhub_target, data),
self.kv_client.transactions.put(route_keys.service_url_path, target),
# The weight is used to balance services, not servers.
# Traefik by default will use round-robin load-balancing anyway.
# See: https://doc.traefik.io/traefik/routing/services/#load-balancing
#self.kv_client.transactions.put(route_keys.service_weight_path, "1"),
self.kv_client.transactions.put(
route_keys.router_service_path, route_keys.service_alias
),
Expand All @@ -151,15 +148,17 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys):
value = await maybe_future(self._etcd_get(jupyterhub_routespec))
if value is None:
self.log.warning(
"Route {jupyterhub_routespec} doesn't exist. Nothing to delete"
f"Route {jupyterhub_routespec} doesn't exist. Nothing to delete"
)
return True, None

target = value.decode()
jupyterhub_target = self.kv_separator.join(
[self.kv_jupyterhub_prefix, "targets", escapism.escape(value.decode())]
)

success = [
self.kv_client.transactions.delete(jupyterhub_routespec),
self.kv_client.transactions.delete(target),
self.kv_client.transactions.delete(jupyterhub_target),
self.kv_client.transactions.delete(route_keys.service_url_path),
#self.kv_client.transactions.delete(route_keys.service_weight_path),
self.kv_client.transactions.delete(route_keys.router_service_path),
Expand All @@ -170,7 +169,7 @@ async def _kv_atomic_delete_route_parts(self, jupyterhub_routespec, route_keys):

async def _kv_get_target(self, jupyterhub_routespec):
value = await maybe_future(self._etcd_get(jupyterhub_routespec))
if value == None:
if value is None:
return None
return value.decode()

Expand All @@ -182,17 +181,23 @@ async def _kv_get_data(self, target):

async def _kv_get_route_parts(self, kv_entry):
key = kv_entry[1].key.decode()
value = kv_entry[0]
value = kv_entry[0].decode()

# Strip the "/jupyterhub" prefix from the routespec
routespec = key.replace(self.kv_jupyterhub_prefix, "")
target = value.decode()
data = await self._kv_get_data(target)
# Strip the "/jupyterhub/routes/" prefix from the routespec and unescape it
sep = self.kv_separator
route_prefix = sep.join([self.kv_jupyterhub_prefix, "routes" + sep])
target_prefix = sep.join([self.kv_jupyterhub_prefix, "targets" + sep])
routespec = escapism.unescape(key.replace(route_prefix, "", 1))
etcd_target = sep.join([target_prefix, escapism.escape(value)])
target = escapism.unescape(etcd_target.replace(target_prefix, "", 1))
data = await self._kv_get_data(etcd_target)

return routespec, target, data

async def _kv_get_jupyterhub_prefixed_entries(self):
routes = await maybe_future(self._etcd_get_prefix(self.kv_jupyterhub_prefix))
sep = self.kv_separator
routespecs_prefix = sep.join([self.kv_jupyterhub_prefix, "routes" + sep])
routes = await maybe_future(self._etcd_get_prefix(routespecs_prefix))
return routes

async def persist_dynamic_config(self):
Expand Down
22 changes: 6 additions & 16 deletions jupyterhub_traefik_proxy/fileprovider.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import json
import os
import asyncio
import string
import escapism

from traitlets import Any, default, Unicode, observe
Expand Down Expand Up @@ -61,7 +59,6 @@ def _set_dynamic_config_file(self, change):

def __init__(self, **kwargs):
super().__init__(**kwargs)
#self._set_dynamic_config_file(None)
try:
# Load initial dynamic config from disk
self.dynamic_config = self.dynamic_config_handler.load()
Expand Down Expand Up @@ -106,7 +103,7 @@ def _clean_resources(self):
def _get_route_unsafe(self, traefik_routespec):
service_alias = traefik_utils.generate_alias(traefik_routespec, "service")
router_alias = traefik_utils.generate_alias(traefik_routespec, "router")
routespec = self._routespec_from_traefik_path(traefik_routespec)
routespec = self.validate_routespec(traefik_routespec)
result = {"data": None, "target": None, "routespec": routespec}

def get_target_data(d, to_find):
Expand All @@ -133,8 +130,6 @@ def get_target_data(d, to_find):
if result["data"] is None and result["target"] is None:
self.log.info(f"No route for {routespec} found!")
result = None
self.log.debug(f"traefik routespec: {traefik_routespec}")
self.log.debug(f"result for routespec {routespec}:-\n{result}")
return result

async def start(self):
Expand Down Expand Up @@ -179,11 +174,9 @@ async def add_route(self, routespec, target, data):
The proxy implementation should also have a way to associate the fact that a
route came from JupyterHub.
"""
self.log.debug(f"\tTraefikFileProviderProxy.add_route: Adding {routespec} for {target}")
traefik_routespec = self._routespec_to_traefik_path(routespec)
traefik_routespec = self.validate_routespec(routespec)
service_alias = traefik_utils.generate_alias(traefik_routespec, "service")
router_alias = traefik_utils.generate_alias(traefik_routespec, "router")
#data = json.dumps(data)
rule = traefik_utils.generate_rule(traefik_routespec)

async with self.mutex:
Expand Down Expand Up @@ -222,9 +215,6 @@ async def add_route(self, routespec, target, data):
}
self.persist_dynamic_config()

self.log.debug(f"traefik routespec: {traefik_routespec}")
self.log.debug(f"data for routespec {routespec}:-\n{data}")

if self.should_start:
try:
# Check if traefik was launched
Expand All @@ -247,17 +237,17 @@ async def delete_route(self, routespec):
**Subclasses must define this method**
"""
routespec = self._routespec_to_traefik_path(routespec)
routespec = self.validate_routespec(routespec)
service_alias = traefik_utils.generate_alias(routespec, "service")
router_alias = traefik_utils.generate_alias(routespec, "router")

async with self.mutex:

# Pop each entry and if it's the last one, delete the key
self.dynamic_config["http"]["routers"].pop(router_alias, None)
self.dynamic_config["http"]["services"].pop(service_alias, None)
self.dynamic_config["jupyter"]["routers"].pop(router_alias, None)

# If empty, delete the keys
if not self.dynamic_config["http"]["routers"]:
self.dynamic_config["http"].pop("routers")
if not self.dynamic_config["http"]["services"]:
Expand Down Expand Up @@ -293,7 +283,7 @@ async def get_all_routes(self):
continue
escaped_routespec = "".join(router.split("_", 1)[1:])
traefik_routespec = escapism.unescape(escaped_routespec)
routespec = self._routespec_from_traefik_path(traefik_routespec)
routespec = self.validate_routespec(traefik_routespec)
all_routes.update({
routespec : self._get_route_unsafe(traefik_routespec)
})
Expand All @@ -320,7 +310,7 @@ async def get_route(self, routespec):
None: if there are no routes matching the given routespec
"""
routespec = self._routespec_to_traefik_path(routespec)
routespec = self.validate_routespec(routespec)
async with self.mutex:
return self._get_route_unsafe(routespec)

Loading

0 comments on commit c90686b

Please sign in to comment.