Skip to content

Commit

Permalink
Fix Redis keys (#201)
Browse files Browse the repository at this point in the history
* Fix Redis keys (temporary solution)

* Fix unused variable (linter)

* Fix double increment bug, remove deprecated key from test

* Bump version: 1.23.0-rc34 → 1.23.0-rc35

* Execute redis commands at the end of mark_task_for_retry

* Bump version: 1.23.0-rc35 → 1.23.0-rc36

* Remove wrong keys clean-up code

* Fix failing tests
  • Loading branch information
catileptic authored Oct 8, 2024
1 parent 83c62cb commit 59629f1
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.23.0-rc34
current_version = 1.23.0-rc36
commit = True
tag = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)([-](?P<release>(pre|rc))(?P<build>\d+))?
Expand Down
2 changes: 1 addition & 1 deletion servicelayer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

__version__ = "1.23.0-rc34"
__version__ = "1.23.0-rc36"

logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
105 changes: 47 additions & 58 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ def flush_status(self, pipe):

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "finished"))

# delete information about tasks per dataset
pipe.delete(self.pending_key)
Expand All @@ -125,20 +124,23 @@ def get_status(self):
status["last_update"] = last_update

for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
num_pending = unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending"))
)
num_running = unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running"))
)
num_finished = unpack_int(
self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished"))
)

status["stages"].append(
{
"job_id": "",
"stage": stage,
"pending": max(
0, unpack_int(self.conn.scard(make_key(stage_key, "pending")))
),
"running": max(
0, unpack_int(self.conn.scard(make_key(stage_key, "running")))
),
"finished": max(
0, unpack_int(self.conn.get(make_key(stage_key, "finished")))
),
"pending": max(0, num_pending),
"running": max(0, num_running),
"finished": max(0, num_finished),
}
)

Expand Down Expand Up @@ -207,14 +209,10 @@ def add_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# add the stage to the list of active stages per dataset
# update status of stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# add the task to the set of pending tasks per dataset
pipe.sadd(self.pending_key, task_id)
Expand All @@ -232,11 +230,8 @@ def remove_task(self, task_id, stage):
# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# remove the task from the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.srem(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# delete the retry key for this task
pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))
Expand All @@ -255,16 +250,11 @@ def checkout_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# add the stage to the list of active stages per dataset
# update status of stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "running"), task_id)
# remove the task from the set of pending tasks per stage
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)

# add the task to the set of running tasks per dataset
pipe.sadd(self.running_key, task_id)
Expand All @@ -291,14 +281,12 @@ def mark_done(self, task: Task):
# delete the retry key for the task
pipe.delete(task.retry_key)

# remove the task from the set of tasks per stage
# and the pending and running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "pending"), task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
# increase the number of finished tasks per stage
pipe.incr(make_key(stage_key, "finished"))
stage = task.operation
task_id = task.task_id
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.incr(make_key(PREFIX, "qds", self.name, stage, "finished"))

# update dataset timestamps
pipe.set(self.last_update_key, pack_now())
Expand All @@ -312,45 +300,44 @@ def mark_done(self, task: Task):

def mark_for_retry(self, task):
pipe = self.conn.pipeline()

log.info(
f"Marking task {task.task_id} (stage {task.operation})"
f" for retry after NACK"
)

stage = task.operation
task_id = task.task_id

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)
pipe.srem(self.pending_key, task_id)
pipe.srem(self.running_key, task_id)

# remove the task from the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)

# delete the retry key for the task
pipe.delete(task.retry_key)

pipe.set(self.last_update_key, pack_now())

pipe.execute()

def is_done(self):
status = self.get_status()
return status["pending"] == 0 and status["running"] == 0

def __str__(self):
return self.name

def get_stage_key(self, stage):
return make_key(PREFIX, "qds", self.name, stage)

def is_task_tracked(self, task: Task):
tracked = True

dataset = dataset_from_collection_id(task.collection_id)
task_id = task.task_id
stage = task.operation

stage_key = self.get_stage_key(stage)

# A task is considered tracked if
# the dataset is in the list of active datasets
if dataset not in self.conn.smembers(self.key):
Expand All @@ -359,7 +346,9 @@ def is_task_tracked(self, task: Task):
elif stage not in self.conn.smembers(self.active_stages_key):
tracked = False
# and the task_id is in the list of task_ids per stage
elif task_id not in self.conn.smembers(stage_key):
elif task_id not in self.conn.smembers(
make_key(PREFIX, "qds", self.name, stage)
):
tracked = False

return tracked
Expand Down Expand Up @@ -687,7 +676,7 @@ def process():
return self.process(blocking=True)

if not self.num_threads:
# TODO - seems like we need at least one thread
# we need at least one thread
# consuming and processing require separate threads
self.num_threads = 1

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="servicelayer",
version="1.23.0-rc34",
version="1.23.0-rc36",
description="Basic remote service functions for alephdata components",
classifiers=[
"Development Status :: 3 - Alpha",
Expand Down
39 changes: 39 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from unittest import TestCase


from servicelayer.cache import get_fakeredis
from servicelayer.taskqueue import (
Dataset,
dataset_from_collection_id,
)


class TestDataset(TestCase):
def setUp(self):
self.connection = get_fakeredis()
self.connection.flushdb()
self.collection_id = 1

self.dataset = Dataset(
conn=self.connection, name=dataset_from_collection_id(self.collection_id)
)

def test_get_active_datasets_key(self):
assert self.dataset.key == "tq:qdatasets"

def test_get_active_stages_key(self):
assert (
self.dataset.active_stages_key
== f"tq:qds:{self.collection_id}:active_stages"
)

def test_get_timestamp_keys(self):
assert self.dataset.start_key == f"tq:qdj:{self.collection_id}:start"
assert (
self.dataset.last_update_key == f"tq:qdj:{self.collection_id}:last_update"
)

def test_tasks_per_collection_keys(self):
assert self.dataset.finished_key == f"tq:qdj:{self.collection_id}:finished"
assert self.dataset.running_key == f"tq:qdj:{self.collection_id}:running"
assert self.dataset.pending_key == f"tq:qdj:{self.collection_id}:pending"

0 comments on commit 59629f1

Please sign in to comment.