Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add regression test for matrix-org/synapse#16463 #681

Merged
merged 5 commits into from
Oct 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 290 additions & 0 deletions tests/csapi/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/internal/federation"
"github.com/matrix-org/complement/runtime"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/fclient"
"github.com/matrix-org/util"
)

// Observes "first bug" from https://github.com/matrix-org/dendrite/pull/1394#issuecomment-687056673
Expand Down Expand Up @@ -374,6 +377,191 @@ func TestSync(t *testing.T) {
})
}

// This is a regression test for
// https://github.com/matrix-org/synapse/issues/16463
//
// We test this by having a local user (alice) and remote user (charlie) in a
// room. Charlie sends 50+ messages into the room without sending to Alice's
// server. Charlie then sends one more which get sent to Alice.
//
// Alice should observe that she receives some (though not all) of charlie's
// events, with the `limited` flag set.
func TestSyncTimelineGap(t *testing.T) {
runtime.SkipIf(t, runtime.Dendrite)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, do we know this fails on dendrite?

(As I mentioned in #synapse-dev, does this need a spec clarification at all?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, good Q. I can try removing it and seeing what happens

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it fails! Which brings me back to a question I asked somewhere else -- does this require spec clarifications?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, looks like it. The current spec says this about limited:

True if the number of events returned was limited by the limit on the filter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})

srv := federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandleTransactionRequests(nil, nil),
)
cancel := srv.Listen()
defer cancel()

charlie := srv.UserID("charlie")

roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
room := srv.MustJoinRoom(t, deployment, "hs1", roomID, charlie)

filterID := createFilter(t, alice, map[string]interface{}{
"room": map[string]interface{}{
"timeline": map[string]interface{}{
"limit": 20,
},
},
})
_, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID})
t.Logf("Next batch %s", nextBatch)

alice.SendEventSynced(t, roomID, b.Event{
Type: "m.room.message",
Sender: alice.UserID,
Content: map[string]interface{}{
"body": "Hi from Alice!",
"msgtype": "m.text",
},
})

// Create 50 messages, but don't send them to Alice
var missingEvents []gomatrixserverlib.PDU
for i := 0; i < 50; i++ {
event := srv.MustCreateEvent(t, room, federation.Event{
Type: "m.room.message",
Sender: charlie,
Content: map[string]interface{}{
"body": "Remote message",
"msgtype": "m.text",
},
})
room.AddEvent(event)
missingEvents = append(missingEvents, event)
}

// Create one more event that we will send to Alice, which references the
// previous 50.
lastEvent := srv.MustCreateEvent(t, room, federation.Event{
Type: "m.room.message",
Sender: charlie,
Content: map[string]interface{}{
"body": "End",
"msgtype": "m.text",
},
})
room.AddEvent(lastEvent)

// Alice's HS will try and fill in the gap, so we need to respond to those
// requests.
respondToGetMissingEventsEndpoints(t, srv, room, missingEvents)

srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lastEvent.JSON()}, nil)

// We now test two different modes of /sync work. The first is when we are
// syncing when the server receives the `lastEvent` (and so, at least
// Synapse, will start sending down some events immediately). In this mode
// we may see alice's message, but charlie's messages should set the limited
// flag.
//
// The second mode is when we incremental sync *after* all the events have
// finished being persisted, and so we get only charlie's messages.
t.Run("incremental", func(t *testing.T) {
timelineSequence := make([]gjson.Result, 0)

t.Logf("Doing incremental syncs from %s", nextBatch)

// This just reads all timeline batches into `timelineSequence` until we see `lastEvent` come down
clokep marked this conversation as resolved.
Show resolved Hide resolved
alice.MustSyncUntil(t, client.SyncReq{Since: nextBatch, Filter: filterID}, func(clientUserID string, topLevelSyncJSON gjson.Result) error {
t.Logf("next batch %s", topLevelSyncJSON.Get("next_batch").Str)

roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".timeline")
if !roomResult.Exists() {
return fmt.Errorf("No entry for room (%s)", roomID)
}

timelineSequence = append(timelineSequence, roomResult)

events := roomResult.Get("events")
if !events.Exists() || !events.IsArray() {
return fmt.Errorf("Invalid events entry (%s)", roomResult.Raw)
}

foundLastEvent := false
for _, ev := range events.Array() {
if ev.Get("event_id").Str == lastEvent.EventID() {
foundLastEvent = true
}
}

if !foundLastEvent {
return fmt.Errorf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw)
}

return nil
})

t.Logf("Got timeline sequence: %s", timelineSequence)

// Check that we only see Alice's message from before the gap *before*
// we seen any limited batches, and vice versa for Charlie's messages.
Comment on lines +504 to +505
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following this comment. Is "seen" supposed to be "see"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is mean to be "see". There are two valid scenarios: 1) we sync and see Alice's message, then we sync again and see charlie's messages with limited set to true, or 2) we just see charlie's messages with limited set to true

