Skip to content

Commit

Permalink
update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourour Benzarti committed Jul 23, 2024
1 parent 996c28a commit ce54414
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,16 @@ It uses row level locks of postgres to mimic the atomic pop and atomic push of r

```sql
UPDATE task_queue
SET processing = true,
deadline =
current_timestamp + CAST(lease_timeout || ' seconds' AS INTERVAL)
SET started_at = current_timestamp
WHERE id = (
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND started_at IS NULL
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
AND can_start_at <= current_timestamp
ORDER BY can_start_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
Expand All @@ -137,10 +136,11 @@ Let's say two workers try to get a new task at the same time, assuming that they
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND started_at IS NULL
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
AND can_start_at <= current_timestamp
ORDER BY can_start_at
```

The first worker locks the row with the `FOR UPDATE` clause until the update is completed and committed. If we hadn't used the `SKIP LOCKED` clause, the second worker would have seen the same row and waited for the first worker to finish the update. However, since the first worker already updated it, the subquery would no longer be valid, and the second worker would return zero rows because `WHERE id = NULL`.
Expand Down
6 changes: 3 additions & 3 deletions postgrestq/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def get(self) -> Tuple[
"""Get a task from the task queue (non-blocking).
This statement marks the next available task in the queue as
"processing" and returns its ID and task details. The query
started (being processed) and returns its ID and task details. The query
uses a FOR UPDATE SKIP LOCKED clause to lock the selected
task so that other workers can't select the same task simultaneously.
Expand All @@ -291,7 +291,7 @@ def get(self) -> Tuple[
>>> taskqueue.complete(task_id)
After some time (i.e. `lease_timeout`) tasks expire and are
marked as not processing and the TTL is decreased by
marked as not being processed and the TTL is decreased by
one. If TTL is still > 0 the task will be retried.
Note, this method is non-blocking, i.e. it returns immediately
Expand Down Expand Up @@ -525,7 +525,7 @@ def get_updated_expired_task(
) -> Tuple[Optional[str], Optional[int]]:
"""
Given the id of an expired task, it tries to reschedule it by
marking it as not processing, resetting the deadline
marking it as not started, resetting the deadline
and decreasing TTL by one. It returns None if the task is
already updated (or being updated) by another worker.
Expand Down

0 comments on commit ce54414

Please sign in to comment.