Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert _insert_graph_receipts_txn to simple_upsert #16299

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16299.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use a stricter isolation level on `receipts_graph` Postgres transactions to stop error messages.
3 changes: 3 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,7 @@ async def simple_upsert(
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
where_clause: Optional[str] = None,
desc: str = "simple_upsert",
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.
Expand Down Expand Up @@ -1243,6 +1244,7 @@ async def simple_upsert(
keyvalues: The unique key columns and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
desc: description of the transaction, for logging and metrics
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
Expand All @@ -1263,6 +1265,7 @@ async def simple_upsert(
keyvalues,
values,
insertion_values,
where_clause,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
Expand Down
23 changes: 9 additions & 14 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,7 @@ async def insert_receipt(
now - event_ts,
)

await self.db_pool.runInteraction(
"insert_graph_receipt",
self._insert_graph_receipt_txn,
await self._insert_graph_receipt(
room_id,
receipt_type,
user_id,
Expand All @@ -810,9 +808,8 @@ async def insert_receipt(

return stream_id, max_persisted_id

def _insert_graph_receipt_txn(
async def _insert_graph_receipt(
self,
txn: LoggingTransaction,
room_id: str,
receipt_type: str,
user_id: str,
Expand All @@ -822,13 +819,6 @@ def _insert_graph_receipt_txn(
) -> None:
assert self._can_write_to_receipts

txn.call_after(
self._get_receipts_for_user_with_orderings.invalidate,
(user_id, receipt_type),
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
Expand All @@ -840,8 +830,8 @@ def _insert_graph_receipt_txn(
else:
keyvalues["thread_id"] = thread_id

self.db_pool.simple_upsert_txn(
txn,
await self.db_pool.simple_upsert(
clokep marked this conversation as resolved.
Show resolved Hide resolved
desc="insert_graph_receipt",
table="receipts_graph",
keyvalues=keyvalues,
values={
Expand All @@ -851,6 +841,11 @@ def _insert_graph_receipt_txn(
where_clause=where_clause,
)

self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))

# FIXME: This shouldn't invalidate the whole cache
self._get_linearized_receipts_for_room.invalidate((room_id,))


class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
Expand Down
Loading