Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new WebhookKernelSessionManager for Kernel Persistence #1101

Merged
merged 9 commits into from
Jun 10, 2022
142 changes: 142 additions & 0 deletions enterprise_gateway/services/sessions/kernelsessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import os
import threading

import requests
from jupyter_core.paths import jupyter_data_dir
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from traitlets import Bool, Unicode, default
from traitlets.config.configurable import LoggingConfigurable

Expand Down Expand Up @@ -385,3 +387,143 @@ def _get_sessions_loc(self):
if not os.path.exists(path):
os.makedirs(path, 0o755)
return path


class WebhookKernelSessionManager(KernelSessionManager):
"""
Performs kernel session persistence operations against URL provided (EG_WEBHOOK_URL). The URL must have 4 endpoints
associated with it. 1 delete endpoint that takes a list of kernel ids in the body, 1 post endpoint that takes kernels id as a
url param and the kernel session as the body, 1 get endpoint that returns all kernel sessions, and 1 get endpoint that returns
a specific kernel session based on kernel id as url param.
"""

# Webhook URL
webhook_url_env = "EG_WEBHOOK_URL"
webhook_url = Unicode(
config=True,
help="""URL endpoint for webhook kernel session manager""",
)

@default("webhook_url")
def webhook_url_default(self):
return os.getenv(self.webhook_url_env, "")

# Webhook Username
webhook_username_env = "EG_WEBHOOK_USERNAME"
webhook_username = Unicode(
config=True,
help="""Username for webhook kernel session manager API auth""",
)

@default("webhook_username")
def webhook_username_default(self):
return os.getenv(self.webhook_username_env, "")

# Webhook Password
webhook_password_env = "EG_WEBHOOK_PASSWORD"
webhook_password = Unicode(
config=True,
help="""Password for webhook kernel session manager API auth""",
)

@default("webhook_password")
def webhook_password_default(self):
return os.getenv(self.webhook_password_env, "")
amazingarman marked this conversation as resolved.
Show resolved Hide resolved

# Auth Type
auth_type_env = "EG_AUTH_TYPE"
auth_type = Unicode(
config=True,
help="""ROPC for webhook kernel session manager API auth Basic, Digest or None""",
)

@default("auth_type")
def auth_type_default(self):
return os.getenv(self.auth_type_env, "")

def __init__(self, kernel_manager, **kwargs):
super().__init__(kernel_manager, **kwargs)
if self.enable_persistence:
self.log.info("Webhook kernel session persistence activated")
self.auth = ""
if self.auth_type:
if self.webhook_username and self.webhook_password:
if self.auth_type.lower() == "basic":
self.auth = HTTPBasicAuth(self.webhook_username, self.webhook_password)
elif self.auth_type.lower() == "digest":
self.auth = HTTPDigestAuth(self.webhook_username, self.webhook_password)
elif self.auth_type.lower() == "none":
self.auth = ""
else:
self.log.error("No such option for auth_type/EG_AUTH_TYPE")
else:
self.log.error("Username and/or password aren't set")

def delete_sessions(self, kernel_ids):
"""
Deletes kernel sessions from database

:param list of strings kernel_ids: A list of kernel ids
"""
if self.enable_persistence:
response = requests.delete(self.webhook_url, auth=self.auth, json=kernel_ids)
self.log.debug(f"Webhook kernel session deleting: {kernel_ids}")
if response.status_code != 204:
self.log.error(response.raise_for_status())

def save_session(self, kernel_id):
"""
Saves kernel session to database

:param string kernel_id: A kernel id
"""
if self.enable_persistence:
if kernel_id is not None:
temp_session = dict()
temp_session[kernel_id] = self._sessions[kernel_id]
body = KernelSessionManager.pre_save_transformation(temp_session)
response = requests.post(
f"{self.webhook_url}/{kernel_id}", auth=self.auth, json=body
)
self.log.debug(f"Webhook kernel session saving: {kernel_id}")
if response.status_code != 204:
self.log.error(response.raise_for_status())

def load_sessions(self):
"""
Loads kernel sessions from database
"""
if self.enable_persistence:
response = requests.get(self.webhook_url, auth=self.auth)
if response.status_code == 200:
kernel_sessions = response.content
for kernel_session in kernel_sessions:
self._load_session_from_file(kernel_session)
amazingarman marked this conversation as resolved.
Show resolved Hide resolved
else:
self.log.error(response.raise_for_status())

def load_session(self, kernel_id):
"""
Loads a kernel session from database

:param string kernel_id: A kernel id
"""
if self.enable_persistence:
if kernel_id is not None:
response = requests.get(f"{self.webhook_url}/{kernel_id}", auth=self.auth)
if response.status_code == 200:
kernel_session = response.content
self._load_session_from_file(kernel_session)
amazingarman marked this conversation as resolved.
Show resolved Hide resolved
else:
self.log.error(response.raise_for_status())

def _load_session_from_file(self, kernel):
amazingarman marked this conversation as resolved.
Show resolved Hide resolved
"""
Loads kernel session to current session

:param dictionary kernel: Kernel session information
"""
self.log.debug("Loading saved session(s)")
self._sessions.update(
KernelSessionManager.post_load_transformation(json.loads(kernel)["kernel"])
)