Skip to content

Commit

Permalink
Merge pull request #878 from procrastinate-org/issue-871-ewjoachim
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Dec 21, 2023
2 parents 6854f95 + 32f4db8 commit be3d302
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 25 deletions.
20 changes: 14 additions & 6 deletions procrastinate/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from typing import Optional


class ProcrastinateException(Exception):
Expand Down Expand Up @@ -32,12 +33,6 @@ class TaskAlreadyRegistered(ProcrastinateException):
"""


class JobError(ProcrastinateException):
"""
Job ended with an exception.
"""


class LoadFromPathError(ImportError, ProcrastinateException):
"""
App was not found at the provided path, or the loaded object is not an App.
Expand All @@ -54,6 +49,19 @@ def __init__(self, scheduled_at: datetime.datetime):
super().__init__()


class JobError(ProcrastinateException):
"""
Job ended with an exception.
"""

def __init__(
self, *args, retry_exception: Optional[JobRetry] = None, critical: bool = False
):
super().__init__(*args)
self.retry_exception = retry_exception
self.critical = critical


class AppNotOpen(ProcrastinateException):
"""
App was not open. Procrastinate App needs to be opened using:
Expand Down
10 changes: 7 additions & 3 deletions procrastinate/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class BaseRetryStrategy:
"""

