From 709b7363fe13705f37389cce3ab21f0285f359c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sun, 1 Sep 2024 11:25:39 +0100 Subject: [PATCH] Sliding sync: use new DB tables (#17630) Based on https://github.com/element-hq/synapse/pull/17629 Utilizing the new sliding sync tables added in https://github.com/element-hq/synapse/pull/17512 for fast acquisition of rooms for the user and filtering/sorting. --------- Co-authored-by: Eric Eastwood --- changelog.d/17630.misc | 1 + synapse/handlers/sliding_sync/room_lists.py | 562 +++++++++++++++++- synapse/storage/_base.py | 4 + synapse/storage/background_updates.py | 21 +- synapse/storage/databases/main/cache.py | 4 + .../databases/main/events_bg_updates.py | 11 + synapse/storage/databases/main/roommember.py | 55 +- synapse/storage/roommember.py | 13 + .../sliding_sync/test_connection_tracking.py | 16 +- .../test_extension_account_data.py | 16 + .../sliding_sync/test_extension_e2ee.py | 16 + .../sliding_sync/test_extension_receipts.py | 16 + .../sliding_sync/test_extension_to_device.py | 15 + .../sliding_sync/test_extension_typing.py | 16 + .../client/sliding_sync/test_extensions.py | 16 +- .../sliding_sync/test_room_subscriptions.py | 16 + .../client/sliding_sync/test_rooms_invites.py | 16 + .../client/sliding_sync/test_rooms_meta.py | 16 + .../sliding_sync/test_rooms_required_state.py | 16 +- .../sliding_sync/test_rooms_timeline.py | 16 + .../client/sliding_sync/test_sliding_sync.py | 33 + 21 files changed, 877 insertions(+), 18 deletions(-) create mode 100644 changelog.d/17630.misc diff --git a/changelog.d/17630.misc b/changelog.d/17630.misc new file mode 100644 index 00000000000..ed1bf6bd558 --- /dev/null +++ b/changelog.d/17630.misc @@ -0,0 +1 @@ +Use new database tables for sliding sync. diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index f41180fedab..12b7958c6f6 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -18,6 +18,7 @@ from itertools import chain from typing import ( TYPE_CHECKING, + AbstractSet, Any, Dict, List, @@ -47,7 +48,7 @@ Sentinel as StateSentinel, ) from synapse.storage.databases.main.stream import CurrentStateDeltaMembership -from synapse.storage.roommember import RoomsForUser +from synapse.storage.roommember import RoomsForUser, RoomsForUserSlidingSync from synapse.types import ( MutableStateMap, PersistedEventPosition, @@ -143,7 +144,10 @@ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": def filter_membership_for_sync( - *, user_id: str, room_membership_for_user: _RoomMembershipForUser + *, + user_id: str, + room_membership_for_user: Union[_RoomMembershipForUser, RoomsForUserSlidingSync], + newly_left: bool, ) -> bool: """ Returns True if the membership event should be included in the sync response, @@ -156,7 +160,6 @@ def filter_membership_for_sync( membership = room_membership_for_user.membership sender = room_membership_for_user.sender - newly_left = room_membership_for_user.newly_left # We want to allow everything except rooms the user has left unless `newly_left` # because we want everything that's *still* relevant to the user. We include @@ -198,6 +201,310 @@ async def compute_interested_rooms( ) -> SlidingSyncInterestedRooms: """Fetch the set of rooms that match the request""" + if await self.store.have_finished_sliding_sync_background_jobs(): + return await self._compute_interested_rooms_new_tables( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + else: + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) + return await self._compute_interested_rooms_fallback( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + + @trace + async def _compute_interested_rooms_new_tables( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Implementation of `compute_interested_rooms` using new sliding sync db tables.""" + user_id = sync_config.user.to_string() + + # Assemble sliding window lists + lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + # Keep track of the rooms that we can display and need to fetch more info about + relevant_room_map: Dict[str, RoomSyncConfig] = {} + # The set of room IDs of all rooms that could appear in any list. These + # include rooms that are outside the list ranges. + all_rooms: Set[str] = set() + + room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user( + user_id + ) + + changes = await self._get_rewind_changes_to_current_membership_to_token( + sync_config.user, room_membership_for_user_map, to_token=to_token + ) + if changes: + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id, change in changes.items(): + if change is None: + # Remove rooms that the user joined after the `to_token` + room_membership_for_user_map.pop(room_id) + continue + + existing_room = room_membership_for_user_map.get(room_id) + if existing_room is not None: + # Update room membership events to the point in time of the `to_token` + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the current state of the room though + room_type=existing_room.room_type, + is_encrypted=existing_room.is_encrypted, + ) + else: + # This can happen if we get "state reset" out of the room + # after the `to_token`. In other words, there is no membership + # for the room after the `to_token` but we see membership in + # the token range. + + # Get the state at the time. Note that room type never changes, + # so we can just get current room type + room_type = await self.store.get_room_type(room_id) + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, to_token.room_key + ) + + # Add back rooms that the user was state-reset out of after `to_token` + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + room_type=room_type, + is_encrypted=is_encrypted, + ) + + newly_joined_room_ids, newly_left_room_map = ( + await self._get_newly_joined_and_left_rooms( + user_id, from_token=from_token, to_token=to_token + ) + ) + dm_room_ids = await self._get_dm_rooms_for_user(user_id) + + # Handle state resets in the from -> to token range. + state_reset_rooms = ( + newly_left_room_map.keys() - room_membership_for_user_map.keys() + ) + if state_reset_rooms: + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id in ( + newly_left_room_map.keys() - room_membership_for_user_map.keys() + ): + # Get the state at the time. Note that room type never changes, + # so we can just get current room type + room_type = await self.store.get_room_type(room_id) + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, newly_left_room_map[room_id].to_room_stream_token() + ) + + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=None, + membership=Membership.LEAVE, + event_id=None, + event_pos=newly_left_room_map[room_id], + room_version_id=await self.store.get_room_version_id(room_id), + room_type=room_type, + is_encrypted=is_encrypted, + ) + + if sync_config.lists: + sync_room_map = { + room_id: room_membership_for_user + for room_id, room_membership_for_user in room_membership_for_user_map.items() + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_membership_for_user, + newly_left=room_id in newly_left_room_map, + ) + } + with start_active_span("assemble_sliding_window_lists"): + for list_key, list_config in sync_config.lists.items(): + # Apply filters + filtered_sync_room_map = sync_room_map + if list_config.filters is not None: + filtered_sync_room_map = await self.filter_rooms_using_tables( + user_id, + sync_room_map, + list_config.filters, + to_token, + dm_room_ids, + ) + + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = ( + await self.store.is_partial_state_room_batched( + filtered_sync_room_map.keys() + ) + ) + + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + filtered_sync_room_map = { + room_id: room + for room_id, room in filtered_sync_room_map.items() + if not partial_state_room_map.get(room_id) + } + + all_rooms.update(filtered_sync_room_map) + + # Sort the list + sorted_room_info = await self.sort_rooms_using_tables( + filtered_sync_room_map, to_token + ) + + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] + if list_config.ranges: + for range in list_config.ranges: + room_ids_in_list: List[str] = [] + + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. + # + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_membership in sorted_room_info[range[0] :]: + room_id = room_membership.room_id + + if len(room_ids_in_list) >= max_num_rooms: + break + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get( + room_id + ) + if existing_room_sync_config is not None: + room_sync_config = existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + + relevant_room_map[room_id] = room_sync_config + + room_ids_in_list.append(room_id) + + ops.append( + SlidingSyncResult.SlidingWindowList.Operation( + op=OperationType.SYNC, + range=range, + room_ids=room_ids_in_list, + ) + ) + + lists[list_key] = SlidingSyncResult.SlidingWindowList( + count=len(sorted_room_info), + ops=ops, + ) + + if sync_config.room_subscriptions: + with start_active_span("assemble_room_subscriptions"): + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = await self.store.is_partial_state_room_batched( + sync_config.room_subscriptions.keys() + ) + + for ( + room_id, + room_subscription, + ) in sync_config.room_subscriptions.items(): + if room_id not in room_membership_for_user_map: + # TODO: Handle rooms the user isn't in. + continue + + all_rooms.add(room_id) + + # Take the superset of the `RoomSyncConfig` for each room. + room_sync_config = RoomSyncConfig.from_room_config( + room_subscription + ) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + if partial_state_room_map.get(room_id): + continue + + all_rooms.add(room_id) + + # Update our `relevant_room_map` with the room we're going to display + # and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + room_sync_config = ( + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + ) + + relevant_room_map[room_id] = room_sync_config + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = await self._filter_relevant_room_to_send( + previous_connection_state, from_token, relevant_room_map + ) + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map={ + # FIXME: Ideally we wouldn't have to create a new + # `_RoomMembershipForUser` here and instead just return + # `newly_joined_room_ids` directly, to save CPU time. + room_id: _RoomMembershipForUser( + room_id=room_id, + event_id=membership_info.event_id, + event_pos=membership_info.event_pos, + sender=membership_info.sender, + membership=membership_info.membership, + newly_joined=room_id in newly_joined_room_ids, + newly_left=room_id in newly_left_room_map, + is_dm=room_id in dm_room_ids, + ) + for room_id, membership_info in room_membership_for_user_map.items() + }, + ) + + async def _compute_interested_rooms_fallback( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Fallback code when the database background updates haven't completed yet.""" + room_membership_for_user_map = ( await self.get_room_membership_for_user_at_to_token( sync_config.user, to_token, from_token @@ -239,7 +546,7 @@ async def compute_interested_rooms( ) # Since creating the `RoomSyncConfig` takes some work, let's just do it - # once and make a copy whenever we need it. + # once. room_sync_config = RoomSyncConfig.from_room_config(list_config) # Exclude partially-stated rooms if we must wait for the room to be @@ -359,6 +666,29 @@ async def compute_interested_rooms( relevant_room_map[room_id] = room_sync_config + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = await self._filter_relevant_room_to_send( + previous_connection_state, from_token, relevant_room_map + ) + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map=room_membership_for_user_map, + ) + + async def _filter_relevant_room_to_send( + self, + previous_connection_state: PerConnectionState, + from_token: Optional[StreamToken], + relevant_room_map: Dict[str, RoomSyncConfig], + ) -> Dict[str, RoomSyncConfig]: + """Filters the `relevant_room_map` down to those rooms that may have + updates we need to fetch and return.""" + # Filtered subset of `relevant_room_map` for rooms that may have updates # (in the event stream) relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map @@ -418,19 +748,13 @@ async def compute_interested_rooms( if room_id in rooms_should_send } - return SlidingSyncInterestedRooms( - lists=lists, - relevant_room_map=relevant_room_map, - relevant_rooms_to_send_map=relevant_rooms_to_send_map, - all_rooms=all_rooms, - room_membership_for_user_map=room_membership_for_user_map, - ) + return relevant_rooms_to_send_map @trace async def _get_rewind_changes_to_current_membership_to_token( self, user: UserID, - rooms_for_user: Mapping[str, RoomsForUser], + rooms_for_user: Mapping[str, Union[RoomsForUser, RoomsForUserSlidingSync]], to_token: StreamToken, ) -> Mapping[str, Optional[RoomsForUser]]: """ @@ -806,7 +1130,7 @@ async def _get_newly_joined_and_left_rooms( async def _get_dm_rooms_for_user( self, user_id: str, - ) -> StrCollection: + ) -> AbstractSet[str]: """Get the set of DM rooms for the user.""" # We're using global account data (`m.direct`) instead of checking for @@ -872,6 +1196,7 @@ async def filter_rooms_relevant_for_sync( if filter_membership_for_sync( user_id=user_id, room_membership_for_user=room_membership_for_user, + newly_left=room_membership_for_user.newly_left, ) } @@ -1364,6 +1689,174 @@ async def filter_rooms( # Assemble a new sync room map but only with the `filtered_room_id_set` return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + @trace + async def filter_rooms_using_tables( + self, + user_id: str, + sync_room_map: Mapping[str, RoomsForUserSlidingSync], + filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, + dm_room_ids: AbstractSet[str], + ) -> Dict[str, RoomsForUserSlidingSync]: + """ + Filter rooms based on the sync request. + + Args: + user: User to filter rooms for + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + filters: Filters to apply + to_token: We filter based on the state of the room at this token + dm_room_ids: Set of room IDs which are DMs + + Returns: + A filtered dictionary of room IDs along with membership information in the + room at the time of `to_token`. + """ + + filtered_room_id_set = set(sync_room_map.keys()) + + # Filter for Direct-Message (DM) rooms + if filters.is_dm is not None: + with start_active_span("filters.is_dm"): + if filters.is_dm: + # Intersect with the DM room set + filtered_room_id_set &= dm_room_ids + else: + # Remove DMs + filtered_room_id_set -= dm_room_ids + + if filters.spaces is not None: + with start_active_span("filters.spaces"): + raise NotImplementedError() + + # Filter for encrypted rooms + if filters.is_encrypted is not None: + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + if sync_room_map[room_id].is_encrypted == filters.is_encrypted + } + + # Filter for rooms that the user has been invited to + if filters.is_invite is not None: + with start_active_span("filters.is_invite"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_for_user = sync_room_map[room_id] + # If we're looking for invite rooms, filter out rooms that the user is + # not invited to and vice versa + if ( + filters.is_invite + and room_for_user.membership != Membership.INVITE + ) or ( + not filters.is_invite + and room_for_user.membership == Membership.INVITE + ): + filtered_room_id_set.remove(room_id) + + # Filter by room type (space vs room, etc). A room must match one of the types + # provided in the list. `None` is a valid type for rooms which do not have a + # room type. + if filters.room_types is not None or filters.not_room_types is not None: + with start_active_span("filters.room_types"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_type = sync_room_map[room_id].room_type + + if ( + filters.room_types is not None + and room_type not in filters.room_types + ): + filtered_room_id_set.remove(room_id) + + if ( + filters.not_room_types is not None + and room_type in filters.not_room_types + ): + filtered_room_id_set.remove(room_id) + + if filters.room_name_like is not None: + with start_active_span("filters.room_name_like"): + # TODO: The room name is a bit more sensitive to leak than the + # create/encryption event. Maybe we should consider a better way to fetch + # historical state before implementing this. + # + # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( + # content_type="room_name", + # room_ids=filtered_room_id_set, + # to_token=to_token, + # sync_room_map=sync_room_map, + # room_id_to_stripped_state_map=room_id_to_stripped_state_map, + # ) + raise NotImplementedError() + + if filters.tags is not None or filters.not_tags is not None: + with start_active_span("filters.tags"): + raise NotImplementedError() + + # Assemble a new sync room map but only with the `filtered_room_id_set` + return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + + @trace + async def sort_rooms_using_tables( + self, + sync_room_map: Mapping[str, RoomsForUserSlidingSync], + to_token: StreamToken, + ) -> List[RoomsForUserSlidingSync]: + """ + Sort by `stream_ordering` of the last event that the user should see in the + room. `stream_ordering` is unique so we get a stable sort. + + Args: + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + to_token: We sort based on the events in the room at this token (<= `to_token`) + + Returns: + A sorted list of room IDs by `stream_ordering` along with membership information. + """ + + # Assemble a map of room ID to the `stream_ordering` of the last activity that the + # user should see in the room (<= `to_token`) + last_activity_in_room_map: Dict[str, int] = {} + + for room_id, room_for_user in sync_room_map.items(): + if room_for_user.membership != Membership.JOIN: + # If the user has left/been invited/knocked/been banned from a + # room, they shouldn't see anything past that point. + # + # FIXME: It's possible that people should see beyond this point + # in invited/knocked cases if for example the room has + # `invite`/`world_readable` history visibility, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + + # For fully-joined rooms, we find the latest activity at/before the + # `to_token`. + joined_room_positions = ( + await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + [ + room_id + for room_id, room_for_user in sync_room_map.items() + if room_for_user.membership == Membership.JOIN + ], + to_token.room_key, + ) + ) + + last_activity_in_room_map.update(joined_room_positions) + + return sorted( + sync_room_map.values(), + # Sort by the last activity (stream_ordering) in the room + key=lambda room_info: last_activity_in_room_map[room_info.room_id], + # We want descending order + reverse=True, + ) + @trace async def sort_rooms( self, @@ -1420,3 +1913,46 @@ async def sort_rooms( # We want descending order reverse=True, ) + + async def get_is_encrypted_for_room_at_token( + self, room_id: str, to_token: RoomStreamToken + ) -> bool: + """Get if the room is encrypted at the time.""" + + # Fetch the current encryption state + state_ids = await self.store.get_partial_filtered_current_state_ids( + room_id, StateFilter.from_types([(EventTypes.RoomEncryption, "")]) + ) + encryption_event_id = state_ids.get((EventTypes.RoomEncryption, "")) + + # Now roll back the state by looking at the state deltas between + # to_token and now. + deltas = await self.store.get_current_state_deltas_for_room( + room_id, + from_token=to_token, + to_token=self.store.get_room_max_token(), + ) + + for delta in deltas: + if delta.event_type != EventTypes.RoomEncryption: + continue + + # Found the first change, we look at the previous event ID to get + # the state at the to token. + + if delta.prev_event_id is None: + # There is no prev event, so no encryption state event, so room is not encrypted + return False + + encryption_event_id = delta.prev_event_id + break + + # We didn't find an encryption state, room isn't encrypted + if encryption_event_id is None: + return False + + # We found encryption state, check if content has a non-null algorithm + encrypted_event = await self.store.get_event(encryption_event_id) + algorithm = encrypted_event.content.get(EventContentFields.ENCRYPTION_ALGORITHM) + + return algorithm is not None diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1ac85ad66d3..d22160b85c5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -126,6 +126,9 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache( "_get_rooms_for_local_user_where_membership_is_inner", (user_id,) ) + self._attempt_to_invalidate_cache( + "get_sliding_sync_rooms_for_user", (user_id,) + ) # Purge other caches based on room state. self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) @@ -160,6 +163,7 @@ def _invalidate_state_caches_all(self, room_id: str) -> None: self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_room_type", (room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (room_id,)) + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) def _attempt_to_invalidate_cache( self, cache_name: str, key: Optional[Collection[Any]] diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index f4732940702..efe42380360 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -44,7 +44,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine from synapse.storage.types import Connection, Cursor -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import Clock, json_encoder from . import engines @@ -487,6 +487,25 @@ async def has_completed_background_update(self, update_name: str) -> bool: return not update_exists + async def have_completed_background_updates( + self, update_names: StrCollection + ) -> bool: + """Return the name of background updates that have not yet been + completed""" + if self._all_done: + return True + + rows = await self.db_pool.simple_select_many_batch( + table="background_updates", + column="update_name", + iterable=update_names, + retcols=("update_name",), + desc="get_uncompleted_background_updates", + ) + + # If we find any rows then we've not completed the update. + return not bool(rows) + async def do_next_background_update(self, sleep: bool = True) -> bool: """Does some amount of work on the next queued background update diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 246d2acc2f3..b0e30daee5a 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -346,6 +346,9 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache( "_get_rooms_for_local_user_where_membership_is_inner", (state_key,) ) + self._attempt_to_invalidate_cache( + "get_sliding_sync_rooms_for_user", (state_key,) + ) self._attempt_to_invalidate_cache( "did_forget", @@ -417,6 +420,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None: self._attempt_to_invalidate_cache( "_get_rooms_for_local_user_where_membership_is_inner", None ) + self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None) self._attempt_to_invalidate_cache("did_forget", None) self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None) self._attempt_to_invalidate_cache("get_references_for_event", None) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index e819364a164..b227e057730 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2342,6 +2342,17 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: return len(memberships_to_update_rows) + async def have_finished_sliding_sync_background_jobs(self) -> bool: + """Return if its safe to use the sliding sync membership tables.""" + + return await self.db_pool.updates.have_completed_background_updates( + ( + _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE, + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, + ) + ) + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 722686d4b81..57b9b95c281 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -53,7 +53,12 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import Sqlite3Engine -from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser +from synapse.storage.roommember import ( + MemberSummary, + ProfileInfo, + RoomsForUser, + RoomsForUserSlidingSync, +) from synapse.types import ( JsonDict, PersistedEventPosition, @@ -1377,6 +1382,54 @@ async def update_room_forgetter_stream_pos(self, stream_id: int) -> None: desc="room_forgetter_stream_pos", ) + @cached(iterable=True, max_entries=10000) + async def get_sliding_sync_rooms_for_user( + self, + user_id: str, + ) -> Mapping[str, RoomsForUserSlidingSync]: + """Get all the rooms for a user to handle a sliding sync request. + + Ignores forgotten rooms and rooms that the user has been kicked from. + + Returns: + Map from room ID to membership info + """ + + def get_sliding_sync_rooms_for_user_txn( + txn: LoggingTransaction, + ) -> Dict[str, RoomsForUserSlidingSync]: + sql = """ + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + r.room_version, + m.event_instance_name, m.event_stream_ordering, + COALESCE(j.room_type, m.room_type), + COALESCE(j.is_encrypted, m.is_encrypted) + FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) + LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') + WHERE user_id = ? + AND m.forgotten = 0 + """ + txn.execute(sql, (user_id,)) + return { + row[0]: RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + room_type=row[7], + is_encrypted=row[8], + ) + for row in txn + } + + return await self.db_pool.runInteraction( + "get_sliding_sync_rooms_for_user", + get_sliding_sync_rooms_for_user_txn, + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 80c9630867e..09213627ecd 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -39,6 +39,19 @@ class RoomsForUser: room_version_id: str +@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) +class RoomsForUserSlidingSync: + room_id: str + sender: Optional[str] + membership: str + event_id: Optional[str] + event_pos: PersistedEventPosition + room_version_id: str + + room_type: Optional[str] + is_encrypted: bool + + @attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True) class GetRoomsForUserWithStreamOrdering: room_id: str diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 6863c32f7c1..436bd4466cb 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -13,7 +13,7 @@ # import logging -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor @@ -28,6 +28,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): """ Test connection tracking in the Sliding Sync API. @@ -44,6 +56,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_required_state_incremental_sync_LIVE(self) -> None: """Test that we only get state updates in incremental sync for rooms we've already seen (LIVE). diff --git a/tests/rest/client/sliding_sync/test_extension_account_data.py b/tests/rest/client/sliding_sync/test_extension_account_data.py index 3482a5f8878..65a6adf4af3 100644 --- a/tests/rest/client/sliding_sync/test_extension_account_data.py +++ b/tests/rest/client/sliding_sync/test_extension_account_data.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): """Tests for the account_data sliding sync extension""" @@ -43,6 +57,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the account_data extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extension_e2ee.py b/tests/rest/client/sliding_sync/test_extension_e2ee.py index 320f8c788f4..2ff66877966 100644 --- a/tests/rest/client/sliding_sync/test_extension_e2ee.py +++ b/tests/rest/client/sliding_sync/test_extension_e2ee.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase): """Tests for the e2ee sliding sync extension""" @@ -42,6 +56,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.e2e_keys_handler = hs.get_e2e_keys_handler() + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling e2ee extension works during an intitial sync, even if there diff --git a/tests/rest/client/sliding_sync/test_extension_receipts.py b/tests/rest/client/sliding_sync/test_extension_receipts.py index e842349ed26..90b035dd75b 100644 --- a/tests/rest/client/sliding_sync/test_extension_receipts.py +++ b/tests/rest/client/sliding_sync/test_extension_receipts.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): """Tests for the receipts sliding sync extension""" @@ -42,6 +56,8 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the receipts extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extension_to_device.py b/tests/rest/client/sliding_sync/test_extension_to_device.py index f8500812ea7..5ba2443089a 100644 --- a/tests/rest/client/sliding_sync/test_extension_to_device.py +++ b/tests/rest/client/sliding_sync/test_extension_to_device.py @@ -14,6 +14,8 @@ import logging from typing import List +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" @@ -40,6 +54,7 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) def _assert_to_device_response( self, response_body: JsonDict, expected_messages: List[JsonDict] diff --git a/tests/rest/client/sliding_sync/test_extension_typing.py b/tests/rest/client/sliding_sync/test_extension_typing.py index 7f523e0f106..0a0f5aff1a2 100644 --- a/tests/rest/client/sliding_sync/test_extension_typing.py +++ b/tests/rest/client/sliding_sync/test_extension_typing.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncTypingExtensionTestCase(SlidingSyncBase): """Tests for the typing notification sliding sync extension""" @@ -41,6 +55,8 @@ class SlidingSyncTypingExtensionTestCase(SlidingSyncBase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main + super().prepare(reactor, clock, hs) + def test_no_data_initial_sync(self) -> None: """ Test that enabling the typing extension works during an intitial sync, diff --git a/tests/rest/client/sliding_sync/test_extensions.py b/tests/rest/client/sliding_sync/test_extensions.py index ae823d5415d..32478467aa7 100644 --- a/tests/rest/client/sliding_sync/test_extensions.py +++ b/tests/rest/client/sliding_sync/test_extensions.py @@ -14,7 +14,7 @@ import logging from typing import Literal -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from typing_extensions import assert_never from twisted.test.proto_helpers import MemoryReactor @@ -30,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncExtensionsTestCase(SlidingSyncBase): """ Test general extensions behavior in the Sliding Sync API. Each extension has their @@ -49,6 +61,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.storage_controllers = hs.get_storage_controllers() self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + # Any extensions that use `lists`/`rooms` should be tested here @parameterized.expand([("account_data",), ("receipts",), ("typing",)]) def test_extensions_lists_rooms_relevant_rooms( diff --git a/tests/rest/client/sliding_sync/test_room_subscriptions.py b/tests/rest/client/sliding_sync/test_room_subscriptions.py index cc17b0b3543..e81d2518399 100644 --- a/tests/rest/client/sliding_sync/test_room_subscriptions.py +++ b/tests/rest/client/sliding_sync/test_room_subscriptions.py @@ -14,6 +14,8 @@ import logging from http import HTTPStatus +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomSubscriptionsTestCase(SlidingSyncBase): """ Test `room_subscriptions` in the Sliding Sync API. @@ -43,6 +57,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_room_subscriptions_with_join_membership(self) -> None: """ Test `room_subscriptions` with a joined room should give us timeline and current diff --git a/tests/rest/client/sliding_sync/test_rooms_invites.py b/tests/rest/client/sliding_sync/test_rooms_invites.py index f08ffaf6743..f6f45c2500e 100644 --- a/tests/rest/client/sliding_sync/test_rooms_invites.py +++ b/tests/rest/client/sliding_sync/test_rooms_invites.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -27,6 +29,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsInvitesTestCase(SlidingSyncBase): """ Test to make sure the `rooms` response looks good for invites in the Sliding Sync API. @@ -49,6 +63,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_invite_shared_history_initial_sync(self) -> None: """ Test that `rooms` we are invited to have some stripped `invite_state` during an diff --git a/tests/rest/client/sliding_sync/test_rooms_meta.py b/tests/rest/client/sliding_sync/test_rooms_meta.py index 690912133a6..71542923da1 100644 --- a/tests/rest/client/sliding_sync/test_rooms_meta.py +++ b/tests/rest/client/sliding_sync/test_rooms_meta.py @@ -13,6 +13,8 @@ # import logging +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsMetaTestCase(SlidingSyncBase): """ Test rooms meta info like name, avatar, joined_count, invited_count, is_dm, @@ -49,6 +63,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: assert persistence is not None self.persistence = persistence + super().prepare(reactor, clock, hs) + def test_rooms_meta_when_joined(self) -> None: """ Test that the `rooms` `name` and `avatar` are included in the response and diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 498c921cbdd..436ae684da8 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -13,7 +13,7 @@ # import logging -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor @@ -30,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): """ Test `rooms.required_state` in the Sliding Sync API. @@ -46,6 +58,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def test_rooms_no_required_state(self) -> None: """ Empty `rooms.required_state` should not return any state events in the room diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index eeac0d6aa99..e56fb58012c 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -14,6 +14,8 @@ import logging from typing import List, Optional +from parameterized import parameterized_class + from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin @@ -28,6 +30,18 @@ logger = logging.getLogger(__name__) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): """ Test `rooms.timeline` in the Sliding Sync API. @@ -44,6 +58,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + super().prepare(reactor, clock, hs) + def _assertListEqual( self, actual_items: StrSequence, diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index cb7638c5ba6..1dcc15b0829 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -13,7 +13,9 @@ # import logging from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple +from unittest.mock import AsyncMock +from parameterized import parameterized_class from typing_extensions import assert_never from twisted.test.proto_helpers import MemoryReactor @@ -47,8 +49,25 @@ class SlidingSyncBase(unittest.HomeserverTestCase): """Base class for sliding sync test cases""" + # Flag as to whether to use the new sliding sync tables or not + # + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) + use_new_tables: bool = True + sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) + hs.get_datastores().main.have_finished_sliding_sync_background_jobs = AsyncMock( # type: ignore[method-assign] + return_value=self.use_new_tables + ) + def default_config(self) -> JsonDict: config = super().default_config() # Enable sliding sync @@ -203,6 +222,18 @@ async def _on_new_acivity( ) +# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the +# foreground update for +# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by +# https://github.com/element-hq/synapse/issues/17623) +@parameterized_class( + ("use_new_tables",), + [ + (True,), + (False,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}", +) class SlidingSyncTestCase(SlidingSyncBase): """ Tests regarding MSC3575 Sliding Sync `/sync` endpoint. @@ -226,6 +257,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.storage_controllers = hs.get_storage_controllers() self.account_data_handler = hs.get_account_data_handler() + super().prepare(reactor, clock, hs) + def _add_new_dm_to_global_account_data( self, source_user_id: str, target_user_id: str, target_room_id: str ) -> None: