From 417d15ee955162c8dbee59902eb10914d1138c9b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Oct 2023 15:27:23 +0100 Subject: [PATCH 1/5] Add regression test for matrix-org/synpase#16463 --- tests/csapi/sync_test.go | 287 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 287 insertions(+) diff --git a/tests/csapi/sync_test.go b/tests/csapi/sync_test.go index 5b0ffb18..79e4dd2d 100644 --- a/tests/csapi/sync_test.go +++ b/tests/csapi/sync_test.go @@ -14,6 +14,9 @@ import ( "github.com/matrix-org/complement/helpers" "github.com/matrix-org/complement/internal/federation" "github.com/matrix-org/complement/runtime" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/fclient" + "github.com/matrix-org/util" ) // Observes "first bug" from https://github.com/matrix-org/dendrite/pull/1394#issuecomment-687056673 @@ -374,6 +377,188 @@ func TestSync(t *testing.T) { }) } +// This is a regression test for +// https://github.com/matrix-org/synapse/issues/16463 +// +// We test this by having a local user (alice) and remote user (charlie) in a +// room. Charlie sends 50+ messages into the room without sending to Alice's +// server. Charlie then sends one more which get sent to Alice. +// +// Alice should observer that she receives some (though not all) of charlie's +// events, with the `limited` flag set. +func TestSyncTimelineGap(t *testing.T) { + runtime.SkipIf(t, runtime.Dendrite) + deployment := complement.Deploy(t, 1) + defer deployment.Destroy(t) + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) + + srv := federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + federation.HandleTransactionRequests(nil, nil), + ) + cancel := srv.Listen() + defer cancel() + + charlie := srv.UserID("charlie") + + roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + room := srv.MustJoinRoom(t, deployment, "hs1", roomID, charlie) + + filterID := createFilter(t, alice, map[string]interface{}{ + "room": map[string]interface{}{ + "timeline": map[string]interface{}{ + "limit": 20, + }, + }, + }) + _, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID}) + t.Logf("Next batch %s", nextBatch) + + alice.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Sender: alice.UserID, + Content: map[string]interface{}{ + "body": "Hi from Alice!", + "msgtype": "m.text", + }, + }) + + // Create 50 messages, but don't send them to Alice + var missingEvents []gomatrixserverlib.PDU + for i := 0; i < 50; i++ { + event := srv.MustCreateEvent(t, room, federation.Event{ + Type: "m.room.message", + Sender: charlie, + Content: map[string]interface{}{ + "body": "Remote message", + "msgtype": "m.text", + }, + }) + room.AddEvent(event) + missingEvents = append(missingEvents, event) + } + + // Create one more event that we will send to Alice, which references the + // previous 50. + lastEvent := srv.MustCreateEvent(t, room, federation.Event{ + Type: "m.room.message", + Sender: charlie, + Content: map[string]interface{}{ + "body": "End", + "msgtype": "m.text", + }, + }) + room.AddEvent(lastEvent) + + // Alice's HS will try and fill in the gap, so we need to respond to those + // requests. + respondToGetMissingEventsEndpoints(t, srv, room, missingEvents) + + srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lastEvent.JSON()}, nil) + + // We now test two different modes of /sync work. The first is when we are + // syncing when the server receives the `lastEvent` (and so, at least + // Synapse, will start sending down some events immediately). + // + // The second mode is when we sync *after* all the events have finished + // being persisted, and so we get everything in one chunk. + t.Run("incremental", func(t *testing.T) { + timelineSequence := make([]gjson.Result, 0) + + t.Logf("Doing incremental syncs from %s", nextBatch) + + // This just reads all timeline batches into `timelineSequence` until we see `lastEvent` come down + alice.MustSyncUntil(t, client.SyncReq{Since: nextBatch, Filter: filterID}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { + t.Logf("next batch %s", topLevelSyncJSON.Get("next_batch").Str) + + roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".timeline") + if !roomResult.Exists() { + return fmt.Errorf("No entry for room (%s)", roomID) + } + + timelineSequence = append(timelineSequence, roomResult) + + events := roomResult.Get("events") + if !events.Exists() || !events.IsArray() { + return fmt.Errorf("Invalid events entry (%s)", roomResult.Raw) + } + + foundLastEvent := false + for _, ev := range events.Array() { + if ev.Get("event_id").Str == lastEvent.EventID() { + foundLastEvent = true + } + } + + if !foundLastEvent { + return fmt.Errorf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw) + } + + return nil + }) + + t.Logf("Got timeline sequence: %s", timelineSequence) + + // Check that we only see Alice's message from before the gap *before* + // we seen any limited batches, and vice versa for Charlie's messages. + limited := false + for _, section := range timelineSequence { + limited = limited || section.Get("limited").Bool() + events := section.Get("events").Array() + for _, ev := range events { + if limited { + if ev.Get("Sender").Str == alice.UserID { + t.Fatalf("Got message from alice after limited flag") + } + } else { + if ev.Get("Sender").Str == charlie { + t.Fatalf("Got message from remote without limited flag being set") + } + } + } + } + + if !limited { + t.Fatalf("No timeline batch for the room was limited") + } + }) + + t.Run("full", func(t *testing.T) { + // Wait until we see `lastEvent` come down sync + alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, lastEvent.EventID())) + + // Now an incremental sync from before should return a limited batch for + // the room, with just Charlie's messages. + topLevelSyncJSON, _ := alice.MustSync(t, client.SyncReq{Since: nextBatch, Filter: filterID}) + roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID)) + if !roomResult.Exists() { + t.Fatalf("No entry for room (%s)", roomID) + } + + eventsJson := roomResult.Get("timeline.events") + if !eventsJson.Exists() || !eventsJson.IsArray() { + t.Fatalf("Invalid events entry (%s)", roomResult.Raw) + } + + eventsArray := eventsJson.Array() + + if eventsArray[len(eventsArray)-1].Get("event_id").Str != lastEvent.EventID() { + t.Fatalf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw) + } + + if roomResult.Get("timeline.limited").Bool() == false { + t.Fatalf("Timeline batch was not limited (%s)", roomResult.Raw) + } + + for _, ev := range eventsArray { + if ev.Get("sender").Str == alice.UserID { + t.Fatalf("Found an event from alice in batch (%s)", roomResult.Raw) + } + } + }) + +} + // Test presence from people in 2 different rooms in incremental sync func TestPresenceSyncDifferentRooms(t *testing.T) { deployment := complement.Deploy(t, 1) @@ -561,3 +746,105 @@ func usersInPresenceEvents(t *testing.T, presence gjson.Result, users []string) t.Fatalf("expected %d presence events, got %d: %+v", len(users), foundCounter, presenceEvents) } } + +func eventIDsFromEvents(he []gomatrixserverlib.PDU) []string { + eventIDs := make([]string, len(he)) + for i := range he { + eventIDs[i] = he[i].EventID() + } + return eventIDs +} + +// Helper method to respond to federation APIs associated with trying to get missing events. +func respondToGetMissingEventsEndpoints(t *testing.T, srv *federation.Server, room *federation.ServerRoom, missingEvents []gomatrixserverlib.PDU) { + srv.Mux().HandleFunc( + "/_matrix/federation/v1/state_ids/{roomID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /state_ids for %s", pathParams["roomID"]) + if pathParams["roomID"] != room.RoomID { + t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID) + return util.JSONResponse{ + Code: 400, + JSON: "wrong room", + } + } + + roomState := room.AllCurrentState() + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "pdu_ids": eventIDsFromEvents(roomState), + "auth_chain_ids": eventIDsFromEvents(room.AuthChainForEvents(roomState)), + }, + } + })).Methods("GET") + + srv.Mux().HandleFunc( + "/_matrix/federation/v1/state/{roomID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /state for %s", pathParams["roomID"]) + if pathParams["roomID"] != room.RoomID { + t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID) + return util.JSONResponse{ + Code: 400, + JSON: "wrong room", + } + } + + roomState := room.AllCurrentState() + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "pdus": roomState, + "auth_chain": room.AuthChainForEvents(roomState), + }, + } + })).Methods("GET") + + srv.Mux().HandleFunc( + "/_matrix/federation/v1/event/{eventID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /event for %s", pathParams["eventID"]) + + for _, ev := range missingEvents { + if ev.EventID() == pathParams["eventID"] { + t.Logf("Returning event %s", pathParams["eventID"]) + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "origin": srv.ServerName(), + "origin_server_ts": 0, + "pdus": []json.RawMessage{ev.JSON()}, + }, + } + } + } + + t.Logf("No event found") + return util.JSONResponse{ + Code: 404, + JSON: map[string]interface{}{}, + } + })).Methods("GET") + + srv.Mux().HandleFunc( + "/_matrix/federation/v1/get_missing_events/{roomID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /get_missing_events for %s", pathParams["roomID"]) + if pathParams["roomID"] != room.RoomID { + t.Errorf("Received /get_missing_events for the wrong room: %s", room.RoomID) + return util.JSONResponse{ + Code: 400, + JSON: "wrong room", + } + } + + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "events": missingEvents[len(missingEvents)-10:], + }, + } + }), + ).Methods("POST") +} From 86270085b0414bce4f3be9137d435b3490df91c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Oct 2023 15:29:51 +0100 Subject: [PATCH 2/5] Fix typo --- tests/csapi/sync_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/csapi/sync_test.go b/tests/csapi/sync_test.go index 79e4dd2d..5099aff7 100644 --- a/tests/csapi/sync_test.go +++ b/tests/csapi/sync_test.go @@ -507,11 +507,11 @@ func TestSyncTimelineGap(t *testing.T) { events := section.Get("events").Array() for _, ev := range events { if limited { - if ev.Get("Sender").Str == alice.UserID { + if ev.Get("sender").Str == alice.UserID { t.Fatalf("Got message from alice after limited flag") } } else { - if ev.Get("Sender").Str == charlie { + if ev.Get("sender").Str == charlie { t.Fatalf("Got message from remote without limited flag being set") } } From 35b7bd722b60ec482e3ad0a84fe02eb5a17627bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Oct 2023 15:38:22 +0100 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Patrick Cloke --- tests/csapi/sync_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/csapi/sync_test.go b/tests/csapi/sync_test.go index 5099aff7..dc6a2d3d 100644 --- a/tests/csapi/sync_test.go +++ b/tests/csapi/sync_test.go @@ -384,7 +384,7 @@ func TestSync(t *testing.T) { // room. Charlie sends 50+ messages into the room without sending to Alice's // server. Charlie then sends one more which get sent to Alice. // -// Alice should observer that she receives some (though not all) of charlie's +// Alice should observe that she receives some (though not all) of charlie's // events, with the `limited` flag set. func TestSyncTimelineGap(t *testing.T) { runtime.SkipIf(t, runtime.Dendrite) @@ -458,10 +458,12 @@ func TestSyncTimelineGap(t *testing.T) { // We now test two different modes of /sync work. The first is when we are // syncing when the server receives the `lastEvent` (and so, at least - // Synapse, will start sending down some events immediately). + // Synapse, will start sending down some events immediately). In this mode + // we may see alice's message, but charlie's messages should set the limited + // flag. // - // The second mode is when we sync *after* all the events have finished - // being persisted, and so we get everything in one chunk. + // The second mode is when we incremental sync *after* all the events have + // finished being persisted, and so we get only charlie's messages. t.Run("incremental", func(t *testing.T) { timelineSequence := make([]gjson.Result, 0) @@ -524,7 +526,8 @@ func TestSyncTimelineGap(t *testing.T) { }) t.Run("full", func(t *testing.T) { - // Wait until we see `lastEvent` come down sync + // Wait until we see `lastEvent` come down sync implying that all events have been persisted + // by alice's homeserver. alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, lastEvent.EventID())) // Now an incremental sync from before should return a limited batch for From 6c917fb234a5c8b9188a4c0b7b9ccaf3c905f1c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Oct 2023 15:38:58 +0100 Subject: [PATCH 4/5] Try running on dendrite --- tests/csapi/sync_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/csapi/sync_test.go b/tests/csapi/sync_test.go index dc6a2d3d..a6144d5d 100644 --- a/tests/csapi/sync_test.go +++ b/tests/csapi/sync_test.go @@ -387,7 +387,6 @@ func TestSync(t *testing.T) { // Alice should observe that she receives some (though not all) of charlie's // events, with the `limited` flag set. func TestSyncTimelineGap(t *testing.T) { - runtime.SkipIf(t, runtime.Dendrite) deployment := complement.Deploy(t, 1) defer deployment.Destroy(t) alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) From c50964d71ed5cc18f61d4dd1fe608c246d87f39b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Oct 2023 15:47:56 +0100 Subject: [PATCH 5/5] Revert "Try running on dendrite" This reverts commit 6c917fb234a5c8b9188a4c0b7b9ccaf3c905f1c9. --- tests/csapi/sync_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/csapi/sync_test.go b/tests/csapi/sync_test.go index a6144d5d..dc6a2d3d 100644 --- a/tests/csapi/sync_test.go +++ b/tests/csapi/sync_test.go @@ -387,6 +387,7 @@ func TestSync(t *testing.T) { // Alice should observe that she receives some (though not all) of charlie's // events, with the `limited` flag set. func TestSyncTimelineGap(t *testing.T) { + runtime.SkipIf(t, runtime.Dendrite) deployment := complement.Deploy(t, 1) defer deployment.Destroy(t) alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})