From b1bbe6ddedecee69cac6f5aa1f44ac61ad396c49 Mon Sep 17 00:00:00 2001 From: Aleksandr Shtaub <91491081+aleksandr-shtaub@users.noreply.github.com> Date: Fri, 24 Nov 2023 15:59:21 +0200 Subject: [PATCH 1/9] Add psycopg3 dependency --- .pre-commit-config.yaml | 1 + poetry.lock | 43 +++++++++++++++++++++++++++++++++++++++-- pyproject.toml | 1 + 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0644ea1ae..53e474ed0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,6 +33,7 @@ repos: additional_dependencies: - attrs==21.4.0 - click==8.0.3 + - psycopg[pool]==3.1.13 - sqlalchemy[mypy]==1.4.29 - types-croniter==1.0.4 - types-psycopg2==2.9.5 diff --git a/poetry.lock b/poetry.lock index 56eede212..c24ae9a7e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aiopg" @@ -823,6 +823,45 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "psycopg" +version = "3.1.13" +description = "PostgreSQL database adapter for Python" +optional = false +python-versions = ">=3.7" +files = [ + {file = "psycopg-3.1.13-py3-none-any.whl", hash = "sha256:1253010894cfb64e2da4556d4eff5f05e45cafee641f64e02453be849c8f7687"}, + {file = "psycopg-3.1.13.tar.gz", hash = "sha256:e6d047ce16950651d6e26c7c19ca57cc42e1d4841b58729f691244baeee46e30"}, +] + +[package.dependencies] +"backports.zoneinfo" = {version = ">=0.2.0", markers = "python_version < \"3.9\""} +psycopg-pool = {version = "*", optional = true, markers = "extra == \"pool\""} +typing-extensions = ">=4.1" +tzdata = {version = "*", markers = "sys_platform == \"win32\""} + +[package.extras] +binary = ["psycopg-binary (==3.1.13)"] +c = ["psycopg-c (==3.1.13)"] +dev = ["black (>=23.1.0)", "dnspython (>=2.1)", "flake8 (>=4.0)", "mypy (>=1.4.1)", "types-setuptools (>=57.4)", "wheel (>=0.37)"] +docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"] +pool = ["psycopg-pool"] +test = ["anyio (>=3.6.2,<4.0)", "mypy (>=1.4.1)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"] + +[[package]] +name = "psycopg-pool" +version = "3.2.0" +description = "Connection Pool for Psycopg" +optional = false +python-versions = ">=3.8" +files = [ + {file = "psycopg-pool-3.2.0.tar.gz", hash = "sha256:2e857bb6c120d012dba240e30e5dff839d2d69daf3e962127ce6b8e40594170e"}, + {file = "psycopg_pool-3.2.0-py3-none-any.whl", hash = "sha256:73371d4e795d9363c7b496cbb2dfce94ee8fbf2dcdc384d0a937d1d9d8bdd08d"}, +] + +[package.dependencies] +typing-extensions = ">=3.10" + [[package]] name = "psycopg2-binary" version = "2.9.9" @@ -1560,4 +1599,4 @@ sqlalchemy = ["sqlalchemy"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "d2c40fc7e18261f4e477c332b925fdb0f890d1358bb07ada337509b0ae163c85" +content-hash = "6d9f268cd5bd88ae5d2816f87323fef8756b1fcba3461534b07fe78c852af565" diff --git a/pyproject.toml b/pyproject.toml index ed56840e9..4b856135c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ psycopg2-binary = "*" python-dateutil = "*" sqlalchemy = { version = "^2.0", optional = true } typing-extensions = { version = "*", python = "<3.8" } +psycopg = {extras = ["pool"], version = "^3.1.13"} [tool.poetry.extras] django = ["django"] From c303eceb5423ffa81ee770a1767a5fb50be9596e Mon Sep 17 00:00:00 2001 From: Aleksandr Shtaub <91491081+aleksandr-shtaub@users.noreply.github.com> Date: Mon, 27 Nov 2023 00:08:57 +0200 Subject: [PATCH 2/9] Implement psycopg3 connector --- procrastinate/__init__.py | 2 + procrastinate/psycopg3_connector.py | 291 +++++++++++++++++++ tests/conftest.py | 21 ++ tests/integration/test_psycopg3_connector.py | 202 +++++++++++++ tests/unit/test_psycopg3_connector.py | 167 +++++++++++ 5 files changed, 683 insertions(+) create mode 100644 procrastinate/psycopg3_connector.py create mode 100644 tests/integration/test_psycopg3_connector.py create mode 100644 tests/unit/test_psycopg3_connector.py diff --git a/procrastinate/__init__.py b/procrastinate/__init__.py index e55fe5172..510449494 100644 --- a/procrastinate/__init__.py +++ b/procrastinate/__init__.py @@ -5,6 +5,7 @@ from procrastinate.connector import BaseConnector from procrastinate.job_context import JobContext from procrastinate.psycopg2_connector import Psycopg2Connector +from procrastinate.psycopg3_connector import Psycopg3Connector from procrastinate.retry import BaseRetryStrategy, RetryStrategy __all__ = [ @@ -15,6 +16,7 @@ "BaseRetryStrategy", "AiopgConnector", "Psycopg2Connector", + "Psycopg3Connector", "RetryStrategy", ] diff --git a/procrastinate/psycopg3_connector.py b/procrastinate/psycopg3_connector.py new file mode 100644 index 000000000..fed3de72d --- /dev/null +++ b/procrastinate/psycopg3_connector.py @@ -0,0 +1,291 @@ +import asyncio +import functools +import logging +import re +from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional + +import psycopg +import psycopg.errors +import psycopg.sql +import psycopg.types.json +import psycopg_pool +from psycopg.rows import DictRow, dict_row + +from procrastinate import connector, exceptions, sql + +logger = logging.getLogger(__name__) + +LISTEN_TIMEOUT = 30.0 + +CoroutineFunction = Callable[..., Coroutine] + + +def wrap_exceptions(coro: CoroutineFunction) -> CoroutineFunction: + """ + Wrap psycopg3 errors as connector exceptions. + + This decorator is expected to be used on coroutine functions only. + """ + + @functools.wraps(coro) + async def wrapped(*args, **kwargs): + try: + return await coro(*args, **kwargs) + except psycopg.errors.UniqueViolation as exc: + raise exceptions.UniqueViolation(constraint_name=exc.diag.constraint_name) + except psycopg.Error as exc: + raise exceptions.ConnectorException from exc + + # Attaching a custom attribute to ease testability and make the + # decorator more introspectable + wrapped._exceptions_wrapped = True # type: ignore + return wrapped + + +def wrap_query_exceptions(coro: CoroutineFunction) -> CoroutineFunction: + """ + Detect "admin shutdown" errors and retry a number of times. + + This is to handle the case where the database connection (obtained from the pool) + was actually closed by the server. In this case, pyscopg3 raises an AdminShutdown + exception when the connection is used for issuing a query. What we do is retry when + an AdminShutdown is raised, and until the maximum number of retries is reached. + + The number of retries is set to the pool maximum size plus one, to handle the case + where the connections we have in the pool were all closed on the server side. + """ + + @functools.wraps(coro) + async def wrapped(*args, **kwargs): + final_exc = None + try: + max_tries = args[0]._pool.max_size + 1 + except Exception: + max_tries = 1 + for _ in range(max_tries): + try: + return await coro(*args, **kwargs) + except psycopg.errors.OperationalError as exc: + if "server closed the connection unexpectedly" in str(exc): + final_exc = exc + continue + raise exc + raise exceptions.ConnectorException( + f"Could not get a valid connection after {max_tries} tries" + ) from final_exc + + return wrapped + + +PERCENT_PATTERN = re.compile(r"%(?![\(s])") + + +class Psycopg3Connector(connector.BaseAsyncConnector): + def __init__( + self, + *, + json_dumps: Optional[Callable] = None, + json_loads: Optional[Callable] = None, + **kwargs: Any, + ): + """ + Asynchronous connector based on a ``psycopg_pool.AsyncConnectionPool``. + + The pool connection parameters can be provided here. Alternatively, an already + existing ``psycopg_pool.AsyncConnectionPool`` can be provided in the + ``App.open_async``, via the ``pool`` parameter. + + All other arguments than ``json_dumps`` and ``json_loads`` are passed to + :py:func:`AsyncConnectionPool` (see psycopg3 documentation__), with default + values that may differ from those of ``psycopg3`` (see a partial list of + parameters below). + + .. _psycopg3 doc: https://www.psycopg.org/psycopg3/docs/basic/adapt.html#json-adaptation + .. __: https://www.psycopg.org/psycopg3/docs/api/pool.html + #psycopg_pool.AsyncConnectionPool + + Parameters + ---------- + json_dumps : + The JSON dumps function to use for serializing job arguments. Defaults to + the function used by psycopg3. See the `psycopg3 doc`_. + json_loads : + The JSON loads function to use for deserializing job arguments. Defaults + to the function used by psycopg3. See the `psycopg3 doc`_. Unused if the + pool is externally created and set into the connector through the + ``App.open_async`` method. + min_size : int + Passed to psycopg3, default set to 1 (same as aiopg). + max_size : int + Passed to psycopg3, default set to 10 (same as aiopg). + conninfo : ``Optional[str]`` + Passed to psycopg3. Default is "" instead of None, which means if no + argument is passed, it will connect to localhost:5432 instead of a + Unix-domain local socket file. + """ + self.json_dumps = json_dumps + self.json_loads = json_loads + self._pool: Optional[psycopg_pool.AsyncConnectionPool] = None + self._pool_args = self._adapt_pool_args(kwargs, json_loads) + self._pool_externally_set = False + + @staticmethod + def _adapt_pool_args( + pool_args: Dict[str, Any], json_loads: Optional[Callable] + ) -> Dict[str, Any]: + """ + Adapt the pool args for ``psycopg3``, using sensible defaults for Procrastinate. + """ + base_configure = pool_args.pop("configure", None) + + @wrap_exceptions + async def configure(connection: psycopg.AsyncConnection[DictRow]): + if base_configure: + await base_configure(connection) + if json_loads: + psycopg.types.json.set_json_loads(json_loads, connection) + + return { + "conninfo": "", + "min_size": 1, + "max_size": 10, + "kwargs": { + "row_factory": dict_row, + }, + "configure": configure, + "open": False, + **pool_args, + } + + async def open_async( + self, pool: Optional[psycopg_pool.AsyncConnectionPool] = None + ) -> None: + """ + Instantiate the pool. + + pool : + Optional pool. Procrastinate can use an existing pool. Connection parameters + passed in the constructor will be ignored. + """ + if self._pool: + return + + if pool: + self._pool_externally_set = True + self._pool = pool + else: + self._pool = await self._create_pool(self._pool_args) + + # ensure pool is open + await self._pool.open() # type: ignore + + @staticmethod + @wrap_exceptions + async def _create_pool( + pool_args: Dict[str, Any] + ) -> psycopg_pool.AsyncConnectionPool: + return psycopg_pool.AsyncConnectionPool(**pool_args) + + @wrap_exceptions + async def close_async(self) -> None: + """ + Close the pool and awaits all connections to be released. + """ + if not self._pool or self._pool_externally_set: + return + + await self._pool.close() + self._pool = None + + @property + def pool( + self, + ) -> psycopg_pool.AsyncConnectionPool[psycopg.AsyncConnection[DictRow]]: + if self._pool is None: # Set by open + raise exceptions.AppNotOpen + return self._pool + + def _wrap_json(self, arguments: Dict[str, Any]): + return { + key: psycopg.types.json.Jsonb(value, dumps=self.json_dumps) + if isinstance(value, dict) + else value + for key, value in arguments.items() + } + + @wrap_exceptions + @wrap_query_exceptions + async def execute_query_async(self, query: str, **arguments: Any) -> None: + async with self.pool.connection() as connection: + async with connection.cursor() as cursor: + await cursor.execute(query, self._wrap_json(arguments)) + + @wrap_exceptions + @wrap_query_exceptions + async def execute_query_one_async( + self, query: str, **arguments: Any + ) -> Optional[DictRow]: + async with self.pool.connection() as connection: + async with connection.cursor() as cursor: + await cursor.execute(query, self._wrap_json(arguments)) + return await cursor.fetchone() + + @wrap_exceptions + @wrap_query_exceptions + async def execute_query_all_async( + self, query: str, **arguments: Any + ) -> List[DictRow]: + async with self.pool.connection() as connection: + async with connection.cursor() as cursor: + await cursor.execute(query, self._wrap_json(arguments)) + return await cursor.fetchall() + + @wrap_exceptions + async def listen_notify( + self, event: asyncio.Event, channels: Iterable[str] + ) -> None: + # We need to acquire a dedicated connection, and use the listen + # query + if self.pool.max_size == 1: + logger.warning( + "Listen/Notify capabilities disabled because maximum pool size" + "is set to 1", + extra={"action": "listen_notify_disabled"}, + ) + return + + query_template = psycopg.sql.SQL(sql.queries["listen_queue"]) + + while True: + async with self.pool.connection() as connection: + # autocommit is required for async connection notifies + await connection.set_autocommit(True) + + for channel_name in channels: + query = query_template.format( + channel_name=psycopg.sql.Identifier(channel_name) + ) + await connection.execute(query) + + event.set() + + await self._loop_notify(event=event, connection=connection) + + @wrap_exceptions + async def _loop_notify( + self, + event: asyncio.Event, + connection: psycopg.AsyncConnection, + ) -> None: + while True: + if connection.closed: + return + try: + notifies = connection.notifies() + async for _ in notifies: + event.set() + except psycopg.OperationalError: + continue + + def __del__(self): + pass diff --git a/tests/conftest.py b/tests/conftest.py index eb82b094b..39f5705a6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,11 +13,13 @@ import pytest from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT +from psycopg.conninfo import make_conninfo from procrastinate import aiopg_connector as aiopg_connector_module from procrastinate import app as app_module from procrastinate import blueprints, builtin_tasks, jobs from procrastinate import psycopg2_connector as psycopg2_connector_module +from procrastinate import psycopg3_connector as psycopg3_connector_module from procrastinate import schema, testing from procrastinate.contrib.sqlalchemy import ( psycopg2_connector as sqlalchemy_psycopg2_connector_module, @@ -102,6 +104,13 @@ def connection_params(setup_db, db_factory): yield {"dsn": "", "dbname": "procrastinate_test"} +@pytest.fixture +def psycopg3_connection_params(setup_db, db_factory): + db_factory(dbname="procrastinate_test", template=setup_db) + + yield {"conninfo": make_conninfo(dbname="procrastinate_test")} + + @pytest.fixture def sqlalchemy_engine_dsn(setup_db, db_factory): db_factory(dbname="procrastinate_test", template=setup_db) @@ -120,6 +129,11 @@ async def not_opened_aiopg_connector(connection_params): yield aiopg_connector_module.AiopgConnector(**connection_params) +@pytest.fixture +async def not_opened_psycopg3_connector(psycopg3_connection_params): + yield psycopg3_connector_module.Psycopg3Connector(**psycopg3_connection_params) + + @pytest.fixture def not_opened_psycopg2_connector(connection_params): yield psycopg2_connector_module.Psycopg2Connector(**connection_params) @@ -139,6 +153,13 @@ async def aiopg_connector(not_opened_aiopg_connector): await not_opened_aiopg_connector.close_async() +@pytest.fixture +async def psycopg3_connector(not_opened_psycopg3_connector): + await not_opened_psycopg3_connector.open_async() + yield not_opened_psycopg3_connector + await not_opened_psycopg3_connector.close_async() + + @pytest.fixture def psycopg2_connector(not_opened_psycopg2_connector): not_opened_psycopg2_connector.open() diff --git a/tests/integration/test_psycopg3_connector.py b/tests/integration/test_psycopg3_connector.py new file mode 100644 index 000000000..ce53f24a3 --- /dev/null +++ b/tests/integration/test_psycopg3_connector.py @@ -0,0 +1,202 @@ +import asyncio +import functools +import json + +import attr +import pytest + +from procrastinate import psycopg3_connector + + +@pytest.fixture +async def psycopg3_connector_factory(psycopg3_connection_params): + connectors = [] + + async def _(**kwargs): + json_dumps = kwargs.pop("json_dumps", None) + json_loads = kwargs.pop("json_loads", None) + psycopg3_connection_params.update(kwargs) + connector = psycopg3_connector.Psycopg3Connector( + json_dumps=json_dumps, json_loads=json_loads, **psycopg3_connection_params + ) + connectors.append(connector) + await connector.open_async() + return connector + + yield _ + for connector in connectors: + await connector.close_async() + + +async def test_adapt_pool_args_configure(mocker): + called = [] + + async def configure(connection): + called.append(connection) + + args = psycopg3_connector.Psycopg3Connector._adapt_pool_args( + pool_args={"configure": configure}, json_loads=None + ) + + assert args["configure"] is not configure + + connection = mocker.Mock(_pool=None) + await args["configure"](connection) + + assert called == [connection] + + +@pytest.mark.parametrize( + "method_name, expected", + [ + ("execute_query_one_async", {"json": {"a": "a", "b": "foo"}}), + ("execute_query_all_async", [{"json": {"a": "a", "b": "foo"}}]), + ], +) +async def test_execute_query_json_dumps( + psycopg3_connector_factory, mocker, method_name, expected +): + class NotJSONSerializableByDefault: + pass + + def encode(obj): + if isinstance(obj, NotJSONSerializableByDefault): + return "foo" + raise TypeError() + + query = "SELECT %(arg)s::jsonb as json" + arg = {"a": "a", "b": NotJSONSerializableByDefault()} + json_dumps = functools.partial(json.dumps, default=encode) + connector = await psycopg3_connector_factory(json_dumps=json_dumps) + method = getattr(connector, method_name) + + result = await method(query, arg=arg) + assert result == expected + + +async def test_json_loads(psycopg3_connector_factory, mocker): + @attr.dataclass + class Param: + p: int + + def decode(dct): + if "b" in dct: + dct["b"] = Param(p=dct["b"]) + return dct + + json_loads = functools.partial(json.loads, object_hook=decode) + + query = "SELECT %(arg)s::jsonb as json" + arg = {"a": 1, "b": 2} + connector = await psycopg3_connector_factory(json_loads=json_loads) + + result = await connector.execute_query_one_async(query, arg=arg) + assert result["json"] == {"a": 1, "b": Param(p=2)} + + +async def test_execute_query(psycopg3_connector): + assert ( + await psycopg3_connector.execute_query_async( + "COMMENT ON TABLE \"procrastinate_jobs\" IS 'foo' " + ) + is None + ) + result = await psycopg3_connector.execute_query_one_async( + "SELECT obj_description('public.procrastinate_jobs'::regclass)" + ) + assert result == {"obj_description": "foo"} + + result = await psycopg3_connector.execute_query_all_async( + "SELECT obj_description('public.procrastinate_jobs'::regclass)" + ) + assert result == [{"obj_description": "foo"}] + + +async def test_execute_query_interpolate(psycopg3_connector): + result = await psycopg3_connector.execute_query_one_async( + "SELECT %(foo)s as foo;", foo="bar" + ) + assert result == {"foo": "bar"} + + +@pytest.mark.filterwarnings("error::ResourceWarning") +async def test_execute_query_simultaneous(psycopg3_connector): + # two coroutines doing execute_query_async simultaneously + # + # the test may fail if the connector fails to properly parallelize connections + + async def query(): + await psycopg3_connector.execute_query_async("SELECT 1") + + try: + await asyncio.gather(query(), query()) + except ResourceWarning: + pytest.fail("ResourceWarning") + + +async def test_close_async(psycopg3_connector): + await psycopg3_connector.execute_query_async("SELECT 1") + pool = psycopg3_connector._pool + await psycopg3_connector.close_async() + assert pool.closed is True + assert psycopg3_connector._pool is None + + +async def test_listen_notify(psycopg3_connector): + channel = "somechannel" + event = asyncio.Event() + + task = asyncio.ensure_future( + psycopg3_connector.listen_notify(channels=[channel], event=event) + ) + try: + await event.wait() + event.clear() + await psycopg3_connector.execute_query_async(f"""NOTIFY "{channel}" """) + await asyncio.wait_for(event.wait(), timeout=1) + except asyncio.TimeoutError: + pytest.fail("Notify not received within 1 sec") + finally: + task.cancel() + + +async def test_loop_notify_stop_when_connection_closed(psycopg3_connector): + # We want to make sure that the when the connection is closed, the loop end. + event = asyncio.Event() + await psycopg3_connector.open_async() + async with psycopg3_connector._pool.connection() as connection: + coro = psycopg3_connector._loop_notify(event=event, connection=connection) + + await psycopg3_connector._pool.close() + assert connection.closed + + try: + await asyncio.wait_for(coro, 1) + except asyncio.TimeoutError: + pytest.fail("Failed to detect that connection was closed and stop") + + +async def test_loop_notify_timeout(psycopg3_connector): + # We want to make sure that when the listen starts, we don't listen forever. If the + # connection closes, we eventually finish the coroutine. + event = asyncio.Event() + await psycopg3_connector.open_async() + async with psycopg3_connector._pool.connection() as connection: + task = asyncio.ensure_future( + psycopg3_connector._loop_notify(event=event, connection=connection) + ) + assert not task.done() + + await psycopg3_connector._pool.close() + assert connection.closed + + try: + await asyncio.wait_for(task, 0.1) + except asyncio.TimeoutError: + pytest.fail("Failed to detect that connection was closed and stop") + + assert not event.is_set() + + +async def test_destructor(): + ... diff --git a/tests/unit/test_psycopg3_connector.py b/tests/unit/test_psycopg3_connector.py new file mode 100644 index 000000000..b53fb6070 --- /dev/null +++ b/tests/unit/test_psycopg3_connector.py @@ -0,0 +1,167 @@ +import psycopg +import pytest + +from procrastinate import exceptions, psycopg3_connector + + +@pytest.fixture +def connector(): + return psycopg3_connector.Psycopg3Connector() + + +async def test_adapt_pool_args_configure(mocker): + called = [] + + async def configure(connection): + called.append(connection) + + args = psycopg3_connector.Psycopg3Connector._adapt_pool_args( + pool_args={"configure": configure}, json_loads=None + ) + + assert args["configure"] is not configure + + connection = mocker.Mock(_pool=None) + await args["configure"](connection) + + assert called == [connection] + + +async def test_wrap_exceptions_wraps(): + @psycopg3_connector.wrap_exceptions + async def corofunc(): + raise psycopg.DatabaseError + + coro = corofunc() + + with pytest.raises(exceptions.ConnectorException): + await coro + + +async def test_wrap_exceptions_success(): + @psycopg3_connector.wrap_exceptions + async def corofunc(a, b): + return a, b + + assert await corofunc(1, 2) == (1, 2) + + +@pytest.mark.parametrize( + "max_size, expected_calls_count", + [ + pytest.param(5, 6, id="Valid max_size"), + pytest.param("5", 1, id="Invalid max_size"), + ], +) +async def test_wrap_query_exceptions_reached_max_tries( + mocker, max_size, expected_calls_count +): + called = [] + + @psycopg3_connector.wrap_query_exceptions + async def corofunc(connector): + called.append(True) + raise psycopg.errors.OperationalError( + "server closed the connection unexpectedly" + ) + + connector = mocker.Mock(_pool=mocker.AsyncMock(max_size=max_size)) + coro = corofunc(connector) + + with pytest.raises(exceptions.ConnectorException) as excinfo: + await coro + + assert len(called) == expected_calls_count + assert ( + str(excinfo.value) + == f"Could not get a valid connection after {expected_calls_count} tries" + ) + + +@pytest.mark.parametrize( + "exception_class", [Exception, psycopg.errors.OperationalError] +) +async def test_wrap_query_exceptions_unhandled_exception(mocker, exception_class): + called = [] + + @psycopg3_connector.wrap_query_exceptions + async def corofunc(connector): + called.append(True) + raise exception_class("foo") + + connector = mocker.Mock(_pool=mocker.AsyncMock(max_size=5)) + coro = corofunc(connector) + + with pytest.raises(exception_class): + await coro + + assert len(called) == 1 + + +async def test_wrap_query_exceptions_success(mocker): + called = [] + + @psycopg3_connector.wrap_query_exceptions + async def corofunc(connector, a, b): + if len(called) < 2: + called.append(True) + raise psycopg.errors.OperationalError( + "server closed the connection unexpectedly" + ) + return a, b + + connector = mocker.Mock(_pool=mocker.AsyncMock(max_size=5)) + + assert await corofunc(connector, 1, 2) == (1, 2) + assert len(called) == 2 + + +@pytest.mark.parametrize( + "method_name", + [ + "_create_pool", + "close_async", + "execute_query_async", + "execute_query_one_async", + "execute_query_all_async", + "listen_notify", + ], +) +def test_wrap_exceptions_applied(method_name, connector): + assert getattr(connector, method_name)._exceptions_wrapped is True + + +async def test_listen_notify_pool_one_connection(mocker, caplog, connector): + pool = mocker.AsyncMock(max_size=1) + await connector.open_async(pool) + caplog.clear() + + await connector.listen_notify(None, None) + + assert {e.action for e in caplog.records} == {"listen_notify_disabled"} + + +async def test_open_async_no_pool_specified(mocker, connector): + mocker.patch.object(connector, "_create_pool", return_value=mocker.AsyncMock()) + + await connector.open_async() + + assert connector._create_pool.call_count == 1 + assert connector._pool.open.await_count == 1 + + +async def test_open_async_pool_argument_specified(mocker, connector): + mocker.patch.object(connector, "_create_pool") + pool = mocker.AsyncMock() + + await connector.open_async(pool) + + assert connector._pool_externally_set is True + assert connector._create_pool.call_count == 0 + assert connector._pool.open.await_count == 1 + assert connector._pool == pool + + +def test_get_pool(connector): + with pytest.raises(exceptions.AppNotOpen): + _ = connector.pool From d8e60d2d79f41c0aa492d6863d98e96c5013cc03 Mon Sep 17 00:00:00 2001 From: Aleksandr Shtaub <91491081+aleksandr-shtaub@users.noreply.github.com> Date: Mon, 27 Nov 2023 00:19:23 +0200 Subject: [PATCH 3/9] Add basic Psycopg3Connector docs --- docs/howto/connector.rst | 2 ++ docs/reference.rst | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/howto/connector.rst b/docs/howto/connector.rst index 693bd0552..f0462a321 100644 --- a/docs/howto/connector.rst +++ b/docs/howto/connector.rst @@ -43,6 +43,7 @@ You can use other `aiopg connection arguments`_ (which are the same as .. _`aiopg connection arguments`: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect .. _`psycopg2 connection arguments`: http://initd.org/psycopg/docs/module.html#psycopg2.connect +.. _`psycopg3 connection arguments`: https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.AsyncConnectionPool Other arguments --------------- @@ -61,3 +62,4 @@ Procrastinate currently provides 2 connectors: - `Psycopg2Connector`: This connector is specialized for synchronous calls only, and should only be used to configure your app for synchronous multi-threaded applications that need to :term:`defer` tasks synchronously (see `discussion-sync-defer`). +- `Psycopg3Connector`: Asynchronous connector based on the next generation of psycopg. diff --git a/docs/reference.rst b/docs/reference.rst index 74162aac2..726aa37a1 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -15,6 +15,8 @@ Connectors .. autoclass:: procrastinate.Psycopg2Connector +.. autoclass:: procrastinate.Psycopg3Connector + .. autoclass:: procrastinate.testing.InMemoryConnector :members: reset From 55c710a76d7729420f79a41671f30f9dcda54062 Mon Sep 17 00:00:00 2001 From: Aleksandr Shtaub <91491081+aleksandr-shtaub@users.noreply.github.com> Date: Tue, 28 Nov 2023 10:28:24 +0200 Subject: [PATCH 4/9] Backport changes from ewjoachim's branch to aleksandr-shtaub's branch Co-Authored-by: Aleksandr Shtaub <91491081+aleksandr-shtaub@users.noreply.github.com> Co-Authored-by: Joachim Jablon --- procrastinate/exceptions.py | 6 + procrastinate/psycopg3_connector.py | 119 ++++++++++++------- tests/integration/test_psycopg3_connector.py | 2 +- tests/unit/test_psycopg3_connector.py | 3 +- 4 files changed, 82 insertions(+), 48 deletions(-) diff --git a/procrastinate/exceptions.py b/procrastinate/exceptions.py index f03489a30..03757899a 100644 --- a/procrastinate/exceptions.py +++ b/procrastinate/exceptions.py @@ -90,6 +90,12 @@ def __init__(self, *args, constraint_name: str): self.constraint_name = constraint_name +class NoResult(ConnectorException): + """ + No result was returned by the database query. + """ + + class MissingApp(ProcrastinateException): """ Missing app. This most probably happened because procrastinate needs an diff --git a/procrastinate/psycopg3_connector.py b/procrastinate/psycopg3_connector.py index fed3de72d..f8c719089 100644 --- a/procrastinate/psycopg3_connector.py +++ b/procrastinate/psycopg3_connector.py @@ -2,7 +2,16 @@ import functools import logging import re -from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional +from typing import ( + Any, + Callable, + Coroutine, + Dict, + Iterable, + List, + LiteralString, + Optional, +) import psycopg import psycopg.errors @@ -89,31 +98,35 @@ def __init__( **kwargs: Any, ): """ - Asynchronous connector based on a ``psycopg_pool.AsyncConnectionPool``. + Create a PostgreSQL connector using psycopg. The connector uses an + ``psycopg_pool.AsyncConnectionPool``, which is created internally, or + set into the connector by calling `App.open_async`. - The pool connection parameters can be provided here. Alternatively, an already - existing ``psycopg_pool.AsyncConnectionPool`` can be provided in the - ``App.open_async``, via the ``pool`` parameter. + Note that if you want to use a ``psycopg_pool.AsyncNullConnectionPool``, + you will need to initialize it yourself and pass it to the connector + through the ``App.open_async`` method. - All other arguments than ``json_dumps`` and ``json_loads`` are passed to - :py:func:`AsyncConnectionPool` (see psycopg3 documentation__), with default - values that may differ from those of ``psycopg3`` (see a partial list of - parameters below). + All other arguments than ``json_dumps`` and ``json_loads`` are passed + to ``psycopg_pool.AsyncConnectionPool`` (see psycopg documentation__). + + ``json_dumps`` and ``json_loads`` are used to configure new connections + created by the pool with ``psycopg.types.json.set_json_dumps`` and + ``psycopg.types.json.set_json_loads``. - .. _psycopg3 doc: https://www.psycopg.org/psycopg3/docs/basic/adapt.html#json-adaptation .. __: https://www.psycopg.org/psycopg3/docs/api/pool.html #psycopg_pool.AsyncConnectionPool Parameters ---------- json_dumps : - The JSON dumps function to use for serializing job arguments. Defaults to - the function used by psycopg3. See the `psycopg3 doc`_. + A function to serialize JSON objects to a string. If not provided, + JSON objects will be serialized using psycopg's default JSON + serializer. json_loads : - The JSON loads function to use for deserializing job arguments. Defaults - to the function used by psycopg3. See the `psycopg3 doc`_. Unused if the - pool is externally created and set into the connector through the - ``App.open_async`` method. + A function to deserialize JSON objects from a string. If not + provided, JSON objects will be deserialized using psycopg's default + JSON deserializer. + min_size : int Passed to psycopg3, default set to 1 (same as aiopg). max_size : int @@ -123,15 +136,17 @@ def __init__( argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file. """ - self.json_dumps = json_dumps - self.json_loads = json_loads self._pool: Optional[psycopg_pool.AsyncConnectionPool] = None - self._pool_args = self._adapt_pool_args(kwargs, json_loads) + self.json_dumps = json_dumps self._pool_externally_set = False + self._pool_args = self._adapt_pool_args(kwargs, json_loads, json_dumps) + self.json_loads = json_loads @staticmethod def _adapt_pool_args( - pool_args: Dict[str, Any], json_loads: Optional[Callable] + pool_args: Dict[str, Any], + json_loads: Optional[Callable], + json_dumps: Optional[Callable], ) -> Dict[str, Any]: """ Adapt the pool args for ``psycopg3``, using sensible defaults for Procrastinate. @@ -142,9 +157,13 @@ def _adapt_pool_args( async def configure(connection: psycopg.AsyncConnection[DictRow]): if base_configure: await base_configure(connection) + if json_loads: psycopg.types.json.set_json_loads(json_loads, connection) + if json_dumps: + psycopg.types.json.set_json_dumps(json_dumps, connection) + return { "conninfo": "", "min_size": 1, @@ -153,10 +172,17 @@ async def configure(connection: psycopg.AsyncConnection[DictRow]): "row_factory": dict_row, }, "configure": configure, - "open": False, **pool_args, } + @property + def pool( + self, + ) -> psycopg_pool.AsyncConnectionPool[psycopg.AsyncConnection[DictRow]]: + if self._pool is None: # Set by open_async + raise exceptions.AppNotOpen + return self._pool + async def open_async( self, pool: Optional[psycopg_pool.AsyncConnectionPool] = None ) -> None: @@ -176,15 +202,23 @@ async def open_async( else: self._pool = await self._create_pool(self._pool_args) - # ensure pool is open - await self._pool.open() # type: ignore + await self._pool.open(wait=True) # type: ignore @staticmethod @wrap_exceptions async def _create_pool( pool_args: Dict[str, Any] ) -> psycopg_pool.AsyncConnectionPool: - return psycopg_pool.AsyncConnectionPool(**pool_args) + return psycopg_pool.AsyncConnectionPool( + **pool_args, + # Not specifying open=False raises a warning and will be deprecated. + # It makes sense, as we can't really make async I/Os in a constructor. + open=False, + # Enables a check that will ensure the connections returned when + # using the pool are still alive. If they have been closed by the + # database, they will be seamlessly replaced by a new connection. + check=psycopg_pool.AsyncConnectionPool.check_connection, + ) @wrap_exceptions async def close_async(self) -> None: @@ -197,43 +231,37 @@ async def close_async(self) -> None: await self._pool.close() self._pool = None - @property - def pool( - self, - ) -> psycopg_pool.AsyncConnectionPool[psycopg.AsyncConnection[DictRow]]: - if self._pool is None: # Set by open - raise exceptions.AppNotOpen - return self._pool - def _wrap_json(self, arguments: Dict[str, Any]): return { - key: psycopg.types.json.Jsonb(value, dumps=self.json_dumps) - if isinstance(value, dict) - else value + key: psycopg.types.json.Jsonb(value) if isinstance(value, dict) else value for key, value in arguments.items() } @wrap_exceptions @wrap_query_exceptions - async def execute_query_async(self, query: str, **arguments: Any) -> None: + async def execute_query_async(self, query: LiteralString, **arguments: Any) -> None: async with self.pool.connection() as connection: - async with connection.cursor() as cursor: - await cursor.execute(query, self._wrap_json(arguments)) + await connection.execute(query, self._wrap_json(arguments)) @wrap_exceptions @wrap_query_exceptions async def execute_query_one_async( - self, query: str, **arguments: Any - ) -> Optional[DictRow]: + self, query: LiteralString, **arguments: Any + ) -> DictRow: async with self.pool.connection() as connection: async with connection.cursor() as cursor: await cursor.execute(query, self._wrap_json(arguments)) - return await cursor.fetchone() + + result = await cursor.fetchone() + + if result is None: + raise exceptions.NoResult + return result @wrap_exceptions @wrap_query_exceptions async def execute_query_all_async( - self, query: str, **arguments: Any + self, query: LiteralString, **arguments: Any ) -> List[DictRow]: async with self.pool.connection() as connection: async with connection.cursor() as cursor: @@ -266,9 +294,8 @@ async def listen_notify( channel_name=psycopg.sql.Identifier(channel_name) ) await connection.execute(query) - + # Initial set() lets caller know that we're ready to listen event.set() - await self._loop_notify(event=event, connection=connection) @wrap_exceptions @@ -277,6 +304,8 @@ async def _loop_notify( event: asyncio.Event, connection: psycopg.AsyncConnection, ) -> None: + # We'll leave this loop with a CancelledError, when we get cancelled + while True: if connection.closed: return @@ -285,7 +314,7 @@ async def _loop_notify( async for _ in notifies: event.set() except psycopg.OperationalError: - continue + break def __del__(self): pass diff --git a/tests/integration/test_psycopg3_connector.py b/tests/integration/test_psycopg3_connector.py index ce53f24a3..b130de965 100644 --- a/tests/integration/test_psycopg3_connector.py +++ b/tests/integration/test_psycopg3_connector.py @@ -35,7 +35,7 @@ async def configure(connection): called.append(connection) args = psycopg3_connector.Psycopg3Connector._adapt_pool_args( - pool_args={"configure": configure}, json_loads=None + pool_args={"configure": configure}, json_loads=None, json_dumps=None ) assert args["configure"] is not configure diff --git a/tests/unit/test_psycopg3_connector.py b/tests/unit/test_psycopg3_connector.py index b53fb6070..c742da407 100644 --- a/tests/unit/test_psycopg3_connector.py +++ b/tests/unit/test_psycopg3_connector.py @@ -16,7 +16,7 @@ async def configure(connection): called.append(connection) args = psycopg3_connector.Psycopg3Connector._adapt_pool_args( - pool_args={"configure": configure}, json_loads=None + pool_args={"configure": configure}, json_loads=None, json_dumps=None ) assert args["configure"] is not configure @@ -158,7 +158,6 @@ async def test_open_async_pool_argument_specified(mocker, connector): assert connector._pool_externally_set is True assert connector._create_pool.call_count == 0 - assert connector._pool.open.await_count == 1 assert connector._pool == pool From 84b240d5a9669f02a34bfa5c2047683ce6861a02 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 16 Dec 2023 21:32:02 +0100 Subject: [PATCH 5/9] Fix import --- procrastinate/psycopg3_connector.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/procrastinate/psycopg3_connector.py b/procrastinate/psycopg3_connector.py index f8c719089..522e26d93 100644 --- a/procrastinate/psycopg3_connector.py +++ b/procrastinate/psycopg3_connector.py @@ -2,16 +2,7 @@ import functools import logging import re -from typing import ( - Any, - Callable, - Coroutine, - Dict, - Iterable, - List, - LiteralString, - Optional, -) +from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional import psycopg import psycopg.errors @@ -19,6 +10,7 @@ import psycopg.types.json import psycopg_pool from psycopg.rows import DictRow, dict_row +from typing_extensions import LiteralString from procrastinate import connector, exceptions, sql From 3dbed0947024487d5358030769395ebf83ff81c1 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 16 Dec 2023 22:25:08 +0100 Subject: [PATCH 6/9] Psycopg3 is just called "Psycopg" --- docs/howto/connector.rst | 1 - docs/reference.rst | 2 +- procrastinate/__init__.py | 4 +- ...opg3_connector.py => psycopg_connector.py} | 12 ++-- tests/conftest.py | 16 ++--- ...connector.py => test_psycopg_connector.py} | 72 +++++++++---------- ...connector.py => test_psycopg_connector.py} | 16 ++--- 7 files changed, 61 insertions(+), 62 deletions(-) rename procrastinate/{psycopg3_connector.py => psycopg_connector.py} (96%) rename tests/integration/{test_psycopg3_connector.py => test_psycopg_connector.py} (65%) rename tests/unit/{test_psycopg3_connector.py => test_psycopg_connector.py} (91%) diff --git a/docs/howto/connector.rst b/docs/howto/connector.rst index f0462a321..fc9a0a5bd 100644 --- a/docs/howto/connector.rst +++ b/docs/howto/connector.rst @@ -43,7 +43,6 @@ You can use other `aiopg connection arguments`_ (which are the same as .. _`aiopg connection arguments`: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect .. _`psycopg2 connection arguments`: http://initd.org/psycopg/docs/module.html#psycopg2.connect -.. _`psycopg3 connection arguments`: https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.AsyncConnectionPool Other arguments --------------- diff --git a/docs/reference.rst b/docs/reference.rst index 726aa37a1..9829bd1fb 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -15,7 +15,7 @@ Connectors .. autoclass:: procrastinate.Psycopg2Connector -.. autoclass:: procrastinate.Psycopg3Connector +.. autoclass:: procrastinate.PsycopgConnector .. autoclass:: procrastinate.testing.InMemoryConnector :members: reset diff --git a/procrastinate/__init__.py b/procrastinate/__init__.py index 510449494..93fe1879d 100644 --- a/procrastinate/__init__.py +++ b/procrastinate/__init__.py @@ -5,7 +5,7 @@ from procrastinate.connector import BaseConnector from procrastinate.job_context import JobContext from procrastinate.psycopg2_connector import Psycopg2Connector -from procrastinate.psycopg3_connector import Psycopg3Connector +from procrastinate.psycopg_connector import PsycopgConnector from procrastinate.retry import BaseRetryStrategy, RetryStrategy __all__ = [ @@ -16,7 +16,7 @@ "BaseRetryStrategy", "AiopgConnector", "Psycopg2Connector", - "Psycopg3Connector", + "PsycopgConnector", "RetryStrategy", ] diff --git a/procrastinate/psycopg3_connector.py b/procrastinate/psycopg_connector.py similarity index 96% rename from procrastinate/psycopg3_connector.py rename to procrastinate/psycopg_connector.py index 522e26d93..ac6254f4d 100644 --- a/procrastinate/psycopg3_connector.py +++ b/procrastinate/psycopg_connector.py @@ -23,7 +23,7 @@ def wrap_exceptions(coro: CoroutineFunction) -> CoroutineFunction: """ - Wrap psycopg3 errors as connector exceptions. + Wrap psycopg errors as connector exceptions. This decorator is expected to be used on coroutine functions only. """ @@ -81,7 +81,7 @@ async def wrapped(*args, **kwargs): PERCENT_PATTERN = re.compile(r"%(?![\(s])") -class Psycopg3Connector(connector.BaseAsyncConnector): +class PsycopgConnector(connector.BaseAsyncConnector): def __init__( self, *, @@ -120,11 +120,11 @@ def __init__( JSON deserializer. min_size : int - Passed to psycopg3, default set to 1 (same as aiopg). + Passed to psycopg, default set to 1 (same as aiopg). max_size : int - Passed to psycopg3, default set to 10 (same as aiopg). + Passed to psycopg, default set to 10 (same as aiopg). conninfo : ``Optional[str]`` - Passed to psycopg3. Default is "" instead of None, which means if no + Passed to psycopg. Default is "" instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file. """ @@ -141,7 +141,7 @@ def _adapt_pool_args( json_dumps: Optional[Callable], ) -> Dict[str, Any]: """ - Adapt the pool args for ``psycopg3``, using sensible defaults for Procrastinate. + Adapt the pool args for ``psycopg``, using sensible defaults for Procrastinate. """ base_configure = pool_args.pop("configure", None) diff --git a/tests/conftest.py b/tests/conftest.py index 39f5705a6..c62240896 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,7 @@ from procrastinate import app as app_module from procrastinate import blueprints, builtin_tasks, jobs from procrastinate import psycopg2_connector as psycopg2_connector_module -from procrastinate import psycopg3_connector as psycopg3_connector_module +from procrastinate import psycopg_connector as psycopg_connector_module from procrastinate import schema, testing from procrastinate.contrib.sqlalchemy import ( psycopg2_connector as sqlalchemy_psycopg2_connector_module, @@ -105,7 +105,7 @@ def connection_params(setup_db, db_factory): @pytest.fixture -def psycopg3_connection_params(setup_db, db_factory): +def psycopg_connection_params(setup_db, db_factory): db_factory(dbname="procrastinate_test", template=setup_db) yield {"conninfo": make_conninfo(dbname="procrastinate_test")} @@ -130,8 +130,8 @@ async def not_opened_aiopg_connector(connection_params): @pytest.fixture -async def not_opened_psycopg3_connector(psycopg3_connection_params): - yield psycopg3_connector_module.Psycopg3Connector(**psycopg3_connection_params) +async def not_opened_psycopg_connector(psycopg_connection_params): + yield psycopg_connector_module.PsycopgConnector(**psycopg_connection_params) @pytest.fixture @@ -154,10 +154,10 @@ async def aiopg_connector(not_opened_aiopg_connector): @pytest.fixture -async def psycopg3_connector(not_opened_psycopg3_connector): - await not_opened_psycopg3_connector.open_async() - yield not_opened_psycopg3_connector - await not_opened_psycopg3_connector.close_async() +async def psycopg_connector(not_opened_psycopg_connector): + await not_opened_psycopg_connector.open_async() + yield not_opened_psycopg_connector + await not_opened_psycopg_connector.close_async() @pytest.fixture diff --git a/tests/integration/test_psycopg3_connector.py b/tests/integration/test_psycopg_connector.py similarity index 65% rename from tests/integration/test_psycopg3_connector.py rename to tests/integration/test_psycopg_connector.py index b130de965..28c157ed9 100644 --- a/tests/integration/test_psycopg3_connector.py +++ b/tests/integration/test_psycopg_connector.py @@ -5,19 +5,19 @@ import attr import pytest -from procrastinate import psycopg3_connector +from procrastinate import psycopg_connector @pytest.fixture -async def psycopg3_connector_factory(psycopg3_connection_params): +async def psycopg_connector_factory(psycopg_connection_params): connectors = [] async def _(**kwargs): json_dumps = kwargs.pop("json_dumps", None) json_loads = kwargs.pop("json_loads", None) - psycopg3_connection_params.update(kwargs) - connector = psycopg3_connector.Psycopg3Connector( - json_dumps=json_dumps, json_loads=json_loads, **psycopg3_connection_params + psycopg_connection_params.update(kwargs) + connector = psycopg_connector.PsycopgConnector( + json_dumps=json_dumps, json_loads=json_loads, **psycopg_connection_params ) connectors.append(connector) await connector.open_async() @@ -34,7 +34,7 @@ async def test_adapt_pool_args_configure(mocker): async def configure(connection): called.append(connection) - args = psycopg3_connector.Psycopg3Connector._adapt_pool_args( + args = psycopg_connector.PsycopgConnector._adapt_pool_args( pool_args={"configure": configure}, json_loads=None, json_dumps=None ) @@ -54,7 +54,7 @@ async def configure(connection): ], ) async def test_execute_query_json_dumps( - psycopg3_connector_factory, mocker, method_name, expected + psycopg_connector_factory, mocker, method_name, expected ): class NotJSONSerializableByDefault: pass @@ -67,14 +67,14 @@ def encode(obj): query = "SELECT %(arg)s::jsonb as json" arg = {"a": "a", "b": NotJSONSerializableByDefault()} json_dumps = functools.partial(json.dumps, default=encode) - connector = await psycopg3_connector_factory(json_dumps=json_dumps) + connector = await psycopg_connector_factory(json_dumps=json_dumps) method = getattr(connector, method_name) result = await method(query, arg=arg) assert result == expected -async def test_json_loads(psycopg3_connector_factory, mocker): +async def test_json_loads(psycopg_connector_factory, mocker): @attr.dataclass class Param: p: int @@ -88,45 +88,45 @@ def decode(dct): query = "SELECT %(arg)s::jsonb as json" arg = {"a": 1, "b": 2} - connector = await psycopg3_connector_factory(json_loads=json_loads) + connector = await psycopg_connector_factory(json_loads=json_loads) result = await connector.execute_query_one_async(query, arg=arg) assert result["json"] == {"a": 1, "b": Param(p=2)} -async def test_execute_query(psycopg3_connector): +async def test_execute_query(psycopg_connector): assert ( - await psycopg3_connector.execute_query_async( + await psycopg_connector.execute_query_async( "COMMENT ON TABLE \"procrastinate_jobs\" IS 'foo' " ) is None ) - result = await psycopg3_connector.execute_query_one_async( + result = await psycopg_connector.execute_query_one_async( "SELECT obj_description('public.procrastinate_jobs'::regclass)" ) assert result == {"obj_description": "foo"} - result = await psycopg3_connector.execute_query_all_async( + result = await psycopg_connector.execute_query_all_async( "SELECT obj_description('public.procrastinate_jobs'::regclass)" ) assert result == [{"obj_description": "foo"}] -async def test_execute_query_interpolate(psycopg3_connector): - result = await psycopg3_connector.execute_query_one_async( +async def test_execute_query_interpolate(psycopg_connector): + result = await psycopg_connector.execute_query_one_async( "SELECT %(foo)s as foo;", foo="bar" ) assert result == {"foo": "bar"} @pytest.mark.filterwarnings("error::ResourceWarning") -async def test_execute_query_simultaneous(psycopg3_connector): +async def test_execute_query_simultaneous(psycopg_connector): # two coroutines doing execute_query_async simultaneously # # the test may fail if the connector fails to properly parallelize connections async def query(): - await psycopg3_connector.execute_query_async("SELECT 1") + await psycopg_connector.execute_query_async("SELECT 1") try: await asyncio.gather(query(), query()) @@ -134,25 +134,25 @@ async def query(): pytest.fail("ResourceWarning") -async def test_close_async(psycopg3_connector): - await psycopg3_connector.execute_query_async("SELECT 1") - pool = psycopg3_connector._pool - await psycopg3_connector.close_async() +async def test_close_async(psycopg_connector): + await psycopg_connector.execute_query_async("SELECT 1") + pool = psycopg_connector._pool + await psycopg_connector.close_async() assert pool.closed is True - assert psycopg3_connector._pool is None + assert psycopg_connector._pool is None -async def test_listen_notify(psycopg3_connector): +async def test_listen_notify(psycopg_connector): channel = "somechannel" event = asyncio.Event() task = asyncio.ensure_future( - psycopg3_connector.listen_notify(channels=[channel], event=event) + psycopg_connector.listen_notify(channels=[channel], event=event) ) try: await event.wait() event.clear() - await psycopg3_connector.execute_query_async(f"""NOTIFY "{channel}" """) + await psycopg_connector.execute_query_async(f"""NOTIFY "{channel}" """) await asyncio.wait_for(event.wait(), timeout=1) except asyncio.TimeoutError: pytest.fail("Notify not received within 1 sec") @@ -160,14 +160,14 @@ async def test_listen_notify(psycopg3_connector): task.cancel() -async def test_loop_notify_stop_when_connection_closed(psycopg3_connector): +async def test_loop_notify_stop_when_connection_closed(psycopg_connector): # We want to make sure that the when the connection is closed, the loop end. event = asyncio.Event() - await psycopg3_connector.open_async() - async with psycopg3_connector._pool.connection() as connection: - coro = psycopg3_connector._loop_notify(event=event, connection=connection) + await psycopg_connector.open_async() + async with psycopg_connector._pool.connection() as connection: + coro = psycopg_connector._loop_notify(event=event, connection=connection) - await psycopg3_connector._pool.close() + await psycopg_connector._pool.close() assert connection.closed try: @@ -176,18 +176,18 @@ async def test_loop_notify_stop_when_connection_closed(psycopg3_connector): pytest.fail("Failed to detect that connection was closed and stop") -async def test_loop_notify_timeout(psycopg3_connector): +async def test_loop_notify_timeout(psycopg_connector): # We want to make sure that when the listen starts, we don't listen forever. If the # connection closes, we eventually finish the coroutine. event = asyncio.Event() - await psycopg3_connector.open_async() - async with psycopg3_connector._pool.connection() as connection: + await psycopg_connector.open_async() + async with psycopg_connector._pool.connection() as connection: task = asyncio.ensure_future( - psycopg3_connector._loop_notify(event=event, connection=connection) + psycopg_connector._loop_notify(event=event, connection=connection) ) assert not task.done() - await psycopg3_connector._pool.close() + await psycopg_connector._pool.close() assert connection.closed try: diff --git a/tests/unit/test_psycopg3_connector.py b/tests/unit/test_psycopg_connector.py similarity index 91% rename from tests/unit/test_psycopg3_connector.py rename to tests/unit/test_psycopg_connector.py index c742da407..bc9eae232 100644 --- a/tests/unit/test_psycopg3_connector.py +++ b/tests/unit/test_psycopg_connector.py @@ -1,12 +1,12 @@ import psycopg import pytest -from procrastinate import exceptions, psycopg3_connector +from procrastinate import exceptions, psycopg_connector @pytest.fixture def connector(): - return psycopg3_connector.Psycopg3Connector() + return psycopg_connector.PsycopgConnector() async def test_adapt_pool_args_configure(mocker): @@ -15,7 +15,7 @@ async def test_adapt_pool_args_configure(mocker): async def configure(connection): called.append(connection) - args = psycopg3_connector.Psycopg3Connector._adapt_pool_args( + args = psycopg_connector.PsycopgConnector._adapt_pool_args( pool_args={"configure": configure}, json_loads=None, json_dumps=None ) @@ -28,7 +28,7 @@ async def configure(connection): async def test_wrap_exceptions_wraps(): - @psycopg3_connector.wrap_exceptions + @psycopg_connector.wrap_exceptions async def corofunc(): raise psycopg.DatabaseError @@ -39,7 +39,7 @@ async def corofunc(): async def test_wrap_exceptions_success(): - @psycopg3_connector.wrap_exceptions + @psycopg_connector.wrap_exceptions async def corofunc(a, b): return a, b @@ -58,7 +58,7 @@ async def test_wrap_query_exceptions_reached_max_tries( ): called = [] - @psycopg3_connector.wrap_query_exceptions + @psycopg_connector.wrap_query_exceptions async def corofunc(connector): called.append(True) raise psycopg.errors.OperationalError( @@ -84,7 +84,7 @@ async def corofunc(connector): async def test_wrap_query_exceptions_unhandled_exception(mocker, exception_class): called = [] - @psycopg3_connector.wrap_query_exceptions + @psycopg_connector.wrap_query_exceptions async def corofunc(connector): called.append(True) raise exception_class("foo") @@ -101,7 +101,7 @@ async def corofunc(connector): async def test_wrap_query_exceptions_success(mocker): called = [] - @psycopg3_connector.wrap_query_exceptions + @psycopg_connector.wrap_query_exceptions async def corofunc(connector, a, b): if len(called) < 2: called.append(True) From dc5dc66ff7c170e5eb7c78ec0e8a94c5cf46435c Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 16 Dec 2023 22:39:00 +0100 Subject: [PATCH 7/9] Documentation details --- docs/howto/connector.rst | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/howto/connector.rst b/docs/howto/connector.rst index fc9a0a5bd..54ebc32f9 100644 --- a/docs/howto/connector.rst +++ b/docs/howto/connector.rst @@ -61,4 +61,10 @@ Procrastinate currently provides 2 connectors: - `Psycopg2Connector`: This connector is specialized for synchronous calls only, and should only be used to configure your app for synchronous multi-threaded applications that need to :term:`defer` tasks synchronously (see `discussion-sync-defer`). -- `Psycopg3Connector`: Asynchronous connector based on the next generation of psycopg. +- `PsycopgConnector`: Asynchronous connector based on psycopg v3. It has been + less tested than the other connectors, though it's expected to work fine, + possible even better than aiopg. It might replace the `AiopgConnector` and + `Psycopg2Connector` in the future. Until `#753`_ is merged, support is + experimental. + +.. _`#753`: https://github.com/procrastinate-org/procrastinate/pull/753 From 7c4cd2d7143c74af0f8c181c5bc4fe2f9d108c70 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 16 Dec 2023 22:46:31 +0100 Subject: [PATCH 8/9] wrap_query_exception isn't necessary anymore for psycopg3 --- procrastinate/psycopg_connector.py | 42 ----------------- tests/unit/test_psycopg_connector.py | 70 ---------------------------- 2 files changed, 112 deletions(-) diff --git a/procrastinate/psycopg_connector.py b/procrastinate/psycopg_connector.py index ac6254f4d..dca1dac4d 100644 --- a/procrastinate/psycopg_connector.py +++ b/procrastinate/psycopg_connector.py @@ -1,7 +1,6 @@ import asyncio import functools import logging -import re from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional import psycopg @@ -43,44 +42,6 @@ async def wrapped(*args, **kwargs): return wrapped -def wrap_query_exceptions(coro: CoroutineFunction) -> CoroutineFunction: - """ - Detect "admin shutdown" errors and retry a number of times. - - This is to handle the case where the database connection (obtained from the pool) - was actually closed by the server. In this case, pyscopg3 raises an AdminShutdown - exception when the connection is used for issuing a query. What we do is retry when - an AdminShutdown is raised, and until the maximum number of retries is reached. - - The number of retries is set to the pool maximum size plus one, to handle the case - where the connections we have in the pool were all closed on the server side. - """ - - @functools.wraps(coro) - async def wrapped(*args, **kwargs): - final_exc = None - try: - max_tries = args[0]._pool.max_size + 1 - except Exception: - max_tries = 1 - for _ in range(max_tries): - try: - return await coro(*args, **kwargs) - except psycopg.errors.OperationalError as exc: - if "server closed the connection unexpectedly" in str(exc): - final_exc = exc - continue - raise exc - raise exceptions.ConnectorException( - f"Could not get a valid connection after {max_tries} tries" - ) from final_exc - - return wrapped - - -PERCENT_PATTERN = re.compile(r"%(?![\(s])") - - class PsycopgConnector(connector.BaseAsyncConnector): def __init__( self, @@ -230,13 +191,11 @@ def _wrap_json(self, arguments: Dict[str, Any]): } @wrap_exceptions - @wrap_query_exceptions async def execute_query_async(self, query: LiteralString, **arguments: Any) -> None: async with self.pool.connection() as connection: await connection.execute(query, self._wrap_json(arguments)) @wrap_exceptions - @wrap_query_exceptions async def execute_query_one_async( self, query: LiteralString, **arguments: Any ) -> DictRow: @@ -251,7 +210,6 @@ async def execute_query_one_async( return result @wrap_exceptions - @wrap_query_exceptions async def execute_query_all_async( self, query: LiteralString, **arguments: Any ) -> List[DictRow]: diff --git a/tests/unit/test_psycopg_connector.py b/tests/unit/test_psycopg_connector.py index bc9eae232..3b11300e3 100644 --- a/tests/unit/test_psycopg_connector.py +++ b/tests/unit/test_psycopg_connector.py @@ -46,76 +46,6 @@ async def corofunc(a, b): assert await corofunc(1, 2) == (1, 2) -@pytest.mark.parametrize( - "max_size, expected_calls_count", - [ - pytest.param(5, 6, id="Valid max_size"), - pytest.param("5", 1, id="Invalid max_size"), - ], -) -async def test_wrap_query_exceptions_reached_max_tries( - mocker, max_size, expected_calls_count -): - called = [] - - @psycopg_connector.wrap_query_exceptions - async def corofunc(connector): - called.append(True) - raise psycopg.errors.OperationalError( - "server closed the connection unexpectedly" - ) - - connector = mocker.Mock(_pool=mocker.AsyncMock(max_size=max_size)) - coro = corofunc(connector) - - with pytest.raises(exceptions.ConnectorException) as excinfo: - await coro - - assert len(called) == expected_calls_count - assert ( - str(excinfo.value) - == f"Could not get a valid connection after {expected_calls_count} tries" - ) - - -@pytest.mark.parametrize( - "exception_class", [Exception, psycopg.errors.OperationalError] -) -async def test_wrap_query_exceptions_unhandled_exception(mocker, exception_class): - called = [] - - @psycopg_connector.wrap_query_exceptions - async def corofunc(connector): - called.append(True) - raise exception_class("foo") - - connector = mocker.Mock(_pool=mocker.AsyncMock(max_size=5)) - coro = corofunc(connector) - - with pytest.raises(exception_class): - await coro - - assert len(called) == 1 - - -async def test_wrap_query_exceptions_success(mocker): - called = [] - - @psycopg_connector.wrap_query_exceptions - async def corofunc(connector, a, b): - if len(called) < 2: - called.append(True) - raise psycopg.errors.OperationalError( - "server closed the connection unexpectedly" - ) - return a, b - - connector = mocker.Mock(_pool=mocker.AsyncMock(max_size=5)) - - assert await corofunc(connector, 1, 2) == (1, 2) - assert len(called) == 2 - - @pytest.mark.parametrize( "method_name", [ From 3a3095fc270bc15d6009c22a8cd49d570619c82e Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 16 Dec 2023 22:52:25 +0100 Subject: [PATCH 9/9] sql module produces LiteralStrings, not strings --- procrastinate/sql/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/procrastinate/sql/__init__.py b/procrastinate/sql/__init__.py index 73c002540..10539224d 100644 --- a/procrastinate/sql/__init__.py +++ b/procrastinate/sql/__init__.py @@ -1,6 +1,8 @@ import re import sys -from typing import Dict +from typing import Dict, cast + +from typing_extensions import LiteralString # https://github.com/pypa/twine/pull/551 if sys.version_info[:2] < (3, 9): # coverage: exclude @@ -11,7 +13,7 @@ QUERIES_REGEX = re.compile(r"(?:\n|^)-- ([a-z0-9_]+) --\n(?:-- .+\n)*", re.MULTILINE) -def parse_query_file(query_file: str) -> Dict["str", "str"]: +def parse_query_file(query_file: str) -> Dict["str", LiteralString]: split = iter(QUERIES_REGEX.split(query_file)) next(split) # Consume the header of the file result = {} @@ -19,13 +21,16 @@ def parse_query_file(query_file: str) -> Dict["str", "str"]: while True: key = next(split) value = next(split).strip() - result[key] = value + # procrastinate takes full responsibility for the queries, we + # can safely vouch for them being as safe as if they were + # defined in the code itself. + result[key] = cast(LiteralString, value) except StopIteration: pass return result -def get_queries() -> Dict["str", "str"]: +def get_queries() -> Dict["str", LiteralString]: return parse_query_file( (importlib_resources.files("procrastinate.sql") / "queries.sql").read_text( encoding="utf-8"