-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add regression test for matrix-org/synapse#16463 #681
Changes from 3 commits
417d15e
8627008
35b7bd7
6c917fb
c50964d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,9 @@ import ( | |
"github.com/matrix-org/complement/helpers" | ||
"github.com/matrix-org/complement/internal/federation" | ||
"github.com/matrix-org/complement/runtime" | ||
"github.com/matrix-org/gomatrixserverlib" | ||
"github.com/matrix-org/gomatrixserverlib/fclient" | ||
"github.com/matrix-org/util" | ||
) | ||
|
||
// Observes "first bug" from https://github.com/matrix-org/dendrite/pull/1394#issuecomment-687056673 | ||
|
@@ -374,6 +377,191 @@ func TestSync(t *testing.T) { | |
}) | ||
} | ||
|
||
// This is a regression test for | ||
// https://github.com/matrix-org/synapse/issues/16463 | ||
// | ||
// We test this by having a local user (alice) and remote user (charlie) in a | ||
// room. Charlie sends 50+ messages into the room without sending to Alice's | ||
// server. Charlie then sends one more which get sent to Alice. | ||
// | ||
// Alice should observe that she receives some (though not all) of charlie's | ||
// events, with the `limited` flag set. | ||
func TestSyncTimelineGap(t *testing.T) { | ||
runtime.SkipIf(t, runtime.Dendrite) | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
deployment := complement.Deploy(t, 1) | ||
defer deployment.Destroy(t) | ||
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) | ||
|
||
srv := federation.NewServer(t, deployment, | ||
federation.HandleKeyRequests(), | ||
federation.HandleTransactionRequests(nil, nil), | ||
) | ||
cancel := srv.Listen() | ||
defer cancel() | ||
|
||
charlie := srv.UserID("charlie") | ||
|
||
roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) | ||
room := srv.MustJoinRoom(t, deployment, "hs1", roomID, charlie) | ||
|
||
filterID := createFilter(t, alice, map[string]interface{}{ | ||
"room": map[string]interface{}{ | ||
"timeline": map[string]interface{}{ | ||
"limit": 20, | ||
}, | ||
}, | ||
}) | ||
_, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID}) | ||
t.Logf("Next batch %s", nextBatch) | ||
|
||
alice.SendEventSynced(t, roomID, b.Event{ | ||
Type: "m.room.message", | ||
Sender: alice.UserID, | ||
Content: map[string]interface{}{ | ||
"body": "Hi from Alice!", | ||
"msgtype": "m.text", | ||
}, | ||
}) | ||
|
||
// Create 50 messages, but don't send them to Alice | ||
var missingEvents []gomatrixserverlib.PDU | ||
for i := 0; i < 50; i++ { | ||
event := srv.MustCreateEvent(t, room, federation.Event{ | ||
Type: "m.room.message", | ||
Sender: charlie, | ||
Content: map[string]interface{}{ | ||
"body": "Remote message", | ||
"msgtype": "m.text", | ||
}, | ||
}) | ||
room.AddEvent(event) | ||
missingEvents = append(missingEvents, event) | ||
} | ||
|
||
// Create one more event that we will send to Alice, which references the | ||
// previous 50. | ||
lastEvent := srv.MustCreateEvent(t, room, federation.Event{ | ||
Type: "m.room.message", | ||
Sender: charlie, | ||
Content: map[string]interface{}{ | ||
"body": "End", | ||
"msgtype": "m.text", | ||
}, | ||
}) | ||
room.AddEvent(lastEvent) | ||
|
||
// Alice's HS will try and fill in the gap, so we need to respond to those | ||
// requests. | ||
respondToGetMissingEventsEndpoints(t, srv, room, missingEvents) | ||
|
||
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lastEvent.JSON()}, nil) | ||
|
||
// We now test two different modes of /sync work. The first is when we are | ||
// syncing when the server receives the `lastEvent` (and so, at least | ||
// Synapse, will start sending down some events immediately). In this mode | ||
// we may see alice's message, but charlie's messages should set the limited | ||
// flag. | ||
// | ||
// The second mode is when we incremental sync *after* all the events have | ||
// finished being persisted, and so we get only charlie's messages. | ||
t.Run("incremental", func(t *testing.T) { | ||
timelineSequence := make([]gjson.Result, 0) | ||
|
||
t.Logf("Doing incremental syncs from %s", nextBatch) | ||
|
||
// This just reads all timeline batches into `timelineSequence` until we see `lastEvent` come down | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
alice.MustSyncUntil(t, client.SyncReq{Since: nextBatch, Filter: filterID}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { | ||
t.Logf("next batch %s", topLevelSyncJSON.Get("next_batch").Str) | ||
|
||
roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".timeline") | ||
if !roomResult.Exists() { | ||
return fmt.Errorf("No entry for room (%s)", roomID) | ||
} | ||
|
||
timelineSequence = append(timelineSequence, roomResult) | ||
|
||
events := roomResult.Get("events") | ||
if !events.Exists() || !events.IsArray() { | ||
return fmt.Errorf("Invalid events entry (%s)", roomResult.Raw) | ||
} | ||
|
||
foundLastEvent := false | ||
for _, ev := range events.Array() { | ||
if ev.Get("event_id").Str == lastEvent.EventID() { | ||
foundLastEvent = true | ||
} | ||
} | ||
|
||
if !foundLastEvent { | ||
return fmt.Errorf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw) | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
t.Logf("Got timeline sequence: %s", timelineSequence) | ||
|
||
// Check that we only see Alice's message from before the gap *before* | ||
// we seen any limited batches, and vice versa for Charlie's messages. | ||
Comment on lines
+504
to
+505
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not following this comment. Is "seen" supposed to be "see"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it is mean to be "see". There are two valid scenarios: 1) we sync and see Alice's message, then we sync again and see charlie's messages with |
||
limited := false | ||
for _, section := range timelineSequence { | ||
limited = limited || section.Get("limited").Bool() | ||
events := section.Get("events").Array() | ||
for _, ev := range events { | ||
if limited { | ||
if ev.Get("sender").Str == alice.UserID { | ||
t.Fatalf("Got message from alice after limited flag") | ||
} | ||
} else { | ||
if ev.Get("sender").Str == charlie { | ||
t.Fatalf("Got message from remote without limited flag being set") | ||
} | ||
} | ||
} | ||
} | ||
|
||
if !limited { | ||
t.Fatalf("No timeline batch for the room was limited") | ||
} | ||
}) | ||
|
||
t.Run("full", func(t *testing.T) { | ||
// Wait until we see `lastEvent` come down sync implying that all events have been persisted | ||
// by alice's homeserver. | ||
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, lastEvent.EventID())) | ||
|
||
// Now an incremental sync from before should return a limited batch for | ||
// the room, with just Charlie's messages. | ||
topLevelSyncJSON, _ := alice.MustSync(t, client.SyncReq{Since: nextBatch, Filter: filterID}) | ||
roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID)) | ||
if !roomResult.Exists() { | ||
t.Fatalf("No entry for room (%s)", roomID) | ||
} | ||
|
||
eventsJson := roomResult.Get("timeline.events") | ||
if !eventsJson.Exists() || !eventsJson.IsArray() { | ||
t.Fatalf("Invalid events entry (%s)", roomResult.Raw) | ||
} | ||
|
||
eventsArray := eventsJson.Array() | ||
|
||
if eventsArray[len(eventsArray)-1].Get("event_id").Str != lastEvent.EventID() { | ||
t.Fatalf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw) | ||
} | ||
|
||
if roomResult.Get("timeline.limited").Bool() == false { | ||
t.Fatalf("Timeline batch was not limited (%s)", roomResult.Raw) | ||
} | ||
|
||
for _, ev := range eventsArray { | ||
if ev.Get("sender").Str == alice.UserID { | ||
t.Fatalf("Found an event from alice in batch (%s)", roomResult.Raw) | ||
} | ||
} | ||
}) | ||
|
||
} | ||
|
||
// Test presence from people in 2 different rooms in incremental sync | ||
func TestPresenceSyncDifferentRooms(t *testing.T) { | ||
deployment := complement.Deploy(t, 1) | ||
|
@@ -561,3 +749,105 @@ func usersInPresenceEvents(t *testing.T, presence gjson.Result, users []string) | |
t.Fatalf("expected %d presence events, got %d: %+v", len(users), foundCounter, presenceEvents) | ||
} | ||
} | ||
|
||
func eventIDsFromEvents(he []gomatrixserverlib.PDU) []string { | ||
eventIDs := make([]string, len(he)) | ||
for i := range he { | ||
eventIDs[i] = he[i].EventID() | ||
} | ||
return eventIDs | ||
} | ||
|
||
// Helper method to respond to federation APIs associated with trying to get missing events. | ||
func respondToGetMissingEventsEndpoints(t *testing.T, srv *federation.Server, room *federation.ServerRoom, missingEvents []gomatrixserverlib.PDU) { | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
srv.Mux().HandleFunc( | ||
"/_matrix/federation/v1/state_ids/{roomID}", | ||
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { | ||
t.Logf("Got /state_ids for %s", pathParams["roomID"]) | ||
if pathParams["roomID"] != room.RoomID { | ||
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID) | ||
return util.JSONResponse{ | ||
Code: 400, | ||
JSON: "wrong room", | ||
} | ||
} | ||
|
||
roomState := room.AllCurrentState() | ||
return util.JSONResponse{ | ||
Code: 200, | ||
JSON: map[string]interface{}{ | ||
"pdu_ids": eventIDsFromEvents(roomState), | ||
"auth_chain_ids": eventIDsFromEvents(room.AuthChainForEvents(roomState)), | ||
}, | ||
} | ||
})).Methods("GET") | ||
|
||
srv.Mux().HandleFunc( | ||
"/_matrix/federation/v1/state/{roomID}", | ||
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { | ||
t.Logf("Got /state for %s", pathParams["roomID"]) | ||
if pathParams["roomID"] != room.RoomID { | ||
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID) | ||
return util.JSONResponse{ | ||
Code: 400, | ||
JSON: "wrong room", | ||
} | ||
} | ||
|
||
roomState := room.AllCurrentState() | ||
return util.JSONResponse{ | ||
Code: 200, | ||
JSON: map[string]interface{}{ | ||
"pdus": roomState, | ||
"auth_chain": room.AuthChainForEvents(roomState), | ||
}, | ||
} | ||
})).Methods("GET") | ||
|
||
srv.Mux().HandleFunc( | ||
"/_matrix/federation/v1/event/{eventID}", | ||
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { | ||
t.Logf("Got /event for %s", pathParams["eventID"]) | ||
|
||
for _, ev := range missingEvents { | ||
if ev.EventID() == pathParams["eventID"] { | ||
t.Logf("Returning event %s", pathParams["eventID"]) | ||
return util.JSONResponse{ | ||
Code: 200, | ||
JSON: map[string]interface{}{ | ||
"origin": srv.ServerName(), | ||
"origin_server_ts": 0, | ||
"pdus": []json.RawMessage{ev.JSON()}, | ||
}, | ||
} | ||
} | ||
} | ||
|
||
t.Logf("No event found") | ||
return util.JSONResponse{ | ||
Code: 404, | ||
JSON: map[string]interface{}{}, | ||
} | ||
})).Methods("GET") | ||
|
||
srv.Mux().HandleFunc( | ||
"/_matrix/federation/v1/get_missing_events/{roomID}", | ||
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { | ||
t.Logf("Got /get_missing_events for %s", pathParams["roomID"]) | ||
if pathParams["roomID"] != room.RoomID { | ||
t.Errorf("Received /get_missing_events for the wrong room: %s", room.RoomID) | ||
return util.JSONResponse{ | ||
Code: 400, | ||
JSON: "wrong room", | ||
} | ||
} | ||
|
||
return util.JSONResponse{ | ||
Code: 200, | ||
JSON: map[string]interface{}{ | ||
"events": missingEvents[len(missingEvents)-10:], | ||
}, | ||
} | ||
}), | ||
).Methods("POST") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, do we know this fails on dendrite?
(As I mentioned in #synapse-dev, does this need a spec clarification at all?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err, good Q. I can try removing it and seeing what happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it fails! Which brings me back to a question I asked somewhere else -- does this require spec clarifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, looks like it. The current spec says this about
limited
:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
matrix-org/matrix-spec#1664