Skip to content

Commit

Permalink
Get environments from Kubernetes (#79)
Browse files Browse the repository at this point in the history
* Get environments from Kubernetes
  • Loading branch information
t-persson authored Dec 13, 2024
1 parent 3cab1b7 commit ce3e3b4
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 30 deletions.
14 changes: 7 additions & 7 deletions projects/etos_suite_runner/src/etos_suite_runner/esr.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __environment_request_status(self) -> None:
reason = condition.get("reason", "").lower()
if status == "false" and reason == "failed":
failed.append(condition)
if status == "false" and reason == "done":
if status == "true" and reason == "done":
success.append(condition)
if found and len(failed) > 0:
for condition in failed:
Expand All @@ -122,14 +122,13 @@ def __environment_request_status(self) -> None:
)
break
if found and len(success) == len(requests):
self.params.set_status(
"SUCCESS", "Successfully created an environment for test"
)
self.params.set_status("SUCCESS", None)
self.logger.info(
"Environment provider has finished creating an environment for test.",
extra={"user_log": True},
)
break
self.logger.info("Environmentrequest finished")

def __request_environment(self, ids: list[str]) -> None:
"""Request an environment from the environment provider.
Expand Down Expand Up @@ -175,6 +174,7 @@ def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> No
runners.
:param otel_context_carrier: a dict carrying current OpenTelemetry context.
"""
FORMAT_CONFIG.identifier = self.params.testrun_id
# OpenTelemetry contexts aren't propagated to threads automatically.
# For this reason otel_context needs to be reinstantiated due to
# this method running in a separate thread.
Expand All @@ -183,7 +183,7 @@ def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> No
try:
# TestRun identifier, correlates to the TERCC that should be sent when
# running in the ETOS Kubernetes controller environment.
if os.getenv("IDENTIFIER") is not None:
if self.params.etos_controller:
self.__environment_request_status()
else:
self.__request_environment(ids)
Expand Down Expand Up @@ -246,7 +246,7 @@ def run_suites(self, triggered: EiffelActivityTriggeredEvent):
runner.start_suites_and_wait(suites)
except EnvironmentProviderException as exc:
# Not running as part of the ETOS Kubernetes controller environment
if os.getenv("IDENTIFIER") is None:
if not self.params.etos_controller:
self.logger.info("Release test environment.")
self._release_environment()
self._record_exception(exc)
Expand Down Expand Up @@ -281,7 +281,7 @@ def _run(self):
self.logger.info("ETOS suite runner is starting up", extra={"user_log": True})
# TestRun identifier, correlates to the TERCC that should be sent when
# running in the ETOS Kubernetes controller environment.
if os.getenv("IDENTIFIER") is not None:
if self.params.etos_controller:
# We are probably running in the ETOS Kubernetes controller environment
self.logger.info("Checking TERCC")
if request_tercc(self.etos, testrun_id) is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, etos: ETOS) -> None:
self.issuer = {"name": "ETOS Suite Runner"}
self.environment_status = {"status": "NOT_STARTED", "error": None}

def set_status(self, status: str, error: str) -> None:
def set_status(self, status: str, error: Optional[str] = None) -> None:
"""Set environment provider status."""
with self.lock:
self.logger.debug("Setting environment status to %r, error %r", status, error)
Expand All @@ -59,6 +59,11 @@ def get_status(self) -> dict:
with self.lock:
return self.environment_status.copy()

@property
def etos_controller(self) -> bool:
"""Whether or not the suite runner is running as a part of the ETOS controller."""
return os.getenv("IDENTIFIER") is not None

def _get_id(
self,
config_key: str,
Expand Down Expand Up @@ -112,7 +117,7 @@ def main_suite_ids(self) -> list[str]:
taken from the Environment requests to the environment provider, and are
used to correlate the environments created with the main test suites.
"""
if os.getenv("IDENTIFIER") is None:
if not self.etos_controller:
return [str(uuid4()) for _ in range(len(self.test_suite))]
return [request.spec.id for request in self.environment_requests]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __decrypt(self, password: Union[str, dict]) -> str:
self.logger.debug("No encryption key available, won't decrypt password")
return password
password_value = password.get("$decrypt", {}).get("value")
if password_value is None:
if password_value is None or password_value == "":
self.logger.debug("No '$decrypt' JSONTas struct for password, won't decrypt password")
return password
return Fernet(key).decrypt(password_value).decode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""ETOS suite runner executor."""
import os
import logging
from multiprocessing.pool import ThreadPool

Expand Down Expand Up @@ -90,7 +89,7 @@ def start_suites_and_wait(self, suites: list[tuple[str, Suite]]):
raise exc
finally:
# Not running as part of controller
if os.getenv("IDENTIFIER") is None:
if not self.params.etos_controller:
self.logger.info("Release the full test environment.")
self._release_environment()

Expand Down
98 changes: 80 additions & 18 deletions projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import time
from typing import Iterator, Union

from eiffellib.events import EiffelTestSuiteStartedEvent
from eiffellib.events import EiffelTestSuiteStartedEvent, EiffelEnvironmentDefinedEvent
from environment_provider.lib.registry import ProviderRegistry
from environment_provider.environment import release_environment
from etos_lib import ETOS
Expand Down Expand Up @@ -53,11 +53,12 @@ class SubSuite(OpenTelemetryBase): # pylint:disable=too-many-instance-attribute
released = False
test_start_exception_caught = False

def __init__(self, etos: ETOS, environment: dict, main_suite_id: str) -> None:
def __init__(self, etos: ETOS, environment: dict, main_suite_id: str, controller: bool) -> None:
"""Initialize a sub suite."""
self.etos = etos
self.environment = environment
self.main_suite_id = main_suite_id
self.controller = controller
self.logger = logging.getLogger(f"SubSuite - {self.environment.get('name')}")
self.logger.addFilter(DuplicateFilter(self.logger))
self.otel_tracer = opentelemetry.trace.get_tracer(__name__)
Expand Down Expand Up @@ -213,7 +214,7 @@ def release(self, testrun_id) -> None:
"Check in test environment %r", self.environment["id"], extra={"user_log": True}
)
# Running as part of ETOS controller
if os.getenv("IDENTIFIER") is not None:
if self.controller:
success = self._delete_environment()
else:
success = self._release_environment(testrun_id)
Expand Down Expand Up @@ -268,16 +269,11 @@ def __del__(self):
"""Destructor."""
opentelemetry.context.detach(self.otel_context_token)

