Skip to content

Commit

Permalink
Merge pull request #8 from EVOLVED-5G/Flow
Browse files Browse the repository at this point in the history
Flow
  • Loading branch information
NaniteBased authored Oct 10, 2022
2 parents 7e973cc + bed2f10 commit ab5d8eb
Show file tree
Hide file tree
Showing 41 changed files with 1,729 additions and 931 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
**10/10/2022** [Version 3.6.0]

- Implemented Child tasks, flow control:
- Sequence
- Parallel
- Repeat
- While
- Select
- Added Task labels
- Updated log views in dashboard
- Documentation reorganization
- Bug fixes

**30/06/2022** [Version 3.5.0 - Release A]

- Add Variables parameter to Robot Framework task
Expand Down
2 changes: 1 addition & 1 deletion Composer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .composer import Composer
from .platform_configuration import PlatformConfiguration
from .platform_configuration import PlatformConfiguration, TaskDefinition
67 changes: 39 additions & 28 deletions Composer/composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@
class Composer:
facility: Facility = None

@staticmethod
def getMessageAction(severity: str, message: str) -> ActionInformation:
action = ActionInformation()
action.TaskName = "Run.Message"
action.Order = -9999
action.Config = {'Severity': severity, 'Message': message}
return action

@staticmethod
def getMessageTask(severity: str, message: str) -> TaskDefinition:
task = TaskDefinition()
task.Task = Message
task.Params = {'Severity': severity, 'Message': message}
return task

@classmethod
def Compose(cls, descriptor: ExperimentDescriptor) -> PlatformConfiguration:
def _messageAction(severity: str, message: str) -> ActionInformation:
action = ActionInformation()
action.TaskName = "Run.Message"
action.Order = -9999
action.Config = {'Severity': severity, 'Message': message}
return action

def _messageTask(severity: str, message: str) -> TaskDefinition:
task = TaskDefinition()
task.Task = Message
task.Params = {'Severity': severity, 'Message': message}
return task

if cls.facility is None:
cls.facility = Facility()

Expand Down Expand Up @@ -54,17 +56,17 @@ def _messageTask(severity: str, message: str) -> TaskDefinition:
configuration.NetworkServices.append(nsInfo)
except Exception as e:
errored = True
actions.append(
_messageAction("ERROR",
f"Exception while obtaining information about network service {nsId}: {e}"))
actions.append(cls.getMessageAction(
"ERROR", f"Exception while obtaining information about network service {nsId}: {e}"))

if not errored:
nest, error = cls.composeNest(descriptor.Slice, descriptor.Scenario, configuration.NetworkServices)
if error is None:
configuration.Nest = nest
else:
errored = True
actions.append(_messageAction("ERROR", f'Error while generating NEST data for experiment: {error}'))
actions.append(cls.getMessageAction(
"ERROR", f'Error while generating NEST data for experiment: {error}'))

if not errored:
if descriptor.Type == ExperimentType.MONROE:
Expand All @@ -78,28 +80,21 @@ def _messageTask(severity: str, message: str) -> TaskDefinition:
if len(testcaseActions) != 0:
actions.extend(testcaseActions)
else:
actions.append(_messageAction("WARNING", # Notify, but do not cancel execution
f'TestCase "{testcase}" did not generate any actions'))
actions.append(cls.getMessageAction( # Notify, but do not cancel execution
"WARNING", f'TestCase "{testcase}" did not generate any actions'))
panels.extend(cls.facility.GetTestCaseDashboards(testcase))
else:
delay = ActionInformation()
delay.TaskName = "Run.Delay"
delay.Config = {'Time': descriptor.Duration*60}
actions.append(delay)

actions.sort(key=lambda action: action.Order) # Sort by Order
actions.sort(key=lambda action: action.Order) # Sort by Order (only those at first level)
requirements = set()

for action in actions:
requirements.update(action.Requirements)

task = cls.getTaskClass(action.TaskName)
if task is None:
taskDefinition = _messageTask("ERROR", f"Could not find task {action.TaskName}")
else:
taskDefinition = TaskDefinition()
taskDefinition.Params = action.Config
taskDefinition.Task = task
taskDefinition = cls.getTaskDefinition(action)
configuration.RunTasks.append(taskDefinition)

configuration.Requirements = list(requirements)
Expand All @@ -117,6 +112,22 @@ def getTaskClass(taskName: str):
Log.E(f'Task "{taskName}" not found')
return None

@classmethod
def getTaskDefinition(cls, action: ActionInformation):
task = cls.getTaskClass(action.TaskName)
if task is None:
taskDefinition = cls.getMessageTask("ERROR", f"Could not find task {action.TaskName}")
else:
taskDefinition = TaskDefinition()
taskDefinition.Params = action.Config
taskDefinition.Task = task
taskDefinition.Label = action.Label
taskDefinition.Children = []
for child in action.Children:
childDefinition = cls.getTaskDefinition(child) # Either will be a _real_ task or a message
taskDefinition.Children.append(childDefinition)
return taskDefinition

