diff --git a/src/webmon_app/reporting/dasmon/views.py b/src/webmon_app/reporting/dasmon/views.py
index b788ad49..dea1f3bc 100644
--- a/src/webmon_app/reporting/dasmon/views.py
+++ b/src/webmon_app/reporting/dasmon/views.py
@@ -342,6 +342,8 @@ def diagnostics(request, instrument):
wf_diag = view_util.workflow_diagnostics()
# Post-processing
red_diag = view_util.postprocessing_diagnostics()
+ # reduction queue size
+ red_queue_size = report_view_util.reduction_queue_sizes()
breadcrumbs = view_util.get_monitor_breadcrumbs(instrument_id, "diagnostics")
template_values = {
@@ -371,6 +373,7 @@ def diagnostics(request, instrument):
template_values["wf_diagnostics"] = wf_diag
template_values["post_diagnostics"] = red_diag
+ template_values["reduction_queue_size"] = red_queue_size
template_values["action_messages"] = actions
notices = []
diff --git a/src/webmon_app/reporting/report/view_util.py b/src/webmon_app/reporting/report/view_util.py
index 3cb0c025..642a9f4e 100644
--- a/src/webmon_app/reporting/report/view_util.py
+++ b/src/webmon_app/reporting/report/view_util.py
@@ -21,6 +21,7 @@
StatusQueue,
Task,
WorkflowSummary,
+ StatusQueueMessageCount,
)
from django.urls import reverse
from django.http import HttpResponseServerError
@@ -629,3 +630,15 @@ def find_skipped_runs(instrument_id, start_run_number=0):
except: # noqa: E722
logging.exception("Error finding missing runs:")
return missing_runs
+
+
+def reduction_queue_sizes():
+ """
+ Get the size of the message queues
+ """
+ queue_sizes = []
+
+ for q in StatusQueueMessageCount.objects.values_list("queue_id").distinct():
+ queue_sizes.append(StatusQueueMessageCount.objects.filter(queue_id=q[0]).latest("created_on").to_dict())
+
+ return queue_sizes
diff --git a/src/webmon_app/reporting/templates/dasmon/diagnostics.html b/src/webmon_app/reporting/templates/dasmon/diagnostics.html
index 4fa1afea..9759f5ac 100644
--- a/src/webmon_app/reporting/templates/dasmon/diagnostics.html
+++ b/src/webmon_app/reporting/templates/dasmon/diagnostics.html
@@ -142,6 +142,30 @@
{% endif %}
+{% if reduction_queue_size %}
+
+ Reduction queue length:
+
+
+
+
+ Queue |
+ Last updated |
+ Count |
+
+ {% for item in reduction_queue_size %}
+
+ {{ item.queue }} |
+ {{ item.created_on }} |
+ {{ item.message_count }} |
+
+ {% endfor %}
+
+
+
+
+{% endif %}
+
{% endblock %}
{% block nocontent %}{% endblock %}
diff --git a/src/webmon_app/reporting/tests/test_report/test_view_util.py b/src/webmon_app/reporting/tests/test_report/test_view_util.py
index 91780219..8f6ef27d 100644
--- a/src/webmon_app/reporting/tests/test_report/test_view_util.py
+++ b/src/webmon_app/reporting/tests/test_report/test_view_util.py
@@ -9,6 +9,7 @@
from reporting.report.models import StatusQueue
from reporting.report.models import Task
from reporting.report.models import WorkflowSummary
+from reporting.report.models import StatusQueueMessageCount
import json
@@ -368,6 +369,26 @@ def test_find_skipped_runs(self):
missing_runs = find_skipped_runs(inst)
self.assertEqual(missing_runs[0], 5)
+ def test_get_status_queue_message_count(self):
+ from reporting.report.view_util import reduction_queue_sizes
+
+ sq1 = StatusQueue(name="TEST_QUEUE", is_workflow_input=True)
+ sq1.save()
+ sq2 = StatusQueue(name="TEST_QUEUE2", is_workflow_input=True)
+ sq2.save()
+
+ StatusQueueMessageCount(queue=sq1, message_count=10).save()
+ StatusQueueMessageCount(queue=sq1, message_count=11).save()
+ StatusQueueMessageCount(queue=sq1, message_count=12).save()
+ StatusQueueMessageCount(queue=sq2, message_count=42).save()
+
+ queue_size_list = reduction_queue_sizes()
+ self.assertEqual(len(queue_size_list), 2)
+ assert queue_size_list[0]["queue"] == "TEST_QUEUE"
+ assert queue_size_list[0]["message_count"] == 12
+ assert queue_size_list[1]["queue"] == "TEST_QUEUE2"
+ assert queue_size_list[1]["message_count"] == 42
+
if __name__ == "__main__":
pytest.main([__file__])
diff --git a/src/workflow_app/workflow/database/report/models.py b/src/workflow_app/workflow/database/report/models.py
index 6bd4a183..ec0b658e 100644
--- a/src/workflow_app/workflow/database/report/models.py
+++ b/src/workflow_app/workflow/database/report/models.py
@@ -243,6 +243,16 @@ class StatusQueueMessageCount(models.Model):
class Meta:
app_label = "report"
+ def __str__(self):
+ return f"{self.queue}: {self.message_count} {self.created_on}"
+
+ def to_dict(self):
+ return {
+ "queue": str(self.queue),
+ "message_count": self.message_count,
+ "created_on": self.created_on,
+ }
+
class RunStatusManager(models.Manager):
def status(self, run_id, status_description):
diff --git a/tests/test_DASMONPageView.py b/tests/test_DASMONPageView.py
index 6f93b6a0..1001caa2 100644
--- a/tests/test_DASMONPageView.py
+++ b/tests/test_DASMONPageView.py
@@ -35,7 +35,7 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics):
tree = etree.parse(StringIO(dasmon_diagnostics.text), parser)
table_content = tree.xpath("//tr/td//text()")
# verify number of entries in the tables
- expected_number_of_entries = 48
+ expected_number_of_entries = 57
assert len(table_content) == expected_number_of_entries
# -- DASMON diagnostics
status = table_content[1]
@@ -63,6 +63,13 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics):
autoreducer_pid = table_content[21]
assert len(autoreducer) > 0
assert len(autoreducer_pid) > 0
+ # -- Reduction queue lengths
+ assert table_content[31] == "REDUCTION.DATA_READY"
+ assert table_content[33] == "0"
+ assert table_content[34] == "REDUCTION_CATALOG.DATA_READY"
+ assert table_content[36] == "0"
+ assert table_content[37] == "CATALOG.ONCAT.DATA_READY"
+ assert table_content[39] == "0"
if __name__ == "__main__":