Skip to content

Commit

Permalink
#110 Added SaaS support for the extension wrappers (#111)
Browse files Browse the repository at this point in the history
* #110 Added SaaS support for the extension wrappers

* #110 Fixed issues with calling the extensions interface.

* #110 Ignored mypy error

* #110 Addressed issues found in the review
  • Loading branch information
ahsimb authored Jun 4, 2024
1 parent ef2a391 commit bf0ddd4
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 172 deletions.
3 changes: 3 additions & 0 deletions doc/changes/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@
* #103 Enabled SaaS connections for both the database and the BucketFS.
* #105 Added the new configuration element - storage_backend.
* #108 Supplied the BucketFS service name when opening an on-prem bucketfs bucket.
* #110 Added the support of SaaS to the extension wrappers.
- Added SaaS configuration parameters in a call to the language container deployer.
- Changed how the bucketfs parameters are stored in a connection object.

27 changes: 17 additions & 10 deletions exasol/nb_connector/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ def get_udf_bucket_path(conf: Secrets) -> str:
return bucket.udf_path


def get_saas_database_id(conf: Secrets) -> str:
"""
Gets the SaaS database id using the available configuration elements.
"""
saas_database_id = conf.get(CKey.saas_database_id)
if saas_database_id:
return saas_database_id
return saas_api.get_database_id(
host=conf.get(CKey.saas_url),
account_id=conf.get(CKey.saas_account_id),
pat=conf.get(CKey.saas_token),
database_name=conf.get(CKey.saas_database_name))


def open_pyexasol_connection(conf: Secrets, **kwargs) -> pyexasol.ExaConnection:
"""
Opens a pyexasol connection using provided configuration parameters.
Expand Down Expand Up @@ -248,17 +262,10 @@ def open_bucketfs_connection(conf: Secrets) -> bfs.BucketLike:
return bucketfs[conf.get(CKey.bfs_bucket)]

else:
saas_url, saas_token, saas_account_id, saas_database_id = [
conf.get(key) for key in [
CKey.saas_url, CKey.saas_token, CKey.saas_account_id, CKey.saas_database_id]
saas_url, saas_token, saas_account_id = [
conf.get(key) for key in [CKey.saas_url, CKey.saas_token, CKey.saas_account_id]
]
saas_database_id = (saas_database_id or
saas_api.get_database_id(
host=saas_url,
account_id=saas_account_id,
pat=saas_token,
database_name=conf.get(CKey.saas_database_name)
))
saas_database_id = get_saas_database_id(conf)
return bfs.SaaSBucket(url=saas_url,
account_id=saas_account_id,
database_id=saas_database_id,
Expand Down
83 changes: 65 additions & 18 deletions exasol/nb_connector/extension_wrapper_common.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import annotations
from typing import Optional

from exasol.nb_connector.connections import open_pyexasol_connection
import exasol.bucketfs as bfs # type: ignore

from exasol.nb_connector.connections import (open_pyexasol_connection,
get_backend, get_saas_database_id)
from exasol.nb_connector.secret_store import Secrets
from exasol.nb_connector.utils import optional_str_to_bool
from exasol.nb_connector.ai_lab_config import AILabConfig as CKey
from exasol.nb_connector.ai_lab_config import AILabConfig as CKey, StorageBackend


def str_to_bool(conf: Secrets, key: CKey, default_value: bool) -> bool:
Expand Down Expand Up @@ -34,16 +37,41 @@ def encapsulate_bucketfs_credentials(
Parameters:
conf:
The secret store. The store must hold the BucketFS service
The secret store.
For an On-Prem database the store must hold the BucketFS service
parameters (bfs_host_name or db_host_name, bfs_port,
bfs_service), the access credentials (bfs_user,
bfs_password), and the bucket name (bfs_bucket), as well
as the DB connection parameters.
For a SaaS database the store must hold the SaaS connection
parameters (saas_url, saas_account_id, saas_database_id or
saas_database_name, saas_token).
path_in_bucket:
Path identifying a location in the bucket.
connection_name:
Name for the connection object to be created.
The parameters will be stored in json strings. The distribution
of the parameters among the connection entities will be as following.
On-Prem:
TO: backend, url, service_name, bucket_name, verify, path
USER: username
IDENTIFIED BY: password
SaaS:
TO: backend, url, account_id, path
USER: database_id
IDENTIFIED BY: pat
Note that the parameter names correspond to the arguments of the build_path
function. provided by the bucketfs-python. Hence, the most convenient way to
handle this lot is to combine json structures from all three entities and
pass them as kwargs to the build_path. The code below gives a usage example.
bfs_params = json.loads(conn_obj.address)
bfs_params.update(json.loads(conn_obj.user))
bfs_params.update(json.loads(conn_obj.password))
bfs_path = build_path(**bfs_params)
A note about handling the TLS certificate verification settings.
If the server certificate verification is turned on, either through
reliance of the default https request settings or by setting the cert_vld
Expand All @@ -53,30 +81,49 @@ def encapsulate_bucketfs_credentials(
the connection object will instead turn the verification off. This is
because there is no guarantee that the consumer of the connection object,
i.e. a UDF, would have this custom CA list, and even if it would, its location
is unknown.
is unknown. This is only applicable for an On-Prem backend.
"""

