From 85e5f2dc252b866d67c8da2ddbfdb84974db1807 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 26 Oct 2023 15:11:24 -0400 Subject: [PATCH] Add a new module API to update user presence state. (#16544) This adds a module API which allows a module to update a user's presence state/status message. This is useful for controlling presence from an external system. To fully control presence from the module the presence.enabled config parameter gains a new state of "untracked" which disables internal tracking of presence changes via user actions, etc. Only updates from the module will be persisted and sent down sync properly). --- changelog.d/16544.feature | 1 + .../configuration/config_documentation.md | 7 ++ synapse/config/server.py | 11 +- synapse/federation/federation_server.py | 2 +- synapse/federation/sender/__init__.py | 2 +- synapse/handlers/initial_sync.py | 2 +- synapse/handlers/presence.py | 78 +++++++----- synapse/handlers/sync.py | 2 +- synapse/module_api/__init__.py | 33 ++++++ synapse/rest/client/presence.py | 6 +- tests/handlers/test_presence.py | 111 ++++++++++++++++-- tests/rest/client/test_presence.py | 19 ++- 12 files changed, 221 insertions(+), 53 deletions(-) create mode 100644 changelog.d/16544.feature diff --git a/changelog.d/16544.feature b/changelog.d/16544.feature new file mode 100644 index 000000000000..92bf701be649 --- /dev/null +++ b/changelog.d/16544.feature @@ -0,0 +1 @@ +Add a new module API for controller presence. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 6cc83c1cd03e..a1ca5fa98c22 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -230,6 +230,13 @@ Example configuration: presence: enabled: false ``` + +`enabled` can also be set to a special value of "untracked" which ignores updates +received via clients and federation, while still accepting updates from the +[module API](../../modules/index.md). + +*The "untracked" option was added in Synapse 1.96.0.* + --- ### `require_auth_for_profile_requests` diff --git a/synapse/config/server.py b/synapse/config/server.py index 72d30da30082..f9e18d2053d3 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -368,9 +368,14 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # Whether to enable user presence. presence_config = config.get("presence") or {} - self.use_presence = presence_config.get("enabled") - if self.use_presence is None: - self.use_presence = config.get("use_presence", True) + presence_enabled = presence_config.get("enabled") + if presence_enabled is None: + presence_enabled = config.get("use_presence", True) + + # Whether presence is enabled *at all*. + self.presence_enabled = bool(presence_enabled) + # Whether to internally track presence, requires that presence is enabled, + self.track_presence = self.presence_enabled and presence_enabled != "untracked" # Custom presence router module # This is the legacy way of configuring it (the config should now be put in the modules section) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 6ac8d1609585..3b27925517a9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1395,7 +1395,7 @@ def register_instances_for_edu( self._edu_type_to_instance[edu_type] = instance_names async def on_edu(self, edu_type: str, origin: str, content: dict) -> None: - if not self.config.server.use_presence and edu_type == EduTypes.PRESENCE: + if not self.config.server.track_presence and edu_type == EduTypes.PRESENCE: return # Check if we have a handler on this instance diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 7b6b1da090b1..7980d1a322c6 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -844,7 +844,7 @@ async def send_presence_to_destinations( destinations (list[str]) """ - if not states or not self.hs.config.server.use_presence: + if not states or not self.hs.config.server.track_presence: # No-op if presence is disabled. return diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index b1d8be866f6c..4727efcdbafd 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -439,7 +439,7 @@ async def _room_initial_sync_joined( async def get_presence() -> List[JsonDict]: # If presence is disabled, return an empty list - if not self.hs.config.server.use_presence: + if not self.hs.config.server.presence_enabled: return [] states = await presence_handler.get_states( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index dfc0b9db070f..202beee73802 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -192,7 +192,8 @@ def __init__(self, hs: "HomeServer"): self.state = hs.get_state_handler() self.is_mine_id = hs.is_mine_id - self._presence_enabled = hs.config.server.use_presence + self._presence_enabled = hs.config.server.presence_enabled + self._track_presence = hs.config.server.track_presence self._federation = None if hs.should_send_federation(): @@ -512,7 +513,7 @@ def __init__(self, hs: "HomeServer"): ) async def _on_shutdown(self) -> None: - if self._presence_enabled: + if self._track_presence: self.hs.get_replication_command_handler().send_command( ClearUserSyncsCommand(self.instance_id) ) @@ -524,7 +525,7 @@ def send_user_sync( is_syncing: bool, last_sync_ms: int, ) -> None: - if self._presence_enabled: + if self._track_presence: self.hs.get_replication_command_handler().send_user_sync( self.instance_id, user_id, device_id, is_syncing, last_sync_ms ) @@ -571,7 +572,7 @@ async def user_syncing( Called by the sync and events servlets to record that a user has connected to this worker and is waiting for some events. """ - if not affect_presence or not self._presence_enabled: + if not affect_presence or not self._track_presence: return _NullContextManager() # Note that this causes last_active_ts to be incremented which is not @@ -702,8 +703,8 @@ async def set_state( user_id = target_user.to_string() - # If presence is disabled, no-op - if not self._presence_enabled: + # If tracking of presence is disabled, no-op + if not self._track_presence: return # Proxy request to instance that writes presence @@ -723,7 +724,7 @@ async def bump_presence_active_time( with the app. """ # If presence is disabled, no-op - if not self._presence_enabled: + if not self._track_presence: return # Proxy request to instance that writes presence @@ -760,7 +761,7 @@ def __init__(self, hs: "HomeServer"): ] = {} now = self.clock.time_msec() - if self._presence_enabled: + if self._track_presence: for state in self.user_to_current_state.values(): # Create a psuedo-device to properly handle time outs. This will # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT. @@ -831,7 +832,7 @@ def __init__(self, hs: "HomeServer"): self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") - if self._presence_enabled: + if self._track_presence: # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. @@ -839,6 +840,9 @@ def __init__(self, hs: "HomeServer"): 30, self.clock.looping_call, self._handle_timeouts, 5000 ) + # Presence information is persisted, whether or not it is being tracked + # internally. + if self._presence_enabled: self.clock.call_later( 60, self.clock.looping_call, @@ -854,7 +858,7 @@ def __init__(self, hs: "HomeServer"): ) # Used to handle sending of presence to newly joined users/servers - if self._presence_enabled: + if self._track_presence: self.notifier.add_replication_callback(self.notify_new_event) # Presence is best effort and quickly heals itself, so lets just always @@ -905,7 +909,9 @@ async def _persist_unpersisted_changes(self) -> None: ) async def _update_states( - self, new_states: Iterable[UserPresenceState], force_notify: bool = False + self, + new_states: Iterable[UserPresenceState], + force_notify: bool = False, ) -> None: """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state @@ -943,7 +949,7 @@ async def _update_states( for new_state in new_states: user_id = new_state.user_id - # Its fine to not hit the database here, as the only thing not in + # It's fine to not hit the database here, as the only thing not in # the current state cache are OFFLINE states, where the only field # of interest is last_active which is safe enough to assume is 0 # here. @@ -957,6 +963,9 @@ async def _update_states( is_mine=self.is_mine_id(user_id), wheel_timer=self.wheel_timer, now=now, + # When overriding disabled presence, don't kick off all the + # wheel timers. + persist=not self._track_presence, ) if force_notify: @@ -1072,7 +1081,7 @@ async def bump_presence_active_time( with the app. """ # If presence is disabled, no-op - if not self._presence_enabled: + if not self._track_presence: return user_id = user.to_string() @@ -1124,7 +1133,7 @@ async def user_syncing( client that is being used by a user. presence_state: The presence state indicated in the sync request """ - if not affect_presence or not self._presence_enabled: + if not affect_presence or not self._track_presence: return _NullContextManager() curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0) @@ -1284,7 +1293,7 @@ async def _persist_and_notify(self, states: List[UserPresenceState]) -> None: async def incoming_presence(self, origin: str, content: JsonDict) -> None: """Called when we receive a `m.presence` EDU from a remote server.""" - if not self._presence_enabled: + if not self._track_presence: return now = self.clock.time_msec() @@ -1359,7 +1368,7 @@ async def set_state( raise SynapseError(400, "Invalid presence state") # If presence is disabled, no-op - if not self._presence_enabled: + if not self._track_presence: return user_id = target_user.to_string() @@ -2118,6 +2127,7 @@ def handle_update( is_mine: bool, wheel_timer: WheelTimer, now: int, + persist: bool, ) -> Tuple[UserPresenceState, bool, bool]: """Given a presence update: 1. Add any appropriate timers. @@ -2129,6 +2139,8 @@ def handle_update( is_mine: Whether the user is ours wheel_timer now: Time now in ms + persist: True if this state should persist until another update occurs. + Skips insertion into wheel timers. Returns: 3-tuple: `(new_state, persist_and_notify, federation_ping)` where: @@ -2146,14 +2158,15 @@ def handle_update( if is_mine: if new_state.state == PresenceState.ONLINE: # Idle timer - wheel_timer.insert( - now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER - ) + if not persist: + wheel_timer.insert( + now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER + ) active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY new_state = new_state.copy_and_replace(currently_active=active) - if active: + if active and not persist: wheel_timer.insert( now=now, obj=user_id, @@ -2162,11 +2175,12 @@ def handle_update( if new_state.state != PresenceState.OFFLINE: # User has stopped syncing - wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, - ) + if not persist: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, + ) last_federate = new_state.last_federation_update_ts if now - last_federate > FEDERATION_PING_INTERVAL: @@ -2174,7 +2188,7 @@ def handle_update( new_state = new_state.copy_and_replace(last_federation_update_ts=now) federation_ping = True - if new_state.state == PresenceState.BUSY: + if new_state.state == PresenceState.BUSY and not persist: wheel_timer.insert( now=now, obj=user_id, @@ -2182,11 +2196,13 @@ def handle_update( ) else: - wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, - ) + # An update for a remote user was received. + if not persist: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, + ) # Check whether the change was something worth notifying about if should_notify(prev_state, new_state, is_mine): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f75c1548ca00..2f1bc5a01561 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1517,7 +1517,7 @@ async def generate_sync_result( # Presence data is included if the server has it enabled and not filtered out. include_presence_data = bool( - self.hs_config.server.use_presence + self.hs_config.server.presence_enabled and not sync_config.filter_collection.blocks_all_presence() ) # Device list updates are sent if a since token is provided. diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 0786d2063565..09ea6bdecb79 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -23,6 +23,7 @@ Generator, Iterable, List, + Mapping, Optional, Tuple, TypeVar, @@ -39,6 +40,7 @@ from synapse.api import errors from synapse.api.errors import SynapseError +from synapse.api.presence import UserPresenceState from synapse.config import ConfigError from synapse.events import EventBase from synapse.events.presence_router import ( @@ -1184,6 +1186,37 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: presence_events, [destination] ) + async def set_presence_for_users( + self, users: Mapping[str, Tuple[str, Optional[str]]] + ) -> None: + """ + Update the internal presence state of users. + + This can be used for either local or remote users. + + Note that this method can only be run on the process that is configured to write to the + presence stream. By default, this is the main process. + + Added in Synapse v1.96.0. + """ + + # We pull out the presence handler here to break a cyclic + # dependency between the presence router and module API. + presence_handler = self._hs.get_presence_handler() + + from synapse.handlers.presence import PresenceHandler + + assert isinstance(presence_handler, PresenceHandler) + + states = await presence_handler.current_state_for_users(users.keys()) + for user_id, (state, status_msg) in users.items(): + prev_state = states.setdefault(user_id, UserPresenceState.default(user_id)) + states[user_id] = prev_state.copy_and_replace( + state=state, status_msg=status_msg + ) + + await presence_handler._update_states(states.values(), force_notify=True) + def looping_background_call( self, f: Callable, diff --git a/synapse/rest/client/presence.py b/synapse/rest/client/presence.py index d578faa96984..054a391f2630 100644 --- a/synapse/rest/client/presence.py +++ b/synapse/rest/client/presence.py @@ -42,15 +42,13 @@ def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.auth = hs.get_auth() - self._use_presence = hs.config.server.use_presence - async def on_GET( self, request: SynapseRequest, user_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) user = UserID.from_string(user_id) - if not self._use_presence: + if not self.hs.config.server.presence_enabled: return 200, {"presence": "offline"} if requester.user != user: @@ -96,7 +94,7 @@ async def on_PUT( except Exception: raise SynapseError(400, "Unable to parse state") - if self._use_presence: + if self.hs.config.server.track_presence: await self.presence_handler.set_state(user, requester.device_id, state) return 200, {} diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 41c8c44e0241..173b14521a15 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import itertools from typing import Optional, cast from unittest.mock import Mock, call @@ -33,6 +33,7 @@ IDLE_TIMER, LAST_ACTIVE_GRANULARITY, SYNC_ONLINE_TIMEOUT, + PresenceHandler, handle_timeout, handle_update, ) @@ -66,7 +67,12 @@ def test_offline_to_online(self) -> None: ) state, persist_and_notify, federation_ping = handle_update( - prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + prev_state, + new_state, + is_mine=True, + wheel_timer=wheel_timer, + now=now, + persist=False, ) self.assertTrue(persist_and_notify) @@ -108,7 +114,12 @@ def test_online_to_online(self) -> None: ) state, persist_and_notify, federation_ping = handle_update( - prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + prev_state, + new_state, + is_mine=True, + wheel_timer=wheel_timer, + now=now, + persist=False, ) self.assertFalse(persist_and_notify) @@ -153,7 +164,12 @@ def test_online_to_online_last_active_noop(self) -> None: ) state, persist_and_notify, federation_ping = handle_update( - prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + prev_state, + new_state, + is_mine=True, + wheel_timer=wheel_timer, + now=now, + persist=False, ) self.assertFalse(persist_and_notify) @@ -196,7 +212,12 @@ def test_online_to_online_last_active(self) -> None: new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE) state, persist_and_notify, federation_ping = handle_update( - prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + prev_state, + new_state, + is_mine=True, + wheel_timer=wheel_timer, + now=now, + persist=False, ) self.assertTrue(persist_and_notify) @@ -231,7 +252,12 @@ def test_remote_ping_timer(self) -> None: new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE) state, persist_and_notify, federation_ping = handle_update( - prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now + prev_state, + new_state, + is_mine=False, + wheel_timer=wheel_timer, + now=now, + persist=False, ) self.assertFalse(persist_and_notify) @@ -265,7 +291,12 @@ def test_online_to_offline(self) -> None: new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE) state, persist_and_notify, federation_ping = handle_update( - prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + prev_state, + new_state, + is_mine=True, + wheel_timer=wheel_timer, + now=now, + persist=False, ) self.assertTrue(persist_and_notify) @@ -287,7 +318,12 @@ def test_online_to_idle(self) -> None: new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE) state, persist_and_notify, federation_ping = handle_update( - prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now + prev_state, + new_state, + is_mine=True, + wheel_timer=wheel_timer, + now=now, + persist=False, ) self.assertTrue(persist_and_notify) @@ -347,6 +383,41 @@ def test_persisting_presence_updates(self) -> None: # They should be identical. self.assertEqual(presence_states_compare, db_presence_states) + @parameterized.expand( + itertools.permutations( + ( + PresenceState.BUSY, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + ), + 2, + ) + ) + def test_override(self, initial_state: str, final_state: str) -> None: + """Overridden statuses should not go into the wheel timer.""" + wheel_timer = Mock() + user_id = "@foo:bar" + now = 5000000 + + prev_state = UserPresenceState.default(user_id) + prev_state = prev_state.copy_and_replace( + state=initial_state, last_active_ts=now, currently_active=True + ) + + new_state = prev_state.copy_and_replace(state=final_state, last_active_ts=now) + + handle_update( + prev_state, + new_state, + is_mine=True, + wheel_timer=wheel_timer, + now=now, + persist=True, + ) + + wheel_timer.insert.assert_not_called() + class PresenceTimeoutTestCase(unittest.TestCase): """Tests different timers and that the timer does not change `status_msg` of user.""" @@ -738,7 +809,6 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.presence_handler = hs.get_presence_handler() - self.clock = hs.get_clock() def test_external_process_timeout(self) -> None: """Test that if an external process doesn't update the records for a while @@ -1471,6 +1541,29 @@ def _set_presencestate_with_status_msg( self.assertEqual(new_state.state, state) self.assertEqual(new_state.status_msg, status_msg) + @unittest.override_config({"presence": {"enabled": "untracked"}}) + def test_untracked_does_not_idle(self) -> None: + """Untracked presence should not idle.""" + + # Mark user as online, this needs to reach into internals in order to + # bypass checks. + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) + assert isinstance(self.presence_handler, PresenceHandler) + self.get_success( + self.presence_handler._update_states( + [state.copy_and_replace(state=PresenceState.ONLINE)] + ) + ) + + # Ensure the update took. + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) + self.assertEqual(state.state, PresenceState.ONLINE) + + # The timeout should not fire and the state should be the same. + self.reactor.advance(SYNC_ONLINE_TIMEOUT) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) + self.assertEqual(state.state, PresenceState.ONLINE) + class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: diff --git a/tests/rest/client/test_presence.py b/tests/rest/client/test_presence.py index 66b387cea37e..4e89107e54cf 100644 --- a/tests/rest/client/test_presence.py +++ b/tests/rest/client/test_presence.py @@ -50,7 +50,7 @@ def test_put_presence(self) -> None: PUT to the status endpoint with use_presence enabled will call set_state on the presence handler. """ - self.hs.config.server.use_presence = True + self.hs.config.server.presence_enabled = True body = {"presence": "here", "status_msg": "beep boop"} channel = self.make_request( @@ -63,7 +63,22 @@ def test_put_presence(self) -> None: @unittest.override_config({"use_presence": False}) def test_put_presence_disabled(self) -> None: """ - PUT to the status endpoint with use_presence disabled will NOT call + PUT to the status endpoint with presence disabled will NOT call + set_state on the presence handler. + """ + + body = {"presence": "here", "status_msg": "beep boop"} + channel = self.make_request( + "PUT", "/presence/%s/status" % (self.user_id,), body + ) + + self.assertEqual(channel.code, HTTPStatus.OK) + self.assertEqual(self.presence_handler.set_state.call_count, 0) + + @unittest.override_config({"presence": {"enabled": "untracked"}}) + def test_put_presence_untracked(self) -> None: + """ + PUT to the status endpoint with presence untracked will NOT call set_state on the presence handler. """