def get_retry_exception(
self, exception: Exception, attempts: int
self, exception: BaseException, attempts: int
) -> Optional[exceptions.JobRetry]:
schedule_in = self.get_schedule_in(exception=exception, attempts=attempts)
if schedule_in is None:
Expand All @@ -26,7 +26,9 @@ def get_retry_exception(
schedule_at = utils.utcnow() + datetime.timedelta(seconds=schedule_in)
return exceptions.JobRetry(schedule_at.replace(microsecond=0))

def get_schedule_in(self, *, exception: Exception, attempts: int) -> Optional[int]:
def get_schedule_in(
self, *, exception: BaseException, attempts: int
) -> Optional[int]:
"""
Parameters
----------
Expand Down Expand Up @@ -81,7 +83,9 @@ class RetryStrategy(BaseRetryStrategy):
exponential_wait: int = 0
retry_exceptions: Optional[Iterable[Type[Exception]]] = None

def get_schedule_in(self, *, exception: Exception, attempts: int) -> Optional[int]:
def get_schedule_in(
self, *, exception: BaseException, attempts: int
) -> Optional[int]:
if self.max_attempts and attempts >= self.max_attempts:
return None
# isinstance's 2nd param must be a tuple, not an arbitrary iterable
Expand Down
2 changes: 1 addition & 1 deletion procrastinate/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def configure(
)

def get_retry_exception(
self, exception: Exception, job: jobs.Job
self, exception: BaseException, job: jobs.Job
) -> Optional[exceptions.JobRetry]:
if not self.retry_strategy:
return None
Expand Down
25 changes: 18 additions & 7 deletions procrastinate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,13 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None:
try:
await self.run_job(job=job, worker_id=worker_id)
status = jobs.Status.SUCCEEDED
except exceptions.JobRetry as e:
retry_at = e.scheduled_at
except exceptions.JobError:
except exceptions.JobError as e:
status = jobs.Status.FAILED
if e.retry_exception:
retry_at = e.retry_exception.scheduled_at
if e.critical and e.__cause__:
raise e.__cause__

except exceptions.TaskNotFound as exc:
status = jobs.Status.FAILED
self.logger.exception(
Expand Down Expand Up @@ -221,10 +224,16 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None:
f"Starting job {job.call_string}",
extra=context.log_extra(action="start_job"),
)
exc_info: Union[bool, Exception]
job_args = []
if task.pass_context:
job_args.append(context)

# Initialise logging variables
task_result = None
log_title = "Error"
log_action = "job_error"
log_level = logging.ERROR
exc_info: Union[bool, BaseException] = False
try:
task_result = task(*job_args, **job.task_kwargs)
if asyncio.iscoroutine(task_result):
Expand All @@ -236,20 +245,22 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None:
extra=context.log_extra(action="concurrent_sync_task"),
)

except Exception as e:
except BaseException as e:
task_result = None
log_title = "Error"
log_action = "job_error"
log_level = logging.ERROR
exc_info = e
critical = not isinstance(e, Exception)

retry_exception = task.get_retry_exception(exception=e, job=job)
if retry_exception:
log_title = "Error, to retry"
log_action = "job_error_retry"
log_level = logging.INFO
raise retry_exception from e
raise exceptions.JobError() from e
raise exceptions.JobError(
retry_exception=retry_exception, critical=critical
) from e

else:
log_title = "Success"
Expand Down
98 changes: 90 additions & 8 deletions tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ async def coro(*args, **kwargs):

scheduled_at = conftest.aware_datetime(2000, 1, 1)
test_worker.run_job = mocker.Mock(
side_effect=exceptions.JobRetry(scheduled_at=scheduled_at)
side_effect=exceptions.JobError(
retry_exception=exceptions.JobRetry(scheduled_at=scheduled_at)
)
)
job = job_factory(id=1)
await test_worker.job_manager.defer_job_async(job)
Expand All @@ -137,6 +139,58 @@ async def coro(*args, **kwargs):
test_worker.run_job.assert_called_with(job=job, worker_id=0)
assert connector.jobs[1]["status"] == "todo"
assert connector.jobs[1]["scheduled_at"] == scheduled_at
assert connector.jobs[1]["attempts"] == 1


async def test_process_job_retry_failed_job_critical(
mocker, test_worker, job_factory, connector
):
class TestException(BaseException):
pass

job_exception = exceptions.JobError(critical=True)
job_exception.__cause__ = TestException()

test_worker.run_job = mocker.Mock(side_effect=job_exception)
job = job_factory(id=1)
await test_worker.job_manager.defer_job_async(job)

# Exceptions that extend BaseException should be re-raised after the failed job
# is scheduled for retry (if retry is applicable).
with pytest.raises(TestException):
await test_worker.process_job(job=job, worker_id=0)

test_worker.run_job.assert_called_with(job=job, worker_id=0)
assert connector.jobs[1]["status"] == "failed"
assert connector.jobs[1]["scheduled_at"] is None
assert connector.jobs[1]["attempts"] == 1


async def test_process_job_retry_failed_job_retry_critical(
mocker, test_worker, job_factory, connector
):
class TestException(BaseException):
pass

scheduled_at = conftest.aware_datetime(2000, 1, 1)
job_exception = exceptions.JobError(
critical=True, retry_exception=exceptions.JobRetry(scheduled_at=scheduled_at)
)
job_exception.__cause__ = TestException()

test_worker.run_job = mocker.Mock(side_effect=job_exception)
job = job_factory(id=1)
await test_worker.job_manager.defer_job_async(job)

# Exceptions that extend BaseException should be re-raised after the failed job
# is scheduled for retry (if retry is applicable).
with pytest.raises(TestException):
await test_worker.process_job(job=job, worker_id=0)

test_worker.run_job.assert_called_with(job=job, worker_id=0)
assert connector.jobs[1]["status"] == "todo"
assert connector.jobs[1]["scheduled_at"] == scheduled_at
assert connector.jobs[1]["attempts"] == 1


async def test_run_job(app):
Expand Down Expand Up @@ -248,11 +302,11 @@ def task():
async def test_run_job_error(app, caplog):
caplog.set_level("INFO")

def job(a, b): # pylint: disable=unused-argument
def job_func(a, b): # pylint: disable=unused-argument
raise ValueError("nope")

task = tasks.Task(job, blueprint=app, queue="yay", name="job")
task.func = job
task = tasks.Task(job_func, blueprint=app, queue="yay", name="job")
task.func = job_func

app.tasks = {"job": task}

Expand Down Expand Up @@ -280,14 +334,40 @@ def job(a, b): # pylint: disable=unused-argument
)


async def test_run_job_critical_error(app, caplog):
caplog.set_level("INFO")

def job_func(a, b): # pylint: disable=unused-argument
raise BaseException("nope")

task = tasks.Task(job_func, blueprint=app, queue="yay", name="job")
task.func = job_func

app.tasks = {"job": task}

job = jobs.Job(
id=16,
task_kwargs={"a": 9, "b": 3},
lock="sherlock",
queueing_lock="houba",
task_name="job",
queue="yay",
)
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobError) as exc_info:
await test_worker.run_job(job=job, worker_id=3)

assert exc_info.value.critical is True


async def test_run_job_retry(app, caplog):
caplog.set_level("INFO")

def job(a, b): # pylint: disable=unused-argument
def job_func(a, b): # pylint: disable=unused-argument
raise ValueError("nope")

task = tasks.Task(job, blueprint=app, queue="yay", name="job", retry=True)
task.func = job
task = tasks.Task(job_func, blueprint=app, queue="yay", name="job", retry=True)
task.func = job_func

app.tasks = {"job": task}

Expand All @@ -300,9 +380,11 @@ def job(a, b): # pylint: disable=unused-argument
queue="yay",
)
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobRetry):
with pytest.raises(exceptions.JobError) as exc_info:
await test_worker.run_job(job=job, worker_id=3)

assert isinstance(exc_info.value.retry_exception, exceptions.JobRetry)

assert (
len(
[
Expand Down

0 comments on commit be3d302

Please sign in to comment.