Skip to content

Commit

Permalink
Add get_many helper to retrieve many tasks in a batch
Browse files Browse the repository at this point in the history
  • Loading branch information
jacopofar committed Apr 1, 2024
1 parent 9074f61 commit 66d3710
Showing 1 changed file with 55 additions and 1 deletion.
56 changes: 55 additions & 1 deletion postgrestq/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime

from uuid import uuid4, UUID
from typing import Optional, Tuple, Iterator, Dict, Any, Callable
from typing import Optional, Tuple, Iterator, Dict, Any, Callable, List

from psycopg import sql, connect

Expand Down Expand Up @@ -262,6 +262,60 @@ def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]:
conn.commit()
return task, task_id

def get_many(self, amount: int) -> List[
Tuple[Optional[Dict[str, Any]], Optional[UUID]]
]:
"""Same as get() but retrieves multiple tasks.
If there are less than `amount` tasks in the queue, it will return
what is available.
If no task is available it will return an empty list.
This is faster than multiple calls to get(), as it uses a single query.
Returns
-------
list of (task, task_id) :
The tasks and their IDs
"""
conn = self.conn

with conn.cursor() as cur:
cur.execute(
sql.SQL(
"""
UPDATE {}
SET processing = true,
started_at = current_timestamp
WHERE id IN (
SELECT id
FROM {}
WHERE completed_at IS NULL
AND processing = false
AND queue_name = %s
AND ttl > 0
AND can_start_at <= current_timestamp
ORDER BY can_start_at
FOR UPDATE SKIP LOCKED
LIMIT %s
)
RETURNING task, id;"""
).format(
sql.Identifier(self._table_name),
sql.Identifier(self._table_name),
),
(self._queue_name, amount),
)

ret = []
for row in cur.fetchall():
logger.info(f"Got task with id {row[1]}")
ret.append(row)
conn.commit()
return ret

def complete(self, task_id: Optional[UUID]) -> None:
"""Mark a task as completed.
Expand Down

0 comments on commit 66d3710

Please sign in to comment.