@property
def sub_suite_environments(self) -> Iterator[dict]:
def _sub_suite_environments_from_eiffel(self) -> Iterator[dict]:
"""All sub suite environments from the environment provider.
Each sub suite environment is an environment for the sub suites to execute in.
"""
self.logger.debug(
"Start collecting sub suite definitions (timeout=%ds).",
self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"),
)
environments = []
timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT")
while time.time() < timeout:
Expand All @@ -300,7 +296,13 @@ def sub_suite_environments(self) -> Iterator[dict]:
):
if environment["meta"]["id"] not in environments:
environments.append(environment["meta"]["id"])
yield environment
sub_suite_definition = self._download_sub_suite(environment)
if sub_suite_definition is None:
raise EnvironmentProviderException(
"URL to sub suite is missing", self.etos.config.get("task_id")
)
sub_suite_definition["id"] = environment["meta"]["id"]
yield sub_suite_definition
if activity_finished is not None:
if activity_finished["data"]["activityOutcome"]["conclusion"] != "SUCCESSFUL":
exc = EnvironmentProviderException(
Expand All @@ -318,6 +320,67 @@ def sub_suite_environments(self) -> Iterator[dict]:
self._record_exception(exc)
raise exc

def _sub_suite_environments_from_kubernetes(self) -> Iterator[dict]:
"""All sub suite environments from Kubernetes.
Each sub suite environment is an environment for the sub suites to execute in.
"""
environments = []
timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT")
while time.time() < timeout:
time.sleep(5)
for environment in self.params.environments:
if environment.spec.sub_suite_id in environments:
continue

# Send eiffel event for the ETR.
event = EiffelEnvironmentDefinedEvent()
event.meta.event_id = environment.metadata.name
url = f"{os.getenv('ETOS_API')}/v1alpha/testrun/{environment.metadata.name}"
self.etos.events.send(
event,
{"CONTEXT": self.test_suite_started_id},
{"name": environment.spec.name, "uri": url},
)

environments.append(environment.spec.sub_suite_id)
executor = environment.spec.executor
yield {
"executor": executor,
"id": environment.spec.sub_suite_id,
"name": environment.spec.name,
}
status = self.params.get_status()
if status.get("status") == "FAILURE":
exc = EnvironmentProviderException(
status.get("error"), self.etos.config.get("task_id")
)
self._record_exception(exc)
raise exc
if status.get("status") == "SUCCESS" and len(environments) > 0:
return
else: # pylint:disable=useless-else-on-loop
exc = TimeoutError(
f"Timed out after {self.etos.config.get('WAIT_FOR_ENVIRONMENT_TIMEOUT')} seconds."
)
self._record_exception(exc)
raise exc

@property
def sub_suite_environments(self) -> Iterator[dict]:
"""All sub suite environments from the environment provider.
Each sub suite environment is an environment for the sub suites to execute in.
"""
self.logger.debug(
"Start collecting sub suite definitions (timeout=%ds).",
self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"),
)
if self.params.etos_controller:
yield from self._sub_suite_environments_from_kubernetes()
else:
yield from self._sub_suite_environments_from_eiffel()

@property
def all_finished(self) -> bool:
"""Whether or not all sub suites are finished."""
Expand Down Expand Up @@ -400,17 +463,16 @@ def _start(self):
self.test_suite_started.meta.event_id,
extra={"user_log": True},
)
for sub_suite_environment in self.sub_suite_environments:
for sub_suite_definition in self.sub_suite_environments:
self.logger.info(
"Environment received. Starting up a sub suite", extra={"user_log": True}
)
sub_suite_definition = self._download_sub_suite(sub_suite_environment)
if sub_suite_definition is None:
raise EnvironmentProviderException(
"URL to sub suite is missing", self.etos.config.get("task_id")
)
sub_suite_definition["id"] = sub_suite_environment["meta"]["id"]
sub_suite = SubSuite(self.etos, sub_suite_definition, self.test_suite_started_id)
sub_suite = SubSuite(
self.etos,
sub_suite_definition,
self.test_suite_started_id,
self.params.etos_controller,
)
self.sub_suites.append(sub_suite)
thread = threading.Thread(
target=sub_suite.start,
Expand Down

0 comments on commit ce3e3b4

Please sign in to comment.