From 908b90835cb8aadb9fb361d0e6279d0b9f19f0d2 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 11 Sep 2023 06:04:15 -0500 Subject: [PATCH 1/5] Change isolation level on upsert transcation for receipts_graph to READ_COMMITTED from REPEATABLE_READ --- synapse/storage/databases/main/receipts.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index e4d10ff250d1..3a455f011b05 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -804,6 +804,9 @@ async def insert_receipt( event_ids, thread_id, data, + # Use READ_COMMITTED to avoid 'could not serialize access due to concurrent + # update' Postgres errors which lead to rollbacks and re-dos. + isolation_level=IsolationLevel.READ_COMMITTED, ) max_persisted_id = self._receipts_id_gen.get_current_token() From 5d3e1ebafc2107222bcad15383ee99ee9e9051eb Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 11 Sep 2023 06:13:58 -0500 Subject: [PATCH 2/5] changelog --- changelog.d/16299.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16299.misc diff --git a/changelog.d/16299.misc b/changelog.d/16299.misc new file mode 100644 index 000000000000..b9dce360dd2e --- /dev/null +++ b/changelog.d/16299.misc @@ -0,0 +1 @@ +Use a stricter isolation level on `receipts_graph` Postgres transactions to stop error messages. From 07234cbb4186cf868e5ec5b8d6c22bb7621fe331 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Tue, 12 Sep 2023 05:56:02 -0500 Subject: [PATCH 3/5] Allow passing where_clause through a simple_upsert into simple_upsert_txn --- synapse/storage/database.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 6c5fcdcec37d..697bc5651c91 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1193,6 +1193,7 @@ async def simple_upsert( keyvalues: Dict[str, Any], values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, + where_clause: Optional[str] = None, desc: str = "simple_upsert", ) -> bool: """Insert a row with values + insertion_values; on conflict, update with values. @@ -1243,6 +1244,7 @@ async def simple_upsert( keyvalues: The unique key columns and their new values values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting + where_clause: An index predicate to apply to the upsert. desc: description of the transaction, for logging and metrics Returns: Returns True if a row was inserted or updated (i.e. if `values` is @@ -1263,6 +1265,7 @@ async def simple_upsert( keyvalues, values, insertion_values, + where_clause, db_autocommit=autocommit, ) except self.engine.module.IntegrityError as e: From 49648153ed35d026e72d9288f016885c93f7a70c Mon Sep 17 00:00:00 2001 From: Jason Little Date: Tue, 12 Sep 2023 06:55:10 -0500 Subject: [PATCH 4/5] Mutate a runInteraction->simple_upsert_txn into a simple_upsert for _insert_graph_receipt --- synapse/storage/databases/main/receipts.py | 26 ++++++++-------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3a455f011b05..a074c439895e 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -795,27 +795,21 @@ async def insert_receipt( now - event_ts, ) - await self.db_pool.runInteraction( - "insert_graph_receipt", - self._insert_graph_receipt_txn, + await self._insert_graph_receipt( room_id, receipt_type, user_id, event_ids, thread_id, data, - # Use READ_COMMITTED to avoid 'could not serialize access due to concurrent - # update' Postgres errors which lead to rollbacks and re-dos. - isolation_level=IsolationLevel.READ_COMMITTED, ) max_persisted_id = self._receipts_id_gen.get_current_token() return stream_id, max_persisted_id - def _insert_graph_receipt_txn( + async def _insert_graph_receipt( self, - txn: LoggingTransaction, room_id: str, receipt_type: str, user_id: str, @@ -825,13 +819,6 @@ def _insert_graph_receipt_txn( ) -> None: assert self._can_write_to_receipts - txn.call_after( - self._get_receipts_for_user_with_orderings.invalidate, - (user_id, receipt_type), - ) - # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) - keyvalues = { "room_id": room_id, "receipt_type": receipt_type, @@ -843,8 +830,8 @@ def _insert_graph_receipt_txn( else: keyvalues["thread_id"] = thread_id - self.db_pool.simple_upsert_txn( - txn, + await self.db_pool.simple_upsert( + desc="insert_graph_receipt", table="receipts_graph", keyvalues=keyvalues, values={ @@ -854,6 +841,11 @@ def _insert_graph_receipt_txn( where_clause=where_clause, ) + self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type)) + + # FIXME: This shouldn't invalidate the whole cache + self._get_linearized_receipts_for_room.invalidate((room_id,)) + class ReceiptsBackgroundUpdateStore(SQLBaseStore): POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering" From 5d85f66028c5ab555896f9504439aff12cc7058e Mon Sep 17 00:00:00 2001 From: Jason Little Date: Thu, 14 Sep 2023 05:53:01 -0500 Subject: [PATCH 5/5] Update changelog --- changelog.d/16299.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/16299.misc b/changelog.d/16299.misc index b9dce360dd2e..d4546691518d 100644 --- a/changelog.d/16299.misc +++ b/changelog.d/16299.misc @@ -1 +1 @@ -Use a stricter isolation level on `receipts_graph` Postgres transactions to stop error messages. +Refactor `receipts_graph` Postgres transactions to stop error messages.