Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rosswhitfield committed Sep 16, 2024
1 parent a0f584c commit caae959
Showing 1 changed file with 46 additions and 2 deletions.
48 changes: 46 additions & 2 deletions src/webmon_app/reporting/tests/test_dasmon/test_view_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.contrib.auth.models import Group
from django.utils import timezone

from reporting.report.models import Instrument
from reporting.report.models import Instrument, Information, RunStatus, StatusQueue
from reporting.dasmon.models import ActiveInstrument, Parameter, StatusCache, StatusVariable, Signal
from workflow.database.report.models import DataRun
from workflow.database.report.models import IPTS
Expand Down Expand Up @@ -446,14 +446,58 @@ def test_workflow_diagnostics(self):
def test_postprocessing_diagnostics(self):
from reporting.dasmon.view_util import postprocessing_diagnostics

# add postprocessing services
common = Instrument.objects.get(name="common")
name_postprocessor = settings.SYSTEM_STATUS_PREFIX + "autoreducer4.com"
para_postprocessor = Parameter.objects.create(name=name_postprocessor)
para_postprocessor.save()
StatusCache.objects.create(
instrument_id=common,
key_id=para_postprocessor,
value=0,
timestamp=timezone.now(),
)
para_postprocessor_pid = Parameter.objects.create(name=name_postprocessor + "_pid")
para_postprocessor_pid.save()
StatusCache.objects.create(
instrument_id=common,
key_id=para_postprocessor_pid,
value=7,
timestamp=timezone.now(),
)

# create StatusQueue, DataRun, RunStatus and Information needed for test
inst = Instrument.objects.get(name="testinst")
queue = StatusQueue(name="REDUCTION.COMPLETE")
queue.save()
ipts = IPTS(expt_name="IPTS-42")
ipts.save()
dataRun = DataRun(run_number=42, ipts_id=ipts, instrument_id=inst, file="/filename")
dataRun.save()
runStatus = RunStatus(run_id=dataRun, queue_id=queue)
runStatus.save()
info = Information(run_status_id=runStatus, description="autoreducer4.com")
info.save()

red_diag = postprocessing_diagnostics()
# NOTE: we don't have any postprocessing data during testing, so only
# test the entry that does exist
assert red_diag["catalog_status"] == 0
assert red_diag["reduction_status"] == 0
assert len(red_diag["ar_nodes"]) == 0
assert len(red_diag["conditions"]) == 0

# for nodes we have data to check
assert len(red_diag["ar_nodes"]) == 3
for i in range(3):
assert "time" in red_diag["ar_nodes"][i]
assert red_diag["ar_nodes"][i]["node"] == "autoreducer4.com"

msgs = [node["msg"] for node in red_diag["ar_nodes"] if "msg" in node]
print(msgs)
assert len(msgs) == 2
assert "PID: 7" in msgs
assert "Last msg: testinst_42: REDUCTION.COMPLETE" in msgs

def test_pvstreamer_diagnostics(self):
from reporting.dasmon.view_util import pvstreamer_diagnostics

Expand Down

0 comments on commit caae959

Please sign in to comment.