From ce544142f8947be3477c6e59452f324efe377079 Mon Sep 17 00:00:00 2001 From: Sourour Benzarti Date: Tue, 23 Jul 2024 17:47:53 +0200 Subject: [PATCH] update documentation --- README.md | 14 +++++++------- postgrestq/task_queue.py | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 8309ccc..32896b5 100644 --- a/README.md +++ b/README.md @@ -114,17 +114,16 @@ It uses row level locks of postgres to mimic the atomic pop and atomic push of r ```sql UPDATE task_queue -SET processing = true, - deadline = - current_timestamp + CAST(lease_timeout || ' seconds' AS INTERVAL) +SET started_at = current_timestamp WHERE id = ( SELECT id FROM task_queue WHERE completed_at IS NULL - AND processing = false + AND started_at IS NULL AND queue_name = AND ttl > 0 - ORDER BY created_at + AND can_start_at <= current_timestamp + ORDER BY can_start_at FOR UPDATE SKIP LOCKED LIMIT 1 ) @@ -137,10 +136,11 @@ Let's say two workers try to get a new task at the same time, assuming that they SELECT id FROM task_queue WHERE completed_at IS NULL - AND processing = false + AND started_at IS NULL AND queue_name = AND ttl > 0 -ORDER BY created_at + AND can_start_at <= current_timestamp +ORDER BY can_start_at ``` The first worker locks the row with the `FOR UPDATE` clause until the update is completed and committed. If we hadn't used the `SKIP LOCKED` clause, the second worker would have seen the same row and waited for the first worker to finish the update. However, since the first worker already updated it, the subquery would no longer be valid, and the second worker would return zero rows because `WHERE id = NULL`. diff --git a/postgrestq/task_queue.py b/postgrestq/task_queue.py index 329e985..192a764 100644 --- a/postgrestq/task_queue.py +++ b/postgrestq/task_queue.py @@ -273,7 +273,7 @@ def get(self) -> Tuple[ """Get a task from the task queue (non-blocking). This statement marks the next available task in the queue as - "processing" and returns its ID and task details. The query + started (being processed) and returns its ID and task details. The query uses a FOR UPDATE SKIP LOCKED clause to lock the selected task so that other workers can't select the same task simultaneously. @@ -291,7 +291,7 @@ def get(self) -> Tuple[ >>> taskqueue.complete(task_id) After some time (i.e. `lease_timeout`) tasks expire and are - marked as not processing and the TTL is decreased by + marked as not being processed and the TTL is decreased by one. If TTL is still > 0 the task will be retried. Note, this method is non-blocking, i.e. it returns immediately @@ -525,7 +525,7 @@ def get_updated_expired_task( ) -> Tuple[Optional[str], Optional[int]]: """ Given the id of an expired task, it tries to reschedule it by - marking it as not processing, resetting the deadline + marking it as not started, resetting the deadline and decreasing TTL by one. It returns None if the task is already updated (or being updated) by another worker.