Skip to content

Commit

Permalink
Extract flushing status data into separate method
Browse files Browse the repository at this point in the history
  • Loading branch information
tillprochaska committed Jul 1, 2024
1 parent a2a8dc4 commit 4142715
Showing 1 changed file with 34 additions and 50 deletions.
84 changes: 34 additions & 50 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,7 @@ def __init__(self, conn, name):

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# clean up tasks and task counts
pipe.delete(self.finished_key)
pipe.delete(self.running_key)
pipe.delete(self.pending_key)
pipe.delete(self.start_key)
pipe.delete(self.last_update_key)
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
pipe.delete(self.active_stages_key)
pipe.execute()
self.flush_status()

def get_status(self):
"""Status of a given dataset."""
Expand Down Expand Up @@ -235,24 +219,13 @@ def remove_task(self, task_id, stage):

pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.set(self.last_update_key, pack_now())
pipe.execute()

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
self.flush_status()

def checkout_task(self, task_id, stage):
"""Update state when a task is checked out for execution"""
log.info(f"Checking out task: {task_id}")
Expand Down Expand Up @@ -293,24 +266,7 @@ def mark_done(self, task: Task):

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
pipe = self.conn.pipeline()
# remove the dataset from active datasets
self.conn.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# reset time stamps
pipe.delete(self.start_key)
pipe.delete(self.last_update_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.execute()
self.flush_status()

def mark_for_retry(self, task):
pipe = self.conn.pipeline()
Expand Down Expand Up @@ -358,6 +314,34 @@ def is_task_tracked(self, task: Task):

return tracked

def flush_status(self):
"""Flush status data such as timestamps and task counts"""

pipe = self.conn.pipeline()

# remove the dataset from active datasets
self.conn.srem(self.key, self.name)

# reset finished task count
pipe.delete(self.finished_key)

# reset timestamps
pipe.delete(self.start_key)
pipe.delete(self.last_update_key)

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# delete stages key
pipe.delete(self.active_stages_key)

pipe.execute()

@classmethod
def is_low_prio(cls, conn, collection_id):
"""This Dataset is on the low prio list."""
Expand Down

0 comments on commit 4142715

Please sign in to comment.