Skip to content

Commit

Permalink
feat: conditionally run pipelines in debug mode (#223)
Browse files Browse the repository at this point in the history
* Consider the current log_level

* Consider the current log_level

* Unit test

* Unit test simplify

* Dockerfile and small bug fix

* fix env conversion

* Simplify Dockerfile

* Instructions

* Renames

* Renames
  • Loading branch information
YolanFery authored Dec 4, 2024
1 parent 488b290 commit 6f495f7
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 18 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ python -m venv venv # Create a virtual environment for this project
source venv/bin/activate # Activate the venv
pip install -e ".[dev]" # Necessary to be able to run the openhexa CLI
```

### Using a local installation of OpenHEXA to run pipelines

While it is possible to run pipelines locally using only the SDK, if you want to run OpenHEXA in a more realistic
Expand All @@ -127,6 +126,20 @@ openhexa config set_url http://localhost:8000

Notes: you can monitor the status of your pipelines using http://localhost:8000/pipelines/status

### Using a local version of the SDK to run pipelines

If you want to use a local version of the SDK to run pipelines, you can build a docker image with the local version of the SDK installed in it :

```shell
docker build --platform linux/amd64 -t local_image:v1 -f images/Dockerfile .
```

Then reference the image name and tag in the `.env` file of your OpenHexa app :

```
DEFAULT_WORKSPACE_IMAGE=local_image:v1
```

### Running the tests

You can run the tests using pytest:
Expand Down
11 changes: 11 additions & 0 deletions images/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM blsq/openhexa-base-environment:latest

USER root

WORKDIR /app
COPY . /app

RUN pip install build
RUN python -m build .

RUN pip install --no-cache-dir /app/dist/*.tar.gz && rm -rf /app/dist/*.tar.gz
7 changes: 7 additions & 0 deletions openhexa/cli/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import click

from openhexa.sdk.pipelines.log_level import LogLevel

CONFIGFILE_PATH = os.path.expanduser("~") + "/.openhexa.ini"


Expand Down Expand Up @@ -76,6 +78,11 @@ def workspaces(self):
"""Return the workspaces from the settings file."""
return self._file_config["workspaces"]

@property
def log_level(self) -> LogLevel:
"""Return the log level from the environment variables."""
return LogLevel.parse_log_level(os.getenv("HEXA_LOG_LEVEL"))

def activate(self, workspace: str):
"""Set the current workspace in the settings file."""
if workspace not in self.workspaces:
Expand Down
35 changes: 35 additions & 0 deletions openhexa/sdk/pipelines/log_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Log levels for the pipeline runs."""
from enum import IntEnum


class LogLevel(IntEnum):
"""
Enum representing different log levels.
- Attributes:
DEBUG (int): Debug level, value 0.
INFO (int): Info level, value 1.
WARNING (int): Warning level, value 2.
ERROR (int): Error level, value 3.
CRITICAL (int): Critical level, value 4.
"""

DEBUG = 0
INFO = 1
WARNING = 2
ERROR = 3
CRITICAL = 4

@classmethod
def parse_log_level(cls, value) -> "LogLevel":
"""Parse a log level from a string or integer."""
if isinstance(value, int) and 0 <= value <= 4:
return LogLevel(value)
if isinstance(value, str):
if value.isdigit():
return cls.parse_log_level(int(value))
value = value.upper()
if hasattr(cls, value):
return getattr(cls, value)
return cls.INFO
34 changes: 17 additions & 17 deletions openhexa/sdk/pipelines/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import datetime
import errno
import os
import typing

from openhexa.sdk.pipelines.log_level import LogLevel
from openhexa.sdk.utils import Environment, get_environment, graphql
from openhexa.sdk.workspaces import workspace

Expand Down Expand Up @@ -75,45 +75,45 @@ def add_database_output(self, table_name: str):
print(f"Sending output with table_name {table_name}")

def log_debug(self, message: str):
"""Log a message with the DEBUG priority."""
self._log_message("DEBUG", message)
"""Log a message with the DEBUG level."""
self._log_message(LogLevel.DEBUG, message)

def log_info(self, message: str):
"""Log a message with the INFO priority."""
self._log_message("INFO", message)
"""Log a message with the INFO level."""
self._log_message(LogLevel.INFO, message)

def log_warning(self, message: str):
"""Log a message with the WARNING priority."""
self._log_message("WARNING", message)
"""Log a message with the WARNING level."""
self._log_message(LogLevel.WARNING, message)

def log_error(self, message: str):
"""Log a message with the ERROR priority."""
self._log_message("ERROR", message)
"""Log a message with the ERROR level."""
self._log_message(LogLevel.ERROR, message)

def log_critical(self, message: str):
"""Log a message with the CRITICAL priority."""
self._log_message("CRITICAL", message)
"""Log a message with the CRITICAL level."""
self._log_message(LogLevel.CRITICAL, message)

def _log_message(
self,
priority: typing.Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
log_level: LogLevel,
message: str,
):
valid_priorities = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if priority not in valid_priorities:
raise ValueError(f"priority must be one of {', '.join(valid_priorities)}")
from openhexa.cli.settings import settings

if log_level < settings.log_level: # Ignore messages with lower log level than the settings
return
if self._connected:
graphql(
"""
mutation logPipelineMessage ($input: LogPipelineMessageInput!) {
logPipelineMessage(input: $input) { success errors }
}""",
{"input": {"priority": priority, "message": str(message)}},
{"input": {"priority": log_level.name, "message": str(message)}},
)
else:
now = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0).isoformat()
print(now, priority, message)
print(now, log_level.name, message)


if get_environment() == Environment.CLOUD_JUPYTER:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ include-package-data = true
[tool.ruff]
line-length = 120
ignore = ["E501"]
per-file-ignores = { "tests/**/test_*.py" = ["D100","D101","D102", "D103"] } # Ignore missing docstrings in tests

[tool.ruff.lint]
extend-select = [
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import openhexa.cli
import openhexa.sdk
from openhexa.sdk.pipelines.log_level import LogLevel


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -40,4 +41,5 @@ def settings(monkeypatch):
settings_mock.workspaces = {"workspace-slug": "token", "another-workspace-slug": "token"}
settings_mock.debug = False
settings_mock.access_token = "token"
settings_mock.log_level = LogLevel.INFO
return settings_mock
30 changes: 30 additions & 0 deletions tests/test_current_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from unittest.mock import ANY, patch

from openhexa.sdk.pipelines.run import CurrentRun, LogLevel


@patch.object(CurrentRun, "_connected", True)
@patch("openhexa.sdk.pipelines.run.graphql")
def test_default_log_level(mock_graphql):
current_run = CurrentRun()

current_run.log_debug("This is a debug message")
current_run.log_info("This is an info message")

assert mock_graphql.call_count == 1
mock_graphql.assert_any_call(ANY, {"input": {"priority": "INFO", "message": "This is an info message"}})


@patch.object(CurrentRun, "_connected", True)
@patch("openhexa.sdk.pipelines.run.graphql")
def test_filtering_log_messages_based_on_settings(mock_graphql, settings):
settings.log_level = LogLevel.ERROR
current_run = CurrentRun()

current_run.log_warning("This is a warning message")
current_run.log_error("This is an error message")
current_run.log_critical("This is a critical message")

assert mock_graphql.call_count == 2
mock_graphql.assert_any_call(ANY, {"input": {"priority": "ERROR", "message": "This is an error message"}})
mock_graphql.assert_any_call(ANY, {"input": {"priority": "CRITICAL", "message": "This is a critical message"}})
29 changes: 29 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Pipeline test module."""

import os
from unittest import TestCase
from unittest.mock import Mock, patch

import pytest
Expand All @@ -12,6 +13,7 @@
PostgreSQLConnection,
S3Connection,
)
from openhexa.sdk.pipelines.log_level import LogLevel
from openhexa.sdk.pipelines.parameter import Parameter, ParameterValueError
from openhexa.sdk.pipelines.pipeline import Pipeline

Expand Down Expand Up @@ -218,3 +220,30 @@ def test_pipeline_parameters_spec():
"default": None,
},
]


class TestLogLevel(TestCase):
def test_parse_log_level(self):
test_cases = [
(0, LogLevel.DEBUG),
(1, LogLevel.INFO),
(2, LogLevel.WARNING),
(3, LogLevel.ERROR),
(4, LogLevel.CRITICAL),
("0", LogLevel.DEBUG),
("1", LogLevel.INFO),
("2", LogLevel.WARNING),
("3", LogLevel.ERROR),
("4", LogLevel.CRITICAL),
("DEBUG", LogLevel.DEBUG),
("INFO", LogLevel.INFO),
("WARNING", LogLevel.WARNING),
("ERROR", LogLevel.ERROR),
("CRITICAL", LogLevel.CRITICAL),
("invalid", LogLevel.INFO),
(6, LogLevel.INFO),
(-1, LogLevel.INFO),
]
for input_value, expected in test_cases:
with self.subTest(input_value=input_value, expected=expected):
self.assertEqual(LogLevel.parse_log_level(input_value), expected)

0 comments on commit 6f495f7

Please sign in to comment.