bfs_host = conf.get(CKey.bfs_host_name, conf.get(CKey.db_host_name))
bfs_protocol = "https" if str_to_bool(conf, CKey.bfs_encryption, True) else "http"
bfs_dest = (
f"{bfs_protocol}://{bfs_host}:{conf.get(CKey.bfs_port)}/"
f"{conf.get(CKey.bfs_bucket)}/{path_in_bucket};{conf.get(CKey.bfs_service)}"
)
# TLS certificate verification option shall be provided in the fragment field.
verify: Optional[bool] = (False if conf.get(CKey.trusted_ca)
else optional_str_to_bool(conf.get(CKey.cert_vld)))
if verify is not None:
bfs_dest += f'#{verify}'
def to_json_str(**kwargs) -> str:
def format_value(v):
return f'"{v}"' if isinstance(v, str) else v

return ", ".join(f'"{k}":{format_value(v)}' for k, v in kwargs.items()
if v is not None)

backend = get_backend(conf)
if backend == StorageBackend.onprem:
host = conf.get(CKey.bfs_host_name, conf.get(CKey.db_host_name))
protocol = "https" if str_to_bool(conf, CKey.bfs_encryption, True) else "http"
url = f"{protocol}://{host}:{conf.get(CKey.bfs_port)}"
verify: Optional[bool] = (False if conf.get(CKey.trusted_ca)
else optional_str_to_bool(conf.get(CKey.cert_vld)))
conn_to = to_json_str(backend=bfs.path.StorageBackend.onprem.name,
url=url, service_name=conf.get(CKey.bfs_service),
bucket_name=conf.get(CKey.bfs_bucket),
path=path_in_bucket,
verify=verify)
conn_user = to_json_str(username=conf.get(CKey.bfs_user))
conn_password = to_json_str(password=conf.get(CKey.bfs_password))
else:
database_id = get_saas_database_id(conf)
conn_to = to_json_str(backend=bfs.path.StorageBackend.saas.name,
url=conf.get(CKey.saas_url),
account_id=conf.get(CKey.saas_account_id),
path=path_in_bucket)
conn_user = to_json_str(database_id=database_id)
conn_password = to_json_str(pat=conf.get(CKey.saas_token))

sql = f"""
CREATE OR REPLACE CONNECTION [{connection_name}]
TO '{bfs_dest}'
TO {{BUCKETFS_ADDRESS!s}}
USER {{BUCKETFS_USER!s}}
IDENTIFIED BY {{BUCKETFS_PASSWORD!s}}
"""
query_params = {
"BUCKETFS_USER": conf.get(CKey.bfs_user),
"BUCKETFS_PASSWORD": conf.get(CKey.bfs_password),
"BUCKETFS_ADDRESS": conn_to,
"BUCKETFS_USER": conn_user,
"BUCKETFS_PASSWORD": conn_password
}
with open_pyexasol_connection(conf, compression=True) as conn:
conn.execute(query=sql, query_params=query_params)
Expand Down
20 changes: 18 additions & 2 deletions exasol/nb_connector/sagemaker_extension_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
# Extension, including its language container, will be uploaded.
PATH_IN_BUCKET = "SME"

LANGUAGE_ALIAS = "PYTHON3_SME"

LATEST_KNOWN_VERSION = "0.7.0"

# Activation SQL for the Sagemaker Extension will be saved in the secret
Expand All @@ -31,7 +33,9 @@
AWS_CONNECTION_PREFIX = "SME_AWS"


def deploy_language_container(conf: Secrets, version: str) -> None:
def deploy_language_container(conf: Secrets,
version: str,
language_alias: str) -> None:
"""
Calls the Sagemaker Extension's language container deployment API.
Downloads the specified released version of the extension from the GitHub
Expand All @@ -51,6 +55,8 @@ def deploy_language_container(conf: Secrets, version: str) -> None:
and the parameters of the BucketFS service.
version:
Sagemaker Extension version.
language_alias:
The language alias of the extension's language container.
"""

