Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix bug where a new writer advances their token too quickly #16473

Merged
merged 10 commits into from
Oct 23, 2023
26 changes: 22 additions & 4 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,29 @@ def get_current_token_for_writer(self, instance_name: str) -> int:
instance_name, self._persisted_upto_position
)

# We also ensure that we always return at least the
# `persisted_upto_position` for ourselves, so that when we notify
# other workers about our position we give them the max valid value
# here so that nothing waits for us to advance.
max_pos = max(
self._current_positions.values(), default=self._persisted_upto_position
)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

# We want to return the maximum "current token" that we can for a
# writer, this helps ensure that streams progress as fast as
# possible.
pos = max(pos, self._persisted_upto_position)

if (
self._instance_name == instance_name
and self._in_flight_fetches
and self._unfinished_ids
):
# For our own instance when there's nothing in flight, it's safe
# to advance to the maximum persisted position we've seen (as we
# know that any new tokens we request will be greater).
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
max_pos = max(
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
self._current_positions.values(),
default=self._persisted_upto_position,
)
pos = max(pos, max_pos)

return self._return_factor * pos
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

def get_minimal_local_current_token(self) -> int:
Expand Down
Loading