Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log worker retry count and retry count exhaustion #113

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions servicelayer/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from structlog.contextvars import clear_contextvars, bind_contextvars

from servicelayer import settings
from servicelayer.util import unpack_int

LOG_FORMAT_TEXT = "TEXT"
LOG_FORMAT_JSON = "JSON"
Expand Down Expand Up @@ -84,6 +85,7 @@ def apply_task_context(task, **kwargs):
dataset=task.job.dataset.name,
start_time=time.time(),
trace_id=str(uuid.uuid4()),
retry=unpack_int(task.context.get("retries")),
**kwargs
)

Expand Down
11 changes: 9 additions & 2 deletions servicelayer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,16 @@ def init_internal(self):
def retry(self, task):
retries = unpack_int(task.context.get("retries"))
if retries < settings.WORKER_RETRY:
log.warning("Queue failed task for re-try...")
task.context["retries"] = retries + 1
retry_count = retries + 1
log.warning(
f"Queueing failed task for retry #{retry_count}/{settings.WORKER_RETRY}..." # noqa
)
task.context["retries"] = retry_count
task.stage.queue(task.payload, task.context)
else:
log.warning(
f"Failed task, exhausted retry count of {settings.WORKER_RETRY}"
)

def process(self, blocking=True, interval=INTERVAL):
retries = 0
Expand Down
96 changes: 60 additions & 36 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest import TestCase
import logging
import pytest

from servicelayer.cache import get_fakeredis
Expand All @@ -17,39 +17,63 @@ def handle(self, task):
self.test_done += 1


class WorkerTest(TestCase):
def test_run(self):
conn = get_fakeredis()
operation = "lala"
worker = CountingWorker(conn=conn, stages=[operation])
worker.sync()
assert worker.test_done == 0, worker.test_done
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})
assert not job.is_done()
assert worker.test_done == 0, worker.test_done
worker.sync()
assert worker.test_done == 1, worker.test_done
assert job.is_done()
worker.retry(task)
assert not job.is_done()
worker.sync()
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
assert worker.test_done == 1, worker.test_done
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
worker.num_threads = None
class NoOpWorker(worker.Worker):
def handle(self, task):
pass


def test_run():
conn = get_fakeredis()
operation = "lala"
worker = CountingWorker(conn=conn, stages=[operation])
worker.sync()
assert worker.test_done == 0, worker.test_done
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})
assert not job.is_done()
assert worker.test_done == 0, worker.test_done
worker.sync()
assert worker.test_done == 1, worker.test_done
assert job.is_done()
worker.retry(task)
assert not job.is_done()
worker.sync()
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
assert worker.test_done == 1, worker.test_done
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
worker.num_threads = None
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
try:
worker._handle_signal(5, None)
except SystemExit as exc:
assert exc.code == 5, exc.code
with pytest.raises(SystemExit) as exc: # noqa
worker._handle_signal(5, None)


def test_fails(caplog):
caplog.set_level(logging.DEBUG)
conn = get_fakeredis()
operation = "fails"
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})

worker = NoOpWorker(conn=conn, stages=[operation])
worker.sync()
for _ in range(4):
worker.retry(task)
worker.run(blocking=False)
stchris marked this conversation as resolved.
Show resolved Hide resolved
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
try:
worker._handle_signal(5, None)
except SystemExit as exc:
assert exc.code == 5, exc.code
with pytest.raises(SystemExit) as exc: # noqa
worker._handle_signal(5, None)

log_messages = [r.msg for r in caplog.records]
assert "Queueing failed task for retry #1/3..." in log_messages
assert "Queueing failed task for retry #2/3..." in log_messages
assert "Queueing failed task for retry #3/3..." in log_messages
assert "Failed task, exhausted retry count of 3" in log_messages
Loading