diff --git a/clientapi/routing/data_retention.go b/clientapi/routing/data_retention.go new file mode 100644 index 0000000000..c46fc46cca --- /dev/null +++ b/clientapi/routing/data_retention.go @@ -0,0 +1,103 @@ +package routing + +import ( + "context" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/roomserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +type DataRetentionRequest struct { + DataRetention api.PerformDataRetentionRequest `json:"data_retention"` +} + +// Triggred by an application service job. +// Purges stale data according to data retention policy provided in the request body. +// For large spaces with many rooms this operation may take a considerable amount of time. +func PostDataRetention( + req *http.Request, + cfg *config.ClientAPI, + rsAPI roomserverAPI.ClientRoomserverAPI, +) util.JSONResponse { + var body DataRetentionRequest + if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { + return *reqErr + } + dr := body.DataRetention + + if dr.MaxAge <= 0 || dr.SpaceID == "" { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.BadJSON("missing max_age or space_id"), + } + } + + // Validate the roomID + validRoomID, err := spec.NewRoomID(dr.SpaceID) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.InvalidParam("space_id is invalid"), + } + } + + queryReq := api.QueryRoomsUnderSpaceRequest{ + SpaceID: validRoomID.String(), + } + + var queryRes api.QueryRoomsUnderSpaceResponse + if queryErr := rsAPI.QueryRoomsUnderSpace(req.Context(), &queryReq, &queryRes); queryErr != nil { + util.GetLogger(req.Context()).WithError(queryErr).Error("rsAPI.QueryRoomsUnderSpace failed") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + + if dr.Teams { + logrus.Infof("Performing data retention on teams in space %s", dr.SpaceID) + for _, roomId := range queryRes.Teams { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on team with id %s", roomId) + return util.ErrorResponse(err) + } + } + } + + if dr.Operations { + logrus.Infof("Performing data retention on operations in space %s", dr.SpaceID) + for _, roomId := range queryRes.Operations { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on operation with id %s", roomId) + return util.ErrorResponse(err) + } + } + } + + if dr.Dms { + logrus.Infof("Performing data retention on dms in space %s", dr.SpaceID) + for _, roomId := range queryRes.DMs { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on dm with id %s", roomId) + return util.ErrorResponse(err) + } + } + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index dc587cd556..0749dbe545 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -1547,6 +1547,12 @@ func Setup( }), ).Methods(http.MethodPost, http.MethodOptions) + v3mux.Handle("/rooms/{roomID}/dataRetention", + httputil.MakeAuthAPI("data_retention", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return PostDataRetention(req, cfg, rsAPI) + }), + ).Methods(http.MethodPost, http.MethodOptions) + synapseAdminRouter.Handle("/admin/v1/event_reports", httputil.MakeAdminAPI("admin_report_events", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { from := parseUint64OrDefault(req.URL.Query().Get("from"), 0) diff --git a/federationapi/storage/shared/storage.go b/federationapi/storage/shared/storage.go index 8c73967c6f..1ac920c226 100644 --- a/federationapi/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -389,3 +389,10 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { return nil }) } + +func (d *Database) DataRetentionInRoom(ctx context.Context, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // TODO: Implement this to support federation + return nil + }) +} diff --git a/go.mod b/go.mod index 66bf0d1881..df576e4cfe 100644 --- a/go.mod +++ b/go.mod @@ -55,8 +55,9 @@ require ( modernc.org/sqlite v1.29.5 ) +require github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect + require ( - github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect github.com/RoaringBitmap/roaring v1.2.3 // indirect diff --git a/roomserver/api/api.go b/roomserver/api/api.go index b2b3192447..21a068a03e 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -231,6 +231,8 @@ type ClientRoomserverAPI interface { QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error + //! GlobeKeeper Customization + QueryRoomsUnderSpace(ctx context.Context, req *QueryRoomsUnderSpaceRequest, res *QueryRoomsUnderSpaceResponse) error QueryRoomsForUser(ctx context.Context, userID spec.UserID, desiredMembership string) ([]spec.RoomID, error) QueryStateAfterEvents(ctx context.Context, req *QueryStateAfterEventsRequest, res *QueryStateAfterEventsResponse) error // QueryKnownUsers returns a list of users that we know about from our joined rooms. @@ -247,6 +249,8 @@ type ClientRoomserverAPI interface { PerformAdminEvacuateRoom(ctx context.Context, roomID string) (affected []string, err error) PerformAdminEvacuateUser(ctx context.Context, userID string) (affected []string, err error) PerformAdminPurgeRoom(ctx context.Context, roomID string) error + //! GlobeKeeper Customization + PerformDataRetention(ctx context.Context, dr *PerformDataRetentionRequest, roomID string) error PerformAdminDownloadState(ctx context.Context, roomID, userID string, serverName spec.ServerName) error PerformPeek(ctx context.Context, req *PerformPeekRequest) (roomID string, err error) PerformUnpeek(ctx context.Context, roomID, userID, deviceID string) error diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 852b64206d..227680756a 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -59,6 +59,8 @@ const ( OutputTypeRetirePeek OutputType = "retire_peek" // OutputTypePurgeRoom indicates the event is an OutputPurgeRoom OutputTypePurgeRoom OutputType = "purge_room" + // OutputTypeDataRetention indicates the event is an OutputDataRetention + OutputTypeDataRetention OutputType = "data_retention" ) // An OutputEvent is an entry in the roomserver output kafka log. @@ -84,6 +86,8 @@ type OutputEvent struct { RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"` // The content of the event with type OutputPurgeRoom PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"` + // The content of the event with type OutputDataRetention + DataRetentionInRoom *OutputDataRetention `json:"data_retention,omitempty"` } // Type of the OutputNewRoomEvent. @@ -269,3 +273,8 @@ type OutputRetirePeek struct { type OutputPurgeRoom struct { RoomID string } + +type OutputDataRetention struct { + RoomID string + DR *PerformDataRetentionRequest +} diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index d6caec08c0..9c3145832b 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -171,3 +171,12 @@ type PerformForgetRequest struct { } type PerformForgetResponse struct{} + +type PerformDataRetentionRequest struct { + SpaceID string `json:"space_id"` + Enabled bool `json:"enabled"` + MaxAge int32 `json:"max_age"` + Teams bool `json:"teams"` + Operations bool `json:"operations"` + Dms bool `json:"dms"` +} diff --git a/roomserver/api/query.go b/roomserver/api/query.go index c4c019f99a..1d8ac38c9b 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -346,6 +346,17 @@ type QueryServerBannedFromRoomResponse struct { Banned bool `json:"banned"` } +// ! GlobeKeeper Customization +type QueryRoomsUnderSpaceRequest struct { + SpaceID string `json:"space_id"` +} + +type QueryRoomsUnderSpaceResponse struct { + DMs []string `json:"dm_rooms"` + Operations []string `json:"operation_rooms"` + Teams []string `json:"team_rooms"` +} + type QueryAdminEventReportsResponse struct { ID int64 `json:"id"` Score int64 `json:"score"` diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 1b88172343..84426bb75c 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -222,6 +222,36 @@ func (r *Admin) PerformAdminPurgeRoom( }) } +// PerformDataRetention removes all stale encrypted chat messages from the given room according to the data retention policy. +func (r *Admin) PerformDataRetention( + ctx context.Context, + dr *api.PerformDataRetentionRequest, + roomID string, +) error { + // Validate we actually got a room ID and nothing else + if _, _, err := gomatrixserverlib.SplitID('!', roomID); err != nil { + return err + } + + logrus.WithField("room_id", roomID).Warn("Performing data retention on room from roomserver") + if err := r.DB.DataRetentionInRoom(ctx, dr, roomID); err != nil { + logrus.WithField("room_id", roomID).WithError(err).Warn("Failed to perform data retention on room from roomserver") + return err + } + + logrus.WithField("room_id", roomID).Warn("Performed data retention on room from roomserver, informing other components") + + return r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{ + { + Type: api.OutputTypeDataRetention, + DataRetentionInRoom: &api.OutputDataRetention{ + RoomID: roomID, + DR: dr, + }, + }, + }) +} + func (r *Admin) PerformAdminDownloadState( ctx context.Context, roomID, userID string, serverName spec.ServerName, diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 886d004920..aa901de610 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -493,6 +493,21 @@ func (r *Queryer) QueryMembershipsForRoom( return nil } +// QueryRoomsUnderSpace implements api.RoomserverInternalAPI +func (r *Queryer) QueryRoomsUnderSpace( + ctx context.Context, + request *api.QueryRoomsUnderSpaceRequest, + response *api.QueryRoomsUnderSpaceResponse, +) error { + var err error + response.DMs, response.Operations, response.Teams, err = r.DB.QueryRoomsUnderSpace(ctx, request.SpaceID) + if err != nil { + return err + } + + return nil +} + // QueryServerJoinedToRoom implements api.RoomserverInternalAPI func (r *Queryer) QueryServerJoinedToRoom( ctx context.Context, diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index ab105e6f9b..a3fd96c6c6 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -140,6 +140,9 @@ type Database interface { // joinOnly is set to true. // Returns an error if there was a problem talking to the database. GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error) + //! GlobeKeeper Customization + // QueryRoomsUnderSpace looks up rooms under the given space and returns all of them sorted by their type (DMs, operations and teams). + QueryRoomsUnderSpace(ctx context.Context, spaceID string) (dms, operations, teams []string, err error) // EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was // not found. // Returns an error if the retrieval went wrong. @@ -177,6 +180,8 @@ type Database interface { GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]gomatrixserverlib.PDU, error) GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error) PurgeRoom(ctx context.Context, roomID string) error + //! GlobeKeeper Customization + DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error // GetMembershipForHistoryVisibility queries the membership events for the given eventIDs. diff --git a/roomserver/storage/postgres/data_retention_statements.go b/roomserver/storage/postgres/data_retention_statements.go new file mode 100644 index 0000000000..dc47c7eeac --- /dev/null +++ b/roomserver/storage/postgres/data_retention_statements.go @@ -0,0 +1,102 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const dataRetentionEventJSONSQL = "" + + "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionEventsSQL = "" + + "DELETE FROM roomserver_events WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionPreviousEventsSQL = "" + + "UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +const dataRetentionStateBlockEntriesSQL = "" + + "UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +type dataRetentionStatements struct { + dataRetentionEventJSONStmt *sql.Stmt + dataRetentionEventsStmt *sql.Stmt + dataRetentionPreviousEventsStmt *sql.Stmt + dataRetentionStateBlockEntriesStmt *sql.Stmt +} + +func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) { + s := &dataRetentionStatements{} + + return s, sqlutil.StatementList{ + {&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL}, + {&s.dataRetentionEventsStmt, dataRetentionEventsSQL}, + {&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL}, + {&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL}, + }.Prepare(db) +} + +func (s *dataRetentionStatements) DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string, +) error { + dataRetentionByRoomNID := []*sql.Stmt{ + s.dataRetentionEventJSONStmt, + s.dataRetentionEventsStmt, + s.dataRetentionPreviousEventsStmt, + s.dataRetentionStateBlockEntriesStmt, + } + for _, stmt := range dataRetentionByRoomNID { + _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID) + if err != nil { + return err + } + } + return nil +} diff --git a/roomserver/storage/postgres/event_json_table.go b/roomserver/storage/postgres/event_json_table.go index 5f069ca10a..312fcaa378 100644 --- a/roomserver/storage/postgres/event_json_table.go +++ b/roomserver/storage/postgres/event_json_table.go @@ -54,9 +54,66 @@ const bulkSelectEventJSONSQL = "" + " WHERE event_nid = ANY($1)" + " ORDER BY event_nid ASC" +const selectRoomsUnderSpaceSQL = ` + WITH room_ids AS ( + SELECT DISTINCT + (REGEXP_MATCHES(event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + FROM roomserver_event_json + WHERE event_json LIKE '%"state_key":"$1"%' + AND event_json LIKE '%"type":"m.space.parent"%' + ), + dm_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS dm_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"is_direct":true%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + operation_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS operation_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"type":"connect.operation"%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + team_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS team_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE r.room_id = ANY ( + SELECT room_id FROM room_ids) + AND r.room_id NOT IN ( + SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms) + AND r.room_id NOT IN ( + SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms) + ) + SELECT + dm_rooms.dm_array, + operation_rooms.operation_array, + team_rooms.team_array + FROM + dm_rooms, + operation_rooms, + team_rooms +` + type eventJSONStatements struct { - insertEventJSONStmt *sql.Stmt - bulkSelectEventJSONStmt *sql.Stmt + insertEventJSONStmt *sql.Stmt + bulkSelectEventJSONStmt *sql.Stmt + selectRoomsUnderSpaceSQLStmt *sql.Stmt } func CreateEventJSONTable(db *sql.DB) error { @@ -70,6 +127,7 @@ func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) { return s, sqlutil.StatementList{ {&s.insertEventJSONStmt, insertEventJSONSQL}, {&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL}, + {&s.selectRoomsUnderSpaceSQLStmt, selectRoomsUnderSpaceSQL}, }.Prepare(db) } @@ -107,3 +165,30 @@ func (s *eventJSONStatements) BulkSelectEventJSON( } return results[:i], rows.Err() } + +func (s *eventJSONStatements) SelectRoomsUnderSpace( + ctx context.Context, txn *sql.Tx, spaceID string, +) ([]string, []string, []string, error) { + stmt := sqlutil.TxStmt(txn, s.selectRoomsUnderSpaceSQLStmt) + rows, err := stmt.QueryContext(ctx, spaceID) + if err != nil { + return nil, nil, nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomsUnderSpaceSQL: rows.close() failed") + + var ( + Dms []string + Operations []string + Teams []string + ) + + if err := rows.Scan(&Dms, &Operations, &Teams); err != nil { + return nil, nil, nil, err + } + + if err := rows.Err(); err != nil { + return nil, nil, nil, err + } + + return Dms, Operations, Teams, nil +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 0ed617d6c7..e7a2ce7268 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -46,6 +46,7 @@ type Database struct { MembershipTable tables.Membership PublishedTable tables.Published Purge tables.Purge + DataRetention tables.DataRetention UserRoomKeyTable tables.UserRoomKeys GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error) } @@ -522,6 +523,12 @@ func (d *Database) GetMembershipEventNIDsForRoom( return d.getMembershipEventNIDsForRoom(ctx, nil, roomNID, joinOnly, localOnly) } +func (d *Database) QueryRoomsUnderSpace( + ctx context.Context, spaceId string, +) ([]string, []string, []string, error) { + return d.EventJSONTable.SelectRoomsUnderSpace(ctx, nil, spaceId) +} + func (d *Database) getMembershipEventNIDsForRoom( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, joinOnly bool, localOnly bool, ) ([]types.EventNID, error) { @@ -1680,6 +1687,22 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { }) } +// DataRetentionInRoom removes all stale encrypted chat messages within a given room from the roomserver. +// For large rooms this operation may take a considerable amount of time. +func (d *Database) DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // Remove `ForUpdate` to execute without locking the records (might be needed since data retention is a long running operation) + roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID) + if err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("room %s does not exist", roomID) + } + return fmt.Errorf("failed to lock the room: %w", err) + } + return d.DataRetention.DataRetentionInRoom(ctx, txn, dr, roomNID, roomID) + }) +} + func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { diff --git a/roomserver/storage/sqlite3/data_retention_statements.go b/roomserver/storage/sqlite3/data_retention_statements.go new file mode 100644 index 0000000000..1c14a94419 --- /dev/null +++ b/roomserver/storage/sqlite3/data_retention_statements.go @@ -0,0 +1,102 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const dataRetentionEventJSONSQL = "" + + "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionEventsSQL = "" + + "DELETE FROM roomserver_events WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionPreviousEventsSQL = "" + + "UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +const dataRetentionStateBlockEntriesSQL = "" + + "UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +type dataRetentionStatements struct { + dataRetentionEventJSONStmt *sql.Stmt + dataRetentionEventsStmt *sql.Stmt + dataRetentionPreviousEventsStmt *sql.Stmt + dataRetentionStateBlockEntriesStmt *sql.Stmt +} + +func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) { + s := &dataRetentionStatements{} + + return s, sqlutil.StatementList{ + {&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL}, + {&s.dataRetentionEventsStmt, dataRetentionEventsSQL}, + {&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL}, + {&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL}, + }.Prepare(db) +} + +func (s *dataRetentionStatements) DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string, +) error { + + dataRetentionByRoomNID := []*sql.Stmt{ + s.dataRetentionEventJSONStmt, + s.dataRetentionEventsStmt, + s.dataRetentionPreviousEventsStmt, + s.dataRetentionStateBlockEntriesStmt, + } + for _, stmt := range dataRetentionByRoomNID { + _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID) + if err != nil { + return err + } + } + return nil +} diff --git a/roomserver/storage/sqlite3/event_json_table.go b/roomserver/storage/sqlite3/event_json_table.go index 325951c7fb..0ecf53ca21 100644 --- a/roomserver/storage/sqlite3/event_json_table.go +++ b/roomserver/storage/sqlite3/event_json_table.go @@ -46,10 +46,67 @@ const bulkSelectEventJSONSQL = ` ORDER BY event_nid ASC ` +const selectRoomsUnderSpaceSQL = ` + WITH room_ids AS ( + SELECT DISTINCT + (REGEXP_MATCHES(event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + FROM roomserver_event_json + WHERE event_json LIKE '%"state_key":"$1"%' + AND event_json LIKE '%"type":"m.space.parent"%' + ), + dm_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS dm_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"is_direct":true%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + operation_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS operation_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"type":"connect.operation"%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + team_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS team_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE r.room_id = ANY ( + SELECT room_id FROM room_ids) + AND r.room_id NOT IN ( + SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms) + AND r.room_id NOT IN ( + SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms) + ) + SELECT + dm_rooms.dm_array, + operation_rooms.operation_array, + team_rooms.team_array + FROM + dm_rooms, + operation_rooms, + team_rooms +` + type eventJSONStatements struct { - db *sql.DB - insertEventJSONStmt *sql.Stmt - bulkSelectEventJSONStmt *sql.Stmt + db *sql.DB + insertEventJSONStmt *sql.Stmt + bulkSelectEventJSONStmt *sql.Stmt + selectRoomsUnderSpaceSQLStmt *sql.Stmt } func CreateEventJSONTable(db *sql.DB) error { @@ -65,6 +122,7 @@ func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) { return s, sqlutil.StatementList{ {&s.insertEventJSONStmt, insertEventJSONSQL}, {&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL}, + {&s.selectRoomsUnderSpaceSQLStmt, selectRoomsUnderSpaceSQL}, }.Prepare(db) } @@ -111,3 +169,30 @@ func (s *eventJSONStatements) BulkSelectEventJSON( } return results[:i], rows.Err() } + +func (s *eventJSONStatements) SelectRoomsUnderSpace( + ctx context.Context, txn *sql.Tx, spaceID string, +) ([]string, []string, []string, error) { + stmt := sqlutil.TxStmt(txn, s.selectRoomsUnderSpaceSQLStmt) + rows, err := stmt.QueryContext(ctx, spaceID) + if err != nil { + return nil, nil, nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomsUnderSpaceSQL: rows.close() failed") + + var ( + Dms []string + Operations []string + Teams []string + ) + + if err := rows.Scan(&Dms, &Operations, &Teams); err != nil { + return nil, nil, nil, err + } + + if err := rows.Err(); err != nil { + return nil, nil, nil, err + } + + return Dms, Operations, Teams, nil +} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 02f6992c4f..e4bbd65dcd 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -25,6 +25,7 @@ type EventJSON interface { // Insert the event JSON. On conflict, replace the event JSON with the new value (for redactions). InsertEventJSON(ctx context.Context, tx *sql.Tx, eventNID types.EventNID, eventJSON []byte) error BulkSelectEventJSON(ctx context.Context, tx *sql.Tx, eventNIDs []types.EventNID) ([]EventJSONPair, error) + SelectRoomsUnderSpace(ctx context.Context, txn *sql.Tx, spaceID string) ([]string, []string, []string, error) } type EventTypes interface { @@ -214,6 +215,12 @@ type Purge interface { ) error } +type DataRetention interface { + DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string, + ) error +} + type UserRoomKeys interface { // InsertUserRoomPrivatePublicKey inserts the given private key as well as the public key for it. This should be used // when creating keys locally.