diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 9876ec252f54..46bb753bb015 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -726,11 +726,29 @@ def get_current_token_for_writer(self, instance_name: str) -> int: instance_name, self._persisted_upto_position ) - # We also ensure that we always return at least the - # `persisted_upto_position` for ourselves, so that when we notify - # other workers about our position we give them the max valid value - # here so that nothing waits for us to advance. + max_pos = max( + self._current_positions.values(), default=self._persisted_upto_position + ) + + # We want to return the maximum "current token" that we can for a + # writer, this helps ensure that streams progress as fast as + # possible. pos = max(pos, self._persisted_upto_position) + + if ( + self._instance_name == instance_name + and self._in_flight_fetches + and self._unfinished_ids + ): + # For our own instance when there's nothing in flight, it's safe + # to advance to the maximum persisted position we've seen (as we + # know that any new tokens we request will be greater). + max_pos = max( + self._current_positions.values(), + default=self._persisted_upto_position, + ) + pos = max(pos, max_pos) + return self._return_factor * pos def get_minimal_local_current_token(self) -> int: