From 59629f17d49bb94f2f8acf347bdeb2a698c7c738 Mon Sep 17 00:00:00 2001 From: catileptic Date: Tue, 8 Oct 2024 14:27:45 +0200 Subject: [PATCH] Fix Redis keys (#201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix Redis keys (temporary solution) * Fix unused variable (linter) * Fix double increment bug, remove deprecated key from test * Bump version: 1.23.0-rc34 → 1.23.0-rc35 * Execute redis commands at the end of mark_task_for_retry * Bump version: 1.23.0-rc35 → 1.23.0-rc36 * Remove wrong keys clean-up code * Fix failing tests --- .bumpversion.cfg | 2 +- servicelayer/__init__.py | 2 +- servicelayer/taskqueue.py | 105 +++++++++++++++++--------------------- setup.py | 2 +- tests/test_dataset.py | 39 ++++++++++++++ 5 files changed, 89 insertions(+), 61 deletions(-) create mode 100644 tests/test_dataset.py diff --git a/.bumpversion.cfg b/.bumpversion.cfg index fc99921..86ab3bb 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.23.0-rc34 +current_version = 1.23.0-rc36 commit = True tag = True parse = (?P\d+)\.(?P\d+)\.(?P\d+)([-](?P(pre|rc))(?P\d+))? diff --git a/servicelayer/__init__.py b/servicelayer/__init__.py index 5ab37d9..85da306 100644 --- a/servicelayer/__init__.py +++ b/servicelayer/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "1.23.0-rc34" +__version__ = "1.23.0-rc36" logging.getLogger("boto3").setLevel(logging.WARNING) logging.getLogger("botocore").setLevel(logging.WARNING) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index cc7f568..af0a654 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -96,11 +96,10 @@ def flush_status(self, pipe): # delete information about running stages for stage in self.conn.smembers(self.active_stages_key): - stage_key = self.get_stage_key(stage) - pipe.delete(stage_key) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) + pipe.delete(make_key(PREFIX, "qds", self.name, stage)) + pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending")) + pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running")) + pipe.delete(make_key(PREFIX, "qds", self.name, stage, "finished")) # delete information about tasks per dataset pipe.delete(self.pending_key) @@ -125,20 +124,23 @@ def get_status(self): status["last_update"] = last_update for stage in self.conn.smembers(self.active_stages_key): - stage_key = self.get_stage_key(stage) + num_pending = unpack_int( + self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending")) + ) + num_running = unpack_int( + self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running")) + ) + num_finished = unpack_int( + self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished")) + ) + status["stages"].append( { "job_id": "", "stage": stage, - "pending": max( - 0, unpack_int(self.conn.scard(make_key(stage_key, "pending"))) - ), - "running": max( - 0, unpack_int(self.conn.scard(make_key(stage_key, "running"))) - ), - "finished": max( - 0, unpack_int(self.conn.get(make_key(stage_key, "finished"))) - ), + "pending": max(0, num_pending), + "running": max(0, num_running), + "finished": max(0, num_finished), } ) @@ -207,14 +209,10 @@ def add_task(self, task_id, stage): # add the dataset to active datasets pipe.sadd(self.key, self.name) - # add the stage to the list of active stages per dataset + # update status of stages per dataset pipe.sadd(self.active_stages_key, stage) - - # add the task to the set of tasks per stage - # and the set of pending tasks per stage - stage_key = self.get_stage_key(stage) - pipe.sadd(stage_key, task_id) - pipe.sadd(make_key(stage_key, "pending"), task_id) + pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id) + pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id) # add the task to the set of pending tasks per dataset pipe.sadd(self.pending_key, task_id) @@ -232,11 +230,8 @@ def remove_task(self, task_id, stage): # remove the task from the set of pending tasks per dataset pipe.srem(self.pending_key, task_id) - # remove the task from the set of tasks per stage - # and the set of pending tasks per stage - stage_key = self.get_stage_key(stage) - pipe.srem(stage_key, task_id) - pipe.srem(make_key(stage_key, "pending"), task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id) # delete the retry key for this task pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id)) @@ -255,16 +250,11 @@ def checkout_task(self, task_id, stage): # add the dataset to active datasets pipe.sadd(self.key, self.name) - # add the stage to the list of active stages per dataset + # update status of stages per dataset pipe.sadd(self.active_stages_key, stage) - - # add the task to the set of tasks per stage - # and the set of running tasks per stage - stage_key = self.get_stage_key(stage) - pipe.sadd(stage_key, task_id) - pipe.sadd(make_key(stage_key, "running"), task_id) - # remove the task from the set of pending tasks per stage - pipe.srem(make_key(stage_key, "pending"), task_id) + pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id) + pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "running"), task_id) # add the task to the set of running tasks per dataset pipe.sadd(self.running_key, task_id) @@ -291,14 +281,12 @@ def mark_done(self, task: Task): # delete the retry key for the task pipe.delete(task.retry_key) - # remove the task from the set of tasks per stage - # and the pending and running tasks per stage - stage_key = self.get_stage_key(task.operation) - pipe.srem(stage_key, task.task_id) - pipe.srem(make_key(stage_key, "pending"), task.task_id) - pipe.srem(make_key(stage_key, "running"), task.task_id) - # increase the number of finished tasks per stage - pipe.incr(make_key(stage_key, "finished")) + stage = task.operation + task_id = task.task_id + pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id) + pipe.incr(make_key(PREFIX, "qds", self.name, stage, "finished")) # update dataset timestamps pipe.set(self.last_update_key, pack_now()) @@ -312,26 +300,30 @@ def mark_done(self, task: Task): def mark_for_retry(self, task): pipe = self.conn.pipeline() + log.info( f"Marking task {task.task_id} (stage {task.operation})" f" for retry after NACK" ) + stage = task.operation + task_id = task.task_id + # remove the task from the pending and running sets of tasks per dataset - pipe.srem(self.pending_key, task.task_id) - pipe.srem(self.running_key, task.task_id) + pipe.srem(self.pending_key, task_id) + pipe.srem(self.running_key, task_id) - # remove the task from the set of tasks per stage - # and the set of running tasks per stage - stage_key = self.get_stage_key(task.operation) - pipe.srem(stage_key, task.task_id) - pipe.srem(make_key(stage_key, "running"), task.task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id) + pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id) # delete the retry key for the task pipe.delete(task.retry_key) pipe.set(self.last_update_key, pack_now()) + pipe.execute() + def is_done(self): status = self.get_status() return status["pending"] == 0 and status["running"] == 0 @@ -339,9 +331,6 @@ def is_done(self): def __str__(self): return self.name - def get_stage_key(self, stage): - return make_key(PREFIX, "qds", self.name, stage) - def is_task_tracked(self, task: Task): tracked = True @@ -349,8 +338,6 @@ def is_task_tracked(self, task: Task): task_id = task.task_id stage = task.operation - stage_key = self.get_stage_key(stage) - # A task is considered tracked if # the dataset is in the list of active datasets if dataset not in self.conn.smembers(self.key): @@ -359,7 +346,9 @@ def is_task_tracked(self, task: Task): elif stage not in self.conn.smembers(self.active_stages_key): tracked = False # and the task_id is in the list of task_ids per stage - elif task_id not in self.conn.smembers(stage_key): + elif task_id not in self.conn.smembers( + make_key(PREFIX, "qds", self.name, stage) + ): tracked = False return tracked @@ -687,7 +676,7 @@ def process(): return self.process(blocking=True) if not self.num_threads: - # TODO - seems like we need at least one thread + # we need at least one thread # consuming and processing require separate threads self.num_threads = 1 diff --git a/setup.py b/setup.py index be46f92..f686f66 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name="servicelayer", - version="1.23.0-rc34", + version="1.23.0-rc36", description="Basic remote service functions for alephdata components", classifiers=[ "Development Status :: 3 - Alpha", diff --git a/tests/test_dataset.py b/tests/test_dataset.py new file mode 100644 index 0000000..b7498b2 --- /dev/null +++ b/tests/test_dataset.py @@ -0,0 +1,39 @@ +from unittest import TestCase + + +from servicelayer.cache import get_fakeredis +from servicelayer.taskqueue import ( + Dataset, + dataset_from_collection_id, +) + + +class TestDataset(TestCase): + def setUp(self): + self.connection = get_fakeredis() + self.connection.flushdb() + self.collection_id = 1 + + self.dataset = Dataset( + conn=self.connection, name=dataset_from_collection_id(self.collection_id) + ) + + def test_get_active_datasets_key(self): + assert self.dataset.key == "tq:qdatasets" + + def test_get_active_stages_key(self): + assert ( + self.dataset.active_stages_key + == f"tq:qds:{self.collection_id}:active_stages" + ) + + def test_get_timestamp_keys(self): + assert self.dataset.start_key == f"tq:qdj:{self.collection_id}:start" + assert ( + self.dataset.last_update_key == f"tq:qdj:{self.collection_id}:last_update" + ) + + def test_tasks_per_collection_keys(self): + assert self.dataset.finished_key == f"tq:qdj:{self.collection_id}:finished" + assert self.dataset.running_key == f"tq:qdj:{self.collection_id}:running" + assert self.dataset.pending_key == f"tq:qdj:{self.collection_id}:pending"