Skip to content

Commit

Permalink
feat: conditionally run pipelines in debug mode (#862)
Browse files Browse the repository at this point in the history
* Add log level to model + wire it in env

* Remove todo

* Merge

* Fix

* Fix

* Wire backend

* Wire backend

* Wire backend

* Fix wiring

* Add tests

* Working test

* Simplify test

* Simplify test

* Add to view

* Add safe parsing and tests

* Cleaner tests
  • Loading branch information
YolanFery authored and nazarfil committed Dec 9, 2024
1 parent 281e908 commit 29e8e9a
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 4 deletions.
2 changes: 2 additions & 0 deletions hexa/pipelines/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type PipelineRun {
outputs: [PipelineRunOutput!]! # The outputs generated by the pipeline run.
code: String! # The code of the pipeline run.
sendMailNotifications: Boolean! # Indicates if email notifications should be sent for the pipeline run.
enableDebugLogs: Boolean! # Indicates if debug logs should be stored for the pipeline run.
timeout: Int # The timeout value for the pipeline run.
datasetVersions: [DatasetVersion!]! # The dataset versions associated with the pipeline run.
stoppedBy: User # The user who stopped the pipeline run.
Expand Down Expand Up @@ -377,6 +378,7 @@ input RunPipelineInput {
versionId: UUID # The ID of the pipeline version to use for the run.
config: JSON! # The configuration for the pipeline run.
sendMailNotifications: Boolean # Indicates if email notifications should be sent for the pipeline run.
enableDebugLogs: Boolean # Indicates if debug logs should be stored for the pipeline run.
}

"""
Expand Down
1 change: 1 addition & 0 deletions hexa/pipelines/management/commands/pipelines_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def run_pipeline(run: PipelineRun):
"HEXA_RUN_ID": str(run.id),
"HEXA_PIPELINE_NAME": run.pipeline.name,
"HEXA_PIPELINE_TYPE": run.pipeline.type,
"HEXA_LOG_LEVEL": run.log_level,
}
if run.pipeline.type == PipelineType.NOTEBOOK:
env_vars.update({"HEXA_NOTEBOOK_PATH": run.pipeline.notebook_path})
Expand Down
26 changes: 26 additions & 0 deletions hexa/pipelines/migrations/0053_pipelinerun_log_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 4.2.16 on 2024-12-02 13:12

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("pipelines", "0052_pipelineversion_version_number_revert_logic"),
]

operations = [
migrations.AddField(
model_name="pipelinerun",
name="log_level",
field=models.IntegerField(
choices=[
(0, "Debug"),
(1, "Info"),
(2, "Warning"),
(3, "Error"),
(4, "Critical"),
],
default=1,
),
),
]
26 changes: 25 additions & 1 deletion hexa/pipelines/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,26 @@ class PipelineType(models.TextChoices):
ZIPFILE = "zipFile", _("ZipFile")


class PipelineRunLogLevel(models.IntegerChoices):
DEBUG = 0
INFO = 1
WARNING = 2
ERROR = 3
CRITICAL = 4

@classmethod
def parse_log_level(cls, value):
if isinstance(value, int) and 0 <= value <= 4:
return value
if isinstance(value, str):
if value.isdigit():
return cls.parse_log_level(int(value))
value = value.upper()
if hasattr(cls, value):
return getattr(cls, value)
return cls.INFO


class Pipeline(SoftDeletedModel):
class Meta:
verbose_name = "Pipeline"
Expand Down Expand Up @@ -283,6 +303,7 @@ def run(
trigger_mode: PipelineRunTrigger,
config: typing.Mapping[typing.Dict, typing.Any] | None = None,
send_mail_notifications: bool = True,
log_level: PipelineRunLogLevel = PipelineRunLogLevel.INFO,
):
timeout = settings.PIPELINE_RUN_DEFAULT_TIMEOUT
if pipeline_version and pipeline_version.timeout:
Expand All @@ -303,6 +324,7 @@ def run(
access_token=str(uuid.uuid4()),
send_mail_notifications=send_mail_notifications,
timeout=timeout,
log_level=log_level,
)

return run
Expand Down Expand Up @@ -557,7 +579,9 @@ class Meta:
on_delete=models.SET_NULL,
related_name="+",
)

log_level = models.IntegerField(
choices=PipelineRunLogLevel.choices, default=PipelineRunLogLevel.INFO
)
objects = PipelineRunQuerySet.as_manager()

@property
Expand Down
4 changes: 4 additions & 0 deletions hexa/pipelines/schema/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PipelineDoesNotSupportParametersError,
PipelineRecipient,
PipelineRun,
PipelineRunLogLevel,
PipelineRunState,
PipelineRunTrigger,
PipelineType,
Expand Down Expand Up @@ -215,6 +216,9 @@ def resolve_run_pipeline(_, info, **kwargs):
trigger_mode=PipelineRunTrigger.MANUAL,
config=input.get("config", {}),
send_mail_notifications=input.get("sendMailNotifications", False),
log_level=PipelineRunLogLevel.DEBUG
if input.get("enableDebugLogs", False)
else PipelineRunLogLevel.INFO,
)
track(
request,
Expand Down
28 changes: 28 additions & 0 deletions hexa/pipelines/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Pipeline,
PipelineNotificationLevel,
PipelineRecipient,
PipelineRunLogLevel,
PipelineRunState,
PipelineRunTrigger,
)
Expand All @@ -15,6 +16,33 @@
)


class TestPipelineRunLogLevel(TestCase):
def test_parse_log_level(self):
test_cases = [
(0, PipelineRunLogLevel.DEBUG),
(1, PipelineRunLogLevel.INFO),
(2, PipelineRunLogLevel.WARNING),
(3, PipelineRunLogLevel.ERROR),
(4, PipelineRunLogLevel.CRITICAL),
("0", PipelineRunLogLevel.DEBUG),
("1", PipelineRunLogLevel.INFO),
("2", PipelineRunLogLevel.WARNING),
("3", PipelineRunLogLevel.ERROR),
("4", PipelineRunLogLevel.CRITICAL),
("DEBUG", PipelineRunLogLevel.DEBUG),
("INFO", PipelineRunLogLevel.INFO),
("WARNING", PipelineRunLogLevel.WARNING),
("ERROR", PipelineRunLogLevel.ERROR),
("CRITICAL", PipelineRunLogLevel.CRITICAL),
("invalid", PipelineRunLogLevel.INFO),
(5, PipelineRunLogLevel.INFO),
(-1, PipelineRunLogLevel.INFO),
]
for value, expected in test_cases:
with self.subTest(value=value):
self.assertEqual(PipelineRunLogLevel.parse_log_level(value), expected)


class PipelineTest(TestCase):
def create_recipient(
self,
Expand Down
47 changes: 47 additions & 0 deletions hexa/pipelines/tests/test_pipelines_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from unittest.mock import MagicMock, patch

from django.test import TestCase, override_settings

from hexa.pipelines.management.commands.pipelines_runner import run_pipeline
from hexa.pipelines.models import PipelineRun, PipelineRunLogLevel, PipelineType


class TestRunPipeline(TestCase):
@override_settings(
INTERNAL_BASE_URL="http://testserver",
DEFAULT_WORKSPACE_IMAGE="default_workspace_image",
PIPELINE_SCHEDULER_SPAWNER="docker",
)
@patch("hexa.pipelines.management.commands.pipelines_runner.run_pipeline_docker")
@patch("os.fork", return_value=0)
def test_env_vars(self, _, mock_run_pipeline_docker):
mock_run = MagicMock(spec=PipelineRun)
mock_run.id = 123
mock_run.access_token = "someAccessToken"
mock_run.log_level = PipelineRunLogLevel.DEBUG
mock_run.pipeline.workspace.slug = "test_workspace"
mock_run.pipeline.workspace.docker_image = "docker_image"
mock_run.pipeline.name = "test_pipeline"
mock_run.pipeline.type = PipelineType.NOTEBOOK
mock_run.pipeline.notebook_path = "/path/to/notebook"
mock_run.pipeline.code = "pipeline_code"
mock_run.send_mail_notifications = False

mock_run_pipeline_docker.return_value = (True, "SomeLogs")

with self.assertRaises(SystemExit):
run_pipeline(mock_run)

expected_env_vars = {
"HEXA_SERVER_URL": "http://testserver",
"HEXA_TOKEN": "InNvbWVBY2Nlc3NUb2tlbiI:6jqo7CX79O6IOh7lgqpwOXBBYSxzNOBtXeSNb4ry9EM",
"HEXA_WORKSPACE": "test_workspace",
"HEXA_RUN_ID": "123",
"HEXA_PIPELINE_NAME": "test_pipeline",
"HEXA_PIPELINE_TYPE": PipelineType.NOTEBOOK,
"HEXA_LOG_LEVEL": PipelineRunLogLevel.DEBUG,
"HEXA_NOTEBOOK_PATH": "/path/to/notebook",
}
mock_run_pipeline_docker.assert_called_once_with(
mock_run, "docker_image", expected_env_vars
)
17 changes: 16 additions & 1 deletion hexa/pipelines/tests/test_schema/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
PipelineNotificationLevel,
PipelineRecipient,
PipelineRun,
PipelineRunLogLevel,
PipelineRunState,
PipelineRunTrigger,
PipelineType,
Expand Down Expand Up @@ -1880,6 +1881,10 @@ def test_pipeline_new_run_with_timeout(self):
},
r["data"]["runPipeline"],
)
self.assertEqual(1, len(PipelineRun.objects.all()))
pipeline_run = PipelineRun.objects.first()
self.assertEqual(pipeline_run.log_level, PipelineRunLogLevel.INFO)
self.assertFalse(pipeline_run.send_mail_notifications)

def test_pipeline_new_run_default_timeout(self):
self.assertEqual(0, len(PipelineRun.objects.all()))
Expand All @@ -1902,7 +1907,14 @@ def test_pipeline_new_run_default_timeout(self):
}
}
""",
{"input": {"id": str(id1), "config": {}}},
{
"input": {
"id": str(id1),
"config": {},
"sendMailNotifications": True,
"enableDebugLogs": True,
}
},
)
self.assertEqual(
{
Expand All @@ -1913,6 +1925,9 @@ def test_pipeline_new_run_default_timeout(self):
r["data"]["runPipeline"],
)
self.assertEqual(1, len(PipelineRun.objects.all()))
pipeline_run = PipelineRun.objects.first()
self.assertEqual(pipeline_run.log_level, PipelineRunLogLevel.DEBUG)
self.assertTrue(pipeline_run.send_mail_notifications)

def test_stop_running_pipeline(self):
self.test_create_pipeline_version()
Expand Down
2 changes: 1 addition & 1 deletion hexa/pipelines/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def test_urlencoded_int_parameter(self):
"required": False,
}
],
{"send_mail_notifications": True},
{"send_mail_notifications": True, "log_level": "INFO"},
{},
content_type="application/x-www-form-urlencoded",
)
Expand Down
5 changes: 4 additions & 1 deletion hexa/pipelines/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from hexa.analytics.api import track
from hexa.app import get_hexa_app_configs
from hexa.core.views_utils import disable_cors
from hexa.pipelines.models import Environment
from hexa.pipelines.models import Environment, PipelineRunLogLevel

from .credentials import PipelinesCredentials
from .models import Pipeline, PipelineRunTrigger, PipelineType, PipelineVersion
Expand Down Expand Up @@ -148,6 +148,7 @@ def run_pipeline(
config = {}
if content_type == "application/x-www-form-urlencoded":
send_mail_notifications = request.POST.get("send_mail_notifications", False)
log_level = PipelineRunLogLevel.parse_log_level(request.POST.get("log_level"))
for parameter in pipeline_version.parameters:
if parameter["code"] not in request.POST:
continue
Expand All @@ -173,6 +174,7 @@ def run_pipeline(

elif content_type == "application/json":
send_mail_notifications = request.GET.get("send_mail_notifications", False)
log_level = PipelineRunLogLevel.parse_log_level(request.GET.get("log_level"))
try:
config = json.loads(request.body)
except json.JSONDecodeError:
Expand All @@ -189,6 +191,7 @@ def run_pipeline(
trigger_mode=PipelineRunTrigger.WEBHOOK,
config=config,
send_mail_notifications=send_mail_notifications,
log_level=log_level,
)
track(
request,
Expand Down

0 comments on commit 29e8e9a

Please sign in to comment.