diff --git a/docs/spanner/advanced-session-pool-topics.rst b/docs/spanner/advanced-session-pool-topics.rst new file mode 100644 index 0000000000000..b8b4e8c9253a5 --- /dev/null +++ b/docs/spanner/advanced-session-pool-topics.rst @@ -0,0 +1,98 @@ +Advanced Session Pool Topics +============================ + +Custom Session Pool Implementations +----------------------------------- + +You can supply your own pool implementation, which must satisfy the +contract laid out in +:class:`~google.cloud.spanner.pool.AbstractSessionPool`: + +.. code-block:: python + + from google.cloud.spanner.pool import AbstractSessionPool + + class MyCustomPool(AbstractSessionPool): + + def __init__(self, custom_param): + super(MyCustomPool, self).__init__() + self.custom_param = custom_param + + def bind(self, database): + ... + + def get(self, read_only=False): + ... + + def put(self, session, discard_if_full=True): + ... + + pool = MyCustomPool(custom_param=42) + database = instance.database(DATABASE_NAME, pool=pool) + +Lowering latency for read / query operations +-------------------------------------------- + +Some applications may need to minimize latency for read operations, including +particularly the overhead of making an API request to create or refresh a +session. :class:`~google.cloud.spanner.pool.PingingPool` is designed for such +applications, which need to configure a background thread to do the work of +keeping the sessions fresh. + +Create an instance of :class:`~google.cloud.spanner.pool.PingingPool`: + +.. code-block:: python + + from google.cloud.spanner import Client + from google.cloud.spanner.pool import PingingPool + + client = Client() + instance = client.instance(INSTANCE_NAME) + pool = PingingPool(size=10, default_timeout=5, ping_interval=300) + database = instance.database(DATABASE_NAME, pool=pool) + +Set up a background thread to ping the pool's session, keeping them +from becoming stale: + +.. code-block:: python + + import threading + + background = threading.Thread(target=pool.ping, name='ping-pool') + background.daemon = True + background.start() + +Lowering latency for mixed read-write operations +------------------------------------------------ + +Some applications may need to minimize latency for read write operations, +including particularly the overhead of making an API request to create or +refresh a session or to begin a session's transaction. +:class:`~google.cloud.spanner.pool.TransactionPingingPool` is designed for +such applications, which need to configure a background thread to do the work +of keeping the sessions fresh and starting their transactions after use. + +Create an instance of +:class:`~google.cloud.spanner.pool.TransactionPingingPool`: + +.. code-block:: python + + from google.cloud.spanner import Client + from google.cloud.spanner.pool import TransactionPingingPool + + client = Client() + instance = client.instance(INSTANCE_NAME) + pool = TransactionPingingPool(size=10, default_timeout=5, ping_interval=300) + database = instance.database(DATABASE_NAME, pool=pool) + +Set up a background thread to ping the pool's session, keeping them +from becoming stale, and ensuring that each session has a new transaction +started before it is used: + +.. code-block:: python + + import threading + + background = threading.Thread(target=pool.ping, name='ping-pool') + background.daemon = True + background.start() diff --git a/docs/spanner/database-usage.rst b/docs/spanner/database-usage.rst index aecd1ab12ccc3..529010c8443fc 100644 --- a/docs/spanner/database-usage.rst +++ b/docs/spanner/database-usage.rst @@ -117,8 +117,141 @@ method: :meth:`~google.cloud.spanner.instance.Operation.finished` will result in an :exc`ValueError` being raised. +Non-Admin Database Usage +======================== -Next Step ---------- +Use a Snapshot to Read / Query the Database +------------------------------------------- -Next, learn about :doc:`session-crud-usage`. +A snapshot represents a read-only point-in-time view of the database. + +Calling :meth:`~google.cloud.spanner.database.Database.snapshot` with +no arguments creates a snapshot with strong concurrency: + +.. code:: python + + with database.snapshot() as snapshot: + do_something_with(snapshot) + +See :class:`~google.cloud.spanner.snapshot.Snapshot` for the other options +which can be passed. + +.. note:: + + :meth:`~google.cloud.spanner.database.Database.snapshot` returns an + object intended to be used as a Python context manager (i.e., as the + target of a ``with`` statement). Use the instance, and any result + sets returned by its ``read`` or ``execute_sql`` methods, only inside + the block created by the ``with`` statement. + +See :doc:`snapshot-usage` for more complete examples of snapshot usage. + +Use a Batch to Modify Rows in the Database +------------------------------------------ + +A batch represents a bundled set of insert/upsert/update/delete operations +on the rows of tables in the database. + +.. code:: python + + with database.batch() as batch: + batch.insert_or_update(table, columns, rows) + batch.delete(table, keyset_to_delete) + +.. note:: + + :meth:`~google.cloud.spanner.database.Database.batch` returns an + object intended to be used as a Python context manager (i.e., as the + target of a ``with`` statement). It applies any changes made inside + the block of its ``with`` statement when exiting the block, unless an + exception is raised within the block. Use the batch only inside + the block created by the ``with`` statement. + +See :doc:`batch-usage` for more complete examples of batch usage. + +Use a Transaction to Query / Modify Rows in the Database +-------------------------------------------------------- + +A transaction represents the union of a "strong" snapshot and a batch: +it allows ``read`` and ``execute_sql`` operations, and accumulates +insert/upsert/update/delete operations. + +Because other applications may be performing concurrent updates which +would invalidate the reads / queries, the work done by a transaction needs +to be bundled as a retryable "unit of work" function, which takes the +transaction as a required argument: + +.. code:: python + + def unit_of_work(transaction): + result = transaction.execute_sql(QUERY) + + for emp_id, hours, pay in _compute_pay(result): + transaction.insert_or_update( + table='monthly_hours', + columns=['employee_id', 'month', 'hours', 'pay'], + values=[emp_id, month_start, hours, pay]) + + database.run_in_transaction(unit_of_work) + +.. note:: + + :meth:`~google.cloud.spanner.database.Database.run_in_transaction` + commits the transaction automatically if the "unit of work" function + returns without raising an exception. + +.. note:: + + :meth:`~google.cloud.spanner.database.Database.run_in_transaction` + retries the "unit of work" function if the read / query operatoins + or the commit are aborted due to concurrent updates + +See :doc:`transaction-usage` for more complete examples of transaction usage. + +Configuring a session pool for a database +----------------------------------------- + +Under the covers, the ``snapshot``, ``batch``, and ``run_in_transaction`` +methods use a pool of :class:`~google.cloud.spanner.session.Session` objects +to manage their communication with the back-end. You can configure +one of the pools manually to control the number of sessions, timeouts, etc., +and then passing it to the :class:`~google.cloud.spanner.database.Database` +constructor: + +.. code-block:: python + + from google.cloud.spanner import Client + from google.cloud.spanner import FixedSizePool + client = Client() + instance = client.instance(INSTANCE_NAME) + pool = FixedSizePool(size=10, default_timeout=5) + database = instanc.database(DATABASE_NAME, pool=pool) + +Note that creating a database with a pool may presume that its database +already exists, as it may need to pre-create sessions (rather than creating +them on demand, as the default implementation does). + +You can supply your own pool implementation, which must satisfy the +contract laid out in :class:`~google.cloud.spanner.pool.AbstractSessionPool`: + +.. code-block:: python + + from google.cloud.pool import AbstractSessionPool + + class MyCustomPool(AbstractSessionPool): + + def __init__(self, database, custom_param): + super(MyCustomPool, self).__init__(database) + self.custom_param = custom_param + + def get(self, read_only=False): + ... + + def put(self, session, discard_if_full=True): + ... + + database = instance.database(DATABASE_NAME, pool=pool) + pool = MyCustomPool(database, custom_param=42) + +See :doc:`advanced-session-pool-topics` for more advanced coverage of +session pools. diff --git a/docs/spanner/session-crud-usage.rst b/docs/spanner/session-crud-usage.rst deleted file mode 100644 index e0734bee10665..0000000000000 --- a/docs/spanner/session-crud-usage.rst +++ /dev/null @@ -1,80 +0,0 @@ -Session Creation / Deletion -=========================== - -Outside of the admin APIs, all work with actual table data in a database -occurs in the context of a session. - - -Session Factory ---------------- - -To create a :class:`~google.cloud.spanner.session.Session` object: - -.. code:: python - - session = database.session() - - -Create a new Session --------------------- - -After creating the session object, use its -:meth:`~google.cloud.spanner.session.Session.create` method to -trigger its creation on the server: - -.. code:: python - - session.create() - - -Test for the existence of a Session ------------------------------------ - -After creating the session object, use its -:meth:`~google.cloud.spanner.session.Session.exists` method to determine -whether the session still exists on the server: - -.. code:: python - - assert session.exists() - - -Delete a Session ----------------- - -Once done with the session object, use its -:meth:`~google.cloud.spanner.session.Session.delete` method to free up -its resources on the server: - -.. code:: python - - session.delete() - - -Using a Session as a Context Manager ------------------------------------- - -Rather than calling the Session's -:meth:`~google.cloud.spanner.session.Session.create` and -:meth:`~google.cloud.spanner.session.Session.delete` methods directly, -you can use the session as a Python context manager: - -.. code:: python - - with database.session() as session: - - assert session.exists() - # perform session operations here - -.. note:: - - At the beginning of the ``with`` block, the session's - :meth:`~google.cloud.spanner.session.Session.create` method is called. - At the end of the ``with`` block, the session's - :meth:`~google.cloud.spanner.session.Session.delete` method is called. - - -Next Step ---------- - -Next, learn about :doc:`session-implicit-txn-usage`. diff --git a/docs/spanner/session-implicit-txn-usage.rst b/docs/spanner/session-implicit-txn-usage.rst deleted file mode 100644 index 5c7d3025f5662..0000000000000 --- a/docs/spanner/session-implicit-txn-usage.rst +++ /dev/null @@ -1,54 +0,0 @@ -Implicit Transactions -##################### - -The following operations on a session to not require creating an explicit -:class:`~google.cloud.spanner.snapshot.Snapshot` or -:class:`~google.cloud.spanner.transaction.Transaction`. - - -Read Table Data ---------------- - -Read data for selected rows from a table in the session's database. Calls -the ``Read`` API, which returns all rows specified in ``key_set``, or else -fails if the result set is too large, - -.. code:: python - - result = session.read( - table='table-name', columns=['first_name', 'last_name', 'age'], - key_set=['phred@example.com', 'bharney@example.com']) - - for row in result.rows: - print(row) - -.. note:: - - If streaming a chunk fails due to a "resumable" error, - :meth:`Session.read` retries the ``StreamingRead`` API reqeust, - passing the ``resume_token`` from the last partial result streamed. - - -Execute a SQL Select Statement ------------------------------- - -Read data from a query against tables in the session's database. Calls -the ``ExecuteSql`` API, which returns all rows matching the query, or else -fails if the result set is too large, - -.. code:: python - - QUERY = ( - 'SELECT e.first_name, e.last_name, p.telephone ' - 'FROM employees as e, phones as p ' - 'WHERE p.employee_id == e.employee_id') - result = session.execute_sql(QUERY) - - for row in result.rows: - print(row) - - -Next Step ---------- - -Next, learn about :doc:`batch-usage`. diff --git a/docs/spanner/session-pool-usage.rst b/docs/spanner/session-pool-usage.rst deleted file mode 100644 index 883bb6d720b2f..0000000000000 --- a/docs/spanner/session-pool-usage.rst +++ /dev/null @@ -1,198 +0,0 @@ -Session Pools -############# - -In order to minimize the latency of session creation, you can set up a -session pool on your database. For instance, to use a pool which does *not* -block when exhausted, and which pings each session at checkout: - -Configuring a session pool for a database ------------------------------------------ - -.. code-block:: python - - from google.cloud.spanner import Client - from google.cloud.spanner import FixedSizePool - client = Client() - instance = client.instance(INSTANCE_NAME) - database = instance.database(DATABASE_NAME) - pool = FixedSizePool(database, size=10, default_timeout=5) - -Note that creating the pool presumes that its database already exists, as -it may need to pre-create sessions (rather than creating them on demand). - -You can supply your own pool implementation, which must satisfy the -contract laid out in -:class:`~google.cloud.spanner.session.AbstractSessionPool`: - -.. code-block:: python - - from google.cloud.spanner import AbstractSessionPool - - class MyCustomPool(AbstractSessionPool): - - def __init__(self, database, custom_param): - super(MyCustomPool, self).__init__(database) - self.custom_param = custom_param - - def get(self, read_only=False): - ... - - def put(self, session, discard_if_full=True): - ... - - database = instance.database(DATABASE_NAME, pool=pool) - pool = MyCustomPool(database, custom_param=42) - - -Checking out sessions from the pool ------------------------------------ - -No matter what kind of pool you create for the database, you can check out -a session from the pool, rather than creating it manually. The -:meth:`~google.cloud.spanner.session.AbstractSessionPool.session` method -returns an object designed to be used as a context manager, checking the -session out from the pool and returning it automatically: - -.. code-block:: python - - with pool.session() as session: - - snapshot = session.snapshot() - - result = snapshot.read( - table='table-name', columns=['first_name', 'last_name', 'age'], - key_set=['phred@example.com', 'bharney@example.com']) - - for row in result.rows: - print(row) - -Some pool implementations may allow additional keyword arguments when checked -out: - -.. code-block:: python - - with pool.session(read_only=True) as session: - - snapshot = session.snapshot() - - result = snapshot.read( - table='table-name', columns=['first_name', 'last_name', 'age'], - key_set=['phred@example.com', 'bharney@example.com']) - - for row in result.rows: - print(row) - - -Lowering latency for read / query operations --------------------------------------------- - -Some applications may need to minimize latency for read operations, including -particularly the overhead of making an API request to create or refresh a -session. :class:`~google.cloud.spanner.pool.PingingPool` is designed for such -applications, which need to configure a background thread to do the work of -keeping the sessions fresh. - -Create an instance of :class:`~google.cloud.spanner.pool.PingingPool`: - -.. code-block:: python - - from google.cloud.spanner import Client - from google.cloud.spanner import PingingPool - - client = Client() - instance = client.instance(INSTANCE_NAME) - pool = PingingPool(size=10, default_timeout=5, ping_interval=300) - database = instance.database(DATABASE_NAME, pool=pool) - -Set up a background thread to ping the pool's session, keeping them -from becoming stale: - -.. code-block:: python - - import threading - - background = threading.Thread(target=pool.ping, name='ping-pool') - background.daemon = True - background.start() - -``database.execute_sql()`` is a shortcut, which checks out a session, creates a -snapshot, and uses the snapshot to execute a query: - -.. code-block:: python - - QUERY = """\ - SELECT first_name, last_name, age FROM table-name - WHERE email in ["phred@example.com", "bharney@example.com"] - """ - result = database.execute_sql(QUERY) - - for row in result: - do_something_with(row) - - -Lowering latency for mixed read-write operations ------------------------------------------------- - -Some applications may need to minimize latency for read write operations, -including particularly the overhead of making an API request to create or -refresh a session or to begin a session's transaction. -:class:`~google.cloud.spanner.pool.TransactionPingingPool` is designed for -such applications, which need to configure a background thread to do the work -of keeping the sessions fresh and starting their transactions after use. - -Create an instance of -:class:`~google.cloud.spanner.pool.TransactionPingingPool`: - -.. code-block:: python - - from google.cloud.spanner import Client - from google.cloud.spanner import TransactionPingingPool - - client = Client() - instance = client.instance(INSTANCE_NAME) - pool = TransactionPingingPool(size=10, default_timeout=5, ping_interval=300) - database = instance.database(DATABASE_NAME, pool=pool) - -Set up a background thread to ping the pool's session, keeping them -from becoming stale, and ensuring that each session has a new transaction -started before it is used: - -.. code-block:: python - - import threading - - background = threading.Thread(target=pool.ping, name='ping-pool') - background.daemon = True - background.start() - -``database.run_in_transaction()`` is a shortcut: it checks out a session -and uses it to perform a set of read and write operations inside the context -of a transaction, retrying if aborted. The application must supply a callback -function, which is passed a transaction (plus any additional parameters -passed), and does its work using that transaction. - -.. code-block:: python - - import datetime - - QUERY = """\ - SELECT employee_id, sum(hours) FROM daily_hours - WHERE start_date >= %s AND end_date < %s - GROUP BY employee_id id ORDER BY employee_id id""" - - def unit_of_work(transaction, month_start, month_end): - """Compute rolled-up hours for a given month.""" - query = QUERY % (month_start.isoformat(), - (month_end + datetime.timedelta(1)).isoformat()) - row_iter = transaction.execute_sql(query) - - for emp_id, hours, pay in _compute_pay(row_iter): - transaction.insert_or_update( - table='monthly_hours', - columns=['employee_id', 'month', 'hours', 'pay'], - values=[emp_id, month_start, hours, pay]) - - database.run_in_transaction( - unit_of_work, - month_start=datetime.date(2016, 12, 1), - month_end.date(2016, 12, 31)) diff --git a/docs/spanner/usage.rst b/docs/spanner/usage.rst index 2d61fbaed9c7f..762ec3894b034 100644 --- a/docs/spanner/usage.rst +++ b/docs/spanner/usage.rst @@ -8,12 +8,10 @@ Spanner client-usage instance-usage database-usage - session-crud-usage - session-implicit-txn-usage - session-pool-usage batch-usage snapshot-usage transaction-usage + advanced-session-pool-topics client-api instance-api diff --git a/spanner/google/cloud/spanner/database.py b/spanner/google/cloud/spanner/database.py index 40dcc471d1c4e..728acadc61373 100644 --- a/spanner/google/cloud/spanner/database.py +++ b/spanner/google/cloud/spanner/database.py @@ -315,6 +315,36 @@ def session(self): """ return Session(self) + def snapshot(self, **kw): + """Return an object which wraps a snapshot. + + The wrapper *must* be used as a context manager, with the snapshot + as the value returned by the wrapper. + + See + https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly + + :type kw: dict + :param kw: + Passed through to + :class:`~google.cloud.spanner.snapshot.Snapshot` constructor. + + :rtype: :class:`~google.cloud.spanner.database.SnapshotCheckout` + :returns: new wrapper + """ + return SnapshotCheckout(self, **kw) + + def batch(self): + """Return an object which wraps a batch. + + The wrapper *must* be used as a context manager, with the batch + as the value returned by the wrapper. + + :rtype: :class:`~google.cloud.spanner.database.BatchCheckout` + :returns: new wrapper + """ + return BatchCheckout(self) + def run_in_transaction(self, func, *args, **kw): """Perform a unit of work in a transaction, retrying on abort. @@ -349,36 +379,6 @@ def run_in_transaction(self, func, *args, **kw): finally: self._local.transaction_running = False - def batch(self): - """Return an object which wraps a batch. - - The wrapper *must* be used as a context manager, with the batch - as the value returned by the wrapper. - - :rtype: :class:`~google.cloud.spanner.database.BatchCheckout` - :returns: new wrapper - """ - return BatchCheckout(self) - - def snapshot(self, **kw): - """Return an object which wraps a snapshot. - - The wrapper *must* be used as a context manager, with the snapshot - as the value returned by the wrapper. - - See - https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly - - :type kw: dict - :param kw: - Passed through to - :class:`~google.cloud.spanner.snapshot.Snapshot` constructor. - - :rtype: :class:`~google.cloud.spanner.database.SnapshotCheckout` - :returns: new wrapper - """ - return SnapshotCheckout(self, **kw) - class BatchCheckout(object): """Context manager for using a batch from a database. diff --git a/spanner/tests/unit/test_database.py b/spanner/tests/unit/test_database.py index c812176499dd2..851fec4a2175a 100644 --- a/spanner/tests/unit/test_database.py +++ b/spanner/tests/unit/test_database.py @@ -621,6 +621,55 @@ def test_session_factory(self): self.assertIs(session.session_id, None) self.assertIs(session._database, database) + def test_snapshot_defaults(self): + from google.cloud.spanner.database import SnapshotCheckout + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + pool = _Pool() + session = _Session() + pool.put(session) + database = self._make_one(self.DATABASE_ID, instance, pool=pool) + + checkout = database.snapshot() + self.assertIsInstance(checkout, SnapshotCheckout) + self.assertIs(checkout._database, database) + self.assertEqual(checkout._kw, {}) + + def test_snapshot_w_read_timestamp_and_multi_use(self): + import datetime + from google.cloud._helpers import UTC + from google.cloud.spanner.database import SnapshotCheckout + + now = datetime.datetime.utcnow().replace(tzinfo=UTC) + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + pool = _Pool() + session = _Session() + pool.put(session) + database = self._make_one(self.DATABASE_ID, instance, pool=pool) + + checkout = database.snapshot(read_timestamp=now, multi_use=True) + + self.assertIsInstance(checkout, SnapshotCheckout) + self.assertIs(checkout._database, database) + self.assertEqual( + checkout._kw, {'read_timestamp': now, 'multi_use': True}) + + def test_batch(self): + from google.cloud.spanner.database import BatchCheckout + + client = _Client() + instance = _Instance(self.INSTANCE_NAME, client=client) + pool = _Pool() + session = _Session() + pool.put(session) + database = self._make_one(self.DATABASE_ID, instance, pool=pool) + + checkout = database.batch() + self.assertIsInstance(checkout, BatchCheckout) + self.assertIs(checkout._database, database) + def test_run_in_transaction_wo_args(self): import datetime @@ -686,55 +735,6 @@ def nested_unit_of_work(): database.run_in_transaction(nested_unit_of_work) self.assertEqual(inner.call_count, 0) - def test_batch(self): - from google.cloud.spanner.database import BatchCheckout - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - checkout = database.batch() - self.assertIsInstance(checkout, BatchCheckout) - self.assertIs(checkout._database, database) - - def test_snapshot_defaults(self): - from google.cloud.spanner.database import SnapshotCheckout - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - checkout = database.snapshot() - self.assertIsInstance(checkout, SnapshotCheckout) - self.assertIs(checkout._database, database) - self.assertEqual(checkout._kw, {}) - - def test_snapshot_w_read_timestamp_and_multi_use(self): - import datetime - from google.cloud._helpers import UTC - from google.cloud.spanner.database import SnapshotCheckout - - now = datetime.datetime.utcnow().replace(tzinfo=UTC) - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - pool = _Pool() - session = _Session() - pool.put(session) - database = self._make_one(self.DATABASE_ID, instance, pool=pool) - - checkout = database.snapshot(read_timestamp=now, multi_use=True) - - self.assertIsInstance(checkout, SnapshotCheckout) - self.assertIs(checkout._database, database) - self.assertEqual( - checkout._kw, {'read_timestamp': now, 'multi_use': True}) - class TestBatchCheckout(_BaseTest):