diff --git a/.codecov.yml b/.codecov.yml index c9526802..0ea43a02 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1,6 +1,9 @@ coverage: precision: 2 range: [95, 100] + status: + patch: false + project: false comment: layout: 'header, diff, flags, files, footer' diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b7f0460..d382c822 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,7 @@ jobs: fail-fast: false matrix: os: [ubuntu] - python-version: ['3.6', '3.7', '3.8', '3.9'] + python-version: ['3.7', '3.8', '3.9', '3.10'] env: PYTHON: ${{ matrix.python-version }} diff --git a/.gitignore b/.gitignore index d6092b95..e2d3e183 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ __pycache__/ .vscode/ .venv/ /.auto-format +/scratch/ diff --git a/HISTORY.rst b/HISTORY.rst index be3cb434..9cd827a5 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,32 @@ History ------- +v0.23a1 (2022-03-09) +.................... +* Fix jobs timeout by @kiriusm2 in #248 +* Update ``index.rst`` by @Kludex in #266 +* Improve some docs wording by @johtso in #285 +* fix error when cron jobs were terminanted by @tobymao in #273 +* add ``on_job_start`` and ``on_job_end`` hooks by @tobymao in #274 +* Update argument docstring definition by @sondrelg in #278 +* fix tests and uprev test dependencies, #288 +* Add link to WorkerSettings in documentation by @JonasKs in #279 +* Allow setting ``job_id`` on cron jobs by @JonasKs in #293 +* Fix docs typo by @johtso in #296 +* support aioredis v2 by @Yolley in #259 +* support python 3.10, #298 + +v0.22 (2021-09-02) +.................. +* fix package importing in example, #261, thanks @cdpath +* restrict ``aioredis`` to ``<2.0.0`` (soon we'll support ``aioredis>=2.0.0``), #258, thanks @PaxPrz +* auto setting version on release, 759fe03 + +v0.21 (2021-07-06) +.................. +* CI improvements #243 +* fix ``log_redis_info`` #255 + v0.20 (2021-04-26) .................. diff --git a/Makefile b/Makefile index 35fd24c5..0155352b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .DEFAULT_GOAL := all isort = isort arq tests -black = black -S -l 120 --target-version py37 arq tests +black = black arq tests .PHONY: install install: @@ -15,7 +15,7 @@ format: .PHONY: lint lint: - flake8 arq/ tests/ + flake8 --max-complexity 10 --max-line-length 120 --ignore E203,W503 arq/ tests/ $(isort) --check-only --df $(black) --check diff --git a/arq/cli.py b/arq/cli.py index 8cc03227..97f62992 100644 --- a/arq/cli.py +++ b/arq/cli.py @@ -43,7 +43,7 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: else: kwargs = {} if burst is None else {'burst': burst} if watch: - asyncio.get_event_loop().run_until_complete(watch_reload(watch, worker_settings_)) + asyncio.run(watch_reload(watch, worker_settings_)) else: run_worker(worker_settings_, **kwargs) diff --git a/arq/connections.py b/arq/connections.py index 770d86ba..7397f3c8 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -6,12 +6,13 @@ from datetime import datetime, timedelta from operator import attrgetter from typing import Any, Callable, Generator, List, Optional, Tuple, Union -from urllib.parse import urlparse +from urllib.parse import parse_qs, urlparse from uuid import uuid4 -import aioredis -from aioredis import MultiExecError, Redis from pydantic.validators import make_arbitrary_type_validator +from redis.asyncio import ConnectionPool, Redis +from redis.asyncio.sentinel import Sentinel +from redis.exceptions import RedisError, WatchError from .constants import default_queue_name, job_key_prefix, result_key_prefix from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job @@ -40,6 +41,7 @@ class RedisSettings: host: Union[str, List[Tuple[str, int]]] = 'localhost' port: int = 6379 + unix_socket_path: Optional[str] = None database: int = 0 password: Optional[str] = None ssl: Union[bool, None, SSLContext] = None @@ -53,13 +55,19 @@ class RedisSettings: @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': conf = urlparse(dsn) - assert conf.scheme in {'redis', 'rediss'}, 'invalid DSN scheme' + assert conf.scheme in {'redis', 'rediss', 'unix'}, 'invalid DSN scheme' + if conf.query and 'db' in parse_qs(conf.query): + # e.q. redis://localhost:6379?db=1 + database = int(parse_qs(conf.query)['db'][0]) + else: + database = int(conf.path.lstrip('/')) if conf.path else 0 return RedisSettings( host=conf.hostname or 'localhost', port=conf.port or 6379, ssl=conf.scheme == 'rediss', password=conf.password, - database=int((conf.path or '0').strip('/')), + database=database, + unix_socket_path=conf.path if conf.scheme == 'unix' else None, ) def __repr__(self) -> str: @@ -70,20 +78,20 @@ def __repr__(self) -> str: expires_extra_ms = 86_400_000 -class ArqRedis(Redis): # type: ignore +class ArqRedis(Redis): # type: ignore[misc] """ - Thin subclass of ``aioredis.Redis`` which adds :func:`arq.connections.enqueue_job`. + Thin subclass of ``redis.asyncio.Redis`` which adds :func:`arq.connections.enqueue_job`. :param redis_settings: an instance of ``arq.connections.RedisSettings``. :param job_serializer: a function that serializes Python objects to bytes, defaults to pickle.dumps :param job_deserializer: a function that deserializes bytes into Python objects, defaults to pickle.loads :param default_queue_name: the default queue name to use, defaults to ``arq.queue``. - :param kwargs: keyword arguments directly passed to ``aioredis.Redis``. + :param kwargs: keyword arguments directly passed to ``redis.asyncio.Redis``. """ def __init__( self, - pool_or_conn: Any, + pool_or_conn: Optional[ConnectionPool] = None, job_serializer: Optional[Serializer] = None, job_deserializer: Optional[Deserializer] = None, default_queue_name: str = default_queue_name, @@ -92,7 +100,9 @@ def __init__( self.job_serializer = job_serializer self.job_deserializer = job_deserializer self.default_queue_name = default_queue_name - super().__init__(pool_or_conn, **kwargs) + if pool_or_conn: + kwargs['connection_pool'] = pool_or_conn + super().__init__(**kwargs) async def enqueue_job( self, @@ -129,14 +139,10 @@ async def enqueue_job( defer_by_ms = to_ms(_defer_by) expires_ms = to_ms(_expires) - with await self as conn: - pipe = conn.pipeline() - pipe.unwatch() - pipe.watch(job_key) - job_exists = pipe.exists(job_key) - job_result_exists = pipe.exists(result_key_prefix + job_id) - await pipe.execute() - if await job_exists or await job_result_exists: + async with self.pipeline(transaction=True) as pipe: + await pipe.watch(job_key) + if any(await asyncio.gather(pipe.exists(job_key), pipe.exists(result_key_prefix + job_id))): + await pipe.reset() return None enqueue_time_ms = timestamp_ms() @@ -150,24 +156,22 @@ async def enqueue_job( expires_ms = expires_ms or score - enqueue_time_ms + expires_extra_ms job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) - tr = conn.multi_exec() - tr.psetex(job_key, expires_ms, job) - tr.zadd(_queue_name, score, job_id) + pipe.multi() + pipe.psetex(job_key, expires_ms, job) + pipe.zadd(_queue_name, {job_id: score}) try: - await tr.execute() - except MultiExecError: + await pipe.execute() + except WatchError: # job got enqueued since we checked 'job_exists' - # https://github.com/samuelcolvin/arq/issues/131, avoid warnings in log - await asyncio.gather(*tr._results, return_exceptions=True) return None return Job(job_id, redis=self, _queue_name=_queue_name, _deserializer=self.job_deserializer) - async def _get_job_result(self, key: str) -> JobResult: - job_id = key[len(result_key_prefix) :] + async def _get_job_result(self, key: bytes) -> JobResult: + job_id = key[len(result_key_prefix) :].decode() job = Job(job_id, self, _deserializer=self.job_deserializer) r = await job.result_info() if r is None: - raise KeyError(f'job "{key}" not found') + raise KeyError(f'job "{key.decode()}" not found') r.job_id = job_id return r @@ -179,8 +183,8 @@ async def all_job_results(self) -> List[JobResult]: results = await asyncio.gather(*[self._get_job_result(k) for k in keys]) return sorted(results, key=attrgetter('enqueue_time')) - async def _get_job_def(self, job_id: str, score: int) -> JobDef: - v = await self.get(job_key_prefix + job_id, encoding=None) + async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: + v = await self.get(job_key_prefix + job_id.decode()) jd = deserialize_job(v, deserializer=self.job_deserializer) jd.score = score return jd @@ -189,8 +193,8 @@ async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[Job """ Get information about queued, mostly useful when testing. """ - jobs = await self.zrange(queue_name, withscores=True) - return await asyncio.gather(*[self._get_job_def(job_id, score) for job_id, score in jobs]) + jobs = await self.zrange(queue_name, withscores=True, start=0, end=-1) + return await asyncio.gather(*[self._get_job_def(job_id, int(score)) for job_id, score in jobs]) async def create_pool( @@ -204,8 +208,7 @@ async def create_pool( """ Create a new redis pool, retrying up to ``conn_retries`` times if the connection fails. - Similar to ``aioredis.create_redis_pool`` except it returns a :class:`arq.connections.ArqRedis` instance, - thus allowing job enqueuing. + Returns a :class:`arq.connections.ArqRedis` instance, thus allowing job enqueuing. """ settings: RedisSettings = RedisSettings() if settings_ is None else settings_ @@ -214,32 +217,34 @@ async def create_pool( ), "str provided for 'host' but 'sentinel' is true; list of sentinels expected" if settings.sentinel: - addr: Any = settings.host - async def pool_factory(*args: Any, **kwargs: Any) -> Redis: - client = await aioredis.sentinel.create_sentinel_pool(*args, ssl=settings.ssl, **kwargs) - return client.master_for(settings.sentinel_master) + def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: + client = Sentinel(*args, sentinels=settings.host, ssl=settings.ssl, **kwargs) + return client.master_for(settings.sentinel_master, redis_class=ArqRedis) else: pool_factory = functools.partial( - aioredis.create_pool, create_connection_timeout=settings.conn_timeout, ssl=settings.ssl + ArqRedis, + host=settings.host, + port=settings.port, + unix_socket_path=settings.unix_socket_path, + socket_connect_timeout=settings.conn_timeout, + ssl=settings.ssl, ) - addr = settings.host, settings.port try: - pool = await pool_factory(addr, db=settings.database, password=settings.password, encoding='utf8') - pool = ArqRedis( - pool, - job_serializer=job_serializer, - job_deserializer=job_deserializer, - default_queue_name=default_queue_name, - ) + pool = pool_factory(db=settings.database, password=settings.password, encoding='utf8') + pool.job_serializer = job_serializer + pool.job_deserializer = job_deserializer + pool.default_queue_name = default_queue_name + await pool.ping() - except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e: + except (ConnectionError, OSError, RedisError, asyncio.TimeoutError) as e: if retry < settings.conn_retries: logger.warning( - 'redis connection error %s %s %s, %d retries remaining...', - addr, + 'redis connection error %s:%s %s %s, %d retries remaining...', + settings.host, + settings.port, e.__class__.__name__, e, settings.conn_retries - retry, @@ -264,14 +269,16 @@ async def pool_factory(*args: Any, **kwargs: Any) -> Redis: async def log_redis_info(redis: Redis, log_func: Callable[[str], Any]) -> None: - with await redis as r: - info_server, info_memory, info_clients, key_count = await asyncio.gather( - r.info(section='Server'), r.info(section='Memory'), r.info(section='Clients'), r.dbsize(), - ) - - redis_version = info_server.get('server', {}).get('redis_version', '?') - mem_usage = info_memory.get('memory', {}).get('used_memory_human', '?') - clients_connected = info_clients.get('clients', {}).get('connected_clients', '?') + async with redis.pipeline(transaction=True) as pipe: + pipe.info(section='Server') + pipe.info(section='Memory') + pipe.info(section='Clients') + pipe.dbsize() + info_server, info_memory, info_clients, key_count = await pipe.execute() + + redis_version = info_server.get('redis_version', '?') + mem_usage = info_memory.get('used_memory_human', '?') + clients_connected = info_clients.get('connected_clients', '?') log_func( f'redis_version={redis_version} ' diff --git a/arq/cron.py b/arq/cron.py index c4c410b2..8dd15773 100644 --- a/arq/cron.py +++ b/arq/cron.py @@ -102,6 +102,7 @@ class CronJob: microsecond: int run_at_startup: bool unique: bool + job_id: Optional[str] timeout_s: Optional[float] keep_result_s: Optional[float] keep_result_forever: Optional[bool] @@ -137,6 +138,7 @@ def cron( microsecond: int = 123_456, run_at_startup: bool = False, unique: bool = True, + job_id: Optional[str] = None, timeout: Optional[SecondsTimedelta] = None, keep_result: Optional[float] = 0, keep_result_forever: Optional[bool] = False, @@ -159,7 +161,8 @@ def cron( :param microsecond: microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6 :param run_at_startup: whether to run as worker starts - :param unique: whether the job should be only be executed once at each time + :param unique: whether the job should only be executed once at each time (useful if you have multiple workers) + :param job_id: ID of the job, can be used to enforce job uniqueness, spanning multiple cron schedules :param timeout: job timeout :param keep_result: how long to keep the result for :param keep_result_forever: whether to keep results forever @@ -188,6 +191,7 @@ def cron( microsecond, run_at_startup, unique, + job_id, timeout, keep_result, keep_result_forever, diff --git a/arq/jobs.py b/arq/jobs.py index 18d86b6b..85d4f58c 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -7,7 +7,7 @@ from enum import Enum from typing import Any, Callable, Dict, Optional, Tuple -from aioredis import Redis +from redis.asyncio import Redis from .constants import abort_jobs_ss, default_queue_name, in_progress_key_prefix, job_key_prefix, result_key_prefix from .utils import ms_to_datetime, poll, timestamp_ms @@ -44,6 +44,10 @@ class JobDef: enqueue_time: datetime score: Optional[int] + def __post_init__(self) -> None: + if isinstance(self.score, float): + self.score = int(self.score) + @dataclass class JobResult(JobDef): @@ -110,7 +114,7 @@ async def info(self) -> Optional[JobDef]: """ info: Optional[JobDef] = await self.result_info() if not info: - v = await self._redis.get(job_key_prefix + self.job_id, encoding=None) + v = await self._redis.get(job_key_prefix + self.job_id) if v: info = deserialize_job(v, deserializer=self._deserializer) if info: @@ -122,7 +126,7 @@ async def result_info(self) -> Optional[JobResult]: Information about the job result if available, does not wait for the result. Does not raise an exception even if the job raised one. """ - v = await self._redis.get(result_key_prefix + self.job_id, encoding=None) + v = await self._redis.get(result_key_prefix + self.job_id) if v: return deserialize_result(v, deserializer=self._deserializer) else: @@ -151,7 +155,7 @@ async def abort(self, *, timeout: Optional[float] = None, poll_delay: float = 0. :param poll_delay: how often to poll redis for the job result :return: True if the job aborted properly, False otherwise """ - await self._redis.zadd(abort_jobs_ss, timestamp_ms(), self.job_id) + await self._redis.zadd(abort_jobs_ss, {self.job_id: timestamp_ms()}) try: await self.result(timeout=timeout, poll_delay=poll_delay) except asyncio.CancelledError: @@ -179,7 +183,7 @@ def serialize_job( enqueue_time_ms: int, *, serializer: Optional[Serializer] = None, -) -> Optional[bytes]: +) -> bytes: data = {'t': job_try, 'f': function_name, 'a': args, 'k': kwargs, 'et': enqueue_time_ms} if serializer is None: serializer = pickle.dumps diff --git a/arq/version.py b/arq/version.py index 83522fc0..b47d1126 100644 --- a/arq/version.py +++ b/arq/version.py @@ -1,3 +1,5 @@ __all__ = ['VERSION'] -VERSION = 'dev' +# version is set automatically in CI before release, +# see https://gist.github.com/samuelcolvin/da2f521da5d2195fbfd65da3b8f58589 +VERSION = '0.0.dev0' diff --git a/arq/worker.py b/arq/worker.py index 680105c3..45ad4655 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -9,8 +9,8 @@ from time import time from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, cast -from aioredis import MultiExecError from pydantic.utils import import_string +from redis.exceptions import ResponseError, WatchError from arq.cron import CronJob from arq.jobs import Deserializer, JobResult, SerializationError, Serializer, deserialize_job_raw, serialize_result @@ -138,6 +138,8 @@ class Worker: :param burst: whether to stop the worker once all jobs have been run :param on_startup: coroutine function to run at startup :param on_shutdown: coroutine function to run at shutdown + :param on_job_start: coroutine function to run on job start + :param on_job_end: coroutine function to run on job end :param handle_signals: default true, register signal handlers, set to false when running inside other async framework :param max_jobs: maximum number of jobs to run at a time @@ -145,8 +147,8 @@ class Worker: :param keep_result: default duration to keep job results for :param keep_result_forever: whether to keep results forever :param poll_delay: duration between polling the queue for new jobs - :param queue_read_limit: the maximum number of jobs to pull from the queue each time it's polled; by default it - equals ``max_jobs`` + :param queue_read_limit: the maximum number of jobs to pull from the queue each time it's polled. By default it + equals ``max_jobs`` * 5, or 100; whichever is higher. :param max_tries: default maximum number of times to retry a job :param health_check_interval: how often to set the health check key :param health_check_key: redis key under which health check is set @@ -169,6 +171,8 @@ def __init__( burst: bool = False, on_startup: Optional['StartupShutdown'] = None, on_shutdown: Optional['StartupShutdown'] = None, + on_job_start: Optional['StartupShutdown'] = None, + on_job_end: Optional['StartupShutdown'] = None, handle_signals: bool = True, max_jobs: int = 10, job_timeout: 'SecondsTimedelta' = 300, @@ -202,6 +206,8 @@ def __init__( self.burst = burst self.on_startup = on_startup self.on_shutdown = on_shutdown + self.on_job_start = on_job_start + self.on_job_end = on_job_end self.sem = asyncio.BoundedSemaphore(max_jobs) self.job_timeout_s = to_seconds(job_timeout) self.keep_result_s = to_seconds(keep_result) @@ -331,7 +337,7 @@ async def _poll_iteration(self) -> None: async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs now = timestamp_ms() job_ids = await self.pool.zrangebyscore( - self.queue_name, offset=self._queue_read_offset, count=count, max=now + self.queue_name, min=float('-inf'), start=self._queue_read_offset, num=count, max=now ) await self.start_jobs(job_ids) @@ -351,14 +357,14 @@ async def _cancel_aborted_jobs(self) -> None: """ Go through job_ids in the abort_jobs_ss sorted set and cancel those tasks. """ - with await self.pool as conn: - abort_job_ids, _ = await asyncio.gather( - conn.zrange(abort_jobs_ss), - conn.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age), - ) + async with self.pool.pipeline(transaction=True) as pipe: + pipe.zrange(abort_jobs_ss, start=0, end=-1) + pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')) + abort_job_ids, _ = await pipe.execute() aborted: Set[str] = set() - for job_id in abort_job_ids: + for job_id_bytes in abort_job_ids: + job_id = job_id_bytes.decode() try: task = self.job_tasks[job_id] except KeyError: @@ -371,36 +377,32 @@ async def _cancel_aborted_jobs(self) -> None: self.aborting_tasks.update(aborted) await self.pool.zrem(abort_jobs_ss, *aborted) - async def start_jobs(self, job_ids: List[str]) -> None: + async def start_jobs(self, job_ids: List[bytes]) -> None: """ For each job id, get the job definition, check it's not running and start it in a task """ - for job_id in job_ids: + for job_id_b in job_ids: await self.sem.acquire() + job_id = job_id_b.decode() in_progress_key = in_progress_key_prefix + job_id - with await self.pool as conn: - pipe = conn.pipeline() - pipe.unwatch() - pipe.watch(in_progress_key) - pipe.exists(in_progress_key) - pipe.zscore(self.queue_name, job_id) - _, _, ongoing_exists, score = await pipe.execute() + async with self.pool.pipeline(transaction=True) as pipe: + await pipe.watch(in_progress_key) + ongoing_exists = await pipe.exists(in_progress_key) + score = await pipe.zscore(self.queue_name, job_id) if ongoing_exists or not score: # job already started elsewhere, or already finished and removed from queue self.sem.release() logger.debug('job %s already running elsewhere', job_id) continue - tr = conn.multi_exec() - tr.setex(in_progress_key, self.in_progress_timeout_s, b'1') + pipe.multi() + pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1') try: - await tr.execute() - except MultiExecError: + await pipe.execute() + except (ResponseError, WatchError): # job already started elsewhere since we got 'existing' self.sem.release() logger.debug('multi-exec error, job %s already started elsewhere', job_id) - # https://github.com/samuelcolvin/arq/issues/131, avoid warnings in log - await asyncio.gather(*tr._results, return_exceptions=True) else: t = self.loop.create_task(self.run_job(job_id, score)) t.add_done_callback(lambda _: self.sem.release()) @@ -408,16 +410,16 @@ async def start_jobs(self, job_ids: List[str]) -> None: async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 start_ms = timestamp_ms() - coros = ( - self.pool.get(job_key_prefix + job_id, encoding=None), - self.pool.incr(retry_key_prefix + job_id), - self.pool.expire(retry_key_prefix + job_id, 88400), - ) - if self.allow_abort_jobs: - abort_job, v, job_try, _ = await asyncio.gather(self.pool.zrem(abort_jobs_ss, job_id), *coros) - else: - v, job_try, _ = await asyncio.gather(*coros) - abort_job = False + async with self.pool.pipeline(transaction=True) as pipe: + pipe.get(job_key_prefix + job_id) + pipe.incr(retry_key_prefix + job_id) + pipe.expire(retry_key_prefix + job_id, 88400) + if self.allow_abort_jobs: + pipe.zrem(abort_jobs_ss, job_id) + v, job_try, _, abort_job = await pipe.execute() + else: + v, job_try, _ = await pipe.execute() + abort_job = False function_name, enqueue_time_ms = '', 0 args: Tuple[Any, ...] = () @@ -509,6 +511,10 @@ async def job_failed(exc: BaseException) -> None: 'score': score, } ctx = {**self.ctx, **job_ctx} + + if self.on_job_start: + await self.on_job_start(ctx) + start_ms = timestamp_ms() success = False try: @@ -585,9 +591,18 @@ async def job_failed(exc: BaseException) -> None: serializer=self.job_serializer, ) + if self.on_job_end: + await self.on_job_end(ctx) + await asyncio.shield( self.finish_job( - job_id, finish, result_data, result_timeout_s, keep_result_forever, incr_score, keep_in_progress, + job_id, + finish, + result_data, + result_timeout_s, + keep_result_forever, + incr_score, + keep_in_progress, ) ) @@ -601,34 +616,33 @@ async def finish_job( incr_score: Optional[int], keep_in_progress: Optional[float], ) -> None: - with await self.pool as conn: - await conn.unwatch() - tr = conn.multi_exec() + async with self.pool.pipeline(transaction=True) as tr: delete_keys = [] in_progress_key = in_progress_key_prefix + job_id if keep_in_progress is None: delete_keys += [in_progress_key] else: - tr.expire(in_progress_key, keep_in_progress) + tr.pexpire(in_progress_key, to_ms(keep_in_progress)) if finish: if result_data: - expire = 0 if keep_result_forever else result_timeout_s - tr.set(result_key_prefix + job_id, result_data, expire=expire) + expire = None if keep_result_forever else result_timeout_s + tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) delete_keys += [retry_key_prefix + job_id, job_key_prefix + job_id] tr.zrem(abort_jobs_ss, job_id) tr.zrem(self.queue_name, job_id) elif incr_score: tr.zincrby(self.queue_name, incr_score, job_id) - tr.delete(*delete_keys) + if delete_keys: + tr.delete(*delete_keys) await tr.execute() async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None: - with await self.pool as conn: - await conn.unwatch() - tr = conn.multi_exec() + async with self.pool.pipeline(transaction=True) as tr: tr.delete( - retry_key_prefix + job_id, in_progress_key_prefix + job_id, job_key_prefix + job_id, + retry_key_prefix + job_id, + in_progress_key_prefix + job_id, + job_key_prefix + job_id, ) tr.zrem(abort_jobs_ss, job_id) tr.zrem(self.queue_name, job_id) @@ -636,7 +650,7 @@ async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> keep_result = self.keep_result_forever or self.keep_result_s > 0 if result_data is not None and keep_result: # pragma: no branch expire = 0 if self.keep_result_forever else self.keep_result_s - tr.set(result_key_prefix + job_id, result_data, expire=expire) + tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) await tr.execute() async def heart_beat(self) -> None: @@ -665,7 +679,10 @@ async def run_cron(self, n: datetime, delay: float, num_windows: int = 2) -> Non # We queue up the cron if the next execution time is in the next # delay * num_windows (by default 0.5 * 2 = 1 second). if cron_job.next_run < this_hb_cutoff: - job_id = f'{cron_job.name}:{to_unix_ms(cron_job.next_run)}' if cron_job.unique else None + if cron_job.job_id: + job_id: Optional[str] = cron_job.job_id + else: + job_id = f'{cron_job.name}:{to_unix_ms(cron_job.next_run)}' if cron_job.unique else None job_futures.add( self.pool.enqueue_job( cron_job.name, _job_id=job_id, _queue_name=self.queue_name, _defer_until=cron_job.next_run @@ -686,7 +703,7 @@ async def record_health(self) -> None: f'{datetime.now():%b-%d %H:%M:%S} j_complete={self.jobs_complete} j_failed={self.jobs_failed} ' f'j_retried={self.jobs_retried} j_ongoing={pending_tasks} queued={queued}' ) - await self.pool.setex(self.health_check_key, self.health_check_interval + 1, info.encode()) + await self.pool.psetex(self.health_check_key, int((self.health_check_interval + 1) * 1000), info.encode()) log_suffix = info[info.index('j_complete=') :] if self._last_health_check_log and log_suffix != self._last_health_check_log: logger.info('recording health: %s', info) @@ -728,8 +745,7 @@ async def close(self) -> None: await self.pool.delete(self.health_check_key) if self.on_shutdown: await self.on_shutdown(self.ctx) - self.pool.close() - await self.pool.wait_closed() + await self.pool.close(close_connection_pool=True) self._pool = None def __repr__(self) -> str: @@ -770,8 +786,7 @@ async def async_check_health( else: logger.info('Health check successful: %s', data) r = 0 - redis.close() - await redis.wait_closed() + await redis.close(close_connection_pool=True) return r @@ -784,5 +799,4 @@ def check_health(settings_cls: 'WorkerSettingsType') -> int: redis_settings = cast(Optional[RedisSettings], cls_kwargs.get('redis_settings')) health_check_key = cast(Optional[str], cls_kwargs.get('health_check_key')) queue_name = cast(Optional[str], cls_kwargs.get('queue_name')) - loop = asyncio.get_event_loop() - return loop.run_until_complete(async_check_health(redis_settings, health_check_key, queue_name)) + return asyncio.run(async_check_health(redis_settings, health_check_key, queue_name)) diff --git a/docs/examples/main_demo.py b/docs/examples/main_demo.py index 3305a3db..21affee1 100644 --- a/docs/examples/main_demo.py +++ b/docs/examples/main_demo.py @@ -22,7 +22,8 @@ async def main(): await redis.enqueue_job('download_content', url) # WorkerSettings defines the settings to use when creating the work, -# it's used by the arq cli +# it's used by the arq cli. +# For a list of available settings, see https://arq-docs.helpmanual.io/#arq.worker.Worker class WorkerSettings: functions = [download_content] on_startup = startup diff --git a/docs/index.rst b/docs/index.rst index 46c38e0f..eb934def 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -15,7 +15,7 @@ Job queues and RPC in python with asyncio and redis. .. warning:: In ``v0.16`` *arq* was **COMPLETELY REWRITTEN** to use an entirely different approach to registering workers, - enqueueing jobs and processing jobs. You will need to either keep using ``v0.15`` or entirely rewrite you *arq* + enqueueing jobs and processing jobs. You will need to either keep using ``v0.15`` or entirely rewrite your *arq* integration to use ``v0.16``. See `here <./old/index.html>`_ for old docs. @@ -154,7 +154,7 @@ You can access job information, status and job results using the :class:`arq.job Retrying jobs and cancellation .............................. -As described above, when an arq work shuts down any going jobs are cancelled immediately +As described above, when an arq worker shuts down, any ongoing jobs are cancelled immediately (via vanilla ``task.cancel()``, so a ``CancelledError`` will be raised). You can see this by running a slow job (eg. add ``await asyncio.sleep(5)``) and hitting ``Ctrl+C`` once it's started. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..4abc562d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,60 @@ +[tool.pytest.ini_options] +testpaths = 'tests' +filterwarnings = ['error'] +asyncio_mode = 'auto' +timeout = 10 + +[tool.coverage.run] +source = ['src'] +branch = true + +[tool.coverage.report] +precision = 2 +exclude_lines = [ + 'pragma: no cover', + 'raise NotImplementedError', + 'raise NotImplemented', + 'if TYPE_CHECKING:', + '@overload', +] + +[tool.black] +color = true +line-length = 120 +target-version = ['py39'] +skip-string-normalization = true + +[tool.isort] +line_length = 120 +known_third_party = 'foxglove' +multi_line_output = 3 +include_trailing_comma = true +force_grid_wrap = 0 +combine_as_imports = true +color_output = true + +[tool.mypy] +show_error_codes = true +follow_imports = 'silent' +strict_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +disallow_any_generics = true +check_untyped_defs = true +no_implicit_reexport = true +warn_unused_configs = true +disallow_subclassing_any = true +disallow_incomplete_defs = true +disallow_untyped_decorators = true +disallow_untyped_calls = true + +# for strict mypy: (this is the tricky one :-)) +disallow_untyped_defs = true + +# remaining arguments from `mypy --strict` which cause errors +#no_implicit_optional = true +#warn_return_any = true + +[[tool.mypy.overrides]] +module = ['redis.asyncio.*', 'watchgod'] +ignore_missing_imports = true diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 12a7f168..00000000 --- a/setup.cfg +++ /dev/null @@ -1,59 +0,0 @@ -[tool:pytest] -testpaths = tests -timeout = 5 -filterwarnings = - error - ignore::DeprecationWarning:aioredis - -[flake8] -max-complexity = 10 -max-line-length = 120 -ignore = E203, W503 - -[coverage:run] -source = arq -branch = True - -[coverage:report] -precision = 2 -exclude_lines = - pragma: no cover - raise NotImplementedError - raise NotImplemented - if TYPE_CHECKING: - @overload - -[isort] -line_length=120 -known_third_party=pytest -multi_line_output=3 -include_trailing_comma=True -force_grid_wrap=0 -combine_as_imports=True - -[mypy] -follow_imports = silent -strict_optional = True -warn_redundant_casts = True -warn_unused_ignores = True -disallow_any_generics = True -check_untyped_defs = True -no_implicit_reexport = True -warn_unused_configs = True -disallow_subclassing_any = True -disallow_incomplete_defs = True -disallow_untyped_decorators = True -disallow_untyped_calls = True - -# for strict mypy: (this is the tricky one :-)) -disallow_untyped_defs = True - -# remaining arguments from `mypy --strict` which cause errors -;no_implicit_optional = True -;warn_return_any = True - -[mypy-aioredis] -ignore_missing_imports = true - -[mypy-watchgod] -ignore_missing_imports = true diff --git a/setup.py b/setup.py index 4bb91b72..5b4652de 100644 --- a/setup.py +++ b/setup.py @@ -31,10 +31,10 @@ 'Programming Language :: Python', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', 'Topic :: Internet', 'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: System :: Clustering', @@ -55,7 +55,7 @@ arq=arq.cli:cli """, install_requires=[ - 'aioredis>=1.1.0,<2.0.0', + 'redis>=4.2.0rc3', 'click>=6.7', 'pydantic>=1', 'dataclasses>=0.6;python_version == "3.6"', diff --git a/tests/conftest.py b/tests/conftest.py index e071f857..f445288b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,42 +1,61 @@ import asyncio import functools +import os +import sys import msgpack import pytest -from aioredis import create_redis_pool +from redislite import Redis from arq.connections import ArqRedis, create_pool from arq.worker import Worker -@pytest.yield_fixture +@pytest.fixture(name='loop') +def _fix_loop(event_loop): + return event_loop + + +@pytest.fixture async def arq_redis(loop): - redis_ = await create_redis_pool( - ('localhost', 6379), encoding='utf8', loop=loop, commands_factory=ArqRedis, minsize=5 + redis_ = ArqRedis( + host='localhost', + port=6379, + encoding='utf-8', ) + await redis_.flushall() + yield redis_ - redis_.close() - await redis_.wait_closed() + + await redis_.close(close_connection_pool=True) -@pytest.yield_fixture +@pytest.fixture +async def unix_socket_path(loop): + db_pth = '/tmp/redis_test.db' + if os.path.exists(db_pth): + os.remove(db_pth) + rdb = Redis(db_pth) + yield rdb.socket_file + rdb.close() + + +@pytest.fixture async def arq_redis_msgpack(loop): - redis_ = await create_redis_pool( - ('localhost', 6379), - encoding='utf8', - loop=loop, - commands_factory=functools.partial( - ArqRedis, job_serializer=msgpack.packb, job_deserializer=functools.partial(msgpack.unpackb, raw=False) - ), + redis_ = ArqRedis( + host='localhost', + port=6379, + encoding='utf-8', + job_serializer=msgpack.packb, + job_deserializer=functools.partial(msgpack.unpackb, raw=False), ) await redis_.flushall() yield redis_ - redis_.close() - await redis_.wait_closed() + await redis_.close(close_connection_pool=True) -@pytest.yield_fixture +@pytest.fixture async def worker(arq_redis): worker_: Worker = None @@ -64,7 +83,23 @@ async def create_pool_(settings, *args, **kwargs): yield create_pool_ - for p in pools: - p.close() + await asyncio.gather(*[p.close(close_connection_pool=True) for p in pools]) + + +@pytest.fixture(name='cancel_remaining_task') +def fix_cancel_remaining_task(loop): + async def cancel_remaining_task(): + tasks = asyncio.all_tasks(loop) + cancelled = [] + for task in tasks: + # in repr works in 3.7 where get_coro() is not available + if 'cancel_remaining_task()' not in repr(task): + cancelled.append(task) + task.cancel() + if cancelled: + print(f'Cancelled {len(cancelled)} ongoing tasks', file=sys.stderr) + await asyncio.gather(*cancelled, return_exceptions=True) + + yield - await asyncio.gather(*[p.wait_closed() for p in pools]) + loop.run_until_complete(cancel_remaining_task()) diff --git a/tests/requirements-linting.txt b/tests/requirements-linting.txt index 92e22d95..105da8ea 100644 --- a/tests/requirements-linting.txt +++ b/tests/requirements-linting.txt @@ -1,8 +1,8 @@ -black==19.10b0 -flake8==3.7.9 -flake8-quotes==3 -isort==5.8.0 -mypy==0.812 -pycodestyle==2.5.0 -pyflakes==2.1.1 -twine==3.1.1 +black==21.12b0 +flake8==4.0.1 +flake8-quotes==3.3.1 +isort[colors]==5.10.1 +mypy==0.931 +pycodestyle==2.8.0 +pyflakes==2.4.0 +types_redis==4.1.17 diff --git a/tests/requirements-testing.txt b/tests/requirements-testing.txt index 420e112c..40220c43 100644 --- a/tests/requirements-testing.txt +++ b/tests/requirements-testing.txt @@ -1,10 +1,11 @@ -coverage==5.1 -msgpack==0.6.1 -pytest==5.3.5 -pytest-aiohttp==0.3.0 -pytest-cov==2.8.1 -pytest-mock==3 -pytest-sugar==0.9.2 -pytest-timeout==1.3.3 -pytest-toolbox==0.4 -twine==3.1.1 +coverage==6.3 +dirty-equals==0.1 +msgpack==1.0.3 +pytest==6.2.5 +pytest-asyncio==0.17.2 +pytest-cov==3.0.0 +pytest-mock==3.7.0 +pytest-sugar==0.9.4 +pytest-timeout==2.1.0 +git+https://github.com/tvd0x2a/redislite.git@m1-support +twine==3.7.1 diff --git a/tests/test_cli.py b/tests/test_cli.py index d44529c3..55ca6b8f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,4 @@ +import pytest from click.testing import CliRunner from arq.cli import cli @@ -19,14 +20,15 @@ def test_help(): assert result.output.startswith('Usage: arq [OPTIONS] WORKER_SETTINGS\n') -def test_run(): +def test_run(cancel_remaining_task, mocker, loop): + mocker.patch('asyncio.get_event_loop', lambda: loop) runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings']) assert result.exit_code == 0 assert 'Starting worker for 1 functions: foobar' in result.output -def test_check(): +def test_check(loop): runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--check']) assert result.exit_code == 1 @@ -37,7 +39,8 @@ async def mock_awatch(): yield [1] -def test_run_watch(mocker): +@pytest.mark.filterwarnings('ignore::DeprecationWarning') +def test_run_watch(mocker, cancel_remaining_task): mocker.patch('watchgod.awatch', return_value=mock_awatch()) runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests']) diff --git a/tests/test_cron.py b/tests/test_cron.py index 6f603e21..1919929b 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -1,3 +1,4 @@ +import asyncio import logging import re from datetime import datetime, timedelta @@ -5,6 +6,7 @@ import pytest +import arq from arq import Worker from arq.constants import in_progress_key_prefix from arq.cron import cron, next_cron @@ -142,3 +144,81 @@ async def test_repr(): async def test_str_function(): cj = cron('asyncio.sleep', hour=1, run_at_startup=True) assert str(cj).startswith('' @@ -69,12 +68,12 @@ async def test_set_health_check_key(arq_redis: ArqRedis, worker): await arq_redis.enqueue_job('foobar', _job_id='testing') worker: Worker = worker(functions=[func(foobar, keep_result=0)], health_check_key='arq:test:health-check') await worker.main() - assert sorted(await arq_redis.keys('*')) == ['arq:test:health-check'] + assert sorted(await arq_redis.keys('*')) == [b'arq:test:health-check'] -async def test_handle_sig(caplog): +async def test_handle_sig(caplog, arq_redis: ArqRedis): caplog.set_level(logging.INFO) - worker = Worker([foobar]) + worker = Worker([foobar], redis_pool=arq_redis) worker.main_task = MagicMock() worker.tasks = {0: MagicMock(done=MagicMock(return_value=True)), 1: MagicMock(done=MagicMock(return_value=False))} @@ -415,45 +414,41 @@ async def test_log_health_check(arq_redis: ArqRedis, worker, caplog): assert 'recording health' in caplog.text -async def test_remain_keys(arq_redis: ArqRedis, worker): - redis2 = await create_redis_pool(('localhost', 6379), encoding='utf8') - try: - await arq_redis.enqueue_job('foobar', _job_id='testing') - assert sorted(await redis2.keys('*')) == ['arq:job:testing', 'arq:queue'] - worker: Worker = worker(functions=[foobar]) - await worker.main() - assert sorted(await redis2.keys('*')) == ['arq:queue:health-check', 'arq:result:testing'] - await worker.close() - assert sorted(await redis2.keys('*')) == ['arq:result:testing'] - finally: - redis2.close() - await redis2.wait_closed() +async def test_remain_keys(arq_redis: ArqRedis, worker, create_pool): + redis2 = await create_pool(RedisSettings()) + await arq_redis.enqueue_job('foobar', _job_id='testing') + assert sorted(await redis2.keys('*')) == [b'arq:job:testing', b'arq:queue'] + worker: Worker = worker(functions=[foobar]) + await worker.main() + assert sorted(await redis2.keys('*')) == [b'arq:queue:health-check', b'arq:result:testing'] + await worker.close() + assert sorted(await redis2.keys('*')) == [b'arq:result:testing'] async def test_remain_keys_no_results(arq_redis: ArqRedis, worker): await arq_redis.enqueue_job('foobar', _job_id='testing') - assert sorted(await arq_redis.keys('*')) == ['arq:job:testing', 'arq:queue'] + assert sorted(await arq_redis.keys('*')) == [b'arq:job:testing', b'arq:queue'] worker: Worker = worker(functions=[func(foobar, keep_result=0)]) await worker.main() - assert sorted(await arq_redis.keys('*')) == ['arq:queue:health-check'] + assert sorted(await arq_redis.keys('*')) == [b'arq:queue:health-check'] async def test_remain_keys_keep_results_forever_in_function(arq_redis: ArqRedis, worker): await arq_redis.enqueue_job('foobar', _job_id='testing') - assert sorted(await arq_redis.keys('*')) == ['arq:job:testing', 'arq:queue'] + assert sorted(await arq_redis.keys('*')) == [b'arq:job:testing', b'arq:queue'] worker: Worker = worker(functions=[func(foobar, keep_result_forever=True)]) await worker.main() - assert sorted(await arq_redis.keys('*')) == ['arq:queue:health-check', 'arq:result:testing'] + assert sorted(await arq_redis.keys('*')) == [b'arq:queue:health-check', b'arq:result:testing'] ttl_result = await arq_redis.ttl('arq:result:testing') assert ttl_result == -1 async def test_remain_keys_keep_results_forever(arq_redis: ArqRedis, worker): await arq_redis.enqueue_job('foobar', _job_id='testing') - assert sorted(await arq_redis.keys('*')) == ['arq:job:testing', 'arq:queue'] + assert sorted(await arq_redis.keys('*')) == [b'arq:job:testing', b'arq:queue'] worker: Worker = worker(functions=[func(foobar)], keep_result_forever=True) await worker.main() - assert sorted(await arq_redis.keys('*')) == ['arq:queue:health-check', 'arq:result:testing'] + assert sorted(await arq_redis.keys('*')) == [b'arq:queue:health-check', b'arq:result:testing'] ttl_result = await arq_redis.ttl('arq:result:testing') assert ttl_result == -1 @@ -481,6 +476,16 @@ async def test_run_check_error2(arq_redis: ArqRedis, worker): assert len(exc_info.value.job_results) == 2 +async def test_keep_result_ms(arq_redis: ArqRedis, worker): + async def return_something(ctx): + return 1 + + await arq_redis.enqueue_job('return_something') + worker: Worker = worker(functions=[func(return_something, name='return_something')], keep_result=3600.15) + await worker.main() + assert (worker.jobs_complete, worker.jobs_failed, worker.jobs_retried) == (1, 0, 0) + + async def test_return_exception(arq_redis: ArqRedis, worker): async def return_error(ctx): return TypeError('xxx') @@ -507,7 +512,7 @@ async def test_error_success(arq_redis: ArqRedis, worker): async def test_many_jobs_expire(arq_redis: ArqRedis, worker, caplog): caplog.set_level(logging.INFO) await arq_redis.enqueue_job('foobar') - await asyncio.gather(*[arq_redis.zadd(default_queue_name, 1, f'testing-{i}') for i in range(100)]) + await asyncio.gather(*[arq_redis.zadd(default_queue_name, {f'testing-{i}': 1}) for i in range(100)]) worker: Worker = worker(functions=[foobar]) assert worker.jobs_complete == 0 assert worker.jobs_failed == 0 @@ -727,16 +732,22 @@ async def foo(ctx, v): async def test_multi_exec(arq_redis: ArqRedis, worker, caplog): + c = 0 + async def foo(ctx, v): + nonlocal c + c += 1 return v + 1 caplog.set_level(logging.DEBUG, logger='arq.worker') await arq_redis.enqueue_job('foo', 1, _job_id='testing') worker: Worker = worker(functions=[func(foo, name='foo')]) - await asyncio.gather(*[worker.start_jobs(['testing']) for _ in range(5)]) + await asyncio.gather(*[worker.start_jobs([b'testing']) for _ in range(5)]) # debug(caplog.text) - assert 'multi-exec error, job testing already started elsewhere' in caplog.text - assert 'WatchVariableError' not in caplog.text + await worker.main() + assert c == 1 + # assert 'multi-exec error, job testing already started elsewhere' in caplog.text + # assert 'WatchVariableError' not in caplog.text async def test_abort_job(arq_redis: ArqRedis, worker, caplog, loop): @@ -748,7 +759,7 @@ async def wait_and_abort(job, delay=0.1): assert await job.abort() is True caplog.set_level(logging.INFO) - await arq_redis.zadd(abort_jobs_ss, int(1e9), b'foobar') + await arq_redis.zadd(abort_jobs_ss, {b'foobar': int(1e9)}) job = await arq_redis.enqueue_job('longfunc', _job_id='testing') worker: Worker = worker(functions=[func(longfunc, name='longfunc')], allow_abort_jobs=True, poll_delay=0.1) @@ -756,6 +767,7 @@ async def wait_and_abort(job, delay=0.1): assert worker.jobs_failed == 0 assert worker.jobs_retried == 0 await asyncio.gather(wait_and_abort(job), worker.main()) + await worker.main() assert worker.jobs_complete == 0 assert worker.jobs_failed == 1 assert worker.jobs_retried == 0 @@ -835,3 +847,36 @@ async def longfunc(ctx): assert worker.jobs_retried == 0 log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records)) assert 'X.XXs ! testing:longfunc failed, TimeoutError:' in log + + +async def test_on_job(arq_redis: ArqRedis, worker): + result = {'called': 0} + + async def on_start(ctx): + assert ctx['job_id'] == 'testing' + result['called'] += 1 + + async def on_end(ctx): + assert ctx['job_id'] == 'testing' + result['called'] += 1 + + async def test(ctx): + return + + await arq_redis.enqueue_job('func', _job_id='testing') + worker: Worker = worker( + functions=[func(test, name='func')], + on_job_start=on_start, + on_job_end=on_end, + job_timeout=0.2, + poll_delay=0.1, + ) + assert worker.jobs_complete == 0 + assert worker.jobs_failed == 0 + assert worker.jobs_retried == 0 + assert result['called'] == 0 + await worker.main() + assert worker.jobs_complete == 1 + assert worker.jobs_failed == 0 + assert worker.jobs_retried == 0 + assert result['called'] == 2