diff --git a/src/webmon_app/reporting/dasmon/view_util.py b/src/webmon_app/reporting/dasmon/view_util.py index 0cf5d217..647c721d 100644 --- a/src/webmon_app/reporting/dasmon/view_util.py +++ b/src/webmon_app/reporting/dasmon/view_util.py @@ -121,13 +121,13 @@ def get_latest(instrument_id, key_id): # First get it from the cache try: last_value = StatusCache.objects.filter(instrument_id=instrument_id, key_id=key_id).latest("timestamp") - except: # noqa: E722 + except StatusCache.DoesNotExist: # If that didn't work, get it from the table of values - values = StatusVariable.objects.filter(instrument_id=instrument_id, key_id=key_id) - # If we don't have any entry yet, just return Non - if len(values) == 0: + try: + last_value = StatusVariable.objects.filter(instrument_id=instrument_id, key_id=key_id).latest("timestamp") + except StatusVariable.DoesNotExist: + # If we don't have any entry yet, just return None return None - last_value = values.latest("timestamp") # Put the latest value in the cache so we don't have to go through this again cached = StatusCache( diff --git a/src/webmon_app/reporting/metrics/urls.py b/src/webmon_app/reporting/metrics/urls.py new file mode 100644 index 00000000..f6eb9755 --- /dev/null +++ b/src/webmon_app/reporting/metrics/urls.py @@ -0,0 +1,13 @@ +from django.urls import path +from . import views + +app_name = "metrics" + +urlpatterns = [ + path("/", views.metrics, name="metrics"), + path("/workflow_diagnostics/", views.workflow_diagnostics, name="workflow_diagnostics"), + path("/postprocessing_diagnostics/", views.postprocessing_diagnostics, name="postprocessing_diagnostics"), + path("/instrument_status/", views.instrument_status, name="instrument_status"), + path("/run_statuses/", views.run_statuses, name="run_statuses"), + path("/run_statuses//", views.run_statuses, name="run_statuses"), +] diff --git a/src/webmon_app/reporting/metrics/view_util.py b/src/webmon_app/reporting/metrics/view_util.py new file mode 100644 index 00000000..b23c84fc --- /dev/null +++ b/src/webmon_app/reporting/metrics/view_util.py @@ -0,0 +1,83 @@ +from reporting.report.models import Instrument, DataRun, WorkflowSummary, Information +from reporting.dasmon.models import Parameter, StatusCache, ActiveInstrument +from reporting.report.view_util import is_acquisition_complete +from reporting.dasmon.view_util import is_running +from django.conf import settings +from django.utils import timezone +from django.db.models import Q + + +def postprocessing_diagnostics(): + common_services = Instrument.objects.get(name="common") + agents = [] + + for node_prefix in settings.POSTPROCESS_NODE_PREFIX: + params = Parameter.objects.filter( + ~Q(name__endswith="_pid"), name__startswith=settings.SYSTEM_STATUS_PREFIX + node_prefix + ) + for param in params: + node = param.name.removeprefix(settings.SYSTEM_STATUS_PREFIX) + info = {"name": node} + value = StatusCache.objects.filter(instrument_id=common_services, key_id=param).latest("timestamp") + info["timestamp"] = value.timestamp + + try: + pid = Parameter.objects.get(name=param.name + "_pid") + info["PID"] = ( + StatusCache.objects.filter(instrument_id=common_services, key_id=pid).latest("timestamp").value + ) + + except (Parameter.DoesNotExist, StatusCache.DoesNotExist): + pass + + try: + last_status = Information.objects.filter(description=node).latest("id") + info["last_message"] = str(last_status.run_status_id) + info["last_message_timestamp"] = last_status.run_status_id.created_on + except Information.DoesNotExist: + pass + agents.append(info) + + return agents + + +def instrument_status(): + # return map of instrument name to run status + + instruments = Instrument.objects.all().order_by("name") + status = {} + + for instrument_id in instruments: + if ActiveInstrument.objects.is_alive(instrument_id): + status[str(instrument_id)] = is_running(instrument_id) + + return status + + +def run_statuses(minutes=60): + """Of all the runs created in the last n minutes, + return the number that are acquiring, complete, incomplete, + error along with the total number""" + + runs = DataRun.objects.filter(created_on__gte=timezone.now() - timezone.timedelta(minutes=minutes)).order_by( + "created_on" + ) + + statuses = {"count": len(runs), "acquiring": 0, "incomplete": 0, "complete": 0, "error": 0} + + for run_id in runs: + try: + s = WorkflowSummary.objects.get(run_id=run_id) + except WorkflowSummary.DoesNotExist: + continue + + if not is_acquisition_complete(run_id): + statuses["acquiring"] += 1 + elif s.complete: + statuses["complete"] += 1 + elif run_id.last_error() is None: + statuses["incomplete"] += 1 + else: + statuses["error"] += 1 + + return statuses diff --git a/src/webmon_app/reporting/metrics/views.py b/src/webmon_app/reporting/metrics/views.py new file mode 100644 index 00000000..76142751 --- /dev/null +++ b/src/webmon_app/reporting/metrics/views.py @@ -0,0 +1,41 @@ +from django.http import JsonResponse +from django.conf import settings +from django.views.decorators.cache import cache_page +import reporting.users.view_util as users_view_util +import reporting.dasmon.view_util as dasmon_view_util +from . import view_util + + +@users_view_util.login_or_local_required_401 +@cache_page(settings.FAST_PAGE_CACHE_TIMEOUT) +def metrics(request): + data = {} + data["workflow_diagnostics"] = dasmon_view_util.workflow_diagnostics() + data["postprocessing_diagnostics"] = view_util.postprocessing_diagnostics() + data["instrument_status"] = view_util.instrument_status() + data["run_statuses"] = view_util.run_statuses() + return JsonResponse(data) + + +@users_view_util.login_or_local_required_401 +@cache_page(settings.FAST_PAGE_CACHE_TIMEOUT) +def workflow_diagnostics(request): + return JsonResponse(dasmon_view_util.workflow_diagnostics()) + + +@users_view_util.login_or_local_required_401 +@cache_page(settings.FAST_PAGE_CACHE_TIMEOUT) +def postprocessing_diagnostics(request): + return JsonResponse(view_util.postprocessing_diagnostics(), safe=False) + + +@users_view_util.login_or_local_required_401 +@cache_page(settings.FAST_PAGE_CACHE_TIMEOUT) +def instrument_status(request): + return JsonResponse(view_util.instrument_status()) + + +@users_view_util.login_or_local_required_401 +@cache_page(settings.FAST_PAGE_CACHE_TIMEOUT) +def run_statuses(request, minutes=60): + return JsonResponse(view_util.run_statuses(minutes)) diff --git a/src/webmon_app/reporting/report/views.py b/src/webmon_app/reporting/report/views.py index 52762132..31d1b896 100644 --- a/src/webmon_app/reporting/report/views.py +++ b/src/webmon_app/reporting/report/views.py @@ -114,16 +114,16 @@ def summary(request): adara_start = datetime.datetime(2012, 10, 1).replace(tzinfo=timezone.get_current_timezone()) today = datetime.datetime.today().replace(tzinfo=timezone.get_current_timezone()) # Fill in the partial data for the current month - runs = DataRun.objects.filter(created_on__gte=max_date) + number_of_runs = DataRun.objects.filter(created_on__gte=max_date).count() run_rate = [] run_summary = [ { "min_date": max_date, "max_date": datetime.datetime.today(), - "number_of_runs": len(runs), + "number_of_runs": number_of_runs, } ] - run_rate.append([1000 * int((today - epoch).total_seconds()), len(runs)]) + run_rate.append([1000 * int((today - epoch).total_seconds()), number_of_runs]) while True: # Make sure we don't display zeros for the period before # the system was installed diff --git a/src/webmon_app/reporting/reporting_app/settings/base.py b/src/webmon_app/reporting/reporting_app/settings/base.py index c175566b..0bad2296 100644 --- a/src/webmon_app/reporting/reporting_app/settings/base.py +++ b/src/webmon_app/reporting/reporting_app/settings/base.py @@ -206,6 +206,7 @@ def validate_ldap_settings(server_uri, user_dn_template): "reporting.dasmon", "reporting.pvmon", "reporting.reduction", + "reporting.metrics", "health_check", "health_check.db", "health_check.cache", diff --git a/src/webmon_app/reporting/reporting_app/urls.py b/src/webmon_app/reporting/reporting_app/urls.py index e4b524ec..44f890c1 100644 --- a/src/webmon_app/reporting/reporting_app/urls.py +++ b/src/webmon_app/reporting/reporting_app/urls.py @@ -20,6 +20,7 @@ path("reduction/", include("reporting.reduction.urls", namespace="reduction")), path("pvmon/", include("reporting.pvmon.urls", namespace="pvmon")), path("users/", include("reporting.users.urls", namespace="users")), + path("metrics", include("reporting.metrics.urls", namespace="metrics")), path("database/", admin.site.urls), path("ht/", include("health_check.urls")), ] diff --git a/src/webmon_app/reporting/tests/test_metrics/test_view_util.py b/src/webmon_app/reporting/tests/test_metrics/test_view_util.py new file mode 100644 index 00000000..1c518c96 --- /dev/null +++ b/src/webmon_app/reporting/tests/test_metrics/test_view_util.py @@ -0,0 +1,187 @@ +from django.test import TestCase + +from django.conf import settings +from django.utils import timezone + +from reporting.report.models import Instrument, Information, RunStatus, StatusQueue, WorkflowSummary, Error +from reporting.dasmon.models import ActiveInstrument, Parameter, StatusCache +from workflow.database.report.models import DataRun +from workflow.database.report.models import IPTS + + +class ViewUtilTest(TestCase): + @classmethod + def setUpClass(cls): + inst = Instrument.objects.create(name="testinst") + inst.save() + ActiveInstrument.objects.create( + instrument_id=inst, + is_alive=True, + is_adara=True, + has_pvsd=True, + has_pvstreamer=True, + ) + recording = Parameter.objects.create(name="recording") + recording.save() + paused = Parameter.objects.create(name="paused") + paused.save() + StatusCache.objects.create( + instrument_id=inst, + key_id=recording, + value="true", + ) + StatusCache.objects.create( + instrument_id=inst, + key_id=paused, + value="false", + ) + + # add common services + common = Instrument.objects.create(name="common") + common.save() + ActiveInstrument.objects.create( + instrument_id=common, + is_alive=False, + is_adara=False, + has_pvsd=False, + has_pvstreamer=False, + ) + + @classmethod + def tearDownClass(cls): + Instrument.objects.all().delete() + Parameter.objects.all().delete() + + def test_postprocessing_diagnostics(self): + from reporting.metrics.view_util import postprocessing_diagnostics + + # add postprocessing services + common = Instrument.objects.get(name="common") + for i in range(1, 3): + name_postprocessor = settings.SYSTEM_STATUS_PREFIX + f"autoreducer{i}.com" + para_postprocessor = Parameter.objects.create(name=name_postprocessor) + para_postprocessor.save() + StatusCache.objects.create( + instrument_id=common, + key_id=para_postprocessor, + value=0, + timestamp=timezone.now(), + ) + para_postprocessor_pid = Parameter.objects.create(name=name_postprocessor + "_pid") + para_postprocessor_pid.save() + StatusCache.objects.create( + instrument_id=common, + key_id=para_postprocessor_pid, + value=7, + timestamp=timezone.now(), + ) + + # create StatusQueue, DataRun, RunStatus and Information needed for test + inst = Instrument.objects.get(name="testinst") + queue = StatusQueue(name="REDUCTION.COMPLETE") + queue.save() + ipts = IPTS(expt_name="IPTS-42") + ipts.save() + dataRun = DataRun(run_number=42, ipts_id=ipts, instrument_id=inst, file="/filename") + dataRun.save() + runStatus = RunStatus(run_id=dataRun, queue_id=queue) + runStatus.save() + info = Information(run_status_id=runStatus, description="autoreducer1.com") + info.save() + + diag = postprocessing_diagnostics() + node1 = diag[0] + assert node1["name"] == "autoreducer1.com" + assert node1["PID"] == "7" + assert "timestamp" in node1 + assert "last_message_timestamp" in node1 + assert node1["last_message"] == "testinst_42: REDUCTION.COMPLETE" + node2 = diag[1] + assert node2["name"] == "autoreducer2.com" + assert node2["PID"] == "7" + assert "timestamp" in node2 + assert "last_message_timestamp" not in node2 + assert "last_message" not in node2 + + def test_instrument_status(self): + from reporting.metrics.view_util import instrument_status + + assert instrument_status() == {"testinst": "Recording"} + + def test_run_statuses(self): + from reporting.metrics.view_util import run_statuses + + inst = Instrument.objects.get(name="testinst") + ipts = IPTS(expt_name="IPTS-13") + ipts.save() + + queue = StatusQueue(name="POSTPROCESS.DATA_READY") + queue.save() + + # run too old, should not be included in output + dataRun0 = DataRun(run_number=0, ipts_id=ipts, instrument_id=inst, file="/filename") + dataRun0.save() # need to save once so auto time can be written first so we can override + dataRun0.created_on = timezone.now() - timezone.timedelta(minutes=120) + dataRun0.save() + RunStatus(run_id=dataRun0, queue_id=queue).save() + WorkflowSummary.objects.create(run_id=dataRun0, complete=True) + + statuses = run_statuses() + assert statuses["count"] == 0 + assert statuses["acquiring"] == 0 + assert statuses["incomplete"] == 0 + assert statuses["complete"] == 0 + assert statuses["error"] == 0 + + # run should be acquiring + dataRun1 = DataRun(run_number=1, ipts_id=ipts, instrument_id=inst, file="/filename") + dataRun1.save() + WorkflowSummary.objects.create(run_id=dataRun1) + + statuses = run_statuses() + assert statuses["count"] == 1 + assert statuses["acquiring"] == 1 + assert statuses["incomplete"] == 0 + assert statuses["complete"] == 0 + assert statuses["error"] == 0 + + # run should be incomplete + dataRun2 = DataRun(run_number=2, ipts_id=ipts, instrument_id=inst, file="/filename") + dataRun2.save() + RunStatus(run_id=dataRun2, queue_id=queue).save() + WorkflowSummary.objects.create(run_id=dataRun2) + + statuses = run_statuses() + assert statuses["count"] == 2 + assert statuses["acquiring"] == 1 + assert statuses["incomplete"] == 1 + assert statuses["complete"] == 0 + assert statuses["error"] == 0 + + # run should be complete + dataRun3 = DataRun(run_number=3, ipts_id=ipts, instrument_id=inst, file="/filename") + dataRun3.save() + RunStatus(run_id=dataRun3, queue_id=queue).save() + WorkflowSummary.objects.create(run_id=dataRun3, complete=True) + + statuses = run_statuses() + assert statuses["count"] == 3 + assert statuses["acquiring"] == 1 + assert statuses["incomplete"] == 1 + assert statuses["complete"] == 1 + assert statuses["error"] == 0 + + # run should be error + dataRun4 = DataRun(run_number=4, ipts_id=ipts, instrument_id=inst, file="/filename") + dataRun4.save() + runStatus = RunStatus(run_id=dataRun4, queue_id=queue) + runStatus.save() + Error(run_status_id=runStatus, description="error").save() + WorkflowSummary.objects.create(run_id=dataRun4) + + statuses = run_statuses() + assert statuses["count"] == 4 + assert statuses["acquiring"] == 1 + assert statuses["incomplete"] == 1 + assert statuses["complete"] == 1 + assert statuses["error"] == 1