Skip to content

Commit

Permalink
Merge pull request #203 from neutrons/queue_message_count
Browse files Browse the repository at this point in the history
Add Artemis Data Collector and display queue lengths
  • Loading branch information
rosswhitfield authored Dec 8, 2024
2 parents 4e9e7d1 + 07a80d8 commit 807b4a1
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 1 deletion.
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,26 @@ services:
webmon:
condition: service_healthy

artemis_data_collector:
restart: always
image: ghcr.io/neutrons/artemis_data_collector/artemis_data_collector:latest-prod
env_file:
- .env
environment:
INTERVAL: 60
ARTEMIS_URL: http://activemq:8161
ARTEMIS_USER: artemis
ARTEMIS_PASSWORD: artemis
ARTEMIS_BROKER_NAME: Artemis-Broker
QUEUE_LIST: "['REDUCTION.DATA_READY', 'REDUCTION.HIMEM.DATA_READY', 'CATALOG.ONCAT.DATA_READY', 'REDUCTION_CATALOG.DATA_READY']"
depends_on:
db:
condition: service_healthy
webmon:
condition: service_healthy
activemq:
condition: service_healthy

autoheal:
restart: always
image: willfarrell/autoheal
Expand Down
3 changes: 3 additions & 0 deletions src/webmon_app/reporting/dasmon/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ def diagnostics(request, instrument):
wf_diag = view_util.workflow_diagnostics()
# Post-processing
red_diag = view_util.postprocessing_diagnostics()
# reduction queue size
red_queue_size = report_view_util.reduction_queue_sizes()

