From 4142715a8fcb79b30cace90c03d4e5d63096aad0 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Mon, 1 Jul 2024 15:50:09 +0200 Subject: [PATCH] Extract flushing status data into separate method --- servicelayer/taskqueue.py | 84 ++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 50 deletions(-) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index 323c61a..43170ef 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -88,23 +88,7 @@ def __init__(self, conn, name): def cancel(self): """Cancel processing of all tasks belonging to a dataset""" - pipe = self.conn.pipeline() - # remove the dataset from active datasets - pipe.srem(self.key, self.name) - # clean up tasks and task counts - pipe.delete(self.finished_key) - pipe.delete(self.running_key) - pipe.delete(self.pending_key) - pipe.delete(self.start_key) - pipe.delete(self.last_update_key) - for stage in self.conn.smembers(self.active_stages_key): - stage_key = self.get_stage_key(stage) - pipe.delete(stage_key) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) - pipe.delete(self.active_stages_key) - pipe.execute() + self.flush_status() def get_status(self): """Status of a given dataset.""" @@ -235,24 +219,13 @@ def remove_task(self, task_id, stage): pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id)) - status = self.get_status() - if status["running"] == 0 and status["pending"] == 0: - # remove the dataset from active datasets - pipe.srem(self.key, self.name) - # reset finished task count - pipe.delete(self.finished_key) - # delete information about running stages - for stage in self.conn.smembers(self.active_stages_key): - stage_key = self.get_stage_key(stage) - pipe.delete(stage_key) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) - # delete stages key - pipe.delete(self.active_stages_key) pipe.set(self.last_update_key, pack_now()) pipe.execute() + status = self.get_status() + if status["running"] == 0 and status["pending"] == 0: + self.flush_status() + def checkout_task(self, task_id, stage): """Update state when a task is checked out for execution""" log.info(f"Checking out task: {task_id}") @@ -293,24 +266,7 @@ def mark_done(self, task: Task): status = self.get_status() if status["running"] == 0 and status["pending"] == 0: - pipe = self.conn.pipeline() - # remove the dataset from active datasets - self.conn.srem(self.key, self.name) - # reset finished task count - pipe.delete(self.finished_key) - # reset time stamps - pipe.delete(self.start_key) - pipe.delete(self.last_update_key) - # delete information about running stages - for stage in self.conn.smembers(self.active_stages_key): - stage_key = self.get_stage_key(stage) - pipe.delete(stage_key) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) - # delete stages key - pipe.delete(self.active_stages_key) - pipe.execute() + self.flush_status() def mark_for_retry(self, task): pipe = self.conn.pipeline() @@ -358,6 +314,34 @@ def is_task_tracked(self, task: Task): return tracked + def flush_status(self): + """Flush status data such as timestamps and task counts""" + + pipe = self.conn.pipeline() + + # remove the dataset from active datasets + self.conn.srem(self.key, self.name) + + # reset finished task count + pipe.delete(self.finished_key) + + # reset timestamps + pipe.delete(self.start_key) + pipe.delete(self.last_update_key) + + # delete information about running stages + for stage in self.conn.smembers(self.active_stages_key): + stage_key = self.get_stage_key(stage) + pipe.delete(stage_key) + pipe.delete(make_key(stage_key, "pending")) + pipe.delete(make_key(stage_key, "running")) + pipe.delete(make_key(stage_key, "finished")) + + # delete stages key + pipe.delete(self.active_stages_key) + + pipe.execute() + @classmethod def is_low_prio(cls, conn, collection_id): """This Dataset is on the low prio list."""