Skip to content

Commit

Permalink
Extract flushing status data into separate method
Browse files Browse the repository at this point in the history
  • Loading branch information
tillprochaska committed Jul 1, 2024
1 parent a2a8dc4 commit 4e33f37
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 44 deletions.
75 changes: 31 additions & 44 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,9 @@ def __init__(self, conn, name):
def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# clean up tasks and task counts
pipe.delete(self.finished_key)
pipe.delete(self.running_key)
pipe.delete(self.pending_key)
pipe.delete(self.start_key)
pipe.delete(self.last_update_key)
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
pipe.delete(self.active_stages_key)
self.flush_status(pipe)
pipe.execute()

def get_status(self):
Expand Down Expand Up @@ -235,24 +223,15 @@ def remove_task(self, task_id, stage):

pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.set(self.last_update_key, pack_now())
pipe.execute()

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def checkout_task(self, task_id, stage):
"""Update state when a task is checked out for execution"""
log.info(f"Checking out task: {task_id}")
Expand Down Expand Up @@ -294,22 +273,7 @@ def mark_done(self, task: Task):
status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
pipe = self.conn.pipeline()
# remove the dataset from active datasets
self.conn.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# reset time stamps
pipe.delete(self.start_key)
pipe.delete(self.last_update_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
self.flush_status(pipe)
pipe.execute()

def mark_for_retry(self, task):
Expand Down Expand Up @@ -358,6 +322,29 @@ def is_task_tracked(self, task: Task):

return tracked

def flush_status(self, pipe):
"""Flush status data such as timestamps and task counts"""
# remove the dataset from active datasets
self.conn.srem(self.key, self.name)

# reset finished task count
pipe.delete(self.finished_key)

# reset timestamps
pipe.delete(self.start_key)
pipe.delete(self.last_update_key)

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# delete stages key
pipe.delete(self.active_stages_key)

@classmethod
def is_low_prio(cls, conn, collection_id):
"""This Dataset is on the low prio list."""
Expand Down
47 changes: 47 additions & 0 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def did_nack():

def test_dataset_get_status():
conn = get_fakeredis()
conn.flushdb()

dataset = Dataset(conn=conn, name="123")
status = dataset.get_status()
Expand Down Expand Up @@ -328,6 +329,52 @@ def test_dataset_get_status():
assert status["last_update"] is None


def test_dataset_cancel():
conn = get_fakeredis()
conn.flushdb()

dataset = Dataset(conn=conn, name="abc")
assert conn.keys() == []

# Enqueueing tasks stores status data in Redis
dataset.add_task("1", "ingest")
dataset.add_task("2", "index")
dataset.checkout_task("1", "ingest")
assert conn.keys() != []

# Cancelling a dataset removes associated data from Redis
dataset.cancel()
assert conn.keys() == []


def test_dataset_mark_done():
conn = get_fakeredis()
conn.flushdb()

dataset = Dataset(conn=conn, name="abc")
assert conn.keys() == []

task = Task(
task_id="1",
job_id="abc",
delivery_tag="",
operation="ingest",
context={},
payload={},
priority=5,
collection_id="abc",
)

# Enqueueing a task stores status data in Redis
dataset.add_task(task.task_id, task.operation)
dataset.checkout_task(task.task_id, task.operation)
assert conn.keys() != []

# Marking the last task as done cleans up status data in Redis
dataset.mark_done(task)
assert conn.keys() == []


def test_get_priority_bucket():
redis = get_fakeredis()
rmq_channel = get_rabbitmq_channel()
Expand Down

0 comments on commit 4e33f37

Please sign in to comment.