deployer = SmeLanguageContainerDeployer.create(
Expand All @@ -64,7 +70,13 @@ def deploy_language_container(conf: Secrets, version: str) -> None:
bucketfs_password=conf.get(CKey.bfs_password),
bucketfs_use_https=str_to_bool(conf, CKey.bfs_encryption, True),
bucket=conf.get(CKey.bfs_bucket),
saas_url=conf.get(CKey.saas_url),
saas_account_id=conf.get(CKey.saas_account_id),
saas_database_id=conf.get(CKey.saas_database_id),
saas_database_name=conf.get(CKey.saas_database_name),
saas_token=conf.get(CKey.saas_token),
path_in_bucket=PATH_IN_BUCKET,
language_alias=language_alias,
use_ssl_cert_validation=str_to_bool(conf, CKey.cert_vld, True),
ssl_trusted_ca=conf.get(CKey.trusted_ca),
ssl_client_certificate=conf.get(CKey.client_cert),
Expand Down Expand Up @@ -103,6 +115,7 @@ def deploy_scripts(conf: Secrets) -> None:

def initialize_sme_extension(conf: Secrets,
version: str = LATEST_KNOWN_VERSION,
language_alias: str = LANGUAGE_ALIAS,
run_deploy_container: bool = True,
run_deploy_scripts: bool = True,
run_encapsulate_aws_credentials: bool = True) -> None:
Expand All @@ -118,6 +131,9 @@ def initialize_sme_extension(conf: Secrets,
version:
Sagemaker Extension version. If not specified the hardcoded
latest known version will be used.
language_alias:
The language alias of the extension's language container. Normally
this parameter would only be used for testing.
run_deploy_container:
If set to False will skip the language container deployment.
run_deploy_scripts:
Expand All @@ -131,7 +147,7 @@ def initialize_sme_extension(conf: Secrets,
aws_conn_name = "_".join([AWS_CONNECTION_PREFIX, str(conf.get(CKey.db_user))])

if run_deploy_container:
deploy_language_container(conf, version)
deploy_language_container(conf, version, language_alias)

# Create the required objects in the database
if run_deploy_scripts:
Expand Down
22 changes: 7 additions & 15 deletions exasol/nb_connector/transformers_extension_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
from exasol_transformers_extension.utils.bucketfs_operations import get_model_path # type: ignore
from exasol_transformers_extension.utils.bucketfs_operations import upload_model_files_to_bucketfs # type: ignore
from exasol_transformers_extension.utils.bucketfs_operations import create_bucketfs_location # type: ignore

from exasol_transformers_extension.deployment.scripts_deployer import ScriptsDeployer # type: ignore
from exasol_transformers_extension.deployment.te_language_container_deployer import TeLanguageContainerDeployer # type: ignore

Expand Down Expand Up @@ -88,6 +84,11 @@ def deploy_language_container(conf: Secrets,
bucketfs_password=conf.get(CKey.bfs_password),
bucketfs_use_https=str_to_bool(conf, CKey.bfs_encryption, True),
bucket=conf.get(CKey.bfs_bucket),
saas_url=conf.get(CKey.saas_url),
saas_account_id=conf.get(CKey.saas_account_id),
saas_database_id=conf.get(CKey.saas_database_id),
saas_database_name=conf.get(CKey.saas_database_name),
saas_token=conf.get(CKey.saas_token),
path_in_bucket=PATH_IN_BUCKET,
language_alias=language_alias,
use_ssl_cert_validation=str_to_bool(conf, CKey.cert_vld, True),
Expand Down Expand Up @@ -210,17 +211,8 @@ def upload_model_from_cache(
should have its own cache directory.
"""

# Create bucketfs location
bfs_host = conf.get(CKey.bfs_host_name, conf.get(CKey.db_host_name))
bucketfs_location = create_bucketfs_location(
conf.get(CKey.bfs_service), bfs_host,
int(str(conf.get(CKey.bfs_port))), str(conf.get(CKey.bfs_encryption)).lower() == 'true',
conf.get(CKey.bfs_user), conf.get(CKey.bfs_password), conf.get(CKey.bfs_bucket),
PATH_IN_BUCKET)

# Upload the downloaded model files into bucketfs
upload_path = get_model_path(conf.get(CKey.te_models_bfs_dir), model_name)
upload_model_files_to_bucketfs(cache_dir, upload_path, bucketfs_location)
raise NotImplementedError('Uploading the model is temporarily unavailable. '
'Awaiting changes in the Transformers Extension module.')


def upload_model(
Expand Down
Loading

0 comments on commit bf0ddd4

Please sign in to comment.