Skip to content

Commit

Permalink
Get environments from Kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
t-persson committed Dec 12, 2024
1 parent 3cab1b7 commit c5a2bf3
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 21 deletions.
8 changes: 4 additions & 4 deletions projects/etos_suite_runner/src/etos_suite_runner/esr.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __environment_request_status(self) -> None:
reason = condition.get("reason", "").lower()
if status == "false" and reason == "failed":
failed.append(condition)
if status == "false" and reason == "done":
if status == "true" and reason == "done":
success.append(condition)
if found and len(failed) > 0:
for condition in failed:
Expand All @@ -122,14 +122,13 @@ def __environment_request_status(self) -> None:
)
break
if found and len(success) == len(requests):
self.params.set_status(
"SUCCESS", "Successfully created an environment for test"
)
self.params.set_status("SUCCESS", None)
self.logger.info(
"Environment provider has finished creating an environment for test.",
extra={"user_log": True},
)
break
self.logger.info("Environmentrequest finished")

def __request_environment(self, ids: list[str]) -> None:
"""Request an environment from the environment provider.
Expand Down Expand Up @@ -175,6 +174,7 @@ def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> No
runners.
:param otel_context_carrier: a dict carrying current OpenTelemetry context.
"""
FORMAT_CONFIG.identifier = self.params.testrun_id
# OpenTelemetry contexts aren't propagated to threads automatically.
# For this reason otel_context needs to be reinstantiated due to
# this method running in a separate thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, etos: ETOS) -> None:
self.issuer = {"name": "ETOS Suite Runner"}
self.environment_status = {"status": "NOT_STARTED", "error": None}

def set_status(self, status: str, error: str) -> None:
def set_status(self, status: str, error: Optional[str] = None) -> None:
"""Set environment provider status."""
with self.lock:
self.logger.debug("Setting environment status to %r, error %r", status, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __decrypt(self, password: Union[str, dict]) -> str:
self.logger.debug("No encryption key available, won't decrypt password")
return password
password_value = password.get("$decrypt", {}).get("value")
if password_value is None:
if password_value is None or password_value == "":
self.logger.debug("No '$decrypt' JSONTas struct for password, won't decrypt password")
return password
return Fernet(key).decrypt(password_value).decode()
Expand Down
86 changes: 71 additions & 15 deletions projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import time
from typing import Iterator, Union

from eiffellib.events import EiffelTestSuiteStartedEvent
from eiffellib.events import EiffelTestSuiteStartedEvent, EiffelEnvironmentDefinedEvent
from environment_provider.lib.registry import ProviderRegistry
from environment_provider.environment import release_environment
from etos_lib import ETOS
Expand Down Expand Up @@ -268,16 +268,11 @@ def __del__(self):
"""Destructor."""
opentelemetry.context.detach(self.otel_context_token)

@property
def sub_suite_environments(self) -> Iterator[dict]:
def _sub_suite_environments_from_eiffel(self) -> Iterator[dict]:
"""All sub suite environments from the environment provider.
Each sub suite environment is an environment for the sub suites to execute in.
"""
self.logger.debug(
"Start collecting sub suite definitions (timeout=%ds).",
self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"),
)
environments = []
timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT")
while time.time() < timeout:
Expand All @@ -300,7 +295,13 @@ def sub_suite_environments(self) -> Iterator[dict]:
):
if environment["meta"]["id"] not in environments:
environments.append(environment["meta"]["id"])
yield environment
sub_suite_definition = self._download_sub_suite(environment)
if sub_suite_definition is None:
raise EnvironmentProviderException(
"URL to sub suite is missing", self.etos.config.get("task_id")
)
sub_suite_definition["id"] = environment["meta"]["id"]
yield sub_suite_definition
if activity_finished is not None:
if activity_finished["data"]["activityOutcome"]["conclusion"] != "SUCCESSFUL":
exc = EnvironmentProviderException(
Expand All @@ -318,6 +319,67 @@ def sub_suite_environments(self) -> Iterator[dict]:
self._record_exception(exc)
raise exc

def _sub_suite_environments_from_kubernetes(self) -> Iterator[dict]:
"""All sub suite environments from Kubernetes.
Each sub suite environment is an environment for the sub suites to execute in.
"""
environments = []
timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT")
while time.time() < timeout:
time.sleep(5)
for environment in self.params.environments:
if environment.spec.sub_suite_id in environments:
continue

# Send eiffel event for the ETR.
event = EiffelEnvironmentDefinedEvent()
event.meta.event_id = environment.metadata.name
url = f"{os.getenv('ETOS_API')}/v1alpha/testrun/{environment.metadata.name}"
self.etos.events.send(
event,
{"CONTEXT": self.test_suite_started_id},
{"name": environment.spec.name, "uri": url},
)

environments.append(environment.spec.sub_suite_id)
executor = environment.spec.executor
yield {
"executor": executor,
"id": environment.spec.sub_suite_id,
"name": environment.spec.name,
}
status = self.params.get_status()
if status.get("status") == "FAILURE":
exc = EnvironmentProviderException(
status.get("error"), self.etos.config.get("task_id")
)
self._record_exception(exc)
raise exc
if status.get("status") == "SUCCESS" and len(environments) > 0:
return
else: # pylint:disable=useless-else-on-loop
exc = TimeoutError(
f"Timed out after {self.etos.config.get('WAIT_FOR_ENVIRONMENT_TIMEOUT')} seconds."
)
self._record_exception(exc)
raise exc

@property
def sub_suite_environments(self) -> Iterator[dict]:
"""All sub suite environments from the environment provider.
Each sub suite environment is an environment for the sub suites to execute in.
"""
self.logger.debug(
"Start collecting sub suite definitions (timeout=%ds).",
self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"),
)
if os.getenv("IDENTIFIER") is not None:
yield from self._sub_suite_environments_from_kubernetes()
else:
yield from self._sub_suite_environments_from_eiffel()

@property
def all_finished(self) -> bool:
"""Whether or not all sub suites are finished."""
Expand Down Expand Up @@ -400,16 +462,10 @@ def _start(self):
self.test_suite_started.meta.event_id,
extra={"user_log": True},
)
for sub_suite_environment in self.sub_suite_environments:
for sub_suite_definition in self.sub_suite_environments:
self.logger.info(
"Environment received. Starting up a sub suite", extra={"user_log": True}
)
sub_suite_definition = self._download_sub_suite(sub_suite_environment)
if sub_suite_definition is None:
raise EnvironmentProviderException(
"URL to sub suite is missing", self.etos.config.get("task_id")
)
sub_suite_definition["id"] = sub_suite_environment["meta"]["id"]
sub_suite = SubSuite(self.etos, sub_suite_definition, self.test_suite_started_id)
self.sub_suites.append(sub_suite)
thread = threading.Thread(
Expand Down

0 comments on commit c5a2bf3

Please sign in to comment.