limited := false
for _, section := range timelineSequence {
limited = limited || section.Get("limited").Bool()
events := section.Get("events").Array()
for _, ev := range events {
if limited {
if ev.Get("sender").Str == alice.UserID {
t.Fatalf("Got message from alice after limited flag")
}
} else {
if ev.Get("sender").Str == charlie {
t.Fatalf("Got message from remote without limited flag being set")
}
}
}
}

if !limited {
t.Fatalf("No timeline batch for the room was limited")
}
})

t.Run("full", func(t *testing.T) {
// Wait until we see `lastEvent` come down sync implying that all events have been persisted
// by alice's homeserver.
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, lastEvent.EventID()))

// Now an incremental sync from before should return a limited batch for
// the room, with just Charlie's messages.
topLevelSyncJSON, _ := alice.MustSync(t, client.SyncReq{Since: nextBatch, Filter: filterID})
roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID))
if !roomResult.Exists() {
t.Fatalf("No entry for room (%s)", roomID)
}

eventsJson := roomResult.Get("timeline.events")
if !eventsJson.Exists() || !eventsJson.IsArray() {
t.Fatalf("Invalid events entry (%s)", roomResult.Raw)
}

eventsArray := eventsJson.Array()

if eventsArray[len(eventsArray)-1].Get("event_id").Str != lastEvent.EventID() {
t.Fatalf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw)
}

if roomResult.Get("timeline.limited").Bool() == false {
t.Fatalf("Timeline batch was not limited (%s)", roomResult.Raw)
}

for _, ev := range eventsArray {
if ev.Get("sender").Str == alice.UserID {
t.Fatalf("Found an event from alice in batch (%s)", roomResult.Raw)
}
}
})

}

// Test presence from people in 2 different rooms in incremental sync
func TestPresenceSyncDifferentRooms(t *testing.T) {
deployment := complement.Deploy(t, 1)
Expand Down Expand Up @@ -561,3 +749,105 @@ func usersInPresenceEvents(t *testing.T, presence gjson.Result, users []string)
t.Fatalf("expected %d presence events, got %d: %+v", len(users), foundCounter, presenceEvents)
}
}

func eventIDsFromEvents(he []gomatrixserverlib.PDU) []string {
eventIDs := make([]string, len(he))
for i := range he {
eventIDs[i] = he[i].EventID()
}
return eventIDs
}

// Helper method to respond to federation APIs associated with trying to get missing events.
func respondToGetMissingEventsEndpoints(t *testing.T, srv *federation.Server, room *federation.ServerRoom, missingEvents []gomatrixserverlib.PDU) {
clokep marked this conversation as resolved.
Show resolved Hide resolved
srv.Mux().HandleFunc(
"/_matrix/federation/v1/state_ids/{roomID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /state_ids for %s", pathParams["roomID"])
if pathParams["roomID"] != room.RoomID {
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID)
return util.JSONResponse{
Code: 400,
JSON: "wrong room",
}
}

roomState := room.AllCurrentState()
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"pdu_ids": eventIDsFromEvents(roomState),
"auth_chain_ids": eventIDsFromEvents(room.AuthChainForEvents(roomState)),
},
}
})).Methods("GET")

srv.Mux().HandleFunc(
"/_matrix/federation/v1/state/{roomID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /state for %s", pathParams["roomID"])
if pathParams["roomID"] != room.RoomID {
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID)
return util.JSONResponse{
Code: 400,
JSON: "wrong room",
}
}

roomState := room.AllCurrentState()
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"pdus": roomState,
"auth_chain": room.AuthChainForEvents(roomState),
},
}
})).Methods("GET")

srv.Mux().HandleFunc(
"/_matrix/federation/v1/event/{eventID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /event for %s", pathParams["eventID"])

for _, ev := range missingEvents {
if ev.EventID() == pathParams["eventID"] {
t.Logf("Returning event %s", pathParams["eventID"])
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"origin": srv.ServerName(),
"origin_server_ts": 0,
"pdus": []json.RawMessage{ev.JSON()},
},
}
}
}

t.Logf("No event found")
return util.JSONResponse{
Code: 404,
JSON: map[string]interface{}{},
}
})).Methods("GET")

srv.Mux().HandleFunc(
"/_matrix/federation/v1/get_missing_events/{roomID}",
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
t.Logf("Got /get_missing_events for %s", pathParams["roomID"])
if pathParams["roomID"] != room.RoomID {
t.Errorf("Received /get_missing_events for the wrong room: %s", room.RoomID)
return util.JSONResponse{
Code: 400,
JSON: "wrong room",
}
}

return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{
"events": missingEvents[len(missingEvents)-10:],
},
}
}),
).Methods("POST")
}
Loading