-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Python/SQLAlchemy: Demonstrate support for
asyncpg
and psycopg
The `sqlalchemy-cratedb` package supports the vanilla HTTP-based transport using urllib3, and the standard PostgreSQL drivers `asyncpg` and `psycopg`.
- Loading branch information
Showing
6 changed files
with
549 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
""" | ||
About | ||
===== | ||
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy | ||
dialect, and exercise a few basic examples using the low-level table API, in | ||
asynchronous mode. | ||
Specific to the asynchronous mode of SQLAlchemy is the streaming of results: | ||
> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()` | ||
> method that returns an `AsyncResult` object. This result object uses a server-side cursor | ||
> and provides an async/await API, such as an async iterator. | ||
> | ||
> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core | ||
Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used. | ||
The corresponding SQLAlchemy dialect identifiers are:: | ||
# PostgreSQL protocol on port 5432, using `asyncpg` | ||
crate+asyncpg://crate@localhost:5432/doc | ||
# PostgreSQL protocol on port 5432, using `psycopg` | ||
crate+psycopg://crate@localhost:5432/doc | ||
Synopsis | ||
======== | ||
:: | ||
# Run CrateDB | ||
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate | ||
# Use PostgreSQL protocol, with `asyncpg` | ||
python async_streaming.py asyncpg | ||
# Use PostgreSQL protocol, with asynchronous support of `psycopg` | ||
python async_streaming.py psycopg | ||
# Use with both variants | ||
python async_streaming.py asyncpg psycopg | ||
""" | ||
import asyncio | ||
import sys | ||
import typing as t | ||
from functools import lru_cache | ||
|
||
import sqlalchemy as sa | ||
from sqlalchemy.ext.asyncio import create_async_engine | ||
|
||
metadata = sa.MetaData() | ||
table = sa.Table( | ||
"t1", | ||
metadata, | ||
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False), | ||
sa.Column("name", sa.String), | ||
) | ||
|
||
|
||
class AsynchronousTableStreamingExample: | ||
""" | ||
Demonstrate reading streamed results when using the CrateDB SQLAlchemy | ||
dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers. | ||
- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core | ||
- https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html | ||
""" | ||
|
||
def __init__(self, dsn: str): | ||
self.dsn = dsn | ||
|
||
@property | ||
@lru_cache | ||
def engine(self): | ||
""" | ||
Provide an SQLAlchemy engine object. | ||
""" | ||
return create_async_engine(self.dsn, echo=True) | ||
|
||
async def run(self): | ||
""" | ||
Run the whole recipe. | ||
""" | ||
await self.create_and_insert() | ||
await self.read_buffered() | ||
await self.read_streaming() | ||
|
||
async def create_and_insert(self): | ||
""" | ||
Create table schema, completely dropping it upfront, and insert a few records. | ||
""" | ||
# conn is an instance of AsyncConnection | ||
async with self.engine.begin() as conn: | ||
# to support SQLAlchemy DDL methods as well as legacy functions, the | ||
# AsyncConnection.run_sync() awaitable method will pass a "sync" | ||
# version of the AsyncConnection object to any synchronous method, | ||
# where synchronous IO calls will be transparently translated for | ||
# await. | ||
await conn.run_sync(metadata.drop_all, checkfirst=True) | ||
await conn.run_sync(metadata.create_all) | ||
|
||
# for normal statement execution, a traditional "await execute()" | ||
# pattern is used. | ||
await conn.execute( | ||
table.insert(), | ||
[{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}], | ||
) | ||
|
||
# CrateDB specifics to flush/synchronize the write operation. | ||
await conn.execute(sa.text("REFRESH TABLE t1;")) | ||
|
||
async def read_buffered(self): | ||
""" | ||
Read data from the database, in buffered mode. | ||
""" | ||
async with self.engine.connect() as conn: | ||
# the default result object is the | ||
# sqlalchemy.engine.Result object | ||
result = await conn.execute(table.select()) | ||
|
||
# the results are buffered so no await call is necessary | ||
# for this case. | ||
print(result.fetchall()) | ||
|
||
async def read_streaming(self): | ||
""" | ||
Read data from the database, in streaming mode. | ||
""" | ||
async with self.engine.connect() as conn: | ||
|
||
# for a streaming result that buffers only segments of the | ||
# result at time, the AsyncConnection.stream() method is used. | ||
# this returns a sqlalchemy.ext.asyncio.AsyncResult object. | ||
async_result = await conn.stream(table.select()) | ||
|
||
# this object supports async iteration and awaitable | ||
# versions of methods like .all(), fetchmany(), etc. | ||
async for row in async_result: | ||
print(row) | ||
|
||
|
||
async def run_example(dsn: str): | ||
example = AsynchronousTableStreamingExample(dsn) | ||
|
||
# Run a basic conversation. | ||
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`. | ||
await example.run() | ||
|
||
|
||
def run_drivers(drivers: t.List[str]): | ||
for driver in drivers: | ||
if driver == "asyncpg": | ||
dsn = "crate+asyncpg://crate@localhost:5432/doc" | ||
elif driver == "psycopg": | ||
dsn = "crate+psycopg://crate@localhost:5432/doc" | ||
else: | ||
raise ValueError(f"Unknown driver: {driver}") | ||
|
||
asyncio.run(run_example(dsn)) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
drivers = sys.argv[1:] | ||
if not drivers: | ||
raise ValueError("Please select driver") | ||
run_drivers(drivers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
""" | ||
About | ||
===== | ||
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy | ||
dialect, and exercise a few basic examples using the low-level table API, in | ||
asynchronous mode. | ||
Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used. | ||
The corresponding SQLAlchemy dialect identifiers are:: | ||
# PostgreSQL protocol on port 5432, using `asyncpg` | ||
crate+asyncpg://crate@localhost:5432/doc | ||
# PostgreSQL protocol on port 5432, using `psycopg` | ||
crate+psycopg://crate@localhost:5432/doc | ||
Synopsis | ||
======== | ||
:: | ||
# Run CrateDB | ||
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate | ||
# Use PostgreSQL protocol, with `asyncpg` | ||
python async_table.py asyncpg | ||
# Use PostgreSQL protocol, with asynchronous support of `psycopg` | ||
python async_table.py psycopg | ||
# Use with both variants | ||
python async_table.py asyncpg psycopg | ||
""" | ||
import asyncio | ||
import sys | ||
import typing as t | ||
from functools import lru_cache | ||
|
||
import sqlalchemy as sa | ||
from sqlalchemy.ext.asyncio import create_async_engine | ||
|
||
|
||
class AsynchronousTableExample: | ||
""" | ||
Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode, | ||
using the `asyncpg` and `psycopg` drivers. | ||
""" | ||
|
||
def __init__(self, dsn: str): | ||
self.dsn = dsn | ||
|
||
@property | ||
@lru_cache | ||
def engine(self): | ||
""" | ||
Provide an SQLAlchemy engine object. | ||
""" | ||
return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True) | ||
|
||
@property | ||
@lru_cache | ||
def table(self): | ||
""" | ||
Provide an SQLAlchemy table object. | ||
""" | ||
metadata = sa.MetaData() | ||
return sa.Table( | ||
"testdrive", | ||
metadata, | ||
sa.Column("x", sa.Integer, primary_key=True, autoincrement=False), | ||
sa.Column("y", sa.Integer), | ||
) | ||
|
||
async def conn_run_sync(self, func: t.Callable, *args, **kwargs): | ||
""" | ||
To support SQLAlchemy DDL methods as well as legacy functions, the | ||
AsyncConnection.run_sync() awaitable method will pass a "sync" | ||
version of the AsyncConnection object to any synchronous method, | ||
where synchronous IO calls will be transparently translated for | ||
await. | ||
https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html | ||
""" | ||
# `conn` is an instance of `AsyncConnection` | ||
async with self.engine.begin() as conn: | ||
return await conn.run_sync(func, *args, **kwargs) | ||
|
||
async def run(self): | ||
""" | ||
Run the whole recipe, returning the result from the "read" step. | ||
""" | ||
await self.create() | ||
await self.insert(sync=True) | ||
return await self.read() | ||
|
||
async def create(self): | ||
""" | ||
Create table schema, completely dropping it upfront. | ||
""" | ||
await self.conn_run_sync(self.table.drop, checkfirst=True) | ||
await self.conn_run_sync(self.table.create) | ||
|
||
async def insert(self, sync: bool = False): | ||
""" | ||
Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account. | ||
""" | ||
async with self.engine.begin() as conn: | ||
stmt = self.table.insert().values(x=1, y=42) | ||
await conn.execute(stmt) | ||
stmt = self.table.insert().values(x=2, y=42) | ||
await conn.execute(stmt) | ||
if sync and self.dsn.startswith("crate"): | ||
await conn.execute(sa.text("REFRESH TABLE testdrive;")) | ||
|
||
async def read(self): | ||
""" | ||
Read data from the database. | ||
""" | ||
async with self.engine.begin() as conn: | ||
cursor = await conn.execute(sa.text("SELECT * FROM testdrive;")) | ||
return cursor.fetchall() | ||
|
||
async def reflect(self): | ||
""" | ||
Reflect the table schema from the database. | ||
""" | ||
|
||
# Optionally enable tracing SQLAlchemy calls. | ||
# self.trace() | ||
|
||
def reflect(session): | ||
""" | ||
A function written in "synchronous" style that will be invoked | ||
within the asyncio event loop. | ||
The session object passed is a traditional orm.Session object with | ||
synchronous interface. | ||
https://docs.sqlalchemy.org/en/20/_modules/asyncio/greenlet_orm.html | ||
""" | ||
meta = sa.MetaData() | ||
reflected_table = sa.Table("testdrive", meta, autoload_with=session) | ||
print("Table information:") | ||
print(f"Table: {reflected_table}") | ||
print(f"Columns: {reflected_table.columns}") | ||
print(f"Constraints: {reflected_table.constraints}") | ||
print(f"Primary key: {reflected_table.primary_key}") | ||
|
||
return await self.conn_run_sync(reflect) | ||
|
||
@staticmethod | ||
def trace(): | ||
""" | ||
Trace execution flow through SQLAlchemy. | ||
pip install hunter | ||
""" | ||
from hunter import Q, trace | ||
|
||
constraint = Q(module_startswith="sqlalchemy") | ||
trace(constraint) | ||
|
||
|
||
async def run_example(dsn: str): | ||
example = AsynchronousTableExample(dsn) | ||
|
||
# Run a basic conversation. | ||
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`. | ||
result = await example.run() | ||
print(result) | ||
|
||
# Reflect the table schema. | ||
await example.reflect() | ||
|
||
|
||
def run_drivers(drivers: t.List[str]): | ||
for driver in drivers: | ||
if driver == "asyncpg": | ||
dsn = "crate+asyncpg://crate@localhost:5432/doc" | ||
elif driver == "psycopg": | ||
dsn = "crate+psycopg://crate@localhost:5432/doc" | ||
else: | ||
raise ValueError(f"Unknown driver: {driver}") | ||
|
||
asyncio.run(run_example(dsn)) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
drivers = sys.argv[1:] | ||
if not drivers: | ||
raise ValueError("Please select driver") | ||
run_drivers(drivers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
click<9 | ||
colorlog<7 | ||
crate[sqlalchemy] | ||
dask==2023.12.1 | ||
pandas<2.2 | ||
sqlalchemy>=2,<2.1 | ||
sqlalchemy-cratedb[all] @ git+https://github.com/crate-workbench/sqlalchemy-cratedb@amo/postgresql-async |
Oops, something went wrong.