From a6cf2a4e665def8aa86f9c24bc119bf3f53beffe Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 28 Nov 2024 15:36:31 +1100 Subject: [PATCH 1/3] Add StatusQueueMessageCount --- src/webmon_app/reporting/report/admin.py | 7 +++++++ src/workflow_app/workflow/database/report/models.py | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/src/webmon_app/reporting/report/admin.py b/src/webmon_app/reporting/report/admin.py index 4c434044..267107cb 100644 --- a/src/webmon_app/reporting/report/admin.py +++ b/src/webmon_app/reporting/report/admin.py @@ -1,6 +1,7 @@ from reporting.report.models import ( DataRun, StatusQueue, + StatusQueueMessageCount, RunStatus, WorkflowSummary, IPTS, @@ -93,6 +94,11 @@ class StatusQueueAdmin(admin.ModelAdmin): list_filter = ("is_workflow_input",) +class StatusQueueMessageCountAdmin(admin.ModelAdmin): + list_display = ("id", "queue_id", "message_count", "created_on") + list_filter = ("queue_id",) + + class WorkflowSummaryAdmin(admin.ModelAdmin): readonly_fields = ("run_id",) list_filter = ( @@ -151,6 +157,7 @@ class InstrumentStatusAdmin(admin.ModelAdmin): admin.site.register(DataRun, DataRunAdmin) admin.site.register(StatusQueue, StatusQueueAdmin) +admin.site.register(StatusQueueMessageCount, StatusQueueMessageCountAdmin) admin.site.register(RunStatus, RunStatusAdmin) admin.site.register(WorkflowSummary, WorkflowSummaryAdmin) admin.site.register(IPTS, IPTSAdmin) diff --git a/src/workflow_app/workflow/database/report/models.py b/src/workflow_app/workflow/database/report/models.py index c2a64a5e..6bd4a183 100644 --- a/src/workflow_app/workflow/database/report/models.py +++ b/src/workflow_app/workflow/database/report/models.py @@ -235,6 +235,15 @@ def __str__(self): return self.name +class StatusQueueMessageCount(models.Model): + queue = models.ForeignKey(StatusQueue, on_delete=models.CASCADE) + message_count = models.IntegerField() + created_on = models.DateTimeField("Timestamp", auto_now_add=True) + + class Meta: + app_label = "report" + + class RunStatusManager(models.Manager): def status(self, run_id, status_description): """ From 95c40cd375d2af936dfc966c4b29fe46e4a625e8 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 5 Dec 2024 11:31:59 +1100 Subject: [PATCH 2/3] Add Artemis Data Collector to docker compose --- docker-compose.yml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index dfc10471..281d846a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -183,6 +183,26 @@ services: webmon: condition: service_healthy + artemis_data_collector: + restart: always + image: ghcr.io/neutrons/artemis_data_collector/artemis_data_collector:latest-prod + env_file: + - .env + environment: + INTERVAL: 60 + ARTEMIS_URL: http://activemq:8161 + ARTEMIS_USER: artemis + ARTEMIS_PASSWORD: artemis + ARTEMIS_BROKER_NAME: Artemis-Broker + QUEUE_LIST: "['REDUCTION.DATA_READY', 'REDUCTION.HIMEM.DATA_READY', 'CATALOG.ONCAT.DATA_READY', 'REDUCTION_CATALOG.DATA_READY']" + depends_on: + db: + condition: service_healthy + webmon: + condition: service_healthy + activemq: + condition: service_healthy + autoheal: restart: always image: willfarrell/autoheal From 07a80d807a6b8fd8b93170e2cb25c29b8a823618 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 6 Dec 2024 13:33:33 +1100 Subject: [PATCH 3/3] Add reduction queue length to diagnostics page --- src/webmon_app/reporting/dasmon/views.py | 3 +++ src/webmon_app/reporting/report/view_util.py | 13 ++++++++++ .../templates/dasmon/diagnostics.html | 24 +++++++++++++++++++ .../tests/test_report/test_view_util.py | 21 ++++++++++++++++ .../workflow/database/report/models.py | 10 ++++++++ tests/test_DASMONPageView.py | 9 ++++++- 6 files changed, 79 insertions(+), 1 deletion(-) diff --git a/src/webmon_app/reporting/dasmon/views.py b/src/webmon_app/reporting/dasmon/views.py index b788ad49..dea1f3bc 100644 --- a/src/webmon_app/reporting/dasmon/views.py +++ b/src/webmon_app/reporting/dasmon/views.py @@ -342,6 +342,8 @@ def diagnostics(request, instrument): wf_diag = view_util.workflow_diagnostics() # Post-processing red_diag = view_util.postprocessing_diagnostics() + # reduction queue size + red_queue_size = report_view_util.reduction_queue_sizes() breadcrumbs = view_util.get_monitor_breadcrumbs(instrument_id, "diagnostics") template_values = { @@ -371,6 +373,7 @@ def diagnostics(request, instrument): template_values["wf_diagnostics"] = wf_diag template_values["post_diagnostics"] = red_diag + template_values["reduction_queue_size"] = red_queue_size template_values["action_messages"] = actions notices = [] diff --git a/src/webmon_app/reporting/report/view_util.py b/src/webmon_app/reporting/report/view_util.py index 3cb0c025..642a9f4e 100644 --- a/src/webmon_app/reporting/report/view_util.py +++ b/src/webmon_app/reporting/report/view_util.py @@ -21,6 +21,7 @@ StatusQueue, Task, WorkflowSummary, + StatusQueueMessageCount, ) from django.urls import reverse from django.http import HttpResponseServerError @@ -629,3 +630,15 @@ def find_skipped_runs(instrument_id, start_run_number=0): except: # noqa: E722 logging.exception("Error finding missing runs:") return missing_runs + + +def reduction_queue_sizes(): + """ + Get the size of the message queues + """ + queue_sizes = [] + + for q in StatusQueueMessageCount.objects.values_list("queue_id").distinct(): + queue_sizes.append(StatusQueueMessageCount.objects.filter(queue_id=q[0]).latest("created_on").to_dict()) + + return queue_sizes diff --git a/src/webmon_app/reporting/templates/dasmon/diagnostics.html b/src/webmon_app/reporting/templates/dasmon/diagnostics.html index 4fa1afea..9759f5ac 100644 --- a/src/webmon_app/reporting/templates/dasmon/diagnostics.html +++ b/src/webmon_app/reporting/templates/dasmon/diagnostics.html @@ -142,6 +142,30 @@ {% endif %} +{% if reduction_queue_size %} +

+ Reduction queue length: +

+ + + + + + + + {% for item in reduction_queue_size %} + + + + + + {% endfor %} + +
QueueLast updatedCount
{{ item.queue }}{{ item.created_on }}{{ item.message_count }}
+

+

+{% endif %} + {% endblock %} {% block nocontent %}{% endblock %} diff --git a/src/webmon_app/reporting/tests/test_report/test_view_util.py b/src/webmon_app/reporting/tests/test_report/test_view_util.py index 91780219..8f6ef27d 100644 --- a/src/webmon_app/reporting/tests/test_report/test_view_util.py +++ b/src/webmon_app/reporting/tests/test_report/test_view_util.py @@ -9,6 +9,7 @@ from reporting.report.models import StatusQueue from reporting.report.models import Task from reporting.report.models import WorkflowSummary +from reporting.report.models import StatusQueueMessageCount import json @@ -368,6 +369,26 @@ def test_find_skipped_runs(self): missing_runs = find_skipped_runs(inst) self.assertEqual(missing_runs[0], 5) + def test_get_status_queue_message_count(self): + from reporting.report.view_util import reduction_queue_sizes + + sq1 = StatusQueue(name="TEST_QUEUE", is_workflow_input=True) + sq1.save() + sq2 = StatusQueue(name="TEST_QUEUE2", is_workflow_input=True) + sq2.save() + + StatusQueueMessageCount(queue=sq1, message_count=10).save() + StatusQueueMessageCount(queue=sq1, message_count=11).save() + StatusQueueMessageCount(queue=sq1, message_count=12).save() + StatusQueueMessageCount(queue=sq2, message_count=42).save() + + queue_size_list = reduction_queue_sizes() + self.assertEqual(len(queue_size_list), 2) + assert queue_size_list[0]["queue"] == "TEST_QUEUE" + assert queue_size_list[0]["message_count"] == 12 + assert queue_size_list[1]["queue"] == "TEST_QUEUE2" + assert queue_size_list[1]["message_count"] == 42 + if __name__ == "__main__": pytest.main([__file__]) diff --git a/src/workflow_app/workflow/database/report/models.py b/src/workflow_app/workflow/database/report/models.py index 6bd4a183..ec0b658e 100644 --- a/src/workflow_app/workflow/database/report/models.py +++ b/src/workflow_app/workflow/database/report/models.py @@ -243,6 +243,16 @@ class StatusQueueMessageCount(models.Model): class Meta: app_label = "report" + def __str__(self): + return f"{self.queue}: {self.message_count} {self.created_on}" + + def to_dict(self): + return { + "queue": str(self.queue), + "message_count": self.message_count, + "created_on": self.created_on, + } + class RunStatusManager(models.Manager): def status(self, run_id, status_description): diff --git a/tests/test_DASMONPageView.py b/tests/test_DASMONPageView.py index 6f93b6a0..1001caa2 100644 --- a/tests/test_DASMONPageView.py +++ b/tests/test_DASMONPageView.py @@ -35,7 +35,7 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics): tree = etree.parse(StringIO(dasmon_diagnostics.text), parser) table_content = tree.xpath("//tr/td//text()") # verify number of entries in the tables - expected_number_of_entries = 48 + expected_number_of_entries = 57 assert len(table_content) == expected_number_of_entries # -- DASMON diagnostics status = table_content[1] @@ -63,6 +63,13 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics): autoreducer_pid = table_content[21] assert len(autoreducer) > 0 assert len(autoreducer_pid) > 0 + # -- Reduction queue lengths + assert table_content[31] == "REDUCTION.DATA_READY" + assert table_content[33] == "0" + assert table_content[34] == "REDUCTION_CATALOG.DATA_READY" + assert table_content[36] == "0" + assert table_content[37] == "CATALOG.ONCAT.DATA_READY" + assert table_content[39] == "0" if __name__ == "__main__":