-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'origin/main' into feature/44-start-stop…
…-ITDE
- Loading branch information
Showing
8 changed files
with
471 additions
and
64 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
""" | ||
Bucketfs-related functions. | ||
""" | ||
import pathlib | ||
import logging | ||
import exasol.bucketfs as bfs # type: ignore | ||
|
||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
def put_file(bucket: bfs.Bucket, file_path: pathlib.Path, | ||
skip_if_exists: bool = True) -> pathlib.Path: | ||
""" | ||
Uploads given file into bucketfs | ||
:param bucket: bucket to use | ||
:param file_path: local file path to uplaod. File have to exist. | ||
:param skip_if_exists: Do not upload if file already present in the bucketfs. | ||
:return: Path in the bucketfs. | ||
""" | ||
if not file_path.exists(): | ||
raise ValueError(f"Local file doesn't exist: {file_path}") | ||
local_name = file_path.name | ||
if skip_if_exists and local_name in list(bucket): | ||
_logger.info("File %s is already present in the bucketfs", local_name) | ||
else: | ||
_logger.info("Uploading file %s to bucketfs", local_name) | ||
with file_path.open("rb") as file: | ||
bucket.upload(local_name, file) | ||
return pathlib.Path("/buckets/bfsdefault/") / bucket.name / local_name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import pyexasol # type: ignore | ||
|
||
|
||
_SETUP_SQL = [ | ||
"OPEN SCHEMA {schema!i}", | ||
""" | ||
--/ | ||
CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS | ||
%scriptclass com.exasol.cloudetl.scriptclasses.FilesImportQueryGenerator; | ||
%jar {jar_path!r}; | ||
/ | ||
""", | ||
""" | ||
--/ | ||
CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) | ||
EMITS ( | ||
filename VARCHAR(2000), | ||
partition_index VARCHAR(100), | ||
start_index DECIMAL(36, 0), | ||
end_index DECIMAL(36, 0) | ||
) AS | ||
%scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader; | ||
%jar {jar_path!r}; | ||
/ | ||
""", | ||
""" | ||
--/ | ||
CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS | ||
%scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter; | ||
%jar {jar_path!r}; | ||
/ | ||
""" | ||
] | ||
|
||
|
||
def setup_scripts(db_connection: pyexasol.ExaConnection, schema_name: str, bucketfs_jar_path: str): | ||
""" | ||
Perform initialization of scripts for could-storage-extension. | ||
:param db_connection: DB connection | ||
:param schema_name: name of the schema to be used. | ||
:param bucketfs_jar_path: path to cloud-storage-extension jar in BucketFS | ||
:return: | ||
""" | ||
for sql in _SETUP_SQL: | ||
db_connection.execute(sql, query_params={ | ||
"schema": schema_name, | ||
"jar_path": bucketfs_jar_path, | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
""" | ||
Github-related utility functions - check for latest release of | ||
project, retrieval of artefacts, etc. | ||
""" | ||
import enum | ||
import requests | ||
import pathlib | ||
import logging | ||
from typing import Tuple, Optional | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
class Project(enum.Enum): | ||
""" | ||
Names of github projects to be retrieved. Values have to | ||
match github project names. | ||
""" | ||
CLOUD_STORAGE_EXTENSION = "cloud-storage-extension" | ||
KAFKA_CONNECTOR_EXTENSION = "kafka-connector-extension" | ||
|
||
|
||
def get_latest_version_and_jar_url(project: Project) -> Tuple[str, str]: | ||
""" | ||
Retrieves the latest version of stable project release | ||
and url with jar file from the release. | ||
:param project: name of the project | ||
:return: tuple with version and url to retrieve the artefact. | ||
""" | ||
req = requests.get(f"https://api.github.com/repos/exasol/{project.value}" | ||
f"/releases/latest", timeout=10) | ||
if req.status_code != 200: | ||
raise RuntimeError("Error sending request to the github, code: %d" % | ||
req.status_code) | ||
data = req.json() | ||
version = data.get('tag_name') | ||
if version is None: | ||
raise RuntimeError(f"The latest version of {project.value} " | ||
f"has no tag, something is wrong") | ||
for asset in data.get('assets', []): | ||
name = asset['name'] | ||
if name.endswith(f"{version}.jar"): | ||
dl_url = asset['browser_download_url'] | ||
return version, dl_url | ||
raise RuntimeError("Could not find proper jar url for the latest release") | ||
|
||
|
||
def retrieve_jar(project: Project, use_local_cache: bool = True, | ||
storage_path: Optional[pathlib.Path] = None) -> pathlib.Path: | ||
""" | ||
Returns latest jar file for the project, possibly using local cache. | ||
:param project: project to be used | ||
:param use_local_cache: should local cache be used or file always retrieved anew | ||
:param storage_path: path to be used for downloading. | ||
If None, current directory will be used. | ||
:return: path to the jar file on the local filesystem | ||
""" | ||
version, jar_url = get_latest_version_and_jar_url(project) | ||
_, local_jar_name = jar_url.rsplit('/', maxsplit=1) | ||
local_jar_path = pathlib.Path(local_jar_name) | ||
if storage_path is not None: | ||
if not storage_path.exists(): | ||
raise ValueError(f"Local storage path doesn't exist: {storage_path}") | ||
local_jar_path = storage_path / local_jar_path | ||
|
||
if use_local_cache and local_jar_path.exists(): | ||
_logger.info("Jar for version %s already exists in %s, skip downloading", | ||
version, local_jar_path) | ||
else: | ||
_logger.info("Fetching jar for version %s from %s...", version, jar_url) | ||
req = requests.get(jar_url, stream=True, timeout=10) | ||
try: | ||
count_bytes = local_jar_path.write_bytes(req.content) | ||
_logger.info("Saved %d bytes in %s", count_bytes, local_jar_path) | ||
finally: | ||
req.close() | ||
return local_jar_path |
Oops, something went wrong.