Skip to content

Commit

Permalink
Remove monitoring queue tag switch monitoring db pre-router
Browse files Browse the repository at this point in the history
The main goals of this PR is to make _migrate_logs_to_internal much more
clearly a message forwarder, rather than a message interpreter.

This follows on from PR #2168 which introduces _dispatch_to_internal to
dispatches messages based on their tag rather than on the queue the message
was received on, and is part of an ongoing series to simplify the queue
and routing structure inside the monitoring router and database code.

Further PRs in preparation (in draft PR #3315) contain further simplifications
building on this PR.

After this PR:

* the database manager will respond to a STOP message on any incoming queue,
vs previously only on the priority queue. This is a consequence of treating
the queues all the same now.

* the database manager will not perform such strong validation of message
structure based on message tag at this point. That's part of expecting the
code to forward messages, not inspect them, with later inspecting code
being the place to care abou structure. This only affects behaviour when
invalid messages are sent.

Related PRs and context:

#3567 changes the monitoring router to be more of a router and
to not inspect and modify certain in-transit messages.

There is a long slow project to regularise queues: PR #2117 makes resource
info messages look like other message so they can be dispatched alongside
other message types.

The priority queue was initially (as I understand it) introduced to attempt
to address a race condition of message order arrival vs SQL database key
constraints. The priority queue is an attempt to force certain messages to
be processed before others (not in the priority queue). However a subsequent
commit in 2019, 0a4b685, introduces a
more robust approach because this priority queue approach does not work and
so is not needed.
  • Loading branch information
benclifford committed Aug 15, 2024
1 parent ffb3644 commit dab3431
Showing 1 changed file with 13 additions and 30 deletions.
43 changes: 13 additions & 30 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,31 +316,31 @@ def start(self,
self._kill_event = threading.Event()
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
priority_queue, 'priority', self._kill_event,),
priority_queue, self._kill_event,),
name="Monitoring-migrate-priority",
daemon=True,
)
self._priority_queue_pull_thread.start()

self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
node_queue, 'node', self._kill_event,),
node_queue, self._kill_event,),
name="Monitoring-migrate-node",
daemon=True,
)
self._node_queue_pull_thread.start()

self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
block_queue, 'block', self._kill_event,),
block_queue, self._kill_event,),
name="Monitoring-migrate-block",
daemon=True,
)
self._block_queue_pull_thread.start()

self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
resource_queue, 'resource', self._kill_event,),
resource_queue, self._kill_event,),
name="Monitoring-migrate-resource",
daemon=True,
)
Expand Down Expand Up @@ -577,43 +577,26 @@ def start(self,
raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log")

@wrap_with_logs(target="database_manager")
def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kill_event: threading.Event) -> None:
logger.info("Starting processing for queue {}".format(queue_tag))
def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threading.Event) -> None:
logger.info("Starting _migrate_logs_to_internal")

while not kill_event.is_set() or logs_queue.qsize() != 0:
logger.debug("""Checking STOP conditions for {} threads: {}, {}"""
.format(queue_tag, kill_event.is_set(), logs_queue.qsize() != 0))
logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s",
kill_event.is_set(), logs_queue.qsize() != 0)
try:
x, addr = logs_queue.get(timeout=0.1)
except queue.Empty:
continue
else:
if queue_tag == 'priority' and x == 'STOP':
if x == 'STOP':
self.close()
elif queue_tag == 'priority': # implicitly not 'STOP'
assert isinstance(x, tuple)
assert len(x) == 2
assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \
"_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0])
self._dispatch_to_internal(x)
elif queue_tag == 'resource':
assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x)
assert x[0] == MessageType.RESOURCE_INFO, (
"_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, "
"got tag {}, message {}".format(x[0], x)
)
self._dispatch_to_internal(x)
elif queue_tag == 'node':
assert len(x) == 2, "expected message tuple to have exactly two elements"
assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue"

self._dispatch_to_internal(x)
elif queue_tag == "block":
self._dispatch_to_internal(x)
else:
logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}")
self._dispatch_to_internal(x)

def _dispatch_to_internal(self, x: Tuple) -> None:
assert isinstance(x, tuple)
assert len(x) == 2, "expected message tuple to have exactly two elements"

if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]:
self.pending_priority_queue.put(cast(Any, x))
elif x[0] == MessageType.RESOURCE_INFO:
Expand Down

0 comments on commit dab3431

Please sign in to comment.