diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d651d85d..f125abfa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [UNRELEASED] +### Changed + +- Terraform output to use scrolling buffer. +- Terraform output handling to show errors. + ## [0.231.0-rc.0] - 2023-11-28 ### Authors @@ -14,7 +19,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Ara Ghukasyan <38226926+araghukas@users.noreply.github.com> - Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> - ### Added - check for `/bin/bash` AND `/bin/sh` (in that order) to execute bash leptons diff --git a/covalent/cloud_resource_manager/core.py b/covalent/cloud_resource_manager/core.py index 035efd412..4721fb70c 100644 --- a/covalent/cloud_resource_manager/core.py +++ b/covalent/cloud_resource_manager/core.py @@ -121,9 +121,9 @@ def get_plugin_settings( for key, value in executor_options.items(): try: settings_dict[key]["value"] = value - except: + except Exception: logger.error(f"No such option '{key}'. Use --help for available options") - sys.exit() + sys.exit(1) return settings_dict @@ -164,10 +164,9 @@ def __init__( self._terraform_log_env_vars = { "TF_LOG": "ERROR", "TF_LOG_PATH": os.path.join(self.executor_tf_path, "terraform-error.log"), - "PATH": "$PATH:/usr/bin", } - def _print_stdout(self, process: subprocess.Popen, print_callback: Callable) -> int: + def _poll_process(self, process: subprocess.Popen, print_callback: Callable) -> int: """ Print the stdout from the subprocess to console @@ -179,12 +178,10 @@ def _print_stdout(self, process: subprocess.Popen, print_callback: Callable) -> Return code of the process. """ - while (retcode := process.poll()) is None: - if (proc_stdout := process.stdout.readline()) and print_callback: - print_callback(proc_stdout.strip().decode("utf-8")) - return retcode - - # TODO: Return the command output along with return code + while (returncode := process.poll()) is None: + if print_callback: + print_callback(process.stdout.readline()) + return returncode def _parse_terraform_error_log(self) -> List[str]: """Parse the terraform error logs. @@ -235,16 +232,21 @@ def _get_resource_status( Returns: status: str - status of plugin """ - _, stderr = proc.communicate() + cmds = cmd.split(" ") tfstate_path = cmds[-1].split("=")[-1] - if stderr is None: - return self._terraform_error_validator(tfstate_path=tfstate_path) - else: - raise subprocess.CalledProcessError( - returncode=1, cmd=cmd, stderr=self._parse_terraform_error_log() + + returncode = self._poll_process(proc, print_callback=None) + stderr = proc.stderr.read() + if returncode != 0 and "No state file was found!" not in stderr: + print( + "Unable to get resource status due to the following error:\n\n", + stderr, + file=sys.stderr, ) + return self._terraform_error_validator(tfstate_path=tfstate_path) + def _log_error_msg(self, cmd) -> None: """ Log error msg with valid command to terraform-erro.log @@ -261,7 +263,6 @@ def _log_error_msg(self, cmd) -> None: def _run_in_subprocess( self, cmd: str, - workdir: str, env_vars: Optional[Dict[str, str]] = None, print_callback: Optional[Callable] = None, ) -> Union[None, str]: @@ -270,39 +271,50 @@ def _run_in_subprocess( Args: cmd: Command to execute in the subprocess - workdir: Working directory of the subprocess env_vars: Dictionary of environment variables to set in the processes execution environment Returns: Union[None, str] - For 'covalent deploy status' - returns status of the deplyment + returns status of the deployment - Others return None """ - if git := shutil.which("git"): - proc = subprocess.Popen( - args=cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - cwd=workdir, - shell=True, - env=env_vars, - ) - TERRAFORM_STATE = "state list -state" - if TERRAFORM_STATE in cmd: - return self._get_resource_status(proc=proc, cmd=cmd) - retcode = self._print_stdout(proc, print_callback) - - if retcode != 0: - self._log_error_msg(cmd=cmd) - raise subprocess.CalledProcessError( - returncode=retcode, cmd=cmd, stderr=self._parse_terraform_error_log() - ) - else: + if not shutil.which("git"): self._log_error_msg(cmd=cmd) logger.error("Git not found on the system.") - sys.exit() + sys.exit(1) + + env_vars = env_vars or {} + env_vars.update({"PATH": os.environ["PATH"]}) + + proc = subprocess.Popen( + args=cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=self.executor_tf_path, + universal_newlines=True, + shell=True, + env=env_vars, + ) + + if "state list -state" in cmd: + return self._get_resource_status(proc=proc, cmd=cmd) + + returncode = self._poll_process(proc, print_callback) + + if returncode != 0: + self._log_error_msg(cmd=cmd) + + _, stderr = proc.communicate() + raise subprocess.CalledProcessError( + returncode=returncode, + cmd=cmd, + stderr=self._parse_terraform_error_log(), + output=stderr, + ) + + return None def _update_config(self, tf_executor_config_file: str) -> None: """ @@ -348,18 +360,22 @@ def _get_tf_path(self) -> str: """ if terraform := shutil.which("terraform"): result = subprocess.run( - ["terraform --version"], shell=True, capture_output=True, text=True + ["terraform --version"], + shell=True, + capture_output=True, + text=True, + check=True, ) version = result.stdout.split("v", 1)[1][:3] if float(version) < 1.4: logger.error( "Old version of terraform found. Please update it to version greater than 1.3" ) - sys.exit() + sys.exit(1) return terraform - else: - logger.error("Terraform not found on system") - exit() + + logger.error("Terraform not found on system") + sys.exit(1) def _get_tf_statefile_path(self) -> str: """ @@ -401,14 +417,16 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None: # Run `terraform init` self._run_in_subprocess( - cmd=tf_init, workdir=self.executor_tf_path, env_vars=self._terraform_log_env_vars + cmd=tf_init, + env_vars=self._terraform_log_env_vars, + print_callback=print_callback, ) # Setup terraform infra variables as passed by the user tf_vars_env_dict = os.environ.copy() if self.executor_options: - with open(tfvars_file, "w") as f: + with open(tfvars_file, "w", encoding="utf-8") as f: for key, value in self.executor_options.items(): tf_vars_env_dict[f"TF_VAR_{key}"] = value @@ -418,7 +436,6 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None: # Run `terraform plan` self._run_in_subprocess( cmd=tf_plan, - workdir=self.executor_tf_path, env_vars=self._terraform_log_env_vars, print_callback=print_callback, ) @@ -426,9 +443,8 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None: # Create infrastructure as per the plan # Run `terraform apply` if not dry_run: - cmd_output = self._run_in_subprocess( + self._run_in_subprocess( cmd=tf_apply, - workdir=self.executor_tf_path, env_vars=tf_vars_env_dict.update(self._terraform_log_env_vars), print_callback=print_callback, ) @@ -472,7 +488,6 @@ def down(self, print_callback: Callable) -> None: # Run `terraform destroy` self._run_in_subprocess( cmd=tf_destroy, - workdir=self.executor_tf_path, print_callback=print_callback, env_vars=self._terraform_log_env_vars, ) @@ -510,6 +525,4 @@ def status(self) -> None: tf_state = " ".join([terraform, "state", "list", f"-state={tf_state_file}"]) # Run `terraform state list` - return self._run_in_subprocess( - cmd=tf_state, workdir=self.executor_tf_path, env_vars=self._terraform_log_env_vars - ) + return self._run_in_subprocess(cmd=tf_state, env_vars=self._terraform_log_env_vars) diff --git a/covalent_dispatcher/_cli/groups/__init__.py b/covalent_dispatcher/_cli/groups/__init__.py index 3eee4cd19..3ae5d8d46 100644 --- a/covalent_dispatcher/_cli/groups/__init__.py +++ b/covalent_dispatcher/_cli/groups/__init__.py @@ -13,5 +13,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .db import db -from .deploy import deploy +from .db_group import db +from .deploy_group import deploy diff --git a/covalent_dispatcher/_cli/groups/db.py b/covalent_dispatcher/_cli/groups/db_group.py similarity index 100% rename from covalent_dispatcher/_cli/groups/db.py rename to covalent_dispatcher/_cli/groups/db_group.py diff --git a/covalent_dispatcher/_cli/groups/deploy.py b/covalent_dispatcher/_cli/groups/deploy_group.py similarity index 70% rename from covalent_dispatcher/_cli/groups/deploy.py rename to covalent_dispatcher/_cli/groups/deploy_group.py index d6fb85ff1..a9510c09f 100644 --- a/covalent_dispatcher/_cli/groups/deploy.py +++ b/covalent_dispatcher/_cli/groups/deploy_group.py @@ -17,10 +17,12 @@ """Covalent deploy CLI group.""" - import subprocess +import sys +from functools import partial +from importlib import import_module from pathlib import Path -from typing import Dict, Tuple +from typing import Callable, Dict, Tuple import boto3 import click @@ -30,9 +32,9 @@ from covalent.cloud_resource_manager.core import CloudResourceManager from covalent.executor import _executor_manager -RESOURCE_ALREADY_EXISTS = "Resources already deployed" -RESOURCE_ALREADY_DESTROYED = "Resources already destroyed" -COMPLETED = "Completed" +from .deploy_group_print_callbacks import ScrollBufferCallback + +_TEMPLATE = "[bold green]{message} [default]\n {text}" def get_crm_object(executor_name: str, options: Dict = None) -> CloudResourceManager: @@ -43,36 +45,11 @@ def get_crm_object(executor_name: str, options: Dict = None) -> CloudResourceMan CloudResourceManager object. """ - executor_module_path = Path( - __import__(_executor_manager.executor_plugins_map[executor_name].__module__).__path__[0] - ) + executor_plugin = _executor_manager.executor_plugins_map[executor_name] + executor_module_path = Path(import_module(executor_plugin.__module__).__file__).parent return CloudResourceManager(executor_name, executor_module_path, options) -def get_print_callback( - console: Console, console_status: Console.status, prepend_msg: str, verbose: bool -): - """Get print callback method. - - Args: - console: Rich console object. - console_status: Console status object. - prepend_msg: Message to prepend to the output. - verbose: Whether to print the output inline or not. - - Returns: - Callback method. - - """ - if verbose: - return console.print - - def inline_print_callback(msg): - console_status.update(f"{prepend_msg} {msg}") - - return inline_print_callback - - def get_settings_table(crm: CloudResourceManager) -> Table: """Get resource provisioning settings table. @@ -116,6 +93,43 @@ def get_up_help_table(crm: CloudResourceManager) -> Table: return table +def _run_command_and_show_output( + _command: Callable[[Callable], None], _status_message: str, *, verbose: bool +) -> None: + """Run the command and show the output in the console. + + This function handles execution and outputs from the `up` and `down` commands. + + Args: + _command: command to run, e.g. `partial(crm.up, dry_run=dry_run)` + _status_message: message to show in the console status bar, e.g. "Provisioning resources..." + verbose: whether to show the full Terraform output or not. + """ + console = Console(record=True) + msg_template = _TEMPLATE.format(message=_status_message, text="{text}") + + with console.status(msg_template.format(text="")) as console_status: + print_callback = ScrollBufferCallback( + console=console, + console_status=console_status, + msg_template=msg_template, + verbose=verbose, + ) + + try: + _command(print_callback=print_callback) + + except subprocess.CalledProcessError as e: + console_status.stop() + click.echo(e.stdout) # display error + sys.exit(1) + + if not verbose: + console_status.stop() + if (complete_msg := print_callback.complete_msg) is not None: + console.print("\n", complete_msg, style="bold green") + + @click.group(invoke_without_command=True) def deploy(): """ @@ -130,7 +144,6 @@ def deploy(): 4. Show status of all resources via `covalent deploy status`. """ - pass @deploy.command(context_settings={"ignore_unknown_options": True}) @@ -165,39 +178,16 @@ def up(executor_name: str, vars: Dict, help: bool, dry_run: bool, verbose: bool) """ cmd_options = {key[2:]: value for key, value in (var.split("=") for var in vars)} if msg := validate_args(cmd_options): + # Message is not None, so there was an error. click.echo(msg) - return + sys.exit(1) crm = get_crm_object(executor_name, cmd_options) if help: click.echo(Console().print(get_up_help_table(crm))) - return - - console = Console(record=True) - prepend_msg = "[bold green] Provisioning resources..." - - with console.status(prepend_msg) as status: - try: - crm.up( - dry_run=dry_run, - print_callback=get_print_callback( - console=console, - console_status=status, - prepend_msg=prepend_msg, - verbose=verbose, - ), - ) - except subprocess.CalledProcessError as e: - click.echo(f"Unable to provision resources due to the following error:\n\n{e}") - return + sys.exit(0) - click.echo(Console().print(get_settings_table(crm))) - exists_msg_with_verbose = "Apply complete! Resources: 0 added, 0 changed, 0 destroyed" - exists_msg_without_verbose = "found no differences, so no changes are needed" - export_data = console.export_text() - if exists_msg_with_verbose in export_data or exists_msg_without_verbose in export_data: - click.echo(RESOURCE_ALREADY_EXISTS) - else: - click.echo(COMPLETED) + _command = partial(crm.up, dry_run=dry_run) + _run_command_and_show_output(_command, "Provisioning resources...", verbose=verbose) @deploy.command() @@ -223,28 +213,8 @@ def down(executor_name: str, verbose: bool) -> None: """ crm = get_crm_object(executor_name) - - console = Console(record=True) - prepend_msg = "[bold green] Destroying resources..." - with console.status(prepend_msg) as status: - try: - crm.down( - print_callback=get_print_callback( - console=console, - console_status=status, - prepend_msg=prepend_msg, - verbose=verbose, - ) - ) - except subprocess.CalledProcessError as e: - click.echo(f"Unable to destroy resources due to the following error:\n\n{e}") - return - destroyed_msg = "Destroy complete! Resources: 0 destroyed." - export_data = console.export_text() - if destroyed_msg in export_data: - click.echo(RESOURCE_ALREADY_DESTROYED) - else: - click.echo(COMPLETED) + _command = partial(crm.down) + _run_command_and_show_output(_command, "Destroying resources...", verbose=verbose) # TODO - Color code status. @@ -271,11 +241,10 @@ def status(executor_names: Tuple[str]) -> None: "*up": "Warning: Provisioning error, retry 'up'.", "*down": "Warning: Teardown error, retry 'down'.", } - if not executor_names: executor_names = [ name - for name in _executor_manager.executor_plugins_map.keys() + for name in _executor_manager.executor_plugins_map if name not in ["dask", "local", "remote_executor"] ] click.echo(f"Executors: {', '.join(executor_names)}") @@ -289,8 +258,8 @@ def status(executor_names: Tuple[str]) -> None: for executor_name in executor_names: try: crm = get_crm_object(executor_name) - status = crm.status() - table.add_row(executor_name, status, description[status]) + crm_status = crm.status() + table.add_row(executor_name, crm_status, description[crm_status]) except KeyError: invalid_executor_names.append(executor_name) diff --git a/covalent_dispatcher/_cli/groups/deploy_group_print_callbacks.py b/covalent_dispatcher/_cli/groups/deploy_group_print_callbacks.py new file mode 100644 index 000000000..949e5ceb4 --- /dev/null +++ b/covalent_dispatcher/_cli/groups/deploy_group_print_callbacks.py @@ -0,0 +1,99 @@ +# Copyright 2023 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the Apache License 2.0 (the "License"). A copy of the +# License may be obtained with this software package or at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Use of this file is prohibited except in compliance with the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Print callbacks for deploy up and deploy down commands.""" + +import re +from typing import Union + +from rich.console import Console +from rich.status import Status + + +class ScrollBufferCallback: + """Callable print callback that refreshes a buffer of length `max_lines`""" + + _complete_msg_pattern = re.compile( + r"^(Apply complete\! Resources: \d+ added, \d+ changed, \d+ destroyed\." + "|" + r"Destroy complete\! Resources: \d+ destroyed\.)$" + ) + + def __init__( + self, + console: Console, + console_status: Status, + msg_template: str, + verbose: bool, + max_lines: int = 12, + ): + """Create a new scroll buffer callback. + + Args: + console: Rich console object. + console_status: Rich console status object. + msg_template: Message template pre-formatted with provision or destroy message. + verbose: Whether to print the output inline or not. + max_lines: Number of lines in the buffer. Defaults to 5. + """ + self.console = console + self.console_status = console_status + self.msg_template = msg_template + self.verbose = verbose + self.max_lines = max_lines + self.buffer = [] + + self._complete_msg = None + + @property + def complete_msg(self) -> Union[str, None]: + """Return a complete message matching: + + 'Apply complete! Resources: 19 added, 0 changed, 0 destroyed.' + or + 'Destroy complete! Resources: 19 destroyed.' + + Returns: + The complete message, if it exists, else None. + """ + return self._complete_msg + + def __call__(self, msg: str): + if self.verbose: + self._verbose_print(msg) + else: + self._inline_print(msg) + + def _inline_print(self, msg: str) -> None: + """Print into a scrolling buffer of size `self.max_lines`.""" + if len(self.buffer) > self.max_lines: + self.buffer.pop(0) + self.buffer.append(msg) + + if self._complete_msg_pattern.match(msg): + self._complete_msg = msg + + text = self.render_buffer() + self.console_status.update(self.msg_template.format(text=text)) + + def _verbose_print(self, msg: str) -> None: + """Print normally, line by line.""" + print(msg.rstrip() if msg != "\n" else msg) + + def render_buffer(self, sep: str = " ") -> str: + """Render the current buffer as a string.""" + return sep.join(self.buffer) diff --git a/tests/covalent_dispatcher_tests/_cli/cli_test.py b/tests/covalent_dispatcher_tests/_cli/cli_test.py index a8b546e90..46d056b1f 100644 --- a/tests/covalent_dispatcher_tests/_cli/cli_test.py +++ b/tests/covalent_dispatcher_tests/_cli/cli_test.py @@ -18,7 +18,10 @@ """Test for Covalent CLI Tool.""" +import subprocess + import click +import pytest from click.testing import CliRunner from covalent_dispatcher._cli.cli import cli @@ -65,3 +68,205 @@ def test_cli_commands(): "status", "stop", ] + + +@pytest.mark.parametrize( + ("error", "verbose"), + [ + (False, False), + (False, True), + (True, False), + (True, True), + ], +) +def test_run_command_and_show_output(mocker, error, verbose): + """ + Unit test for `_run_command_and_show_output` function. + + Test that errors are raised and messages printed when expected. + """ + from covalent_dispatcher._cli.groups.deploy_group import _run_command_and_show_output + + mock_console_print = mocker.patch("rich.console.Console.print") + mock_click_echo = mocker.patch("click.echo") + mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group_print_callbacks.ScrollBufferCallback.complete_msg", + return_value="Apply complete! Resources: 19 added, 0 changed, 0 destroyed.", + ) + + def _command(*args, **kwargs): + _command.calls += 1 + _output = "Testing Error..." + _error = subprocess.CalledProcessError(1, "terraform", _output) + if error: + raise _error + + _command.calls = 0 + + if error: + msg = "Testing Error..." + with pytest.raises(SystemExit): + _run_command_and_show_output(_command, msg, verbose=verbose) + else: + msg = "Testing Success..." + _run_command_and_show_output(_command, msg, verbose=verbose) + + if error: + mock_click_echo.assert_called_once_with("Testing Error...") + elif not verbose: + mock_console_print.assert_called_once() + else: + assert _command.calls == 1 + + +@pytest.mark.parametrize( + ("verbose", "mode"), + [ + (False, "provision"), + (False, "destroy"), + (True, "provision"), + (True, "destroy"), + ], +) +def test_scroll_buffer_print_callback(mocker, verbose, mode): + """ + Unit test for the custom buffered print callback. + """ + from rich.console import Console + from rich.status import Status + + from covalent_dispatcher._cli.groups.deploy_group import _TEMPLATE + from covalent_dispatcher._cli.groups.deploy_group_print_callbacks import ScrollBufferCallback + + console = Console(record=True) + console_status = Status("Testing...", console=console) + + mock_print = mocker.patch("covalent_dispatcher._cli.groups.deploy_group_print_callbacks.print") + mock_console_status_update = mocker.patch("rich.status.Status.update") + + print_callback = ScrollBufferCallback( + console=console, + console_status=console_status, + msg_template=_TEMPLATE.format(message="Testing...", text="{text}"), + verbose=verbose, + max_lines=3, # shorten to hit pop inside `._inline_print` + ) + + complete_msg = ( + "Apply complete! Resources: 19 added, 0 changed, 0 destroyed." + if mode == "provision" + else "Destroy complete! Resources: 19 destroyed." + ) + messages = ["fake", "fake", "fake", complete_msg, "fake", "fake", "fake"] + + for msg in messages: + print_callback(msg) + if verbose: + mock_print.assert_called_with(msg) + else: + mock_console_status_update.assert_called_with( + print_callback.msg_template.format(text=print_callback.render_buffer()) + ) + + if not verbose: + assert print_callback.complete_msg == complete_msg + + +def test_deploy_up(mocker): + """ + Unit test for `covalent deploy up [executor_name]` command. + """ + + from covalent_dispatcher._cli.groups.deploy_group import up + + # Patch function that executes commands. + mock_run_command_and_show_output = mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group._run_command_and_show_output", + ) + + # Fail with invalid command options. + mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group.validate_args", + return_value="Non-empty msg", + ) + with pytest.raises(SystemExit) as exc_info: + ctx = click.Context(up) + ctx.invoke(up) + + assert exc_info.value.code == 1 + + # Succeed but exit after help message. + mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group.validate_args", + return_value=None, + ) + mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group.get_crm_object", + ) + with pytest.raises(SystemExit) as exc_info: + ctx = click.Context(up) + ctx.invoke(up, help=True) + + assert exc_info.value.code == 0 + + # Succeed with valid command options. + ctx = click.Context(up) + ctx.invoke(up, verbose=True) + + mock_run_command_and_show_output.assert_called_once() + + +def test_deploy_down(mocker): + """ + Unit test for `covalent deploy down [executor_name]` command. + """ + + from covalent_dispatcher._cli.groups.deploy_group import down + + # Patch function that executes commands. + mock_run_command_and_show_output = mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group._run_command_and_show_output", + ) + mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group.get_crm_object", + ) + + ctx = click.Context(down) + ctx.invoke(down, verbose=True) + mock_run_command_and_show_output.assert_called_once() + + +def test_deploy_status(mocker): + """ + Unit test for `covalent deploy status [executor_name]` command. + """ + + from covalent_dispatcher._cli.groups.deploy_group import status + + # Succeed with empty `executor_names` argument. + ctx = click.Context(status) + ctx.invoke(status, executor_names=[]) + + # Succeed with invalid `executor_names` argument. + mock_click_style = mocker.patch("click.style") + + ctx = click.Context(status) + ctx.invoke(status, executor_names=["invalid"]) + + mock_click_style.assert_called_once() + + # Succeed with 'valid' `executor_names` argument. + mocker.patch( + "covalent_dispatcher._cli.groups.deploy_group.get_crm_object", + ) + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager.status", + return_value="down", + ) + + mock_console_print = mocker.patch("rich.console.Console.print") + + ctx = click.Context(status) + ctx.invoke(status, executor_names=["awsbatch"]) # OK if not installed + + mock_console_print.assert_called_once() diff --git a/tests/covalent_dispatcher_tests/_cli/groups/db_test.py b/tests/covalent_dispatcher_tests/_cli/groups/db_test.py index 80cd42eec..a5b17bdd4 100644 --- a/tests/covalent_dispatcher_tests/_cli/groups/db_test.py +++ b/tests/covalent_dispatcher_tests/_cli/groups/db_test.py @@ -18,7 +18,7 @@ from click.testing import CliRunner -from covalent_dispatcher._cli.groups.db import MIGRATION_WARNING_MSG, alembic, migrate +from covalent_dispatcher._cli.groups.db_group import MIGRATION_WARNING_MSG, alembic, migrate from covalent_dispatcher._db.datastore import DataStore @@ -46,7 +46,9 @@ def test_alembic_command_args(mocker): MOCK_ALEMBIC_ARGS_INVALID = "some alembic command --with-flags -m 'comment'" ALEMBIC_ERROR_STDERR = b"alembic: error: invalid..." ALEMBIC_ERROR_STDOUT = b"b60c5 (head)" - popen_mock = mocker.patch.object(sys.modules["covalent_dispatcher._cli.groups.db"], "Popen") + popen_mock = mocker.patch.object( + sys.modules["covalent_dispatcher._cli.groups.db_group"], "Popen" + ) # test valid alembic args popen_mock().communicate.return_value = (ALEMBIC_ERROR_STDOUT, b"") res = runner.invoke(alembic, MOCK_ALEMBIC_ARGS_VALID, catch_exceptions=False) @@ -61,7 +63,9 @@ def test_alembic_not_installed_error(mocker): runner = CliRunner() MOCK_ALEMBIC_ARGS = "current" EXCEPTION_MESSAGE = "[Errno 2] No such file or directory: 'alembic'" - popen_mock = mocker.patch.object(sys.modules["covalent_dispatcher._cli.groups.db"], "Popen") + popen_mock = mocker.patch.object( + sys.modules["covalent_dispatcher._cli.groups.db_group"], "Popen" + ) popen_mock.side_effect = FileNotFoundError(EXCEPTION_MESSAGE) res = runner.invoke(alembic, MOCK_ALEMBIC_ARGS, catch_exceptions=False) assert EXCEPTION_MESSAGE in res.output diff --git a/tests/covalent_tests/cloud_resource_manager/core_test.py b/tests/covalent_tests/cloud_resource_manager/core_test.py index ea2ef058a..7fdca350d 100644 --- a/tests/covalent_tests/cloud_resource_manager/core_test.py +++ b/tests/covalent_tests/cloud_resource_manager/core_test.py @@ -16,6 +16,7 @@ import os import subprocess +import tempfile from configparser import ConfigParser from functools import partial from pathlib import Path @@ -56,6 +57,35 @@ def crm(mocker, executor_name, executor_module_path): ) +class _FakeIO: + """Mocks process stdout and stderr.""" + + def __init__(self, message): + self.message = message + + def read(self): + return self.message + + def readline(self): + return self.read() + + +class _FakeProc: + """Mocks process""" + + def __init__(self, returncode, stdout="", stderr="", fake_stream=True): + self.returncode = returncode + self.args = () + self.stdout = _FakeIO(stdout) if fake_stream else stdout + self.stderr = _FakeIO(stderr) if fake_stream else stderr + + def poll(self): + return self.returncode + + def communicate(self): + return self.stdout.read(), self.stderr.read() + + def test_get_executor_module(mocker): """ Unit test for get_executor_module method @@ -163,9 +193,9 @@ def test_cloud_resource_manager_init(mocker, options, executor_name, executor_mo ) -def test_print_stdout(mocker, crm): +def test_poll_process(mocker, crm): """ - Unit test for CloudResourceManager._print_stdout() method + Unit test for CloudResourceManager._poll_process() method """ test_stdout = "test_stdout".encode("utf-8") @@ -177,7 +207,7 @@ def test_print_stdout(mocker, crm): mock_process.stdout.readline.side_effect = partial(next, iter([test_stdout, None])) mock_print = mocker.patch("covalent.cloud_resource_manager.core.print") - return_code = crm._print_stdout( + return_code = crm._poll_process( mock_process, print_callback=mock_print( test_stdout.decode("utf-8"), @@ -203,17 +233,15 @@ def test_run_in_subprocess(mocker, test_retcode, crm): """ test_cmd = "test_cmd" - test_workdir = "test_workdir" test_env_vars = {"test_env_key": "test_env_value"} - mock_process = mocker.MagicMock() mock_popen = mocker.patch( "covalent.cloud_resource_manager.core.subprocess.Popen", - return_value=mock_process, + return_value=_FakeProc(test_retcode), ) - mock_print_stdout = mocker.patch( - "covalent.cloud_resource_manager.core.CloudResourceManager._print_stdout", + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._poll_process", return_value=int(test_retcode), ) @@ -222,37 +250,33 @@ def test_run_in_subprocess(mocker, test_retcode, crm): ) mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._log_error_msg", - return_value=None, - side_effect=None, ) if test_retcode != 0: exception = subprocess.CalledProcessError(returncode=test_retcode, cmd=test_cmd) - print("sam exception ", exception) - with pytest.raises(Exception, match=str(exception)): + print("some exception ", exception) + with pytest.raises(subprocess.CalledProcessError) as excinfo: crm._run_in_subprocess( cmd=test_cmd, - workdir=test_workdir, env_vars=test_env_vars, ) + # Errors are contained in the output for printing. + assert excinfo.value.output == "some exception " else: crm._run_in_subprocess( cmd=test_cmd, - workdir=test_workdir, env_vars=test_env_vars, ) mock_popen.assert_called_once_with( args=test_cmd, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - cwd=test_workdir, + stderr=subprocess.PIPE, + cwd=crm.executor_tf_path, + universal_newlines=True, shell=True, env=test_env_vars, ) - # print("sam mocker process : ", mock_process) - # print("sam mocker print : ", mock_print_stdout) - # mock_print_stdout.assert_called_once_with(mock_process) def test_update_config(mocker, crm, executor_name): @@ -361,14 +385,14 @@ def test_up(mocker, dry_run, executor_options, executor_name, executor_module_pa test_tf_path = "test_tf_path" test_tf_state_file = "test_tf_state_file" - mock_get_tf_path = mocker.patch( + mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_path", return_value=test_tf_path, ) mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._validation_docker", ) - mock_get_tf_statefile_path = mocker.patch( + mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_statefile_path", return_value=test_tf_state_file, ) @@ -402,7 +426,7 @@ def test_up(mocker, dry_run, executor_options, executor_name, executor_module_pa if executor_options: with pytest.raises(SystemExit): - crm = CloudResourceManager( + CloudResourceManager( executor_name=executor_name, executor_module_path=executor_module_path, options=executor_options, @@ -420,42 +444,26 @@ def test_up(mocker, dry_run, executor_options, executor_name, executor_module_pa ) as mock_file: crm.up(dry_run=dry_run, print_callback=None) - env_vars = { - "PATH": "$PATH:/usr/bin", - "TF_LOG": "ERROR", - "TF_LOG_PATH": os.path.join(crm.executor_tf_path + "/terraform-error.log"), - } # mock_get_tf_path.assert_called_once() init_cmd = f"{test_tf_path} init" mock_run_in_subprocess.assert_any_call( cmd=init_cmd, - workdir=crm.executor_tf_path, - env_vars=env_vars, - # print_callback=None, + env_vars=crm._terraform_log_env_vars, + print_callback=None, ) mock_environ_copy.assert_called_once() - if executor_options: - mock_file.assert_called_once_with( - f"{crm.executor_tf_path}/terraform.tfvars", - "w", - ) - - key, value = list(executor_options.items())[0] - mock_file().write.assert_called_once_with(f'{key}="{value}"\n') mock_run_in_subprocess.assert_any_call( cmd=f"{test_tf_path} plan -out tf.plan", # -state={test_tf_state_file}", - workdir=crm.executor_tf_path, - env_vars=env_vars, + env_vars=crm._terraform_log_env_vars, print_callback=None, ) if not dry_run: mock_run_in_subprocess.assert_any_call( cmd=f"{test_tf_path} apply tf.plan -state={test_tf_state_file}", - workdir=crm.executor_tf_path, - env_vars=env_vars, + env_vars=crm._terraform_log_env_vars, print_callback=None, ) @@ -464,6 +472,68 @@ def test_up(mocker, dry_run, executor_options, executor_name, executor_module_pa ) +def test_up_executor_options(mocker, executor_name, executor_module_path): + """ + Unit test for CloudResourceManager.up() method with executor options. + + Test expected behavior with 'valid' options. Note that *actual* valid options + require executor plugins to be installed, so not suitable for CI tests. + """ + # Disable validation. + mocker.patch( + "covalent.cloud_resource_manager.core.validate_options", + return_value=None, + ) + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._validation_docker", + ) + + # Disable actually finding executor. + mocker.patch( + "covalent.cloud_resource_manager.core.get_executor_module", + ) + + # Disable plugin settings. + mocker.patch( + "covalent.cloud_resource_manager.core.get_plugin_settings", + return_value={}, + ) + + # Disable path checks so nothing deleted (as it would be, if exists). + mocker.patch("covalent.cloud_resource_manager.core.Path.exists", return_value=False) + + # Disable _run_in_subprocess to avoid side effects. + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._run_in_subprocess", + ) + + # Disable _update_config to avoid side effects. + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._update_config", + ) + + # For CI tests, pretend homebrew exists. + mocker.patch("shutil.which", return_value="/opt/homebrew/bin/terraform") + mocker.patch( + "covalent.cloud_resource_manager.core.subprocess.run", + return_value=_FakeProc(0, stdout="v99.99", fake_stream=False), + ) + + crm = CloudResourceManager( + executor_name=executor_name, + executor_module_path=executor_module_path, + options={"test_key": "test_value"}, + ) + + with tempfile.TemporaryDirectory() as d: + # Create fake vars file to avoid side effects. + fake_tfvars_file = Path(d) / "terraform.tfvars" + fake_tfvars_file.touch() + + crm.executor_tf_path = d + crm.up(dry_run=False, print_callback=None) + + def test_down(mocker, crm): """ Unit test for CloudResourceManager.down() method. @@ -471,7 +541,6 @@ def test_down(mocker, crm): test_tf_path = "test_tf_path" test_tf_state_file = "test_tf_state_file" - test_tf_log_file = "terraform-error.log" mock_get_tf_path = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_path", @@ -521,10 +590,8 @@ def test_down(mocker, crm): "-auto-approve", ] ) - env_vars = {"PATH": "$PATH:/usr/bin", "TF_LOG": "ERROR", "TF_LOG_PATH": log_file_path} - mock_run_in_subprocess.assert_called_once_with( - cmd=cmd, print_callback=None, workdir=crm.executor_tf_path, env_vars=env_vars - ) + env_vars = crm._terraform_log_env_vars + mock_run_in_subprocess.assert_called_once_with(cmd=cmd, print_callback=None, env_vars=env_vars) assert mock_path_exists.call_count == 5 assert mock_path_unlink.call_count == 4 @@ -537,7 +604,6 @@ def test_status(mocker, crm): test_tf_path = "test_tf_path" test_tf_state_file = "test_tf_state_file" - log_file_path = os.path.join(crm.executor_tf_path + "/terraform-error.log") mock_get_tf_path = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_path", return_value=test_tf_path, @@ -548,20 +614,76 @@ def test_status(mocker, crm): return_value=test_tf_state_file, ) + mock_terraform_error_validator = mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._terraform_error_validator", + ) + mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._validation_docker", ) - mock_run_in_subprocess = mocker.patch( - "covalent.cloud_resource_manager.core.CloudResourceManager._run_in_subprocess", + mocker.patch( + "covalent.cloud_resource_manager.core.subprocess.Popen", ) crm.status() mock_get_tf_path.assert_called_once() mock_get_tf_statefile_path.assert_called_once() - mock_run_in_subprocess.assert_called_once_with( - cmd=f"{test_tf_path} state list -state={test_tf_state_file}", - workdir=crm.executor_tf_path, - env_vars={"PATH": "$PATH:/usr/bin", "TF_LOG": "ERROR", "TF_LOG_PATH": log_file_path}, + mock_terraform_error_validator.assert_called_once_with(tfstate_path=test_tf_state_file) + + +def test_crm_get_resource_status(mocker, crm): + """ + Unit test for CloudResourceManager._get_resource_status() method. + + Test that errors while getting resource status don't exit, rather print and report status. + """ + + mock_terraform_error_validator = mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._terraform_error_validator", ) + mock_print = mocker.patch( + "covalent.cloud_resource_manager.core.print", + ) + + crm._get_resource_status(proc=_FakeProc(1), cmd="fake command") + mock_print.assert_called_once() + mock_terraform_error_validator.assert_called_once() + + +def test_no_git(crm, mocker): + """ + Test for exit with status 1 if `git` is not available. + """ + + mocker.patch("shutil.which", return_value=None) + mocker.patch("covalent.cloud_resource_manager.CloudResourceManager._log_error_msg") + + with pytest.raises(SystemExit): + crm._run_in_subprocess("fake command") + + +def test_tf_version_error(mocker, crm): + """ + Unit test for CloudResourceManager._get_tf_path() method. + """ + + # Fail. Terraform not found on system. + mocker.patch("shutil.which", return_value=None) + with pytest.raises(SystemExit): + crm._get_tf_path() + + fake_proc_1 = _FakeProc(0, stdout="v0.0", fake_stream=False) + fake_proc_2 = _FakeProc(0, stdout="v99.99", fake_stream=False) + + # Fail. Old version of terraform found. + mocker.patch("shutil.which", return_value="/opt/homebrew/bin/terraform") + mocker.patch("subprocess.run", return_value=fake_proc_1) + with pytest.raises(SystemExit): + crm._get_tf_path() + + # Succeed. + mocker.patch("subprocess.run", return_value=fake_proc_2) + mocker.patch("covalent.cloud_resource_manager.core.logger.error") + assert "terraform" in crm._get_tf_path()