diff --git a/control/action.py b/control/action.py
index 8339914..ca9be6c 100644
--- a/control/action.py
+++ b/control/action.py
@@ -1,48 +1,51 @@
-str_about = '''
+str_about = """
Module that provides the several "action" abstracted classes --
Essentially this provides a base class for CLI operations that
sets up a jobber object, as well some derived classes that are
used to run specific CLI apps as well as some API calling in
the larger context of CUBE/ChRIS
-'''
+"""
-from . import jobber
-from state import data
-import os
-import re
-import pudb
-import json
-from argparse import ArgumentParser, Namespace
-from chrisclient import client
-import time
+from . import jobber
+from state import data
+import os
+import re
+import pudb
+import json
+from argparse import ArgumentParser, Namespace
+from chrisclient import client
+import time
-pluginexec = lambda s: ((s.split('/')[-1]).split('-')[-1]).split(':')[0]
-pluginname = lambda s: ((s.split('/')[-1])).split(':')[0]
-public_repo = lambda u, s: '%s/%s' % (u, pluginname(s))
+pluginexec = lambda s: ((s.split("/")[-1]).split("-")[-1]).split(":")[0]
+pluginname = lambda s: (s.split("/")[-1]).split(":")[0]
+public_repo = lambda u, s: "%s/%s" % (u, pluginname(s))
+
class Shexec:
- '''
+ """
A thin base class providing the chassis for executing sh-based apps.
A simple class wrapper that runs a container image to determine its
json description
- '''
+ """
+
def __init__(self, *args, **kwargs):
- self.env = None
- self.plugin = ''
- self.options : Namespace = None
+ self.env = None
+ self.plugin = ""
+ self.options: Namespace = None
for k, v in kwargs.items():
- if k == 'env' : self.env = v
- if k == 'options' : self.options = v
- self.shell : jobber.Jobber = jobber.Jobber({
- 'verbosity' : 0,
- 'noJobLogging': True
- })
+ if k == "env":
+ self.env = v
+ if k == "options":
+ self.options = v
+ self.shell: jobber.Jobber = jobber.Jobber(
+ {"verbosity": 0, "noJobLogging": True}
+ )
- self.l_runCMDresp : list = []
+ self.l_runCMDresp: list = []
- def string_clean(self, str_string : str) -> str:
+ def string_clean(self, str_string: str) -> str:
"""
Clean/strip/whitespace in a string
@@ -52,11 +55,11 @@ def string_clean(self, str_string : str) -> str:
Returns:
str: cleaned up string
"""
- str_clean = re.sub(r';\n.*--', ';--', str_string)
- str_clean = str_clean.strip()
+ str_clean = re.sub(r";\n.*--", ";--", str_string)
+ str_clean = str_clean.strip()
return str_clean
- def cmd_checkResponse(self, d_resp : dict) -> bool:
+ def cmd_checkResponse(self, d_resp: dict) -> bool:
"""
Check the d_resp from a jobber call,
log a response, and return True/False
@@ -67,47 +70,43 @@ def cmd_checkResponse(self, d_resp : dict) -> bool:
Returns:
bool: True/False pending d_resp
"""
- b_OK : bool = False
- if not d_resp.get('returncode'):
+ b_OK: bool = False
+ if not d_resp.get("returncode"):
self.env.INFO("\tOK!")
- self.env.INFO("\n%s" % json.dumps(d_resp, indent = 4), level = 2)
- b_OK = True
+ self.env.INFO("\n%s" % json.dumps(d_resp, indent=4), level=2)
+ b_OK = True
else:
- self.env.ERROR("\tFailed!", level = 1)
- self.env.ERROR("\n%s" % json.dumps(d_resp, indent = 4), level = 2)
- b_OK = False
+ self.env.ERROR("\tFailed!", level=1)
+ self.env.ERROR("\n%s" % json.dumps(d_resp, indent=4), level=2)
+ b_OK = False
return b_OK
- def __call__(self) ->dict:
- '''
+ def __call__(self) -> dict:
+ """
Base entry point
- '''
- b_status : bool = False
- d_runCMDresp : dict = {'returncode' : 1}
- d_json : dict = {'nop' : 'dummy return'}
- return {
- 'status' : b_status,
- 'run' : d_runCMDresp,
- 'detail' : d_json
- }
+ """
+ b_status: bool = False
+ d_runCMDresp: dict = {"returncode": 1}
+ d_json: dict = {"nop": "dummy return"}
+ return {"status": b_status, "run": d_runCMDresp, "detail": d_json}
+
class PluginRep(Shexec):
- '''
+ """
A specialization of Shexec that runs a container image to determine its
json description
- '''
+ """
+
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def chrispl_onCUBEargs(self):
- '''
+ """
Return a string specifying the CUBE instance
- '''
- return {
- 'onCUBE': json.dumps(self.env.CUBE.onCUBE())
- }
+ """
+ return {"onCUBE": json.dumps(self.env.CUBE.onCUBE())}
- def chris_plugin_info_args(self, str_containerspec : str) -> dict:
+ def chris_plugin_info_args(self, str_containerspec: str) -> dict:
"""
Args needed to determine json rep for chris template style plugins.
@@ -117,15 +116,18 @@ def chris_plugin_info_args(self, str_containerspec : str) -> dict:
Returns:
dict: args to execute
"""
- self.env.INFO('Attempting chris_plugin_info call to find JSON representation...')
- str_args : str = """
+ self.env.INFO(
+ "Attempting chris_plugin_info call to find JSON representation..."
+ )
+ str_args: str = (
+ """
run --rm %s chris_plugin_info
- """ % (str_containerspec)
- return {
- 'args': self.string_clean(str_args)
- }
+ """
+ % (str_containerspec)
+ )
+ return {"args": self.string_clean(str_args)}
- def containerspec_parse(self, str_containerspec : str, **kwargs) -> str:
+ def containerspec_parse(self, str_containerspec: str, **kwargs) -> str:
"""
Given a containerspec of pattern
@@ -144,19 +146,22 @@ def containerspec_parse(self, str_containerspec : str, **kwargs) -> str:
Returns:
str: a parsed string such as the pluginexec name or the pluginfullname
"""
- str_result : str = ""
- str_parse : str = 'pluginexec'
- for k,v in kwargs.items():
- if k == 'find' : str_parse = v
- if 'exec' in str_parse.lower():
- str_result = pluginexec(str_containerspec)
- if 'name' in str_parse.lower():
- str_result = pluginname(str_containerspec)
- if 'repo' in str_parse.lower():
- str_result = public_repo(self.env.options.public_repobase, str_containerspec)
+ str_result: str = ""
+ str_parse: str = "pluginexec"
+ for k, v in kwargs.items():
+ if k == "find":
+ str_parse = v
+ if "exec" in str_parse.lower():
+ str_result = pluginexec(str_containerspec)
+ if "name" in str_parse.lower():
+ str_result = pluginname(str_containerspec)
+ if "repo" in str_parse.lower():
+ str_result = public_repo(
+ self.env.options.public_repobase, str_containerspec
+ )
return str_result
- def chris_cookiecutter_info_args(self, str_containerspec : str) -> dict:
+ def chris_cookiecutter_info_args(self, str_containerspec: str) -> dict:
"""
Args needed to determine json rep for cookiecutter style plugins.
@@ -169,31 +174,30 @@ def chris_cookiecutter_info_args(self, str_containerspec : str) -> dict:
Returns:
dict: args to execute
"""
- self.env.INFO('Attempting cookiecutter call to find JSON representation...')
- str_pluginexec : str = self.env.options.pluginexec
+ self.env.INFO("Attempting cookiecutter call to find JSON representation...")
+ str_pluginexec: str = self.env.options.pluginexec
if not len(self.env.options.pluginexec):
- str_pluginexec = self.containerspec_parse(str_containerspec, find = 'pluginexec')
- str_args : str = """
+ str_pluginexec = self.containerspec_parse(
+ str_containerspec, find="pluginexec"
+ )
+ str_args: str = (
+ """
run --rm %s %s --json
- """ % (str_containerspec, str_pluginexec)
- return {
- 'args' : self.string_clean(str_args)
- }
+ """
+ % (str_containerspec, str_pluginexec)
+ )
+ return {"args": self.string_clean(str_args)}
- def plugin_execForJSON(self, func = None) -> dict:
- '''
+ def plugin_execForJSON(self, func=None) -> dict:
+ """
Return the CLI for determining the plugin JSON representation
- '''
+ """
try:
- str_cmd = """docker %s""" % (
- func(self.options.dock_image)['args']
- )
+ str_cmd = """docker %s""" % (func(self.options.dock_image)["args"])
except:
str_cmd = ""
- str_cmd = str_cmd.strip().replace('\n', '')
- return {
- 'cmd' : str_cmd
- }
+ str_cmd = str_cmd.strip().replace("\n", "")
+ return {"cmd": str_cmd}
def docker_pull(self) -> dict:
"""
@@ -202,8 +206,8 @@ def docker_pull(self) -> dict:
Returns:
dict: results from jobber call
"""
- str_cmd : str = "docker pull %s" % self.options.dock_image
- d_dockerpull : dict = self.shell.job_run(str_cmd)
+ str_cmd: str = "docker pull %s" % self.options.dock_image
+ d_dockerpull: dict = self.shell.job_run(str_cmd)
self.env.INFO("\t$> %s" % str_cmd)
return d_dockerpull
@@ -220,18 +224,18 @@ def jsonScript_buildAndExec(self, argfunc) -> dict:
Returns:
dict: the result of executing the script
"""
- d_PLCmd : dict = self.plugin_execForJSON(argfunc)
- str_PLCmd : str = d_PLCmd['cmd']
- str_PLCmdfile : str = '%s/cmd.sh' % self.env.outputdir
- b_status : bool = False
- d_json : dict = {}
+ d_PLCmd: dict = self.plugin_execForJSON(argfunc)
+ str_PLCmd: str = d_PLCmd["cmd"]
+ str_PLCmdfile: str = "%s/cmd.sh" % self.env.outputdir
+ b_status: bool = False
+ d_json: dict = {}
self.env.INFO("\t$> %s" % str_PLCmd)
- with open(str_PLCmdfile, 'w') as f:
- f.write('#!/bin/bash\n')
+ with open(str_PLCmdfile, "w") as f:
+ f.write("#!/bin/bash\n")
f.write(str_PLCmd)
os.chmod(str_PLCmdfile, 0o755)
- d_runCMDresp : dict = self.shell.job_run(str_PLCmdfile)
+ d_runCMDresp: dict = self.shell.job_run(str_PLCmdfile)
return d_runCMDresp
def json_readFromFile(self) -> dict:
@@ -242,24 +246,24 @@ def json_readFromFile(self) -> dict:
Returns:
dict: structure similar to jsonScript_buildAndExec
"""
- d_ret = {
- 'stderr' : 'no error',
- 'stdout' : '',
- 'returncode' : 0
- }
- self.env.INFO("Reading JSON representation from file '%s'..." % self.options.jsonFile)
+ d_ret = {"stderr": "no error", "stdout": "", "returncode": 0}
+ self.env.INFO(
+ "Reading JSON representation from file '%s'..." % self.options.jsonFile
+ )
try:
- with open(self.env.options.json, 'r') as f:
- d_json = json.load(f)
- d_ret['stdout'] = json.dumps(d_json, indent = 4)
+ with open(self.env.options.json, "r") as f:
+ d_json = json.load(f)
+ d_ret["stdout"] = json.dumps(d_json, indent=4)
except:
- d_ret['stderr'] = "An error in reading the file '%s' was raised" % self.options.jsonFile
- d_ret['returncode'] = 1
+ d_ret["stderr"] = (
+ "An error in reading the file '%s' was raised" % self.options.jsonFile
+ )
+ d_ret["returncode"] = 1
self.cmd_checkResponse(d_ret)
return d_ret
- def __call__(self) ->dict:
- '''
+ def __call__(self) -> dict:
+ """
Entry point for determining plugin representation.
If a json file to read has been specified in the CLI, then this file
@@ -271,7 +275,7 @@ def __call__(self) ->dict:
* first, using the chris_plugin_info for template plugins
* failing that, using the cookiecutter calling spec
- '''
+ """
def docker_pullIfNeeded() -> bool:
"""
@@ -281,63 +285,65 @@ def docker_pullIfNeeded() -> bool:
Returns:
bool: True/False pending pull success/fail
"""
- b_pullOK : bool = True
- d_dockerPull : dict = {
- "message" : "no pull needed since user specified readFromJSONFile"
+ b_pullOK: bool = True
+ d_dockerPull: dict = {
+ "message": "no pull needed since user specified readFromJSONFile"
}
if not len(self.options.jsonFile):
- d_dockerPull = self.docker_pull()
- b_pullOK = self.cmd_checkResponse(d_dockerPull)
+ d_dockerPull = self.docker_pull()
+ b_pullOK = self.cmd_checkResponse(d_dockerPull)
return d_dockerPull, b_pullOK
def jsonRepFromImage_get() -> dict:
- for argfunc in [self.chris_plugin_info_args, self.chris_cookiecutter_info_args]:
- d_runCMDresp = self.jsonScript_buildAndExec(argfunc)
- if self.cmd_checkResponse(d_runCMDresp): break
+ for argfunc in [
+ self.chris_plugin_info_args,
+ self.chris_cookiecutter_info_args,
+ ]:
+ d_runCMDresp = self.jsonScript_buildAndExec(argfunc)
+ if self.cmd_checkResponse(d_runCMDresp):
+ break
return d_runCMDresp
def jsonRep_get() -> dict:
- b_jsonOK : bool = False
+ b_jsonOK: bool = False
if len(self.env.options.jsonFile):
- d_runCMDresp = self.json_readFromFile()
+ d_runCMDresp = self.json_readFromFile()
else:
- d_runCMDresp = jsonRepFromImage_get()
- if not d_runCMDresp['returncode']:
- b_jsonOK = True
+ d_runCMDresp = jsonRepFromImage_get()
+ if not d_runCMDresp["returncode"]:
+ b_jsonOK = True
else:
- b_jsonOK = False
+ b_jsonOK = False
return d_runCMDresp, b_jsonOK
- b_status : bool = False
- b_pullOK : bool = False
- b_jsonOK : bool = False
- d_runCMDresp : dict = {'returncode' : 1}
- d_json : dict = {'error' : 'could not pull image'}
+ b_status: bool = False
+ b_pullOK: bool = False
+ b_jsonOK: bool = False
+ d_runCMDresp: dict = {"returncode": 1}
+ d_json: dict = {"error": "could not pull image"}
if not self.options.nodockerpull:
- d_runCMDresp, b_pullOK = docker_pullIfNeeded()
+ d_runCMDresp, b_pullOK = docker_pullIfNeeded()
else:
b_pullOK = True
if b_pullOK:
- d_runCMDresp, b_jsonOK = jsonRep_get()
- if not d_runCMDresp['returncode']:
- b_status = True
+ d_runCMDresp, b_jsonOK = jsonRep_get()
+ if not d_runCMDresp["returncode"]:
+ b_status = True
self.l_runCMDresp.append(d_runCMDresp)
try:
- d_json = json.loads(d_runCMDresp['stdout'])
+ d_json = json.loads(d_runCMDresp["stdout"])
except:
- d_json = {'error' : 'could not parse resultant stdout'}
- return {
- 'status' : b_status,
- 'run' : d_runCMDresp,
- 'rep' : d_json
- }
+ d_json = {"error": "could not parse resultant stdout"}
+ return {"status": b_status, "run": d_runCMDresp, "rep": d_json}
+
class CHRS(Shexec):
- '''
+ """
A specialization of Shexec that runs CHRS
- '''
+ """
+
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -348,18 +354,17 @@ def login(self) -> dict:
Returns:
dict: results from login call.
"""
- str_cmd : str = self.string_clean("""
- chrs login --address %s --username %s --password %s
- """ % (
- self.options.CUBEurl,
- self.options.CUBEuser,
- self.options.CUBEpasswd
- ))
- d_login : dict = self.shell.job_run(str_cmd)
+ str_cmd: str = self.string_clean(
+ """
+ chrs login --cube %s --username %s --password %s
+ """
+ % (self.options.CUBEurl, self.options.CUBEuser, self.options.CUBEpasswd)
+ )
+ d_login: dict = self.shell.job_run(str_cmd)
self.env.INFO("\t$> %s" % str_cmd)
return d_login
- def pipeline_add(self, str_filename : str) -> dict:
+ def pipeline_add(self, str_filename: str) -> dict:
"""
Add a pipeline file
@@ -369,68 +374,65 @@ def pipeline_add(self, str_filename : str) -> dict:
Returns:
dict: results from pipeline addition
"""
- str_cmd : str = self.string_clean("""
+ str_cmd: str = self.string_clean(
+ """
chrs pipeline-file add %s
- """ % (str_filename))
- d_pipeline_add : dict = self.shell.job_run(str_cmd)
+ """
+ % (str_filename)
+ )
+ d_pipeline_add: dict = self.shell.job_run(str_cmd)
self.env.INFO("\t$> %s" % str_cmd)
return d_pipeline_add
def chrispl_onCUBEargs(self):
- '''
+ """
Return a string specifying the CUBE instance
- '''
- return {
- 'onCUBE': json.dumps(self.env.CUBE.onCUBE())
- }
+ """
+ return {"onCUBE": json.dumps(self.env.CUBE.onCUBE())}
- def __call__(self) ->dict:
- '''
+ def __call__(self) -> dict:
+ """
Entry point for adding pipelines and additionally registering
any plugin dependencies.
- '''
- b_status : bool = False
- d_runCMDresp : dict = {'returncode' : 1}
- d_json : dict = {'error' : 'all attempts to add pipeline failed'}
+ """
+ b_status: bool = False
+ d_runCMDresp: dict = {"returncode": 1}
+ d_json: dict = {"error": "all attempts to add pipeline failed"}
- if not d_runCMDresp['returncode']:
- b_status = True
+ if not d_runCMDresp["returncode"]:
+ b_status = True
self.l_runCMDresp.append(d_runCMDresp)
try:
- d_json = json.loads(d_runCMDresp['stdout'])
+ d_json = json.loads(d_runCMDresp["stdout"])
except:
- d_json = {'error' : 'could not parse resultant stdout'}
- return {
- 'status' : b_status,
- 'run' : d_runCMDresp,
- 'rep' : d_json
- }
+ d_json = {"error": "could not parse resultant stdout"}
+ return {"status": b_status, "run": d_runCMDresp, "rep": d_json}
class Register(Shexec):
- '''
+ """
A class to connect to a CUBE and facilitate plugin registration
- '''
+ """
def __init__(self, *args, **kwargs):
- self.env : data.env = None
- self.options : Namespace = None
+ self.env: data.env = None
+ self.options: Namespace = None
for k, v in kwargs.items():
- if k == 'env' : self.env = v
- if k == 'options' : self.options = v
-
- self.env.INFO("Connecting to CUBE and creating client object...", level =2)
- self.cl : client.Client = client.Client(
- self.env.CUBE.url,
- self.env.CUBE.user,
- self.env.CUBE.password
- )
- self.ld_workflowhist : list = []
- self.ld_topologicalNode : dict = {'data': []}
-
- def register_do(self, d_jsonRep : dict) -> dict:
+ if k == "env":
+ self.env = v
+ if k == "options":
+ self.options = v
+
+ self.env.INFO("Connecting to CUBE and creating client object...", level=2)
+ self.cl: client.Client = client.Client(
+ self.env.CUBE.url, self.env.CUBE.user, self.env.CUBE.password
+ )
+ self.ld_workflowhist: list = []
+ self.ld_topologicalNode: dict = {"data": []}
+
+ def register_do(self, d_jsonRep: dict) -> dict:
"""
The actual registration logic
@@ -439,23 +441,25 @@ def register_do(self, d_jsonRep : dict) -> dict:
"""
self.env.INFO("Communicating with CUBE to register plugin...")
try:
- d_apicall = self.cl.admin_upload_plugin(self.options.computenames, d_jsonRep)
- d_response = {
- 'status' : True,
- 'stdout' : d_apicall,
- 'stderr' : '',
- 'returncode' : 0
+ d_apicall = self.cl.admin_upload_plugin(
+ self.options.computenames, d_jsonRep
+ )
+ d_response = {
+ "status": True,
+ "stdout": d_apicall,
+ "stderr": "",
+ "returncode": 0,
}
except Exception as e:
- d_response = {
- 'status' : False,
- 'stdout' : 'an exception occurred -- check the server address (localhost can cause issues!)',
- 'stderr' : str(e),
- 'returncode' : 1
+ d_response = {
+ "status": False,
+ "stdout": "an exception occurred -- check the server address (localhost can cause issues!)",
+ "stderr": str(e),
+ "returncode": 1,
}
return d_response
- def __call__(self, d_jsonRep : dict) -> dict:
+ def __call__(self, d_jsonRep: dict) -> dict:
"""
The main entry point to register a plugin. The JSON representation
can be a "old" style description, i.e. that does not contain the
@@ -471,7 +475,7 @@ def __call__(self, d_jsonRep : dict) -> dict:
dict: the registration return
"""
- def assign_if_defined(str_key : str, d_cli : dict, d_jrep : dict) -> dict:
+ def assign_if_defined(str_key: str, d_cli: dict, d_jrep: dict) -> dict:
"""
If a exists in the and has non-zero value length
add to the dictionary.
@@ -484,49 +488,47 @@ def assign_if_defined(str_key : str, d_cli : dict, d_jrep : dict) -> dict:
Returns:
dict: a possibly updated d_jrep dictionary
"""
- b_status = False
- if not len(d_cli['dock_image']):
- self.env.ERROR('The parameter "--dock_image" must be set and have non-zero length!')
+ b_status = False
+ if not len(d_cli["dock_image"]):
+ self.env.ERROR(
+ 'The parameter "--dock_image" must be set and have non-zero length!'
+ )
if str_key in d_cli:
b_status = True
if len(d_cli[str_key]):
d_jrep[str_key] = d_cli[str_key]
else:
- if str_key == 'name': d_jrep[str_key] = pluginname(d_cli['dock_image'])
- if str_key == 'public_repo' : d_jrep[str_key] = \
- public_repo(d_cli['public_repobase'],
- d_cli['dock_image'])
+ if str_key == "name":
+ d_jrep[str_key] = pluginname(d_cli["dock_image"])
+ if str_key == "public_repo":
+ d_jrep[str_key] = public_repo(
+ d_cli["public_repobase"], d_cli["dock_image"]
+ )
return d_jrep, b_status
- d_register : dict = {
- 'status' : False,
- 'obj' : {
- 'run' : {
- 'stderr' : ""
- }
- }
- }
- b_statusAND : bool = True
- b_status : bool = False
- if d_jsonRep['status']:
- d_cli = vars(self.options)
- d_json = d_jsonRep['rep']
- for f in ['dock_image', 'name', 'public_repo']:
+ d_register: dict = {"status": False, "obj": {"run": {"stderr": ""}}}
+ b_statusAND: bool = True
+ b_status: bool = False
+ if d_jsonRep["status"]:
+ d_cli = vars(self.options)
+ d_json = d_jsonRep["rep"]
+ for f in ["dock_image", "name", "public_repo"]:
d_json, status = assign_if_defined(f, d_cli, d_json)
b_statusAND &= status
if not b_statusAND:
- d_register['obj']['run']['stderr'] = 'A required CLI parameter is missing!'
+ d_register["obj"]["run"][
+ "stderr"
+ ] = "A required CLI parameter is missing!"
else:
- d_register['obj']['run'] = self.register_do(d_json)
- self.cmd_checkResponse(d_register['obj']['run'])
+ d_register["obj"]["run"] = self.register_do(d_json)
+ self.cmd_checkResponse(d_register["obj"]["run"])
else:
d_register = {
- 'status' : b_status,
- 'message' : 'a failure occured',
- 'obj' : d_jsonRep
+ "status": b_status,
+ "message": "a failure occured",
+ "obj": d_jsonRep,
}
- if d_register['obj']['run'].get('status'):
- d_register['status'] = d_register['obj']['run'].get('status')
+ if d_register["obj"]["run"].get("status"):
+ d_register["status"] = d_register["obj"]["run"].get("status")
return d_register
-
diff --git a/plugin2cube/__main__.py b/plugin2cube/__main__.py
index 2cdd3cd..13d2818 100644
--- a/plugin2cube/__main__.py
+++ b/plugin2cube/__main__.py
@@ -2,27 +2,29 @@
import pudb
try:
- from . import plugin2cube
+ from . import plugin2cube
except:
- from plugin2cube import plugin2cube
-
-from pathlib import Path
-from argparse import ArgumentParser, \
- Namespace, \
- ArgumentDefaultsHelpFormatter, \
- RawTextHelpFormatter
+ from plugin2cube import plugin2cube
+
+from pathlib import Path
+from argparse import (
+ ArgumentParser,
+ Namespace,
+ ArgumentDefaultsHelpFormatter,
+ RawTextHelpFormatter,
+)
from importlib.metadata import Distribution
-__pkg = Distribution.from_name(__package__)
+__pkg = Distribution.from_name(__package__)
__version__ = __pkg.version
-import os, sys, json
-import pudb
-from pudb.remote import set_trace
-from state import data
+import os, sys, json
+import pudb
+from pudb.remote import set_trace
+from state import data
-Env = data.env()
+Env = data.env()
DISPLAY_TITLE = r"""
_ _ _____ _
@@ -35,9 +37,13 @@
|_| |___/
"""
-str_desc = DISPLAY_TITLE + """
+str_desc = (
+ DISPLAY_TITLE
+ + """
- -- version """ + __version__ + """ --
+ -- version """
+ + __version__
+ + """ --
Register a plugin to a CUBE instance.
@@ -54,8 +60,9 @@
discussed elsewhere.
"""
+)
-package_CLIself = """
+package_CLIself = """
--dock_image \\
[--nodockerpull] \\
[--name ] \\
@@ -178,21 +185,23 @@
"""
-def synopsis(ab_shortOnly = False):
+
+def synopsis(ab_shortOnly=False):
scriptName = os.path.basename(sys.argv[0])
- shortSynopsis = '''
+ shortSynopsis = f"""
NAME
plugin2cube
SYNOPSIS
- plugin2cube \ '''\
- + package_CLIself + '''
+ plugin2cube \
+ {package_CLIself}
- '''
+ """
- description = '''
+ description = (
+ """
DESCRIPTION
`plugin2cube` is a simple app that allows for the registration of a
@@ -222,138 +231,96 @@ def synopsis(ab_shortOnly = False):
//.../pl-
- ''' + package_CLIsynpsisArgs + package_CLIexample
+ """
+ + package_CLIsynpsisArgs
+ + package_CLIexample
+ )
if ab_shortOnly:
return shortSynopsis
else:
return shortSynopsis + description
-parser = ArgumentParser(
- description = '''
+
+parser = ArgumentParser(
+ description="""
A CLI app to upload a plugin to a CUBE instance.
-''',
- formatter_class = RawTextHelpFormatter
+""",
+ formatter_class=RawTextHelpFormatter,
)
parser.add_argument(
- '--version',
- default = False,
- dest = 'b_version',
- action = 'store_true',
- help = 'print version info'
-)
-parser.add_argument(
- '--man',
- default = False,
- action = 'store_true',
- help = 'show a man page'
-)
-parser.add_argument(
- '--osenv',
- default = False,
- action = 'store_true',
- help = 'show the base os environment'
-)
-parser.add_argument(
- '--synopsis',
- default = False,
- action = 'store_true',
- help = 'show a synopsis'
-)
-parser.add_argument(
- '--inputdir',
- default = './',
- help = 'optional directory specifying extra input-relative data'
-)
-parser.add_argument(
- '--outputdir',
- default = './',
- help = 'optional directory specifying location of any output data'
-)
-parser.add_argument(
- '--computenames',
- default = 'host',
- help = 'comma separated list of compute environments against which to register the plugin'
-)
-parser.add_argument(
- '--dock_image',
- default = '',
- help = 'name of the docker container'
-)
-parser.add_argument(
- '--nodockerpull',
- default = False,
- action = 'store_true',
- help = 'if specified, do not attempt to pull the image from a registry first'
-)
-parser.add_argument(
- '--name',
- default = '',
- help = 'plugin name within CUBE'
+ "--version",
+ default=False,
+ dest="b_version",
+ action="store_true",
+ help="print version info",
)
+parser.add_argument("--man", default=False, action="store_true", help="show a man page")
parser.add_argument(
- '--public_repo',
- default = '',
- help = 'repo hosting the container image'
+ "--osenv", default=False, action="store_true", help="show the base os environment"
)
parser.add_argument(
- '--public_repobase',
- default = 'https://github.com/FNNDSC',
- help = 'a default base public repo'
+ "--synopsis", default=False, action="store_true", help="show a synopsis"
)
parser.add_argument(
- '--pluginexec',
- default = '',
- help = 'plugin executable name for cookiecutter style pluginsp'
+ "--inputdir",
+ default="./",
+ help="optional directory specifying extra input-relative data",
)
parser.add_argument(
- '--jsonFile',
- default = '',
- help = 'plugin JSON representation file'
+ "--outputdir",
+ default="./",
+ help="optional directory specifying location of any output data",
)
parser.add_argument(
- '--CUBEurl',
- default = 'http://localhost:8000/api/v1/',
- help = 'CUBE URL'
+ "--computenames",
+ default="host",
+ help="comma separated list of compute environments against which to register the plugin",
)
+parser.add_argument("--dock_image", default="", help="name of the docker container")
parser.add_argument(
- '--CUBEuser',
- default = 'chirs',
- help = 'CUBE username'
+ "--nodockerpull",
+ default=False,
+ action="store_true",
+ help="if specified, do not attempt to pull the image from a registry first",
)
+parser.add_argument("--name", default="", help="plugin name within CUBE")
parser.add_argument(
- '--CUBEpasswd',
- default = 'chris1234',
- help = 'CUBE password'
+ "--public_repo", default="", help="repo hosting the container image"
)
parser.add_argument(
- '--verbosity',
- default = '0',
- help = 'verbosity level of app'
+ "--public_repobase",
+ default="https://github.com/FNNDSC",
+ help="a default base public repo",
)
parser.add_argument(
- "--debug",
- help = "if true, toggle telnet pudb debugging",
- dest = 'debug',
- action = 'store_true',
- default = False
+ "--pluginexec",
+ default="",
+ help="plugin executable name for cookiecutter style pluginsp",
)
+parser.add_argument("--jsonFile", default="", help="plugin JSON representation file")
parser.add_argument(
- "--debugTermSize",
- help = "the terminal 'cols,rows' size for debugging",
- default = '253,62'
+ "--CUBEurl", default="http://localhost:8000/api/v1/", help="CUBE URL"
)
+parser.add_argument("--CUBEuser", default="chirs", help="CUBE username")
+parser.add_argument("--CUBEpasswd", default="chris1234", help="CUBE password")
+parser.add_argument("--verbosity", default="0", help="verbosity level of app")
parser.add_argument(
- "--debugPort",
- help = "the debugging telnet port",
- default = '7900'
+ "--debug",
+ help="if true, toggle telnet pudb debugging",
+ dest="debug",
+ action="store_true",
+ default=False,
)
parser.add_argument(
- "--debugHost",
- help = "the debugging telnet host",
- default = '0.0.0.0'
+ "--debugTermSize",
+ help="the terminal 'cols,rows' size for debugging",
+ default="253,62",
)
+parser.add_argument("--debugPort", help="the debugging telnet port", default="7900")
+parser.add_argument("--debugHost", help="the debugging telnet host", default="0.0.0.0")
+
def Env_setup(options: Namespace):
"""
@@ -363,25 +330,26 @@ def Env_setup(options: Namespace):
options (Namespace): options passed from the CLI caller
"""
global Env
- status : bool = True
- options.inputdir = Path(options.inputdir)
- options.outputdir = Path(options.outputdir)
- Env.inputdir = options.inputdir
- Env.outputdir = options.outputdir
- Env.CUBE.url = str(options.CUBEurl)
- Env.CUBE.user = str(options.CUBEuser)
- Env.CUBE.password = str(options.CUBEpasswd)
+ status: bool = True
+ options.inputdir = Path(options.inputdir)
+ options.outputdir = Path(options.outputdir)
+ Env.inputdir = options.inputdir
+ Env.outputdir = options.outputdir
+ Env.CUBE.url = str(options.CUBEurl)
+ Env.CUBE.user = str(options.CUBEuser)
+ Env.CUBE.password = str(options.CUBEpasswd)
Env.debug_setup(
- debug = options.debug,
- termsize = options.debugTermSize,
- port = options.debugPort,
- host = options.debugHost
+ debug=options.debug,
+ termsize=options.debugTermSize,
+ port=options.debugPort,
+ host=options.debugHost,
)
if not len(options.dock_image):
Env.ERROR("The '--dock_image ' CLI MUST be specified!")
- status = False
+ status = False
return status
+
def earlyExit_check(args) -> int:
"""
Perform some preliminary checks
@@ -392,9 +360,9 @@ def earlyExit_check(args) -> int:
if args.man or args.synopsis:
print(str_desc)
if args.man:
- str_help = synopsis(False)
+ str_help = synopsis(False)
else:
- str_help = synopsis(True)
+ str_help = synopsis(True)
print(str_help)
return 1
if args.b_version:
@@ -402,29 +370,34 @@ def earlyExit_check(args) -> int:
return 1
return 0
+
def main(args=None):
"""
Main method for the programmatical calling of the plugin2cube
module
"""
global Env
- Env.version = plugin2cube.__version__
+ Env.version = plugin2cube.__version__
- options = parser.parse_args()
- retcode : int = 1
- if earlyExit_check(options): return 1
+ options = parser.parse_args()
+ retcode: int = 1
+ if earlyExit_check(options):
+ return 1
# set_trace(term_size=(253, 62), host = '0.0.0.0', port = 7900)
- Env.options = options
+ Env.options = options
if Env_setup(options):
Env.set_telnet_trace_if_specified()
- if int(options.verbosity) > 1: print(DISPLAY_TITLE)
- d_register = plugin2cube.plugin2cube(options = options, env = Env).run()
- if d_register['status']: retcode = 0
+ if int(options.verbosity) > 1:
+ print(DISPLAY_TITLE)
+ d_register = plugin2cube.plugin2cube(options=options, env=Env).run()
+ if d_register["status"]:
+ retcode = 0
- Env.INFO("terminating with code %d..." % retcode, level = 2)
+ Env.INFO("terminating with code %d..." % retcode, level=2)
return retcode
-if __name__ == '__main__':
+
+if __name__ == "__main__":
sys.exit(main(args))
diff --git a/plugin2cube/plugin2cube.py b/plugin2cube/plugin2cube.py
index a20494c..c4788a1 100644
--- a/plugin2cube/plugin2cube.py
+++ b/plugin2cube/plugin2cube.py
@@ -1,32 +1,34 @@
#!/usr/bin/env python
-__version__ = '2.2.12'
+__version__ = "2.2.14"
-from pathlib import Path
+from pathlib import Path
-import os, sys, json
-import pudb
-from pudb.remote import set_trace
+import os, sys, json
+import pudb
+from pudb.remote import set_trace
-from concurrent.futures import ThreadPoolExecutor
-from threading import current_thread
+from concurrent.futures import ThreadPoolExecutor
+from threading import current_thread
-from datetime import datetime, timezone
+from datetime import datetime, timezone
-from state import data
-from logic import behavior
-from control import action
+from state import data
+from logic import behavior
+from control import action
-class plugin2cube:
+class plugin2cube:
def __init__(self, *args, **kwargs):
"""
constructor
"""
- self.env = None
- self.options : Namespace = None
+ self.env = None
+ self.options: Namespace = None
for k, v in kwargs.items():
- if k == 'env' : self.env = v
- if k == 'options' : self.options = v
+ if k == "env":
+ self.env = v
+ if k == "options":
+ self.options = v
def prep_do(self) -> action.PluginRep:
"""
@@ -40,27 +42,24 @@ def prep_do(self) -> action.PluginRep:
plugin JSON representation
"""
- PLjson = action.PluginRep(
- env = self.env,
- options = self.options
- )
+ PLjson = action.PluginRep(env=self.env, options=self.options)
- self.env.INFO("Doing some quick prep...", level = 2)
+ self.env.INFO("Doing some quick prep...", level=2)
- self.env.DEBUG("plugin arguments...", level = 3)
- for k,v in self.options.__dict__.items():
- self.env.DEBUG("%25s: [%s]" % (k, v), level = 3)
- self.env.DEBUG("", level = 3)
+ self.env.DEBUG("plugin arguments...", level=3)
+ for k, v in self.options.__dict__.items():
+ self.env.DEBUG("%25s: [%s]" % (k, v), level=3)
+ self.env.DEBUG("", level=3)
if self.options.osenv:
self.env.DEBUG("base environment...")
- for k,v in os.environ.items():
- self.env.DEBUG("%25s: [%s]" % (k, v), level = 3)
+ for k, v in os.environ.items():
+ self.env.DEBUG("%25s: [%s]" % (k, v), level=3)
self.env.DEBUG("")
return PLjson
- def plugin_add(self, PLjson : action.PluginRep) -> dict:
+ def plugin_add(self, PLjson: action.PluginRep) -> dict:
"""
Add the described plugin to the specified CUBE.
@@ -73,17 +72,19 @@ def plugin_add(self, PLjson : action.PluginRep) -> dict:
dict: the JSON return from the CUBE API for registration
"""
- def file_timestamp(str_stamp : str = ""):
+ def file_timestamp(str_stamp: str = ""):
"""
Simple timestamp to file
Args:
str_prefix (str): an optional prefix string before the timestamp
"""
- timenow = lambda: datetime.now(timezone.utc).astimezone().isoformat()
- str_heartbeat : str = str(self.env.outputdir.joinpath('run-%s.log' % str_threadName))
- fl = open(str_heartbeat, 'a')
- fl.write('{}\t%s\n'.format(timenow()) % str_stamp)
+ timenow = lambda: datetime.now(timezone.utc).astimezone().isoformat()
+ str_heartbeat: str = str(
+ self.env.outputdir.joinpath("run-%s.log" % str_threadName)
+ )
+ fl = open(str_heartbeat, "a")
+ fl.write("{}\t%s\n".format(timenow()) % str_stamp)
fl.close()
def jsonRep_get() -> dict:
@@ -93,25 +94,33 @@ def jsonRep_get() -> dict:
Returns:
dict: JSON representation
"""
- d_jsonRep = PLjson()
+ d_jsonRep = PLjson()
return d_jsonRep
- register = action.Register(env = self.env, options = self.options)
- d_register : dict = None
- str_threadName : str = current_thread().getName()
- file_timestamp('START')
-
- self.env.INFO("Registering plugin %s..." % self.options.dock_image)
- d_register = register(jsonRep_get())
- if d_register['status']:
- self.env.INFO('Registration of plugin %s: OK!' % self.options.dock_image)
- self.env.INFO('\n%s' % json.dumps(d_register, indent = 4), level = 3)
+ register = action.Register(env=self.env, options=self.options)
+ d_register: dict = None
+ str_threadName: str = current_thread().getName()
+ file_timestamp("START")
+
+ self.env.INFO(
+ "Registering plugin %s..." % self.options.dock_image
+ )
+ d_register = register(jsonRep_get())
+ if d_register["status"]:
+ self.env.INFO(
+ "Registration of plugin %s: OK!"
+ % self.options.dock_image
+ )
+ self.env.INFO("\n%s" % json.dumps(d_register, indent=4), level=3)
else:
- self.env.INFO('Registration %s: Failed!' % self.options.dock_image)
- self.env.ERROR(d_register['obj']['run']['stderr'])
- self.env.ERROR('\n%s' % json.dumps(d_register, indent= 4), level = 2)
- file_timestamp('\n%s' % json.dumps(d_register, indent = 4))
- file_timestamp('END')
+ self.env.INFO(
+ "Registration %s: Failed!"
+ % self.options.dock_image
+ )
+ self.env.ERROR(d_register["obj"]["run"]["stderr"])
+ self.env.ERROR("\n%s" % json.dumps(d_register, indent=4), level=2)
+ file_timestamp("\n%s" % json.dumps(d_register, indent=4))
+ file_timestamp("END")
return d_register
def run(self) -> dict:
@@ -122,4 +131,3 @@ def run(self) -> dict:
dict: results from the registration
"""
return self.plugin_add(self.prep_do())
-