@classmethod
def composeNest(cls, baseSlice: str, scenario: str, nss: List[NsInfo]) -> Tuple[Dict, Optional[str]]:
"""Returns the NEST as Dict (possibly empty) and a string indicating the error (None in case of success)"""
Expand Down
8 changes: 8 additions & 0 deletions Composer/platform_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ class TaskDefinition:
def __init__(self):
self.Task: ClassVar = None
self.Params: Dict = {}
self.Label: str = ''
self.Children: List[TaskDefinition] = []

def GetTaskInstance(self, logMethod, parent, params):
taskInstance = self.Task(logMethod, parent, params)
taskInstance.Label = self.Label
taskInstance.Children = self.Children
return taskInstance


class PlatformConfiguration:
Expand Down
2 changes: 1 addition & 1 deletion Executor/Tasks/Evolved5g/jenkins_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def Run(self):
try:
self.client = self.getApiClient()
except Exception as e:
self.Log(Level.Error, f"Unable to create Jenkins API client: {e}")
self.Log(Level.ERROR, f"Unable to create Jenkins API client: {e}")
self.client = None

def getApiClient(self) -> Evolved5gJenkinsApi:
Expand Down
2 changes: 1 addition & 1 deletion Executor/Tasks/Evolved5g/nef_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def Run(self):
try:
self.client = self.getApiClient()
except Exception as e:
self.Log(Level.Error, f"Unable to create NEF Emulator client: {e}")
self.Log(Level.ERROR, f"Unable to create NEF Emulator client: {e}")
self.client = None

def getApiClient(self) -> Evolved5gNefEmulator:
Expand Down
3 changes: 3 additions & 0 deletions Executor/Tasks/Flow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .parallel import Parallel
from .sequences import Sequence, Repeat
from .conditionals import While, Select
165 changes: 165 additions & 0 deletions Executor/Tasks/Flow/conditionals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from Helper import Level
from .sequences import SequenceBase
import re


class ConditionalSequenceBase(SequenceBase):
@staticmethod
def getClassName(instance) -> str:
return type(instance).__name__

def dictIsValidCondition(self, condition: dict, errorPrefix: str) -> bool:
if not isinstance(condition, dict):
self.Log(Level.ERROR, f'{errorPrefix} Expected a dictionary, got {self.getClassName(condition)}. Aborting')
return False

evaluate = "Evaluate" in condition.keys()
key = "Key" in condition.keys()

if evaluate == key:
if evaluate:
self.Log(Level.ERROR, f"{errorPrefix} Only one of 'Evaluate` and 'Key' can be defined "
f"at the same time. Aborting")
else:
self.Log(Level.ERROR, f"{errorPrefix} Either of 'Evaluate` or 'Key' must be defined. Aborting")
return False
return True

@staticmethod
def getConditionText(evaluate, key, pattern, negate) -> str:
if evaluate is not None:
return f"Expression '{evaluate}' (expanded) is True"
else:
matchPattern = f"match regex '{pattern}'"
return f"'{key}' {('does not ' if negate else '')} {('exist' if pattern is None else matchPattern)}"

@staticmethod
def regexConditionIsTrue(key, collection, regex) -> bool:
if key in collection.keys():
if regex is None:
return True
else:
value = str(collection[key])
return regex.match(value) is not None
return False


class While(ConditionalSequenceBase):
def __init__(self, logMethod, parent, params):
super().__init__("While", logMethod, parent, params)
self.paramRules = { # Do not generate 'Key' or 'Evaluate' by default. Will be checked by dictIsValidCondition.
'Pattern': (None, False),
'Negate': (False, False),
'MaxIterations': (None, False)}

def inDepthSanitizeParams(self):
return self.dictIsValidCondition(self.params, "Invalid condition:")

def runMany(self):
evaluate = self.params.get('Evaluate', None)
key = self.params.get("Key", None)
pattern = self.params["Pattern"]
negate = self.params["Negate"]
maxIterations = self.params["MaxIterations"]
regex = re.compile(pattern) if pattern is not None else None
conditionText = self.getConditionText(evaluate, key, pattern, negate)

goOn = True
iteration = 0
while goOn:
if maxIterations is None or iteration < maxIterations:
if evaluate is not None:
try:
conditionIsTrue = eval(evaluate) is True
except Exception as e:
self.Log(Level.DEBUG, f"Exception while evaluating expression '{evaluate}' ({e}): "
f"Condition will be considered as not verified. Moving on.")
conditionIsTrue = False
else:
conditionIsTrue = self.regexConditionIsTrue(key, self.parent.Params, regex)

goOn = not conditionIsTrue if negate else conditionIsTrue

if goOn:
self.Log(Level.INFO, f"Condition ({conditionText}) verified. Starting iteration {iteration}")

flowState = {'Iter0': iteration, 'Iter1': iteration + 1}

for index, child in enumerate(self.Children, start=1):
self.runOne(child, f'It{iteration}Seq{index}', flowState)

else:
self.Log(Level.INFO, f"Condition ({conditionText}) not verified. While loop finalized.")
else:
self.Log(Level.INFO, f"Maximum number of iterations ({maxIterations}) reached. While loop finalized.")
goOn = False

iteration += 1


class Select(ConditionalSequenceBase):
def __init__(self, logMethod, parent, params):
super().__init__("Select", logMethod, parent, params)
self.paramRules = {'Conditions': (None, True)}

def inDepthSanitizeParams(self):
conditions = self.params["Conditions"]

if not isinstance(conditions, list):
self.Log(Level.ERROR,
f"Invalid 'Conditions' type: expected list, got {self.getClassName(conditions)}. Aborting")
return False

numConditions = len(conditions)
numChildren = len(self.Children)
if numConditions > numChildren or numConditions < numChildren-1:
self.Log(Level.ERROR, f"Invalid number of conditions ({numConditions}): Must be the same or one less than "
f"child tasks ({numChildren}). Aborting")

for index, condition in enumerate(conditions):
if not self.dictIsValidCondition(condition, f"Invalid condition in position {index}:"):
return False
return True

def runMany(self):
conditions = self.params["Conditions"]

needle = None
conditionText = None
for index, condition in enumerate(conditions):
evaluate = condition.get("Evaluate", None)

if evaluate is not None:
try:
if eval(evaluate) is True:
needle = index
conditionText = self.getConditionText(evaluate, None, None, None)
break
except Exception as e:
self.Log(Level.DEBUG, f"Exception while evaluating expression '{evaluate}' ({e}): "
f"Condition will be considered as not verified. Moving on.")
else:
key = condition["Key"]
pattern = condition.get("Pattern", None)
negate = condition.get("Negate", False)
regex = re.compile(pattern) if pattern is not None else None

verified = self.regexConditionIsTrue(key, self.parent.Params, regex)
verified = not verified if negate else verified

if verified:
needle = index
conditionText = self.getConditionText(None, key, pattern, negate)
break

else: # Nothing verified, check if there is a default branch
if len(conditions) < len(self.Children):
needle = len(self.Children) - 1
conditionText = "None of the previous conditions are verified (default)"

if needle is not None:
self.Log(Level.INFO, f'Condition "{conditionText}" verified for branch {needle}. Executing.')
flowState = {'Branch': needle}
self.runOne(self.Children[needle], f'Case{needle}', flowState)
else:
self.Log(Level.INFO, "Conditions not verified for any branch. Nothing executed.")
56 changes: 56 additions & 0 deletions Executor/Tasks/Flow/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from Task import Task
from Helper import Level
from threading import Thread
from Composer import TaskDefinition
from Experiment import Expander
from Executor import Verdict


class ChildInfo:
def __init__(self, definition: TaskDefinition):
self.Thread = None
self.TaskDefinition = definition
self.TaskInstance = None


class Parallel(Task):
def __init__(self, logMethod, parent, params):
super().__init__("Parallel", parent, params, logMethod, None)

def Run(self):
if len(self.Children) == 0:
self.Log(Level.WARNING, f"Skipping parallel execution: no children defined.")
return

self.Log(Level.INFO, f"Starting parallel execution ({len(self.Children)} children)")

children: [ChildInfo] = []

for index, child in enumerate(self.Children, start=1):
if child.Label is None:
child.Label = f"Br{index}"

flowState = {'Branch': index}
info = ChildInfo(child)
info.TaskInstance = child.GetTaskInstance(
self.Log, self.parent, Expander.ExpandDict(child.Params, self.parent, flowState))
info.Thread = Thread(target=self.runChild, args=(info.TaskInstance,))
children.append(info)

info.Thread.start()
self.Log(Level.DEBUG, f"Started branch {index}: {child.Label}")

for index, info in enumerate(children, start=1):
info.Thread.join()
self.parent.params.update(info.TaskInstance.Vault) # Propagate any published values
self.Log(Level.DEBUG, f"Branch {index} ({info.TaskDefinition.Label}) joined")
self.Verdict = Verdict.Max(self.Verdict, info.TaskInstance.Verdict)

self.Log(Level.INFO, f"Finished execution of all child tasks")

def runChild(self, taskInstance: Task):
try:
taskInstance.Start()
except Exception as e:
taskInstance.Verdict = Verdict.Error
self.Log(Level.ERROR, str(e))
Loading

0 comments on commit ab5d8eb

Please sign in to comment.