Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce replication traffic due to reflected cache stream POSITION #16557

Merged
merged 3 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16557.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.
9 changes: 8 additions & 1 deletion synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import EventsStream
from synapse.replication.tcp.streams._base import StreamRow, Token
from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand Down Expand Up @@ -205,6 +205,13 @@ async def _run_notifier_loop(self) -> None:
# send, so we send a `POSITION` to inform other
# workers of the updated position.

# We skip this for the `caches` stream as a) it
# generates a lot of traffic as every worker would
# echo each write, and b) nothing cares if a given
# worker's caches stream position lags.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that workers don't need to track other workers cache stream positions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is nothing uses the global "persisted upto position" for the caches stream, as everything that uses the caches streams only care about each streams position, not any sort of "combined" position. The reason we care about that for other streams is there is some code that still thinks that all streams can be considered as a single integer (e.g. federation and application sending code).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global "persisted upto position" for the caches stream

Just to refresh my memory -- this is the minimum persisted position across all workers, correct?

everything that uses the caches streams only care about each streams position

Because the caches stream gets blasted to everyone and they just clear their caches but we don't really care about tracking it? Do we even care about each stream's position in it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note I've just pushed a commit that changes things a bit)

global "persisted upto position" for the caches stream

Just to refresh my memory -- this is the minimum persisted position across all workers, correct?

Yup.

everything that uses the caches streams only care about each streams position

Because the caches stream gets blasted to everyone and they just clear their caches but we don't really care about tracking it? Do we even care about each stream's position in it?

There are two usages of the caches stream:

  1. Reliably receiving updates about cache invalidations that happened on other workers. We use the stream position here purely to detect when gaps happened.
  2. We sometimes want to wait for the caches stream to get to a particular position (this where we do a HTTP request and we want to wait for the action to replicate to the requesting worker after receiving a response).

if stream.NAME == CachesStream.NAME:
clokep marked this conversation as resolved.
Show resolved Hide resolved
continue

# Note: `last_token` may not *actually* be the
# last token we sent out in a RDATA or POSITION.
# This can happen if we sent out an RDATA for
Expand Down
Loading