Skip to content

Commit

Permalink
Log worker retry count and retry count exhaustion (#113)
Browse files Browse the repository at this point in the history
* More specific log messages for retry and giving up

* Convert to pytest style. Add a test for failing retries

* formatting

* linter

* Add retry count to structlog output
  • Loading branch information
stchris authored Oct 9, 2023
1 parent b657e3b commit e1c102a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 38 deletions.
2 changes: 2 additions & 0 deletions servicelayer/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from structlog.contextvars import clear_contextvars, bind_contextvars

from servicelayer import settings
from servicelayer.util import unpack_int

LOG_FORMAT_TEXT = "TEXT"
LOG_FORMAT_JSON = "JSON"
Expand Down Expand Up @@ -84,6 +85,7 @@ def apply_task_context(task, **kwargs):
dataset=task.job.dataset.name,
start_time=time.time(),
trace_id=str(uuid.uuid4()),
retry=unpack_int(task.context.get("retries")),
**kwargs
)

Expand Down
11 changes: 9 additions & 2 deletions servicelayer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,16 @@ def init_internal(self):
def retry(self, task):
retries = unpack_int(task.context.get("retries"))
if retries < settings.WORKER_RETRY:
log.warning("Queue failed task for re-try...")
task.context["retries"] = retries + 1
retry_count = retries + 1
log.warning(
f"Queueing failed task for retry #{retry_count}/{settings.WORKER_RETRY}..." # noqa
)
task.context["retries"] = retry_count
task.stage.queue(task.payload, task.context)
else:
log.warning(
f"Failed task, exhausted retry count of {settings.WORKER_RETRY}"
)

def process(self, blocking=True, interval=INTERVAL):
retries = 0
Expand Down
96 changes: 60 additions & 36 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest import TestCase
import logging
import pytest

from servicelayer.cache import get_fakeredis
Expand All @@ -17,39 +17,63 @@ def handle(self, task):
self.test_done += 1


class WorkerTest(TestCase):
def test_run(self):
conn = get_fakeredis()
operation = "lala"
worker = CountingWorker(conn=conn, stages=[operation])
worker.sync()
assert worker.test_done == 0, worker.test_done
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})
assert not job.is_done()
assert worker.test_done == 0, worker.test_done
worker.sync()
assert worker.test_done == 1, worker.test_done
assert job.is_done()
worker.retry(task)
assert not job.is_done()
worker.sync()
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
assert worker.test_done == 1, worker.test_done
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
worker.num_threads = None
class NoOpWorker(worker.Worker):
def handle(self, task):
pass


def test_run():
conn = get_fakeredis()
operation = "lala"
worker = CountingWorker(conn=conn, stages=[operation])
worker.sync()
assert worker.test_done == 0, worker.test_done
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})
assert not job.is_done()
assert worker.test_done == 0, worker.test_done
worker.sync()
assert worker.test_done == 1, worker.test_done
assert job.is_done()
worker.retry(task)
assert not job.is_done()
worker.sync()
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
assert worker.test_done == 1, worker.test_done
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
worker.num_threads = None
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
try:
worker._handle_signal(5, None)
except SystemExit as exc:
assert exc.code == 5, exc.code
with pytest.raises(SystemExit) as exc: # noqa
worker._handle_signal(5, None)


def test_fails(caplog):
caplog.set_level(logging.DEBUG)
conn = get_fakeredis()
operation = "fails"
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})

worker = NoOpWorker(conn=conn, stages=[operation])
worker.sync()
for _ in range(4):
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
try:
worker._handle_signal(5, None)
except SystemExit as exc:
assert exc.code == 5, exc.code
with pytest.raises(SystemExit) as exc: # noqa
worker._handle_signal(5, None)

log_messages = [r.msg for r in caplog.records]
assert "Queueing failed task for retry #1/3..." in log_messages
assert "Queueing failed task for retry #2/3..." in log_messages
assert "Queueing failed task for retry #3/3..." in log_messages
assert "Failed task, exhausted retry count of 3" in log_messages

0 comments on commit e1c102a

Please sign in to comment.