From 645419cc99a69dda183a3d985d62037bd8cf5260 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2023 12:28:19 +0100 Subject: [PATCH 01/10] Fix bug where a new writer advances their token too quickly When starting a new writer (for e.g. persisting events), the `MultiWriterIdGenerator` doesn't have a minimum token for it as there are no rows matching that new writer in the DB. This results in the the first stream ID it acquired being announced as persisted *before* it actually finishes persisting, if another writer gets and persists a subsequent stream ID. This is due to the logic of setting the minimum persisted position to the minimum known position of across all writers, and the new writer starts off not being considered. --- synapse/storage/util/id_generators.py | 8 ++++ tests/storage/test_id_generators.py | 59 +++++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index d2c874b9a8d7..5c18bc4765f5 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -545,6 +545,14 @@ def sort_by_stream_id_key_func(row: Tuple[str, int]) -> int: if instance == self._instance_name: self._current_positions[instance] = stream_id + if self._writers: + # If we have explicit writers then make sure that each instance has + # a position. + for writer in self._writers: + self._current_positions.setdefault( + writer, self._persisted_upto_position + ) + cur.close() def _load_next_id_txn(self, txn: Cursor) -> int: diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index 9174fb096470..b286822f044f 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -259,8 +259,9 @@ def test_empty(self) -> None: id_gen = self._create_id_generator() - # The table is empty so we expect an empty map for positions - self.assertEqual(id_gen.get_positions(), {}) + # The table is empty so we expect the map for positions to have a dummy + # minimum value. + self.assertEqual(id_gen.get_positions(), {"master": 1}) def test_single_instance(self) -> None: """Test that reads and writes from a single process are handled @@ -398,6 +399,56 @@ async def _get_next_async2() -> None: second_id_gen.advance("first", 8) self.assertEqual(second_id_gen.get_positions(), {"first": 8, "second": 9}) + def test_multi_instance_empty_row(self) -> None: + """Test that reads and writes from multiple processes are handled + correctly, when one of the writers starts without any rows. + """ + # Insert some rows for two out of three of the ID gens. + self._insert_rows("first", 3) + self._insert_rows("second", 4) + + first_id_gen = self._create_id_generator( + "first", writers=["first", "second", "third"] + ) + second_id_gen = self._create_id_generator( + "second", writers=["first", "second", "third"] + ) + third_id_gen = self._create_id_generator( + "third", writers=["first", "second", "third"] + ) + + self.assertEqual( + first_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7} + ) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) + self.assertEqual(first_id_gen.get_current_token_for_writer("third"), 7) + + self.assertEqual( + second_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7} + ) + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) + self.assertEqual(second_id_gen.get_current_token_for_writer("third"), 7) + + # Try allocating a new ID gen and check that we only see position + # advanced after we leave the context manager. + + async def _get_next_async() -> None: + async with third_id_gen.get_next() as stream_id: + self.assertEqual(stream_id, 8) + + self.assertEqual( + third_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7} + ) + self.assertEqual(third_id_gen.get_persisted_upto_position(), 7) + + self.get_success(_get_next_async()) + + self.assertEqual( + third_id_gen.get_positions(), {"first": 3, "second": 7, "third": 8} + ) + def test_get_next_txn(self) -> None: """Test that the `get_next_txn` function works correctly.""" @@ -712,8 +763,8 @@ async def _get_next_async() -> None: self.get_success(_get_next_async()) - self.assertEqual(id_gen_1.get_positions(), {"first": -1}) - self.assertEqual(id_gen_2.get_positions(), {"first": -1}) + self.assertEqual(id_gen_1.get_positions(), {"first": -1, "second": -1}) + self.assertEqual(id_gen_2.get_positions(), {"first": -1, "second": -1}) self.assertEqual(id_gen_1.get_persisted_upto_position(), -1) self.assertEqual(id_gen_2.get_persisted_upto_position(), -1) From b4c859e2c6224398d40d6a96e8428e0a3bc3e4c1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2023 12:33:47 +0100 Subject: [PATCH 02/10] Newsfile --- changelog.d/16473.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16473.bugfix diff --git a/changelog.d/16473.bugfix b/changelog.d/16473.bugfix new file mode 100644 index 000000000000..4f4a0380cd77 --- /dev/null +++ b/changelog.d/16473.bugfix @@ -0,0 +1 @@ +Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`. From 8fb85401386f0980334581dbd296af8b8e80c6af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2023 14:00:21 +0100 Subject: [PATCH 03/10] Fix sending out POSITIONs when our token advances without update Broke in #14820 --- synapse/storage/util/id_generators.py | 9 ++++++++- tests/storage/test_id_generators.py | 17 +++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 5c18bc4765f5..4dc04de35e48 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -710,10 +710,17 @@ def get_current_token_for_writer(self, instance_name: str) -> int: # persisted up to position. This stops Synapse from doing a full table # scan when a new writer announces itself over replication. with self._lock: - return self._return_factor * self._current_positions.get( + pos = self._current_positions.get( instance_name, self._persisted_upto_position ) + # We also ensure that we always return at least the + # `persisted_upto_position` for ourselves, so that when we notify + # other workers about our position we give them the max valid value + # here so that nothing waits for us to advance. + pos = max(pos, self._persisted_upto_position) + return self._return_factor * pos + def get_positions(self) -> Dict[str, int]: """Get a copy of the current positon map. diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index b286822f044f..012babc35b2a 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -350,15 +350,12 @@ def test_multi_instance(self) -> None: first_id_gen = self._create_id_generator("first", writers=["first", "second"]) second_id_gen = self._create_id_generator("second", writers=["first", "second"]) - # The first ID gen will notice that it can advance its token to 7 as it - # has no in progress writes... self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7}) - self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) - # ... but the second ID gen doesn't know that. self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7}) - self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) # Try allocating a new ID gen and check that we only see position @@ -420,14 +417,14 @@ def test_multi_instance_empty_row(self) -> None: self.assertEqual( first_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7} ) - self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(first_id_gen.get_current_token_for_writer("third"), 7) self.assertEqual( second_id_gen.get_positions(), {"first": 3, "second": 7, "third": 7} ) - self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("third"), 7) @@ -873,11 +870,11 @@ def test_load_existing_stream(self) -> None: second_id_gen = self._create_id_generator("second", writers=["first", "second"]) self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 6}) - self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3) - self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 6) + self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7) + self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(first_id_gen.get_persisted_upto_position(), 7) self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7}) - self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3) + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(second_id_gen.get_persisted_upto_position(), 7) From dbbb141ca4143d927e090409f9aec0769e807dd1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Oct 2023 09:33:11 +0100 Subject: [PATCH 04/10] For replication HTTP requests, only wait for minimal position --- synapse/replication/http/_base.py | 2 +- synapse/replication/tcp/streams/_base.py | 129 +++++++++++------- synapse/replication/tcp/streams/events.py | 8 +- synapse/replication/tcp/streams/federation.py | 15 +- .../replication/tcp/streams/partial_state.py | 10 +- synapse/storage/util/id_generators.py | 18 +++ tests/storage/test_id_generators.py | 13 ++ 7 files changed, 134 insertions(+), 61 deletions(-) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 63cf24a14d94..7476839db584 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -238,7 +238,7 @@ async def send_request( data[_STREAM_POSITION_KEY] = { "streams": { - stream.NAME: stream.current_token(local_instance_name) + stream.NAME: stream.minimal_local_current_token() for stream in streams }, "instance_name": local_instance_name, diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index c6088a0f99f8..5c4d228f3d7c 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -33,6 +33,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer + from synapse.storage.util.id_generators import AbstractStreamIdGenerator logger = logging.getLogger(__name__) @@ -107,22 +108,10 @@ def parse_row(cls, row: StreamRow) -> Any: def __init__( self, local_instance_name: str, - current_token_function: Callable[[str], Token], update_function: UpdateFunction, ): """Instantiate a Stream - `current_token_function` and `update_function` are callbacks which - should be implemented by subclasses. - - `current_token_function` takes an instance name, which is a writer to - the stream, and returns the position in the stream of the writer (as - viewed from the current process). On the writer process this is where - the writer has successfully written up to, whereas on other processes - this is the position which we have received updates up to over - replication. (Note that most streams have a single writer and so their - implementations ignore the instance name passed in). - `update_function` is called to get updates for this stream between a pair of stream tokens. See the `UpdateFunction` type definition for more info. @@ -133,12 +122,28 @@ def __init__( update_function: callback go get stream updates, as above """ self.local_instance_name = local_instance_name - self.current_token = current_token_function self.update_function = update_function # The token from which we last asked for updates self.last_token = self.current_token(self.local_instance_name) + def current_token(self, instance_name: str) -> Token: + """This takes an instance name, which is a writer to + the stream, and returns the position in the stream of the writer (as + viewed from the current process). + """ + # We can't make this an abstract class as it makes mypy unhappy. + raise NotImplementedError() + + def minimal_local_current_token(self) -> Token: + """Tries to return a minimal current token for the local instance, + i.e. for writers this would be the last successful write. + + If local instance is not a writer (or has written yet) then falls back + to returning the normal "current token". + """ + raise NotImplementedError() + def discard_updates_and_advance(self) -> None: """Called when the stream should advance but the updates would be discarded, e.g. when there are no currently connected workers. @@ -190,6 +195,25 @@ async def get_updates_since( return updates, upto_token, limited +class _StreamFromIdGen(Stream): + """Helper class for simple streams that use a stream ID generator""" + + def __init__( + self, + local_instance_name: str, + update_function: UpdateFunction, + stream_id_gen: "AbstractStreamIdGenerator", + ): + self._stream_id_gen = stream_id_gen + super().__init__(local_instance_name, update_function) + + def current_token(self, instance_name: str) -> Token: + return self._stream_id_gen.get_current_token_for_writer(instance_name) + + def minimal_local_current_token(self) -> Token: + return self._stream_id_gen.get_minimal_local_current_token() + + def current_token_without_instance( current_token: Callable[[], int] ) -> Callable[[str], int]: @@ -242,17 +266,21 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - self._current_token, self.store.get_all_new_backfill_event_rows, ) - def _current_token(self, instance_name: str) -> int: + def current_token(self, instance_name: str) -> Token: # The backfill stream over replication operates on *positive* numbers, # which means we need to negate it. return -self.store._backfill_id_gen.get_current_token_for_writer(instance_name) + def minimal_local_current_token(self) -> Token: + # The backfill stream over replication operates on *positive* numbers, + # which means we need to negate it. + return -self.store._backfill_id_gen.get_minimal_local_current_token() -class PresenceStream(Stream): + +class PresenceStream(_StreamFromIdGen): @attr.s(slots=True, frozen=True, auto_attribs=True) class PresenceStreamRow: user_id: str @@ -283,9 +311,7 @@ def __init__(self, hs: "HomeServer"): update_function = make_http_update_function(hs, self.NAME) super().__init__( - hs.get_instance_name(), - current_token_without_instance(store.get_current_presence_token), - update_function, + hs.get_instance_name(), update_function, store._presence_id_gen ) @@ -305,13 +331,18 @@ class PresenceFederationStreamRow: ROW_TYPE = PresenceFederationStreamRow def __init__(self, hs: "HomeServer"): - federation_queue = hs.get_presence_handler().get_federation_queue() + self._federation_queue = hs.get_presence_handler().get_federation_queue() super().__init__( hs.get_instance_name(), - federation_queue.get_current_token, - federation_queue.get_replication_rows, + self._federation_queue.get_replication_rows, ) + def current_token(self, instance_name: str) -> Token: + return self._federation_queue.get_current_token(instance_name) + + def minimal_local_current_token(self) -> Token: + return self._federation_queue.get_current_token(self.local_instance_name) + class TypingStream(Stream): @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -341,20 +372,25 @@ def __init__(self, hs: "HomeServer"): update_function: Callable[ [str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]] ] = typing_writer_handler.get_all_typing_updates - current_token_function = typing_writer_handler.get_current_token + self.current_token_function = typing_writer_handler.get_current_token else: # Query the typing writer process update_function = make_http_update_function(hs, self.NAME) - current_token_function = hs.get_typing_handler().get_current_token + self.current_token_function = hs.get_typing_handler().get_current_token super().__init__( hs.get_instance_name(), - current_token_without_instance(current_token_function), update_function, ) + def current_token(self, instance_name: str) -> Token: + return self.current_token_function() + + def minimal_local_current_token(self) -> Token: + return self.current_token_function() -class ReceiptsStream(Stream): + +class ReceiptsStream(_StreamFromIdGen): @attr.s(slots=True, frozen=True, auto_attribs=True) class ReceiptsStreamRow: room_id: str @@ -371,12 +407,12 @@ def __init__(self, hs: "HomeServer"): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - current_token_without_instance(store.get_max_receipt_stream_id), store.get_all_updated_receipts, + store._receipts_id_gen, ) -class PushRulesStream(Stream): +class PushRulesStream(_StreamFromIdGen): """A user has changed their push rules""" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -387,20 +423,16 @@ class PushRulesStreamRow: ROW_TYPE = PushRulesStreamRow def __init__(self, hs: "HomeServer"): - self.store = hs.get_datastores().main + store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - self._current_token, - self.store.get_all_push_rule_updates, + store.get_all_push_rule_updates, + store._push_rules_stream_id_gen, ) - def _current_token(self, instance_name: str) -> int: - push_rules_token = self.store.get_max_push_rules_stream_id() - return push_rules_token - -class PushersStream(Stream): +class PushersStream(_StreamFromIdGen): """A user has added/changed/removed a pusher""" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -418,8 +450,8 @@ def __init__(self, hs: "HomeServer"): super().__init__( hs.get_instance_name(), - current_token_without_instance(store.get_pushers_stream_token), store.get_all_updated_pushers_rows, + store._pushers_id_gen, ) @@ -447,15 +479,20 @@ class CachesStreamRow: ROW_TYPE = CachesStreamRow def __init__(self, hs: "HomeServer"): - store = hs.get_datastores().main + self.store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - store.get_cache_stream_token_for_writer, - store.get_all_updated_caches, + self.store.get_all_updated_caches, ) + def current_token(self, instance_name: str) -> Token: + return self.store.get_cache_stream_token_for_writer(instance_name) + + def minimal_local_current_token(self) -> Token: + return self.current_token(self.local_instance_name) + -class DeviceListsStream(Stream): +class DeviceListsStream(_StreamFromIdGen): """Either a user has updated their devices or a remote server needs to be told about a device update. """ @@ -473,8 +510,8 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - current_token_without_instance(self.store.get_device_stream_token), self._update_function, + self.store._device_list_id_gen, ) async def _update_function( @@ -525,7 +562,7 @@ async def _update_function( return updates, upper_limit_token, devices_limited or signatures_limited -class ToDeviceStream(Stream): +class ToDeviceStream(_StreamFromIdGen): """New to_device messages for a client""" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -539,12 +576,12 @@ def __init__(self, hs: "HomeServer"): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - current_token_without_instance(store.get_to_device_stream_token), store.get_all_new_device_messages, + store._device_inbox_id_gen, ) -class AccountDataStream(Stream): +class AccountDataStream(_StreamFromIdGen): """Global or per room account data was changed""" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -560,8 +597,8 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - current_token_without_instance(self.store.get_max_account_data_stream_id), self._update_function, + self.store._account_data_id_gen, ) async def _update_function( diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index ad9b76071383..cad6d7e09aac 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -18,10 +18,10 @@ import attr from synapse.replication.tcp.streams._base import ( - Stream, StreamRow, StreamUpdateResult, Token, + _StreamFromIdGen, ) if TYPE_CHECKING: @@ -119,7 +119,7 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow): TypeToRow = {Row.TypeId: Row for Row in _EventRows} -class EventsStream(Stream): +class EventsStream(_StreamFromIdGen): """We received a new event, or an event went from being an outlier to not""" NAME = "events" @@ -127,9 +127,7 @@ class EventsStream(Stream): def __init__(self, hs: "HomeServer"): self._store = hs.get_datastores().main super().__init__( - hs.get_instance_name(), - self._store._stream_id_gen.get_current_token_for_writer, - self._update_function, + hs.get_instance_name(), self._update_function, self._store._stream_id_gen ) async def _update_function( diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 4046bdec6931..7f5af5852c1c 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -18,6 +18,7 @@ from synapse.replication.tcp.streams._base import ( Stream, + Token, current_token_without_instance, make_http_update_function, ) @@ -47,7 +48,7 @@ def __init__(self, hs: "HomeServer"): # will be a real FederationSender, which has stubs for current_token and # get_replication_rows.) federation_sender = hs.get_federation_sender() - current_token = current_token_without_instance( + self.current_token_func = current_token_without_instance( federation_sender.get_current_token ) update_function: Callable[ @@ -57,15 +58,21 @@ def __init__(self, hs: "HomeServer"): elif hs.should_send_federation(): # federation sender: Query master process update_function = make_http_update_function(hs, self.NAME) - current_token = self._stub_current_token + self.current_token_func = self._stub_current_token else: # other worker: stub out the update function (we're not interested in # any updates so when we get a POSITION we do nothing) update_function = self._stub_update_function - current_token = self._stub_current_token + self.current_token_func = self._stub_current_token - super().__init__(hs.get_instance_name(), current_token, update_function) + super().__init__(hs.get_instance_name(), update_function) + + def current_token(self, instance_name: str) -> Token: + return self.current_token_func(instance_name) + + def minimal_local_current_token(self) -> Token: + return self.current_token(self.local_instance_name) @staticmethod def _stub_current_token(instance_name: str) -> int: diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index a8ce5ffd7289..ad181d7e9331 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -15,7 +15,7 @@ import attr -from synapse.replication.tcp.streams import Stream +from synapse.replication.tcp.streams._base import _StreamFromIdGen if TYPE_CHECKING: from synapse.server import HomeServer @@ -27,7 +27,7 @@ class UnPartialStatedRoomStreamRow: room_id: str -class UnPartialStatedRoomStream(Stream): +class UnPartialStatedRoomStream(_StreamFromIdGen): """ Stream to notify about rooms becoming un-partial-stated; that is, when the background sync finishes such that we now have full state for @@ -41,8 +41,8 @@ def __init__(self, hs: "HomeServer"): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - store.get_un_partial_stated_rooms_token, store.get_un_partial_stated_rooms_from_stream, + store._un_partial_stated_rooms_stream_id_gen, ) @@ -56,7 +56,7 @@ class UnPartialStatedEventStreamRow: rejection_status_changed: bool -class UnPartialStatedEventStream(Stream): +class UnPartialStatedEventStream(_StreamFromIdGen): """ Stream to notify about events becoming un-partial-stated. """ @@ -68,6 +68,6 @@ def __init__(self, hs: "HomeServer"): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - store.get_un_partial_stated_events_token, store.get_un_partial_stated_events_from_stream, + store._un_partial_stated_events_stream_id_gen, ) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 4dc04de35e48..9876ec252f54 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -133,6 +133,15 @@ def get_current_token_for_writer(self, instance_name: str) -> int: """ raise NotImplementedError() + @abc.abstractmethod + def get_minimal_local_current_token(self) -> int: + """Tries to return a minimal current token for the local instance, + i.e. for writers this would be the last successful write. + + If local instance is not a writer (or has written yet) then falls back + to returning the normal "current token". + """ + @abc.abstractmethod def get_next(self) -> AsyncContextManager[int]: """ @@ -312,6 +321,9 @@ def get_current_token(self) -> int: def get_current_token_for_writer(self, instance_name: str) -> int: return self.get_current_token() + def get_minimal_local_current_token(self) -> int: + return self.get_current_token() + class MultiWriterIdGenerator(AbstractStreamIdGenerator): """Generates and tracks stream IDs for a stream with multiple writers. @@ -721,6 +733,12 @@ def get_current_token_for_writer(self, instance_name: str) -> int: pos = max(pos, self._persisted_upto_position) return self._return_factor * pos + def get_minimal_local_current_token(self) -> int: + with self._lock: + return self._return_factor * self._current_positions.get( + self._instance_name, self._persisted_upto_position + ) + def get_positions(self) -> Dict[str, int]: """Get a copy of the current positon map. diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index 012babc35b2a..855eab6ac0d6 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -648,6 +648,19 @@ def _insert(txn: Cursor) -> None: with self.assertRaises(IncorrectDatabaseSetup): self._create_id_generator("first") + def test_minimal_local_token(self) -> None: + self._insert_rows("first", 3) + self._insert_rows("second", 4) + + first_id_gen = self._create_id_generator("first", writers=["first", "second"]) + second_id_gen = self._create_id_generator("second", writers=["first", "second"]) + + self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7}) + self.assertEqual(first_id_gen.get_minimal_local_current_token(), 3) + + self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7}) + self.assertEqual(second_id_gen.get_minimal_local_current_token(), 7) + class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase): """Tests MultiWriterIdGenerator that produce *negative* stream IDs.""" From 79fb6fe9e7b88828e5cbd9b42198afe3053eb8dd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Oct 2023 09:41:10 +0100 Subject: [PATCH 05/10] Return maximal current token --- synapse/storage/util/id_generators.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 9876ec252f54..46bb753bb015 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -726,11 +726,29 @@ def get_current_token_for_writer(self, instance_name: str) -> int: instance_name, self._persisted_upto_position ) - # We also ensure that we always return at least the - # `persisted_upto_position` for ourselves, so that when we notify - # other workers about our position we give them the max valid value - # here so that nothing waits for us to advance. + max_pos = max( + self._current_positions.values(), default=self._persisted_upto_position + ) + + # We want to return the maximum "current token" that we can for a + # writer, this helps ensure that streams progress as fast as + # possible. pos = max(pos, self._persisted_upto_position) + + if ( + self._instance_name == instance_name + and self._in_flight_fetches + and self._unfinished_ids + ): + # For our own instance when there's nothing in flight, it's safe + # to advance to the maximum persisted position we've seen (as we + # know that any new tokens we request will be greater). + max_pos = max( + self._current_positions.values(), + default=self._persisted_upto_position, + ) + pos = max(pos, max_pos) + return self._return_factor * pos def get_minimal_local_current_token(self) -> int: From 56e7fb3d82cb3526d26e621754a053da8f28afc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Oct 2023 10:39:40 +0100 Subject: [PATCH 06/10] Tidy up get current token for writer --- synapse/storage/util/id_generators.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 46bb753bb015..7911d1841f96 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -726,10 +726,6 @@ def get_current_token_for_writer(self, instance_name: str) -> int: instance_name, self._persisted_upto_position ) - max_pos = max( - self._current_positions.values(), default=self._persisted_upto_position - ) - # We want to return the maximum "current token" that we can for a # writer, this helps ensure that streams progress as fast as # possible. @@ -737,17 +733,17 @@ def get_current_token_for_writer(self, instance_name: str) -> int: if ( self._instance_name == instance_name - and self._in_flight_fetches - and self._unfinished_ids + and not self._in_flight_fetches + and not self._unfinished_ids ): # For our own instance when there's nothing in flight, it's safe # to advance to the maximum persisted position we've seen (as we # know that any new tokens we request will be greater). - max_pos = max( + max_pos_of_all_writers = max( self._current_positions.values(), default=self._persisted_upto_position, ) - pos = max(pos, max_pos) + pos = max(pos, max_pos_of_all_writers) return self._return_factor * pos From 8f4390346e527aa7e1060d245103e77be2dd7a30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Oct 2023 12:48:21 +0100 Subject: [PATCH 07/10] Ensure stream position never goes backwards, and add test --- synapse/storage/util/id_generators.py | 43 ++++++++++++++-------- tests/storage/test_id_generators.py | 51 +++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 14 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 7911d1841f96..043e35ee5e2b 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -420,6 +420,11 @@ def __init__( # The maximum stream ID that we have seen been allocated across any writer. self._max_seen_allocated_stream_id = 1 + # The maximum position of the local instance. This can be higher than + # the corresponding position in `current_positions` table when there are + # no active writes in progress. + self._max_position_of_local_instance = self._max_seen_allocated_stream_id + self._sequence_gen = PostgresSequenceGenerator(sequence_name) # We check that the table and sequence haven't diverged. @@ -439,6 +444,16 @@ def __init__( self._current_positions.values(), default=1 ) + # For the case where `stream_positions` is not up to date, + # `_persisted_upto_position` may be higher. + self._max_seen_allocated_stream_id = max( + self._max_seen_allocated_stream_id, self._persisted_upto_position + ) + + # Bump our local maximum position now that we've loaded things from the + # DB. + self._max_position_of_local_instance = self._max_seen_allocated_stream_id + if not writers: # If there have been no explicit writers given then any instance can # write to the stream. In which case, let's pre-seed our own @@ -708,6 +723,7 @@ def _mark_id_as_finished(self, next_id: int) -> None: if new_cur: curr = self._current_positions.get(self._instance_name, 0) self._current_positions[self._instance_name] = max(curr, new_cur) + self._max_position_of_local_instance = max(curr, new_cur) self._add_persisted_position(next_id) @@ -722,6 +738,9 @@ def get_current_token_for_writer(self, instance_name: str) -> int: # persisted up to position. This stops Synapse from doing a full table # scan when a new writer announces itself over replication. with self._lock: + if self._instance_name == instance_name: + return self._return_factor * self._max_position_of_local_instance + pos = self._current_positions.get( instance_name, self._persisted_upto_position ) @@ -731,20 +750,6 @@ def get_current_token_for_writer(self, instance_name: str) -> int: # possible. pos = max(pos, self._persisted_upto_position) - if ( - self._instance_name == instance_name - and not self._in_flight_fetches - and not self._unfinished_ids - ): - # For our own instance when there's nothing in flight, it's safe - # to advance to the maximum persisted position we've seen (as we - # know that any new tokens we request will be greater). - max_pos_of_all_writers = max( - self._current_positions.values(), - default=self._persisted_upto_position, - ) - pos = max(pos, max_pos_of_all_writers) - return self._return_factor * pos def get_minimal_local_current_token(self) -> int: @@ -821,6 +826,16 @@ def _add_persisted_position(self, new_id: int) -> None: self._persisted_upto_position = max(min_curr, self._persisted_upto_position) + # Advance our local max position. + self._max_position_of_local_instance = max( + self._max_position_of_local_instance, self._persisted_upto_position + ) + + if not self._unfinished_ids and not self._in_flight_fetches: + # If we don't have anything in flight, it's safe to advance to the + # max seen stream ID. + self._max_position_of_local_instance = self._max_seen_allocated_stream_id + # We now iterate through the seen positions, discarding those that are # less than the current min positions, and incrementing the min position # if its exactly one greater. diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index 855eab6ac0d6..e35f13247686 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -661,6 +661,57 @@ def test_minimal_local_token(self) -> None: self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7}) self.assertEqual(second_id_gen.get_minimal_local_current_token(), 7) + def test_current_token_gap(self) -> None: + """Test that getting the current token for a writer returns the maximal + token when there are no writes. + """ + self._insert_rows("first", 3) + self._insert_rows("second", 4) + + first_id_gen = self._create_id_generator( + "first", writers=["first", "second", "third"] + ) + second_id_gen = self._create_id_generator( + "second", writers=["first", "second", "third"] + ) + + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 7) + self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7) + self.assertEqual(second_id_gen.get_current_token(), 7) + + # Check that the first ID gen advancing causes the second ID gen to + # advance (as it has nothing in flight). + + async def _get_next_async() -> None: + async with first_id_gen.get_next_mult(2): + pass + + self.get_success(_get_next_async()) + second_id_gen.advance("first", 9) + + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 9) + self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 9) + self.assertEqual(second_id_gen.get_current_token(), 7) + + # Check that the first ID gen advancing doesn't advance the second ID + # gen when it has stuff in flight. + self.get_success(_get_next_async()) + + ctxmgr = second_id_gen.get_next() + self.get_success(ctxmgr.__aenter__()) + + second_id_gen.advance("first", 11) + + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 11) + self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 9) + self.assertEqual(second_id_gen.get_current_token(), 7) + + self.get_success(ctxmgr.__aexit__(None, None, None)) + + self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 11) + self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 12) + self.assertEqual(second_id_gen.get_current_token(), 7) + class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase): """Tests MultiWriterIdGenerator that produce *negative* stream IDs.""" From 2d2a38399ffba2d2c226e15994abc901a294565b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Oct 2023 13:12:38 +0100 Subject: [PATCH 08/10] Make it obvious that _max_position_of_local_instance can't got backwards --- synapse/storage/util/id_generators.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 043e35ee5e2b..9c3eafb562a5 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -723,7 +723,9 @@ def _mark_id_as_finished(self, next_id: int) -> None: if new_cur: curr = self._current_positions.get(self._instance_name, 0) self._current_positions[self._instance_name] = max(curr, new_cur) - self._max_position_of_local_instance = max(curr, new_cur) + self._max_position_of_local_instance = max( + curr, new_cur, self._max_position_of_local_instance + ) self._add_persisted_position(next_id) @@ -834,7 +836,9 @@ def _add_persisted_position(self, new_id: int) -> None: if not self._unfinished_ids and not self._in_flight_fetches: # If we don't have anything in flight, it's safe to advance to the # max seen stream ID. - self._max_position_of_local_instance = self._max_seen_allocated_stream_id + self._max_position_of_local_instance = max( + self._max_seen_allocated_stream_id, self._max_position_of_local_instance + ) # We now iterate through the seen positions, discarding those that are # less than the current min positions, and incrementing the min position From 3d6ea22df675b18b0d7c920b3e7b306a9b764db5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Oct 2023 13:32:02 +0100 Subject: [PATCH 09/10] Fix ambiguous comment --- tests/storage/test_id_generators.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index e35f13247686..fd53b0644c12 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -680,7 +680,7 @@ def test_current_token_gap(self) -> None: self.assertEqual(second_id_gen.get_current_token(), 7) # Check that the first ID gen advancing causes the second ID gen to - # advance (as it has nothing in flight). + # advance (as the second ID gen has nothing in flight). async def _get_next_async() -> None: async with first_id_gen.get_next_mult(2): @@ -694,7 +694,7 @@ async def _get_next_async() -> None: self.assertEqual(second_id_gen.get_current_token(), 7) # Check that the first ID gen advancing doesn't advance the second ID - # gen when it has stuff in flight. + # gen when the second ID gen has stuff in flight. self.get_success(_get_next_async()) ctxmgr = second_id_gen.get_next() From 2ac56c72a366d66dd18ba48568dbba156330913c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Oct 2023 14:36:23 +0100 Subject: [PATCH 10/10] Update docs about current position --- docs/development/synapse_architecture/streams.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/development/synapse_architecture/streams.md b/docs/development/synapse_architecture/streams.md index bee0b8a8c0a2..67d92acfa1fc 100644 --- a/docs/development/synapse_architecture/streams.md +++ b/docs/development/synapse_architecture/streams.md @@ -51,17 +51,24 @@ will be inserted with that ID. For any given stream reader (including writers themselves), we may define a per-writer current stream ID: -> The current stream ID _for a writer W_ is the largest stream ID such that +> A current stream ID _for a writer W_ is the largest stream ID such that > all transactions added by W with equal or smaller ID have completed. Similarly, there is a "linear" notion of current stream ID: -> The "linear" current stream ID is the largest stream ID such that +> A "linear" current stream ID is the largest stream ID such that > all facts (added by any writer) with equal or smaller ID have completed. Because different stream readers A and B learn about new facts at different times, A and B may disagree about current stream IDs. Put differently: we should think of stream readers as being independent of each other, proceeding through a stream of facts at different rates. +The above definition does not give a unique current stream ID, in fact there can +be a range of current stream IDs. Synapse uses both the minimum and maximum IDs +for different purposes. Most often the maximum is used, as its generally +beneficial for workers to advance their IDs as soon as possible. However, the +minimum is used in situations where e.g. another worker is going to wait until +the stream advances past a position. + **NB.** For both senses of "current", that if a writer opens a transaction that never completes, the current stream ID will never advance beyond that writer's last written stream ID. For single-writer streams, the per-writer current ID and the linear current ID are the same. @@ -114,7 +121,7 @@ Writers need to track: - track their current position (i.e. its own per-writer stream ID). - their facts currently awaiting completion. -At startup, +At startup, - the current position of that writer can be found by querying the database (which suggests that facts need to be written to the database atomically, in a transaction); and - there are no facts awaiting completion.