breadcrumbs = view_util.get_monitor_breadcrumbs(instrument_id, "diagnostics")
template_values = {
Expand Down Expand Up @@ -371,6 +373,7 @@ def diagnostics(request, instrument):

template_values["wf_diagnostics"] = wf_diag
template_values["post_diagnostics"] = red_diag
template_values["reduction_queue_size"] = red_queue_size
template_values["action_messages"] = actions

notices = []
Expand Down
7 changes: 7 additions & 0 deletions src/webmon_app/reporting/report/admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from reporting.report.models import (
DataRun,
StatusQueue,
StatusQueueMessageCount,
RunStatus,
WorkflowSummary,
IPTS,
Expand Down Expand Up @@ -93,6 +94,11 @@ class StatusQueueAdmin(admin.ModelAdmin):
list_filter = ("is_workflow_input",)


class StatusQueueMessageCountAdmin(admin.ModelAdmin):
list_display = ("id", "queue_id", "message_count", "created_on")
list_filter = ("queue_id",)


class WorkflowSummaryAdmin(admin.ModelAdmin):
readonly_fields = ("run_id",)
list_filter = (
Expand Down Expand Up @@ -151,6 +157,7 @@ class InstrumentStatusAdmin(admin.ModelAdmin):

admin.site.register(DataRun, DataRunAdmin)
admin.site.register(StatusQueue, StatusQueueAdmin)
admin.site.register(StatusQueueMessageCount, StatusQueueMessageCountAdmin)
admin.site.register(RunStatus, RunStatusAdmin)
admin.site.register(WorkflowSummary, WorkflowSummaryAdmin)
admin.site.register(IPTS, IPTSAdmin)
Expand Down
13 changes: 13 additions & 0 deletions src/webmon_app/reporting/report/view_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
StatusQueue,
Task,
WorkflowSummary,
StatusQueueMessageCount,
)
from django.urls import reverse
from django.http import HttpResponseServerError
Expand Down Expand Up @@ -629,3 +630,15 @@ def find_skipped_runs(instrument_id, start_run_number=0):
except: # noqa: E722
logging.exception("Error finding missing runs:")
return missing_runs


def reduction_queue_sizes():
"""
Get the size of the message queues
"""
queue_sizes = []

for q in StatusQueueMessageCount.objects.values_list("queue_id").distinct():
queue_sizes.append(StatusQueueMessageCount.objects.filter(queue_id=q[0]).latest("created_on").to_dict())

return queue_sizes
24 changes: 24 additions & 0 deletions src/webmon_app/reporting/templates/dasmon/diagnostics.html
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@
{% endif %}
</div>

{% if reduction_queue_size %}
<p>
<b>Reduction queue length:</b>
<div style="margin-left:20px">
<table>
<tbody>
<tr>
<th>Queue</th>
<th>Last updated</th>
<th>Count</th>
</tr>
{% for item in reduction_queue_size %}
<tr>
<td>{{ item.queue }}</td>
<td>{{ item.created_on }}</td>
<td>{{ item.message_count }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<p>
</div>
{% endif %}

{% endblock %}

{% block nocontent %}{% endblock %}
21 changes: 21 additions & 0 deletions src/webmon_app/reporting/tests/test_report/test_view_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from reporting.report.models import StatusQueue
from reporting.report.models import Task
from reporting.report.models import WorkflowSummary
from reporting.report.models import StatusQueueMessageCount

import json

Expand Down Expand Up @@ -368,6 +369,26 @@ def test_find_skipped_runs(self):
missing_runs = find_skipped_runs(inst)
self.assertEqual(missing_runs[0], 5)

def test_get_status_queue_message_count(self):
from reporting.report.view_util import reduction_queue_sizes

sq1 = StatusQueue(name="TEST_QUEUE", is_workflow_input=True)
sq1.save()
sq2 = StatusQueue(name="TEST_QUEUE2", is_workflow_input=True)
sq2.save()

StatusQueueMessageCount(queue=sq1, message_count=10).save()
StatusQueueMessageCount(queue=sq1, message_count=11).save()
StatusQueueMessageCount(queue=sq1, message_count=12).save()
StatusQueueMessageCount(queue=sq2, message_count=42).save()

queue_size_list = reduction_queue_sizes()
self.assertEqual(len(queue_size_list), 2)
assert queue_size_list[0]["queue"] == "TEST_QUEUE"
assert queue_size_list[0]["message_count"] == 12
assert queue_size_list[1]["queue"] == "TEST_QUEUE2"
assert queue_size_list[1]["message_count"] == 42


if __name__ == "__main__":
pytest.main([__file__])
19 changes: 19 additions & 0 deletions src/workflow_app/workflow/database/report/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,25 @@ def __str__(self):
return self.name


class StatusQueueMessageCount(models.Model):
queue = models.ForeignKey(StatusQueue, on_delete=models.CASCADE)
message_count = models.IntegerField()
created_on = models.DateTimeField("Timestamp", auto_now_add=True)

class Meta:
app_label = "report"

def __str__(self):
return f"{self.queue}: {self.message_count} {self.created_on}"

def to_dict(self):
return {
"queue": str(self.queue),
"message_count": self.message_count,
"created_on": self.created_on,
}


class RunStatusManager(models.Manager):
def status(self, run_id, status_description):
"""
Expand Down
9 changes: 8 additions & 1 deletion tests/test_DASMONPageView.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics):
tree = etree.parse(StringIO(dasmon_diagnostics.text), parser)
table_content = tree.xpath("//tr/td//text()")
# verify number of entries in the tables
expected_number_of_entries = 48
expected_number_of_entries = 57
assert len(table_content) == expected_number_of_entries
# -- DASMON diagnostics
status = table_content[1]
Expand Down Expand Up @@ -63,6 +63,13 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics):
autoreducer_pid = table_content[21]
assert len(autoreducer) > 0
assert len(autoreducer_pid) > 0
# -- Reduction queue lengths
assert table_content[31] == "REDUCTION.DATA_READY"
assert table_content[33] == "0"
assert table_content[34] == "REDUCTION_CATALOG.DATA_READY"
assert table_content[36] == "0"
assert table_content[37] == "CATALOG.ONCAT.DATA_READY"
assert table_content[39] == "0"


if __name__ == "__main__":
Expand Down

0 comments on commit 807b4a1

Please sign in to comment.