diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 5c18bc4765f5..4dc04de35e48 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -710,10 +710,17 @@ def get_current_token_for_writer(self, instance_name: str) -> int: # persisted up to position. This stops Synapse from doing a full table # scan when a new writer announces itself over replication. with self._lock: - return self._return_factor * self._current_positions.get( + pos = self._current_positions.get( instance_name, self._persisted_upto_position ) + # We also ensure that we always return at least the + # `persisted_upto_position` for ourselves, so that when we notify + # other workers about our position we give them the max valid value + # here so that nothing waits for us to advance. + pos = max(pos, self._persisted_upto_position) + return self._return_factor * pos + def get_positions(self) -> Dict[str, int]: """Get a copy of the current positon map. diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index b286822f044f..012babc35b2a 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -350,15 +350,12 @@ def test_multi_instance(self) -> None: first_id_gen = self._create_id_generator("first", writers=["first", "second"]) second_id_gen = self._create_id_generator("second", writers=["first", "second"]) - # The first ID gen will notice that it can advance its token to 7 as it - # has no in progress writes... self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7}) - self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) - # ... but the second ID gen doesn't know that. self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7}) - self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) # Try allocating a new ID gen and check that we only see position @@ -420,14 +417,14 @@ def test_multi_instance_empty_row(self) -> None: self.assertEqual( first_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7} ) - self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(first_id_gen.get_current_token_for_writer("third"), 7) self.assertEqual( second_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7} ) - self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("third"), 7) @@ -873,11 +870,11 @@ def test_load_existing_stream(self) -> None: second_id_gen = self._create_id_generator("second", writers=["first", "second"]) self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 6}) - self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) - self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 6) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7) + self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(first_id_gen.get_persisted_upto_position(), 7) self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7}) - self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(second_id_gen.get_persisted_upto_position(), 7)