From ce3e3b402e93de65eea976722c9df1a04425ba85 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Fri, 13 Dec 2024 10:41:24 +0100 Subject: [PATCH] Get environments from Kubernetes (#79) * Get environments from Kubernetes --- .../src/etos_suite_runner/esr.py | 14 +-- .../etos_suite_runner/lib/esr_parameters.py | 9 +- .../src/etos_suite_runner/lib/executor.py | 2 +- .../src/etos_suite_runner/lib/runner.py | 3 +- .../src/etos_suite_runner/lib/suite.py | 98 +++++++++++++++---- 5 files changed, 96 insertions(+), 30 deletions(-) diff --git a/projects/etos_suite_runner/src/etos_suite_runner/esr.py b/projects/etos_suite_runner/src/etos_suite_runner/esr.py index 613bb96..f6bd84b 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/esr.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/esr.py @@ -110,7 +110,7 @@ def __environment_request_status(self) -> None: reason = condition.get("reason", "").lower() if status == "false" and reason == "failed": failed.append(condition) - if status == "false" and reason == "done": + if status == "true" and reason == "done": success.append(condition) if found and len(failed) > 0: for condition in failed: @@ -122,14 +122,13 @@ def __environment_request_status(self) -> None: ) break if found and len(success) == len(requests): - self.params.set_status( - "SUCCESS", "Successfully created an environment for test" - ) + self.params.set_status("SUCCESS", None) self.logger.info( "Environment provider has finished creating an environment for test.", extra={"user_log": True}, ) break + self.logger.info("Environmentrequest finished") def __request_environment(self, ids: list[str]) -> None: """Request an environment from the environment provider. @@ -175,6 +174,7 @@ def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> No runners. :param otel_context_carrier: a dict carrying current OpenTelemetry context. """ + FORMAT_CONFIG.identifier = self.params.testrun_id # OpenTelemetry contexts aren't propagated to threads automatically. # For this reason otel_context needs to be reinstantiated due to # this method running in a separate thread. @@ -183,7 +183,7 @@ def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> No try: # TestRun identifier, correlates to the TERCC that should be sent when # running in the ETOS Kubernetes controller environment. - if os.getenv("IDENTIFIER") is not None: + if self.params.etos_controller: self.__environment_request_status() else: self.__request_environment(ids) @@ -246,7 +246,7 @@ def run_suites(self, triggered: EiffelActivityTriggeredEvent): runner.start_suites_and_wait(suites) except EnvironmentProviderException as exc: # Not running as part of the ETOS Kubernetes controller environment - if os.getenv("IDENTIFIER") is None: + if not self.params.etos_controller: self.logger.info("Release test environment.") self._release_environment() self._record_exception(exc) @@ -281,7 +281,7 @@ def _run(self): self.logger.info("ETOS suite runner is starting up", extra={"user_log": True}) # TestRun identifier, correlates to the TERCC that should be sent when # running in the ETOS Kubernetes controller environment. - if os.getenv("IDENTIFIER") is not None: + if self.params.etos_controller: # We are probably running in the ETOS Kubernetes controller environment self.logger.info("Checking TERCC") if request_tercc(self.etos, testrun_id) is None: diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py index f6e1fb3..7d6f345 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py @@ -44,7 +44,7 @@ def __init__(self, etos: ETOS) -> None: self.issuer = {"name": "ETOS Suite Runner"} self.environment_status = {"status": "NOT_STARTED", "error": None} - def set_status(self, status: str, error: str) -> None: + def set_status(self, status: str, error: Optional[str] = None) -> None: """Set environment provider status.""" with self.lock: self.logger.debug("Setting environment status to %r, error %r", status, error) @@ -59,6 +59,11 @@ def get_status(self) -> dict: with self.lock: return self.environment_status.copy() + @property + def etos_controller(self) -> bool: + """Whether or not the suite runner is running as a part of the ETOS controller.""" + return os.getenv("IDENTIFIER") is not None + def _get_id( self, config_key: str, @@ -112,7 +117,7 @@ def main_suite_ids(self) -> list[str]: taken from the Environment requests to the environment provider, and are used to correlate the environments created with the main test suites. """ - if os.getenv("IDENTIFIER") is None: + if not self.etos_controller: return [str(uuid4()) for _ in range(len(self.test_suite))] return [request.spec.id for request in self.environment_requests] diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py index 4210264..186052f 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py @@ -64,7 +64,7 @@ def __decrypt(self, password: Union[str, dict]) -> str: self.logger.debug("No encryption key available, won't decrypt password") return password password_value = password.get("$decrypt", {}).get("value") - if password_value is None: + if password_value is None or password_value == "": self.logger.debug("No '$decrypt' JSONTas struct for password, won't decrypt password") return password return Fernet(key).decrypt(password_value).decode() diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py index ce5b812..76ae107 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/runner.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS suite runner executor.""" -import os import logging from multiprocessing.pool import ThreadPool @@ -90,7 +89,7 @@ def start_suites_and_wait(self, suites: list[tuple[str, Suite]]): raise exc finally: # Not running as part of controller - if os.getenv("IDENTIFIER") is None: + if not self.params.etos_controller: self.logger.info("Release the full test environment.") self._release_environment() diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py index 3c37f3a..dd6743c 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py @@ -21,7 +21,7 @@ import time from typing import Iterator, Union -from eiffellib.events import EiffelTestSuiteStartedEvent +from eiffellib.events import EiffelTestSuiteStartedEvent, EiffelEnvironmentDefinedEvent from environment_provider.lib.registry import ProviderRegistry from environment_provider.environment import release_environment from etos_lib import ETOS @@ -53,11 +53,12 @@ class SubSuite(OpenTelemetryBase): # pylint:disable=too-many-instance-attribute released = False test_start_exception_caught = False - def __init__(self, etos: ETOS, environment: dict, main_suite_id: str) -> None: + def __init__(self, etos: ETOS, environment: dict, main_suite_id: str, controller: bool) -> None: """Initialize a sub suite.""" self.etos = etos self.environment = environment self.main_suite_id = main_suite_id + self.controller = controller self.logger = logging.getLogger(f"SubSuite - {self.environment.get('name')}") self.logger.addFilter(DuplicateFilter(self.logger)) self.otel_tracer = opentelemetry.trace.get_tracer(__name__) @@ -213,7 +214,7 @@ def release(self, testrun_id) -> None: "Check in test environment %r", self.environment["id"], extra={"user_log": True} ) # Running as part of ETOS controller - if os.getenv("IDENTIFIER") is not None: + if self.controller: success = self._delete_environment() else: success = self._release_environment(testrun_id) @@ -268,16 +269,11 @@ def __del__(self): """Destructor.""" opentelemetry.context.detach(self.otel_context_token) - @property - def sub_suite_environments(self) -> Iterator[dict]: + def _sub_suite_environments_from_eiffel(self) -> Iterator[dict]: """All sub suite environments from the environment provider. Each sub suite environment is an environment for the sub suites to execute in. """ - self.logger.debug( - "Start collecting sub suite definitions (timeout=%ds).", - self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"), - ) environments = [] timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT") while time.time() < timeout: @@ -300,7 +296,13 @@ def sub_suite_environments(self) -> Iterator[dict]: ): if environment["meta"]["id"] not in environments: environments.append(environment["meta"]["id"]) - yield environment + sub_suite_definition = self._download_sub_suite(environment) + if sub_suite_definition is None: + raise EnvironmentProviderException( + "URL to sub suite is missing", self.etos.config.get("task_id") + ) + sub_suite_definition["id"] = environment["meta"]["id"] + yield sub_suite_definition if activity_finished is not None: if activity_finished["data"]["activityOutcome"]["conclusion"] != "SUCCESSFUL": exc = EnvironmentProviderException( @@ -318,6 +320,67 @@ def sub_suite_environments(self) -> Iterator[dict]: self._record_exception(exc) raise exc + def _sub_suite_environments_from_kubernetes(self) -> Iterator[dict]: + """All sub suite environments from Kubernetes. + + Each sub suite environment is an environment for the sub suites to execute in. + """ + environments = [] + timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT") + while time.time() < timeout: + time.sleep(5) + for environment in self.params.environments: + if environment.spec.sub_suite_id in environments: + continue + + # Send eiffel event for the ETR. + event = EiffelEnvironmentDefinedEvent() + event.meta.event_id = environment.metadata.name + url = f"{os.getenv('ETOS_API')}/v1alpha/testrun/{environment.metadata.name}" + self.etos.events.send( + event, + {"CONTEXT": self.test_suite_started_id}, + {"name": environment.spec.name, "uri": url}, + ) + + environments.append(environment.spec.sub_suite_id) + executor = environment.spec.executor + yield { + "executor": executor, + "id": environment.spec.sub_suite_id, + "name": environment.spec.name, + } + status = self.params.get_status() + if status.get("status") == "FAILURE": + exc = EnvironmentProviderException( + status.get("error"), self.etos.config.get("task_id") + ) + self._record_exception(exc) + raise exc + if status.get("status") == "SUCCESS" and len(environments) > 0: + return + else: # pylint:disable=useless-else-on-loop + exc = TimeoutError( + f"Timed out after {self.etos.config.get('WAIT_FOR_ENVIRONMENT_TIMEOUT')} seconds." + ) + self._record_exception(exc) + raise exc + + @property + def sub_suite_environments(self) -> Iterator[dict]: + """All sub suite environments from the environment provider. + + Each sub suite environment is an environment for the sub suites to execute in. + """ + self.logger.debug( + "Start collecting sub suite definitions (timeout=%ds).", + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"), + ) + if self.params.etos_controller: + yield from self._sub_suite_environments_from_kubernetes() + else: + yield from self._sub_suite_environments_from_eiffel() + @property def all_finished(self) -> bool: """Whether or not all sub suites are finished.""" @@ -400,17 +463,16 @@ def _start(self): self.test_suite_started.meta.event_id, extra={"user_log": True}, ) - for sub_suite_environment in self.sub_suite_environments: + for sub_suite_definition in self.sub_suite_environments: self.logger.info( "Environment received. Starting up a sub suite", extra={"user_log": True} ) - sub_suite_definition = self._download_sub_suite(sub_suite_environment) - if sub_suite_definition is None: - raise EnvironmentProviderException( - "URL to sub suite is missing", self.etos.config.get("task_id") - ) - sub_suite_definition["id"] = sub_suite_environment["meta"]["id"] - sub_suite = SubSuite(self.etos, sub_suite_definition, self.test_suite_started_id) + sub_suite = SubSuite( + self.etos, + sub_suite_definition, + self.test_suite_started_id, + self.params.etos_controller, + ) self.sub_suites.append(sub_suite) thread = threading.Thread( target=sub_suite.start,