From aafeb8c0b91dabc9fb26c55d6a4aaf7c50b49745 Mon Sep 17 00:00:00 2001 From: Daniel Aloni Date: Thu, 1 Aug 2024 19:58:37 +0300 Subject: [PATCH 1/4] =?UTF-8?q?=E2=9C=A8=20Initial=20template=20for=20data?= =?UTF-8?q?=20retention=20API.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clientapi/routing/data_retention.go | 121 ++++++++++++++++++++++++++++ clientapi/routing/routing.go | 6 ++ go.mod | 5 ++ go.sum | 4 + 4 files changed, 136 insertions(+) create mode 100644 clientapi/routing/data_retention.go diff --git a/clientapi/routing/data_retention.go b/clientapi/routing/data_retention.go new file mode 100644 index 0000000000..588cc214fc --- /dev/null +++ b/clientapi/routing/data_retention.go @@ -0,0 +1,121 @@ +package routing + +import ( + "net/http" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/matrix-org/util" +) + +type DataRetentionRequest struct { + DataRetentions DataRetention `json:"data_retentions"` +} + +type DataRetention struct { + SpaceID string `json:"space_id"` + Enabled bool `json:"enabled"` + MaxAge int32 `json:"max_age,required"` + Teams bool `json:"teams"` + Operations bool `json:"operations"` + Dms bool `json:"dms"` +} + +// Triggred by an application service job. +// Purges stale data according to data retention policy provided in the request body. +// For large spaces with many rooms this operation may take a considerable amount of time. +func PostDataRetention( + req *http.Request, + cfg *config.ClientAPI, + deviceAPI *api.Device, + userAPI api.ClientUserAPI, +) util.JSONResponse { + var body DataRetentionRequest + if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { + return *reqErr + } + + if body.DataRetentions.MaxAge <= 0 || body.DataRetentions.SpaceID == "" { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.BadJSON("missing max_age or space_id"), + } + } + + // TODO: Fetch dms, operators and teams under the provided space. + // WITH room_ids AS ( + // SELECT DISTINCT + // (REGEXP_MATCHES(event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + // FROM roomserver_event_json + // WHERE event_json LIKE '%"state_key":"$1"%' + // AND event_json LIKE '%"type":"m.space.parent"%' + // ), + // dm_rooms AS ( + // SELECT + // ARRAY_AGG(DISTINCT r.room_id) AS dm_array + // FROM roomserver_event_json e + // CROSS JOIN LATERAL ( + // SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + // ) AS r + // WHERE e.event_json LIKE '%"is_direct":true%' + // AND r.room_id = ANY ( + // SELECT room_id FROM room_ids + // ) + // ), + // operation_rooms AS ( + // SELECT + // ARRAY_AGG(DISTINCT r.room_id) AS operation_array + // FROM roomserver_event_json e + // CROSS JOIN LATERAL ( + // SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + // ) AS r + // WHERE e.event_json LIKE '%"type":"connect.operation"%' + // AND r.room_id = ANY ( + // SELECT room_id FROM room_ids + // ) + // ), + // team_rooms AS ( + // SELECT + // ARRAY_AGG(DISTINCT r.room_id) AS team_array + // FROM roomserver_event_json e + // CROSS JOIN LATERAL ( + // SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + // ) AS r + // WHERE r.room_id = ANY ( + // SELECT room_id FROM room_ids + // ) + // AND r.room_id NOT IN ( + // SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms + // ) + // AND r.room_id NOT IN ( + // SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms + // ) + // ) + // SELECT + // dm_rooms.dm_array, + // operation_rooms.operation_array, + // team_rooms.team_array + // FROM + // dm_rooms, + // operation_rooms, + // team_rooms; + + if body.DataRetentions.Teams { + // TODO: Iterate and purge stale data from teams + } + + if body.DataRetentions.Operations { + // TODO: Iterate and purge stale data from operations + } + + if body.DataRetentions.Dms { + // TODO: Iterate and purge stale data from dms + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index dc587cd556..292d2b2e3c 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -1547,6 +1547,12 @@ func Setup( }), ).Methods(http.MethodPost, http.MethodOptions) + v3mux.Handle("/rooms/{roomID}/dataRetention", + httputil.MakeAuthAPI("data_retention", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + return PostDataRetention(req, cfg, device, userAPI) + }), + ).Methods(http.MethodPost, http.MethodOptions) + synapseAdminRouter.Handle("/admin/v1/event_reports", httputil.MakeAdminAPI("admin_report_events", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { from := parseUint64OrDefault(req.URL.Query().Get("from"), 0) diff --git a/go.mod b/go.mod index 66bf0d1881..20fd944e24 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,10 @@ require ( require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect + github.com/go-co-op/gocron v1.25.0 +) + +require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect github.com/RoaringBitmap/roaring v1.2.3 // indirect @@ -115,6 +119,7 @@ require ( github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/zerolog v1.29.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect diff --git a/go.sum b/go.sum index 971617a84a..451330a812 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,8 @@ github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= github.com/go-asn1-ber/asn1-ber v1.5.5 h1:MNHlNMBDgEKD4TcKr36vQN68BA00aDfjIt3/bD50WnA= github.com/go-asn1-ber/asn1-ber v1.5.5/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= +github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8= +github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-ldap/ldap/v3 v3.4.6 h1:ert95MdbiG7aWo/oPYp9btL3KJlMPKnP58r09rI8T+A= @@ -259,6 +261,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= From 216950d495b9539aee337e78fd914c987848652d Mon Sep 17 00:00:00 2001 From: Daniel Aloni Date: Tue, 6 Aug 2024 11:11:40 +0300 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=97=82=EF=B8=8F=20DB=20functionality?= =?UTF-8?q?=20to=20fetch=20rooms=20under=20space=20sorted=20by=20room=20ty?= =?UTF-8?q?pes.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clientapi/routing/data_retention.go | 106 +++++++----------- clientapi/routing/routing.go | 2 +- roomserver/api/api.go | 2 + roomserver/api/query.go | 11 ++ roomserver/internal/query/query.go | 15 +++ roomserver/storage/interface.go | 3 + .../storage/postgres/event_json_table.go | 91 ++++++++++++++- roomserver/storage/shared/storage.go | 6 + .../storage/sqlite3/event_json_table.go | 93 ++++++++++++++- roomserver/storage/tables/interface.go | 1 + 10 files changed, 261 insertions(+), 69 deletions(-) diff --git a/clientapi/routing/data_retention.go b/clientapi/routing/data_retention.go index 588cc214fc..794919c685 100644 --- a/clientapi/routing/data_retention.go +++ b/clientapi/routing/data_retention.go @@ -1,11 +1,13 @@ package routing import ( + "context" "net/http" "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/roomserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" ) @@ -29,8 +31,7 @@ type DataRetention struct { func PostDataRetention( req *http.Request, cfg *config.ClientAPI, - deviceAPI *api.Device, - userAPI api.ClientUserAPI, + rsAPI roomserverAPI.ClientRoomserverAPI, ) util.JSONResponse { var body DataRetentionRequest if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { @@ -44,74 +45,53 @@ func PostDataRetention( } } - // TODO: Fetch dms, operators and teams under the provided space. - // WITH room_ids AS ( - // SELECT DISTINCT - // (REGEXP_MATCHES(event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id - // FROM roomserver_event_json - // WHERE event_json LIKE '%"state_key":"$1"%' - // AND event_json LIKE '%"type":"m.space.parent"%' - // ), - // dm_rooms AS ( - // SELECT - // ARRAY_AGG(DISTINCT r.room_id) AS dm_array - // FROM roomserver_event_json e - // CROSS JOIN LATERAL ( - // SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id - // ) AS r - // WHERE e.event_json LIKE '%"is_direct":true%' - // AND r.room_id = ANY ( - // SELECT room_id FROM room_ids - // ) - // ), - // operation_rooms AS ( - // SELECT - // ARRAY_AGG(DISTINCT r.room_id) AS operation_array - // FROM roomserver_event_json e - // CROSS JOIN LATERAL ( - // SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id - // ) AS r - // WHERE e.event_json LIKE '%"type":"connect.operation"%' - // AND r.room_id = ANY ( - // SELECT room_id FROM room_ids - // ) - // ), - // team_rooms AS ( - // SELECT - // ARRAY_AGG(DISTINCT r.room_id) AS team_array - // FROM roomserver_event_json e - // CROSS JOIN LATERAL ( - // SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id - // ) AS r - // WHERE r.room_id = ANY ( - // SELECT room_id FROM room_ids - // ) - // AND r.room_id NOT IN ( - // SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms - // ) - // AND r.room_id NOT IN ( - // SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms - // ) - // ) - // SELECT - // dm_rooms.dm_array, - // operation_rooms.operation_array, - // team_rooms.team_array - // FROM - // dm_rooms, - // operation_rooms, - // team_rooms; + // Validate the roomID + validRoomID, err := spec.NewRoomID(body.DataRetentions.SpaceID) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.InvalidParam("space_id is invalid"), + } + } + + queryReq := api.QueryRoomsUnderSpaceRequest{ + SpaceID: validRoomID.String(), + } + + var queryRes api.QueryRoomsUnderSpaceResponse + if queryErr := rsAPI.QueryRoomsUnderSpace(req.Context(), &queryReq, &queryRes); queryErr != nil { + util.GetLogger(req.Context()).WithError(queryErr).Error("rsAPI.QueryRoomsUnderSpace failed") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } if body.DataRetentions.Teams { - // TODO: Iterate and purge stale data from teams + // TODO: Replace with PerformDataRetention once it's implemented + for _, roomId := range queryRes.Teams { + if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + return util.ErrorResponse(err) + } + } } if body.DataRetentions.Operations { - // TODO: Iterate and purge stale data from operations + for _, roomId := range queryRes.Operations { + // TODO: Replace with PerformDataRetention once it's implemented + if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + return util.ErrorResponse(err) + } + } } if body.DataRetentions.Dms { - // TODO: Iterate and purge stale data from dms + for _, roomId := range queryRes.DMs { + // TODO: Replace with PerformDataRetention once it's implemented + if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + return util.ErrorResponse(err) + } + } } return util.JSONResponse{ diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 292d2b2e3c..0749dbe545 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -1549,7 +1549,7 @@ func Setup( v3mux.Handle("/rooms/{roomID}/dataRetention", httputil.MakeAuthAPI("data_retention", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { - return PostDataRetention(req, cfg, device, userAPI) + return PostDataRetention(req, cfg, rsAPI) }), ).Methods(http.MethodPost, http.MethodOptions) diff --git a/roomserver/api/api.go b/roomserver/api/api.go index b2b3192447..850b302a58 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -231,6 +231,8 @@ type ClientRoomserverAPI interface { QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error + //! GlobeKeeper Customization + QueryRoomsUnderSpace(ctx context.Context, req *QueryRoomsUnderSpaceRequest, res *QueryRoomsUnderSpaceResponse) error QueryRoomsForUser(ctx context.Context, userID spec.UserID, desiredMembership string) ([]spec.RoomID, error) QueryStateAfterEvents(ctx context.Context, req *QueryStateAfterEventsRequest, res *QueryStateAfterEventsResponse) error // QueryKnownUsers returns a list of users that we know about from our joined rooms. diff --git a/roomserver/api/query.go b/roomserver/api/query.go index c4c019f99a..1d8ac38c9b 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -346,6 +346,17 @@ type QueryServerBannedFromRoomResponse struct { Banned bool `json:"banned"` } +// ! GlobeKeeper Customization +type QueryRoomsUnderSpaceRequest struct { + SpaceID string `json:"space_id"` +} + +type QueryRoomsUnderSpaceResponse struct { + DMs []string `json:"dm_rooms"` + Operations []string `json:"operation_rooms"` + Teams []string `json:"team_rooms"` +} + type QueryAdminEventReportsResponse struct { ID int64 `json:"id"` Score int64 `json:"score"` diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 886d004920..aa901de610 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -493,6 +493,21 @@ func (r *Queryer) QueryMembershipsForRoom( return nil } +// QueryRoomsUnderSpace implements api.RoomserverInternalAPI +func (r *Queryer) QueryRoomsUnderSpace( + ctx context.Context, + request *api.QueryRoomsUnderSpaceRequest, + response *api.QueryRoomsUnderSpaceResponse, +) error { + var err error + response.DMs, response.Operations, response.Teams, err = r.DB.QueryRoomsUnderSpace(ctx, request.SpaceID) + if err != nil { + return err + } + + return nil +} + // QueryServerJoinedToRoom implements api.RoomserverInternalAPI func (r *Queryer) QueryServerJoinedToRoom( ctx context.Context, diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index ab105e6f9b..9595081ee4 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -140,6 +140,9 @@ type Database interface { // joinOnly is set to true. // Returns an error if there was a problem talking to the database. GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error) + //! GlobeKeeper Customization + // QueryRoomsUnderSpace looks up rooms under the given space and returns all of them sorted by their type (DMs, operations and teams). + QueryRoomsUnderSpace(ctx context.Context, spaceID string) (dms, operations, teams []string, err error) // EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was // not found. // Returns an error if the retrieval went wrong. diff --git a/roomserver/storage/postgres/event_json_table.go b/roomserver/storage/postgres/event_json_table.go index 5f069ca10a..7454ca0794 100644 --- a/roomserver/storage/postgres/event_json_table.go +++ b/roomserver/storage/postgres/event_json_table.go @@ -54,9 +54,68 @@ const bulkSelectEventJSONSQL = "" + " WHERE event_nid = ANY($1)" + " ORDER BY event_nid ASC" +const selectRoomsUnderSpaceSQL = "" + + "WITH room_ids AS (" + + " SELECT DISTINCT" + + " (REGEXP_MATCHES(event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " FROM roomserver_event_json" + + " WHERE event_json LIKE '%\"state_key\":\"$1\"%'" + + " AND event_json LIKE '%\"type\":\"m.space.parent\"%'" + + ")," + + "dm_rooms AS (" + + " SELECT" + + " ARRAY_AGG(DISTINCT r.room_id) AS dm_array" + + " FROM roomserver_event_json e" + + " CROSS JOIN LATERAL (" + + " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " ) AS r" + + " WHERE e.event_json LIKE '%\"is_direct\":true%'" + + " AND r.room_id = ANY (" + + " SELECT room_id FROM room_ids" + + " )" + + ")," + + "operation_rooms AS (" + + " SELECT" + + " ARRAY_AGG(DISTINCT r.room_id) AS operation_array" + + " FROM roomserver_event_json e" + + " CROSS JOIN LATERAL (" + + " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " ) AS r" + + " WHERE e.event_json LIKE '%\"type\":\"connect.operation\"%'" + + " AND r.room_id = ANY (" + + " SELECT room_id FROM room_ids" + + " )" + + ")," + + "team_rooms AS (" + + " SELECT" + + " ARRAY_AGG(DISTINCT r.room_id) AS team_array" + + " FROM roomserver_event_json e" + + " CROSS JOIN LATERAL (" + + " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " ) AS r" + + " WHERE r.room_id = ANY (" + + " SELECT room_id FROM room_ids" + + " )" + + " AND r.room_id NOT IN (" + + " SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms" + + " )" + + " AND r.room_id NOT IN (" + + " SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms" + + " )" + + ")" + + "SELECT" + + " dm_rooms.dm_array," + + " operation_rooms.operation_array," + + " team_rooms.team_array" + + "FROM" + + " dm_rooms," + + " operation_rooms," + + " team_rooms;" + type eventJSONStatements struct { - insertEventJSONStmt *sql.Stmt - bulkSelectEventJSONStmt *sql.Stmt + insertEventJSONStmt *sql.Stmt + bulkSelectEventJSONStmt *sql.Stmt + selectRoomsUnderSpaceSQLStmt *sql.Stmt } func CreateEventJSONTable(db *sql.DB) error { @@ -70,6 +129,7 @@ func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) { return s, sqlutil.StatementList{ {&s.insertEventJSONStmt, insertEventJSONSQL}, {&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL}, + {&s.selectRoomsUnderSpaceSQLStmt, selectRoomsUnderSpaceSQL}, }.Prepare(db) } @@ -107,3 +167,30 @@ func (s *eventJSONStatements) BulkSelectEventJSON( } return results[:i], rows.Err() } + +func (s *eventJSONStatements) SelectRoomsUnderSpace( + ctx context.Context, txn *sql.Tx, spaceID string, +) ([]string, []string, []string, error) { + stmt := sqlutil.TxStmt(txn, s.selectRoomsUnderSpaceSQLStmt) + rows, err := stmt.QueryContext(ctx, spaceID) + if err != nil { + return nil, nil, nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomsUnderSpaceSQL: rows.close() failed") + + var ( + Dms []string + Operations []string + Teams []string + ) + + if err := rows.Scan(&Dms, &Operations, &Teams); err != nil { + return nil, nil, nil, err + } + + if err := rows.Err(); err != nil { + return nil, nil, nil, err + } + + return Dms, Operations, Teams, nil +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 0ed617d6c7..1c9ec75bbb 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -522,6 +522,12 @@ func (d *Database) GetMembershipEventNIDsForRoom( return d.getMembershipEventNIDsForRoom(ctx, nil, roomNID, joinOnly, localOnly) } +func (d *Database) QueryRoomsUnderSpace( + ctx context.Context, spaceId string, +) ([]string, []string, []string, error) { + return d.EventJSONTable.SelectRoomsUnderSpace(ctx, nil, spaceId) +} + func (d *Database) getMembershipEventNIDsForRoom( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, joinOnly bool, localOnly bool, ) ([]types.EventNID, error) { diff --git a/roomserver/storage/sqlite3/event_json_table.go b/roomserver/storage/sqlite3/event_json_table.go index 325951c7fb..42c2179b1c 100644 --- a/roomserver/storage/sqlite3/event_json_table.go +++ b/roomserver/storage/sqlite3/event_json_table.go @@ -46,10 +46,69 @@ const bulkSelectEventJSONSQL = ` ORDER BY event_nid ASC ` +const selectRoomsUnderSpaceSQL = "" + + "WITH room_ids AS (" + + " SELECT DISTINCT" + + " (REGEXP_MATCHES(event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " FROM roomserver_event_json" + + " WHERE event_json LIKE '%\"state_key\":\"$1\"%'" + + " AND event_json LIKE '%\"type\":\"m.space.parent\"%'" + + ")," + + "dm_rooms AS (" + + " SELECT" + + " ARRAY_AGG(DISTINCT r.room_id) AS dm_array" + + " FROM roomserver_event_json e" + + " CROSS JOIN LATERAL (" + + " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " ) AS r" + + " WHERE e.event_json LIKE '%\"is_direct\":true%'" + + " AND r.room_id = ANY (" + + " SELECT room_id FROM room_ids" + + " )" + + ")," + + "operation_rooms AS (" + + " SELECT" + + " ARRAY_AGG(DISTINCT r.room_id) AS operation_array" + + " FROM roomserver_event_json e" + + " CROSS JOIN LATERAL (" + + " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " ) AS r" + + " WHERE e.event_json LIKE '%\"type\":\"connect.operation\"%'" + + " AND r.room_id = ANY (" + + " SELECT room_id FROM room_ids" + + " )" + + ")," + + "team_rooms AS (" + + " SELECT" + + " ARRAY_AGG(DISTINCT r.room_id) AS team_array" + + " FROM roomserver_event_json e" + + " CROSS JOIN LATERAL (" + + " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + + " ) AS r" + + " WHERE r.room_id = ANY (" + + " SELECT room_id FROM room_ids" + + " )" + + " AND r.room_id NOT IN (" + + " SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms" + + " )" + + " AND r.room_id NOT IN (" + + " SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms" + + " )" + + ")" + + "SELECT" + + " dm_rooms.dm_array," + + " operation_rooms.operation_array," + + " team_rooms.team_array" + + "FROM" + + " dm_rooms," + + " operation_rooms," + + " team_rooms;" + type eventJSONStatements struct { - db *sql.DB - insertEventJSONStmt *sql.Stmt - bulkSelectEventJSONStmt *sql.Stmt + db *sql.DB + insertEventJSONStmt *sql.Stmt + bulkSelectEventJSONStmt *sql.Stmt + selectRoomsUnderSpaceSQLStmt *sql.Stmt } func CreateEventJSONTable(db *sql.DB) error { @@ -65,6 +124,7 @@ func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) { return s, sqlutil.StatementList{ {&s.insertEventJSONStmt, insertEventJSONSQL}, {&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL}, + {&s.selectRoomsUnderSpaceSQLStmt, selectRoomsUnderSpaceSQL}, }.Prepare(db) } @@ -111,3 +171,30 @@ func (s *eventJSONStatements) BulkSelectEventJSON( } return results[:i], rows.Err() } + +func (s *eventJSONStatements) SelectRoomsUnderSpace( + ctx context.Context, txn *sql.Tx, spaceID string, +) ([]string, []string, []string, error) { + stmt := sqlutil.TxStmt(txn, s.selectRoomsUnderSpaceSQLStmt) + rows, err := stmt.QueryContext(ctx, spaceID) + if err != nil { + return nil, nil, nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomsUnderSpaceSQL: rows.close() failed") + + var ( + Dms []string + Operations []string + Teams []string + ) + + if err := rows.Scan(&Dms, &Operations, &Teams); err != nil { + return nil, nil, nil, err + } + + if err := rows.Err(); err != nil { + return nil, nil, nil, err + } + + return Dms, Operations, Teams, nil +} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 02f6992c4f..8243fbeee1 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -25,6 +25,7 @@ type EventJSON interface { // Insert the event JSON. On conflict, replace the event JSON with the new value (for redactions). InsertEventJSON(ctx context.Context, tx *sql.Tx, eventNID types.EventNID, eventJSON []byte) error BulkSelectEventJSON(ctx context.Context, tx *sql.Tx, eventNIDs []types.EventNID) ([]EventJSONPair, error) + SelectRoomsUnderSpace(ctx context.Context, txn *sql.Tx, spaceID string) ([]string, []string, []string, error) } type EventTypes interface { From 48008a22671d518f79d21697589a27dd2ae0cded Mon Sep 17 00:00:00 2001 From: Daniel Aloni Date: Tue, 6 Aug 2024 17:36:27 +0300 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=97=82=EF=B8=8F=20DB=20functionality?= =?UTF-8?q?=20to=20execute=20the=20data=20retention.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clientapi/routing/data_retention.go | 44 ++++---- federationapi/storage/shared/storage.go | 7 ++ go.mod | 6 +- go.sum | 4 - roomserver/api/api.go | 2 + roomserver/api/output.go | 9 ++ roomserver/api/perform.go | 9 ++ roomserver/internal/perform/perform_admin.go | 30 ++++++ roomserver/storage/interface.go | 2 + .../postgres/data_retention_statements.go | 102 ++++++++++++++++++ roomserver/storage/shared/storage.go | 17 +++ .../sqlite3/data_retention_statements.go | 102 ++++++++++++++++++ roomserver/storage/tables/interface.go | 6 ++ 13 files changed, 310 insertions(+), 30 deletions(-) create mode 100644 roomserver/storage/postgres/data_retention_statements.go create mode 100644 roomserver/storage/sqlite3/data_retention_statements.go diff --git a/clientapi/routing/data_retention.go b/clientapi/routing/data_retention.go index 794919c685..c46fc46cca 100644 --- a/clientapi/routing/data_retention.go +++ b/clientapi/routing/data_retention.go @@ -10,19 +10,11 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) type DataRetentionRequest struct { - DataRetentions DataRetention `json:"data_retentions"` -} - -type DataRetention struct { - SpaceID string `json:"space_id"` - Enabled bool `json:"enabled"` - MaxAge int32 `json:"max_age,required"` - Teams bool `json:"teams"` - Operations bool `json:"operations"` - Dms bool `json:"dms"` + DataRetention api.PerformDataRetentionRequest `json:"data_retention"` } // Triggred by an application service job. @@ -37,8 +29,9 @@ func PostDataRetention( if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { return *reqErr } + dr := body.DataRetention - if body.DataRetentions.MaxAge <= 0 || body.DataRetentions.SpaceID == "" { + if dr.MaxAge <= 0 || dr.SpaceID == "" { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: spec.BadJSON("missing max_age or space_id"), @@ -46,7 +39,7 @@ func PostDataRetention( } // Validate the roomID - validRoomID, err := spec.NewRoomID(body.DataRetentions.SpaceID) + validRoomID, err := spec.NewRoomID(dr.SpaceID) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, @@ -67,28 +60,37 @@ func PostDataRetention( } } - if body.DataRetentions.Teams { - // TODO: Replace with PerformDataRetention once it's implemented + if dr.Teams { + logrus.Infof("Performing data retention on teams in space %s", dr.SpaceID) for _, roomId := range queryRes.Teams { - if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on team with id %s", roomId) return util.ErrorResponse(err) } } } - if body.DataRetentions.Operations { + if dr.Operations { + logrus.Infof("Performing data retention on operations in space %s", dr.SpaceID) for _, roomId := range queryRes.Operations { - // TODO: Replace with PerformDataRetention once it's implemented - if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on operation with id %s", roomId) return util.ErrorResponse(err) } } } - if body.DataRetentions.Dms { + if dr.Dms { + logrus.Infof("Performing data retention on dms in space %s", dr.SpaceID) for _, roomId := range queryRes.DMs { - // TODO: Replace with PerformDataRetention once it's implemented - if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on dm with id %s", roomId) return util.ErrorResponse(err) } } diff --git a/federationapi/storage/shared/storage.go b/federationapi/storage/shared/storage.go index 8c73967c6f..1ac920c226 100644 --- a/federationapi/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -389,3 +389,10 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { return nil }) } + +func (d *Database) DataRetentionInRoom(ctx context.Context, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // TODO: Implement this to support federation + return nil + }) +} diff --git a/go.mod b/go.mod index 20fd944e24..df576e4cfe 100644 --- a/go.mod +++ b/go.mod @@ -55,10 +55,7 @@ require ( modernc.org/sqlite v1.29.5 ) -require ( - github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect - github.com/go-co-op/gocron v1.25.0 -) +require github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect @@ -119,7 +116,6 @@ require ( github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/zerolog v1.29.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect diff --git a/go.sum b/go.sum index 451330a812..971617a84a 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,6 @@ github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= github.com/go-asn1-ber/asn1-ber v1.5.5 h1:MNHlNMBDgEKD4TcKr36vQN68BA00aDfjIt3/bD50WnA= github.com/go-asn1-ber/asn1-ber v1.5.5/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= -github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8= -github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-ldap/ldap/v3 v3.4.6 h1:ert95MdbiG7aWo/oPYp9btL3KJlMPKnP58r09rI8T+A= @@ -261,8 +259,6 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= -github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 850b302a58..21a068a03e 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -249,6 +249,8 @@ type ClientRoomserverAPI interface { PerformAdminEvacuateRoom(ctx context.Context, roomID string) (affected []string, err error) PerformAdminEvacuateUser(ctx context.Context, userID string) (affected []string, err error) PerformAdminPurgeRoom(ctx context.Context, roomID string) error + //! GlobeKeeper Customization + PerformDataRetention(ctx context.Context, dr *PerformDataRetentionRequest, roomID string) error PerformAdminDownloadState(ctx context.Context, roomID, userID string, serverName spec.ServerName) error PerformPeek(ctx context.Context, req *PerformPeekRequest) (roomID string, err error) PerformUnpeek(ctx context.Context, roomID, userID, deviceID string) error diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 852b64206d..227680756a 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -59,6 +59,8 @@ const ( OutputTypeRetirePeek OutputType = "retire_peek" // OutputTypePurgeRoom indicates the event is an OutputPurgeRoom OutputTypePurgeRoom OutputType = "purge_room" + // OutputTypeDataRetention indicates the event is an OutputDataRetention + OutputTypeDataRetention OutputType = "data_retention" ) // An OutputEvent is an entry in the roomserver output kafka log. @@ -84,6 +86,8 @@ type OutputEvent struct { RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"` // The content of the event with type OutputPurgeRoom PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"` + // The content of the event with type OutputDataRetention + DataRetentionInRoom *OutputDataRetention `json:"data_retention,omitempty"` } // Type of the OutputNewRoomEvent. @@ -269,3 +273,8 @@ type OutputRetirePeek struct { type OutputPurgeRoom struct { RoomID string } + +type OutputDataRetention struct { + RoomID string + DR *PerformDataRetentionRequest +} diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index d6caec08c0..9c3145832b 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -171,3 +171,12 @@ type PerformForgetRequest struct { } type PerformForgetResponse struct{} + +type PerformDataRetentionRequest struct { + SpaceID string `json:"space_id"` + Enabled bool `json:"enabled"` + MaxAge int32 `json:"max_age"` + Teams bool `json:"teams"` + Operations bool `json:"operations"` + Dms bool `json:"dms"` +} diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 1b88172343..84426bb75c 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -222,6 +222,36 @@ func (r *Admin) PerformAdminPurgeRoom( }) } +// PerformDataRetention removes all stale encrypted chat messages from the given room according to the data retention policy. +func (r *Admin) PerformDataRetention( + ctx context.Context, + dr *api.PerformDataRetentionRequest, + roomID string, +) error { + // Validate we actually got a room ID and nothing else + if _, _, err := gomatrixserverlib.SplitID('!', roomID); err != nil { + return err + } + + logrus.WithField("room_id", roomID).Warn("Performing data retention on room from roomserver") + if err := r.DB.DataRetentionInRoom(ctx, dr, roomID); err != nil { + logrus.WithField("room_id", roomID).WithError(err).Warn("Failed to perform data retention on room from roomserver") + return err + } + + logrus.WithField("room_id", roomID).Warn("Performed data retention on room from roomserver, informing other components") + + return r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{ + { + Type: api.OutputTypeDataRetention, + DataRetentionInRoom: &api.OutputDataRetention{ + RoomID: roomID, + DR: dr, + }, + }, + }) +} + func (r *Admin) PerformAdminDownloadState( ctx context.Context, roomID, userID string, serverName spec.ServerName, diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 9595081ee4..a3fd96c6c6 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -180,6 +180,8 @@ type Database interface { GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]gomatrixserverlib.PDU, error) GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error) PurgeRoom(ctx context.Context, roomID string) error + //! GlobeKeeper Customization + DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error // GetMembershipForHistoryVisibility queries the membership events for the given eventIDs. diff --git a/roomserver/storage/postgres/data_retention_statements.go b/roomserver/storage/postgres/data_retention_statements.go new file mode 100644 index 0000000000..dc47c7eeac --- /dev/null +++ b/roomserver/storage/postgres/data_retention_statements.go @@ -0,0 +1,102 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const dataRetentionEventJSONSQL = "" + + "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionEventsSQL = "" + + "DELETE FROM roomserver_events WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionPreviousEventsSQL = "" + + "UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +const dataRetentionStateBlockEntriesSQL = "" + + "UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +type dataRetentionStatements struct { + dataRetentionEventJSONStmt *sql.Stmt + dataRetentionEventsStmt *sql.Stmt + dataRetentionPreviousEventsStmt *sql.Stmt + dataRetentionStateBlockEntriesStmt *sql.Stmt +} + +func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) { + s := &dataRetentionStatements{} + + return s, sqlutil.StatementList{ + {&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL}, + {&s.dataRetentionEventsStmt, dataRetentionEventsSQL}, + {&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL}, + {&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL}, + }.Prepare(db) +} + +func (s *dataRetentionStatements) DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string, +) error { + dataRetentionByRoomNID := []*sql.Stmt{ + s.dataRetentionEventJSONStmt, + s.dataRetentionEventsStmt, + s.dataRetentionPreviousEventsStmt, + s.dataRetentionStateBlockEntriesStmt, + } + for _, stmt := range dataRetentionByRoomNID { + _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID) + if err != nil { + return err + } + } + return nil +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 1c9ec75bbb..e7a2ce7268 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -46,6 +46,7 @@ type Database struct { MembershipTable tables.Membership PublishedTable tables.Published Purge tables.Purge + DataRetention tables.DataRetention UserRoomKeyTable tables.UserRoomKeys GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error) } @@ -1686,6 +1687,22 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { }) } +// DataRetentionInRoom removes all stale encrypted chat messages within a given room from the roomserver. +// For large rooms this operation may take a considerable amount of time. +func (d *Database) DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // Remove `ForUpdate` to execute without locking the records (might be needed since data retention is a long running operation) + roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID) + if err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("room %s does not exist", roomID) + } + return fmt.Errorf("failed to lock the room: %w", err) + } + return d.DataRetention.DataRetentionInRoom(ctx, txn, dr, roomNID, roomID) + }) +} + func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { diff --git a/roomserver/storage/sqlite3/data_retention_statements.go b/roomserver/storage/sqlite3/data_retention_statements.go new file mode 100644 index 0000000000..1c14a94419 --- /dev/null +++ b/roomserver/storage/sqlite3/data_retention_statements.go @@ -0,0 +1,102 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const dataRetentionEventJSONSQL = "" + + "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionEventsSQL = "" + + "DELETE FROM roomserver_events WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionPreviousEventsSQL = "" + + "UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +const dataRetentionStateBlockEntriesSQL = "" + + "UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +type dataRetentionStatements struct { + dataRetentionEventJSONStmt *sql.Stmt + dataRetentionEventsStmt *sql.Stmt + dataRetentionPreviousEventsStmt *sql.Stmt + dataRetentionStateBlockEntriesStmt *sql.Stmt +} + +func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) { + s := &dataRetentionStatements{} + + return s, sqlutil.StatementList{ + {&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL}, + {&s.dataRetentionEventsStmt, dataRetentionEventsSQL}, + {&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL}, + {&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL}, + }.Prepare(db) +} + +func (s *dataRetentionStatements) DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string, +) error { + + dataRetentionByRoomNID := []*sql.Stmt{ + s.dataRetentionEventJSONStmt, + s.dataRetentionEventsStmt, + s.dataRetentionPreviousEventsStmt, + s.dataRetentionStateBlockEntriesStmt, + } + for _, stmt := range dataRetentionByRoomNID { + _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID) + if err != nil { + return err + } + } + return nil +} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 8243fbeee1..e4bbd65dcd 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -215,6 +215,12 @@ type Purge interface { ) error } +type DataRetention interface { + DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string, + ) error +} + type UserRoomKeys interface { // InsertUserRoomPrivatePublicKey inserts the given private key as well as the public key for it. This should be used // when creating keys locally. From 1609a80632a24bfba05c35f2788c5499ff6feeb4 Mon Sep 17 00:00:00 2001 From: Daniel Aloni Date: Tue, 6 Aug 2024 17:49:13 +0300 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=90=9B=20Fix=20sql=20strings=20format?= =?UTF-8?q?ting.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/postgres/event_json_table.go | 112 +++++++++--------- .../storage/sqlite3/event_json_table.go | 112 +++++++++--------- 2 files changed, 110 insertions(+), 114 deletions(-) diff --git a/roomserver/storage/postgres/event_json_table.go b/roomserver/storage/postgres/event_json_table.go index 7454ca0794..312fcaa378 100644 --- a/roomserver/storage/postgres/event_json_table.go +++ b/roomserver/storage/postgres/event_json_table.go @@ -54,63 +54,61 @@ const bulkSelectEventJSONSQL = "" + " WHERE event_nid = ANY($1)" + " ORDER BY event_nid ASC" -const selectRoomsUnderSpaceSQL = "" + - "WITH room_ids AS (" + - " SELECT DISTINCT" + - " (REGEXP_MATCHES(event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " FROM roomserver_event_json" + - " WHERE event_json LIKE '%\"state_key\":\"$1\"%'" + - " AND event_json LIKE '%\"type\":\"m.space.parent\"%'" + - ")," + - "dm_rooms AS (" + - " SELECT" + - " ARRAY_AGG(DISTINCT r.room_id) AS dm_array" + - " FROM roomserver_event_json e" + - " CROSS JOIN LATERAL (" + - " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " ) AS r" + - " WHERE e.event_json LIKE '%\"is_direct\":true%'" + - " AND r.room_id = ANY (" + - " SELECT room_id FROM room_ids" + - " )" + - ")," + - "operation_rooms AS (" + - " SELECT" + - " ARRAY_AGG(DISTINCT r.room_id) AS operation_array" + - " FROM roomserver_event_json e" + - " CROSS JOIN LATERAL (" + - " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " ) AS r" + - " WHERE e.event_json LIKE '%\"type\":\"connect.operation\"%'" + - " AND r.room_id = ANY (" + - " SELECT room_id FROM room_ids" + - " )" + - ")," + - "team_rooms AS (" + - " SELECT" + - " ARRAY_AGG(DISTINCT r.room_id) AS team_array" + - " FROM roomserver_event_json e" + - " CROSS JOIN LATERAL (" + - " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " ) AS r" + - " WHERE r.room_id = ANY (" + - " SELECT room_id FROM room_ids" + - " )" + - " AND r.room_id NOT IN (" + - " SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms" + - " )" + - " AND r.room_id NOT IN (" + - " SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms" + - " )" + - ")" + - "SELECT" + - " dm_rooms.dm_array," + - " operation_rooms.operation_array," + - " team_rooms.team_array" + - "FROM" + - " dm_rooms," + - " operation_rooms," + - " team_rooms;" +const selectRoomsUnderSpaceSQL = ` + WITH room_ids AS ( + SELECT DISTINCT + (REGEXP_MATCHES(event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + FROM roomserver_event_json + WHERE event_json LIKE '%"state_key":"$1"%' + AND event_json LIKE '%"type":"m.space.parent"%' + ), + dm_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS dm_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"is_direct":true%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + operation_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS operation_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"type":"connect.operation"%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + team_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS team_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE r.room_id = ANY ( + SELECT room_id FROM room_ids) + AND r.room_id NOT IN ( + SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms) + AND r.room_id NOT IN ( + SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms) + ) + SELECT + dm_rooms.dm_array, + operation_rooms.operation_array, + team_rooms.team_array + FROM + dm_rooms, + operation_rooms, + team_rooms +` type eventJSONStatements struct { insertEventJSONStmt *sql.Stmt diff --git a/roomserver/storage/sqlite3/event_json_table.go b/roomserver/storage/sqlite3/event_json_table.go index 42c2179b1c..0ecf53ca21 100644 --- a/roomserver/storage/sqlite3/event_json_table.go +++ b/roomserver/storage/sqlite3/event_json_table.go @@ -46,63 +46,61 @@ const bulkSelectEventJSONSQL = ` ORDER BY event_nid ASC ` -const selectRoomsUnderSpaceSQL = "" + - "WITH room_ids AS (" + - " SELECT DISTINCT" + - " (REGEXP_MATCHES(event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " FROM roomserver_event_json" + - " WHERE event_json LIKE '%\"state_key\":\"$1\"%'" + - " AND event_json LIKE '%\"type\":\"m.space.parent\"%'" + - ")," + - "dm_rooms AS (" + - " SELECT" + - " ARRAY_AGG(DISTINCT r.room_id) AS dm_array" + - " FROM roomserver_event_json e" + - " CROSS JOIN LATERAL (" + - " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " ) AS r" + - " WHERE e.event_json LIKE '%\"is_direct\":true%'" + - " AND r.room_id = ANY (" + - " SELECT room_id FROM room_ids" + - " )" + - ")," + - "operation_rooms AS (" + - " SELECT" + - " ARRAY_AGG(DISTINCT r.room_id) AS operation_array" + - " FROM roomserver_event_json e" + - " CROSS JOIN LATERAL (" + - " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " ) AS r" + - " WHERE e.event_json LIKE '%\"type\":\"connect.operation\"%'" + - " AND r.room_id = ANY (" + - " SELECT room_id FROM room_ids" + - " )" + - ")," + - "team_rooms AS (" + - " SELECT" + - " ARRAY_AGG(DISTINCT r.room_id) AS team_array" + - " FROM roomserver_event_json e" + - " CROSS JOIN LATERAL (" + - " SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" + - " ) AS r" + - " WHERE r.room_id = ANY (" + - " SELECT room_id FROM room_ids" + - " )" + - " AND r.room_id NOT IN (" + - " SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms" + - " )" + - " AND r.room_id NOT IN (" + - " SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms" + - " )" + - ")" + - "SELECT" + - " dm_rooms.dm_array," + - " operation_rooms.operation_array," + - " team_rooms.team_array" + - "FROM" + - " dm_rooms," + - " operation_rooms," + - " team_rooms;" +const selectRoomsUnderSpaceSQL = ` + WITH room_ids AS ( + SELECT DISTINCT + (REGEXP_MATCHES(event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + FROM roomserver_event_json + WHERE event_json LIKE '%"state_key":"$1"%' + AND event_json LIKE '%"type":"m.space.parent"%' + ), + dm_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS dm_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"is_direct":true%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + operation_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS operation_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE e.event_json LIKE '%"type":"connect.operation"%' + AND r.room_id = ANY ( + SELECT room_id FROM room_ids + ) + ), + team_rooms AS ( + SELECT + ARRAY_AGG(DISTINCT r.room_id) AS team_array + FROM roomserver_event_json e + CROSS JOIN LATERAL ( + SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id + ) AS r + WHERE r.room_id = ANY ( + SELECT room_id FROM room_ids) + AND r.room_id NOT IN ( + SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms) + AND r.room_id NOT IN ( + SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms) + ) + SELECT + dm_rooms.dm_array, + operation_rooms.operation_array, + team_rooms.team_array + FROM + dm_rooms, + operation_rooms, + team_rooms +` type eventJSONStatements struct { db *sql.DB