Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🗃️ Data Retention #89

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions clientapi/routing/data_retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package routing

import (
"context"
"net/http"

"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)

type DataRetentionRequest struct {
DataRetention api.PerformDataRetentionRequest `json:"data_retention"`
}

// Triggred by an application service job.
// Purges stale data according to data retention policy provided in the request body.
// For large spaces with many rooms this operation may take a considerable amount of time.
func PostDataRetention(
req *http.Request,
cfg *config.ClientAPI,
rsAPI roomserverAPI.ClientRoomserverAPI,
) util.JSONResponse {
var body DataRetentionRequest
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {
return *reqErr
}
dr := body.DataRetention

if dr.MaxAge <= 0 || dr.SpaceID == "" {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.BadJSON("missing max_age or space_id"),
}
}

// Validate the roomID
validRoomID, err := spec.NewRoomID(dr.SpaceID)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.InvalidParam("space_id is invalid"),
}
}

queryReq := api.QueryRoomsUnderSpaceRequest{
SpaceID: validRoomID.String(),
}

var queryRes api.QueryRoomsUnderSpaceResponse
if queryErr := rsAPI.QueryRoomsUnderSpace(req.Context(), &queryReq, &queryRes); queryErr != nil {
util.GetLogger(req.Context()).WithError(queryErr).Error("rsAPI.QueryRoomsUnderSpace failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{},
}
}

if dr.Teams {
logrus.Infof("Performing data retention on teams in space %s", dr.SpaceID)
for _, roomId := range queryRes.Teams {
if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"space_id": dr.SpaceID,
}).Errorf("Failed to perform data retention on team with id %s", roomId)
return util.ErrorResponse(err)
}
}
}

if dr.Operations {
logrus.Infof("Performing data retention on operations in space %s", dr.SpaceID)
for _, roomId := range queryRes.Operations {
if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"space_id": dr.SpaceID,
}).Errorf("Failed to perform data retention on operation with id %s", roomId)
return util.ErrorResponse(err)
}
}
}

if dr.Dms {
logrus.Infof("Performing data retention on dms in space %s", dr.SpaceID)
for _, roomId := range queryRes.DMs {
if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"space_id": dr.SpaceID,
}).Errorf("Failed to perform data retention on dm with id %s", roomId)
return util.ErrorResponse(err)
}
}
}

return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}
6 changes: 6 additions & 0 deletions clientapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,12 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)

v3mux.Handle("/rooms/{roomID}/dataRetention",
httputil.MakeAuthAPI("data_retention", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return PostDataRetention(req, cfg, rsAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)

synapseAdminRouter.Handle("/admin/v1/event_reports",
httputil.MakeAdminAPI("admin_report_events", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
from := parseUint64OrDefault(req.URL.Query().Get("from"), 0)
Expand Down
7 changes: 7 additions & 0 deletions federationapi/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,10 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
return nil
})
}

func (d *Database) DataRetentionInRoom(ctx context.Context, roomID string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// TODO: Implement this to support federation
return nil
})
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ require (
modernc.org/sqlite v1.29.5
)

require github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect

require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/RoaringBitmap/roaring v1.2.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions roomserver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ type ClientRoomserverAPI interface {

QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
//! GlobeKeeper Customization
QueryRoomsUnderSpace(ctx context.Context, req *QueryRoomsUnderSpaceRequest, res *QueryRoomsUnderSpaceResponse) error
QueryRoomsForUser(ctx context.Context, userID spec.UserID, desiredMembership string) ([]spec.RoomID, error)
QueryStateAfterEvents(ctx context.Context, req *QueryStateAfterEventsRequest, res *QueryStateAfterEventsResponse) error
// QueryKnownUsers returns a list of users that we know about from our joined rooms.
Expand All @@ -247,6 +249,8 @@ type ClientRoomserverAPI interface {
PerformAdminEvacuateRoom(ctx context.Context, roomID string) (affected []string, err error)
PerformAdminEvacuateUser(ctx context.Context, userID string) (affected []string, err error)
PerformAdminPurgeRoom(ctx context.Context, roomID string) error
//! GlobeKeeper Customization
PerformDataRetention(ctx context.Context, dr *PerformDataRetentionRequest, roomID string) error
PerformAdminDownloadState(ctx context.Context, roomID, userID string, serverName spec.ServerName) error
PerformPeek(ctx context.Context, req *PerformPeekRequest) (roomID string, err error)
PerformUnpeek(ctx context.Context, roomID, userID, deviceID string) error
Expand Down
9 changes: 9 additions & 0 deletions roomserver/api/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
OutputTypeRetirePeek OutputType = "retire_peek"
// OutputTypePurgeRoom indicates the event is an OutputPurgeRoom
OutputTypePurgeRoom OutputType = "purge_room"
// OutputTypeDataRetention indicates the event is an OutputDataRetention
OutputTypeDataRetention OutputType = "data_retention"
)

// An OutputEvent is an entry in the roomserver output kafka log.
Expand All @@ -84,6 +86,8 @@ type OutputEvent struct {
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
// The content of the event with type OutputPurgeRoom
PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"`
// The content of the event with type OutputDataRetention
DataRetentionInRoom *OutputDataRetention `json:"data_retention,omitempty"`
}

// Type of the OutputNewRoomEvent.
Expand Down Expand Up @@ -269,3 +273,8 @@ type OutputRetirePeek struct {
type OutputPurgeRoom struct {
RoomID string
}

type OutputDataRetention struct {
RoomID string
DR *PerformDataRetentionRequest
}
9 changes: 9 additions & 0 deletions roomserver/api/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,12 @@ type PerformForgetRequest struct {
}

type PerformForgetResponse struct{}

type PerformDataRetentionRequest struct {
SpaceID string `json:"space_id"`
Enabled bool `json:"enabled"`
MaxAge int32 `json:"max_age"`
Teams bool `json:"teams"`
Operations bool `json:"operations"`
Dms bool `json:"dms"`
}
11 changes: 11 additions & 0 deletions roomserver/api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ type QueryServerBannedFromRoomResponse struct {
Banned bool `json:"banned"`
}

// ! GlobeKeeper Customization
type QueryRoomsUnderSpaceRequest struct {
SpaceID string `json:"space_id"`
}

type QueryRoomsUnderSpaceResponse struct {
DMs []string `json:"dm_rooms"`
Operations []string `json:"operation_rooms"`
Teams []string `json:"team_rooms"`
}

type QueryAdminEventReportsResponse struct {
ID int64 `json:"id"`
Score int64 `json:"score"`
Expand Down
30 changes: 30 additions & 0 deletions roomserver/internal/perform/perform_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,36 @@ func (r *Admin) PerformAdminPurgeRoom(
})
}

// PerformDataRetention removes all stale encrypted chat messages from the given room according to the data retention policy.
func (r *Admin) PerformDataRetention(
ctx context.Context,
dr *api.PerformDataRetentionRequest,
roomID string,
) error {
// Validate we actually got a room ID and nothing else
if _, _, err := gomatrixserverlib.SplitID('!', roomID); err != nil {
return err
}

logrus.WithField("room_id", roomID).Warn("Performing data retention on room from roomserver")
if err := r.DB.DataRetentionInRoom(ctx, dr, roomID); err != nil {
logrus.WithField("room_id", roomID).WithError(err).Warn("Failed to perform data retention on room from roomserver")
return err
}

logrus.WithField("room_id", roomID).Warn("Performed data retention on room from roomserver, informing other components")

return r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
{
Type: api.OutputTypeDataRetention,
DataRetentionInRoom: &api.OutputDataRetention{
RoomID: roomID,
DR: dr,
},
},
})
}

func (r *Admin) PerformAdminDownloadState(
ctx context.Context,
roomID, userID string, serverName spec.ServerName,
Expand Down
15 changes: 15 additions & 0 deletions roomserver/internal/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,21 @@ func (r *Queryer) QueryMembershipsForRoom(
return nil
}

// QueryRoomsUnderSpace implements api.RoomserverInternalAPI
func (r *Queryer) QueryRoomsUnderSpace(
ctx context.Context,
request *api.QueryRoomsUnderSpaceRequest,
response *api.QueryRoomsUnderSpaceResponse,
) error {
var err error
response.DMs, response.Operations, response.Teams, err = r.DB.QueryRoomsUnderSpace(ctx, request.SpaceID)
if err != nil {
return err
}

return nil
}

// QueryServerJoinedToRoom implements api.RoomserverInternalAPI
func (r *Queryer) QueryServerJoinedToRoom(
ctx context.Context,
Expand Down
5 changes: 5 additions & 0 deletions roomserver/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type Database interface {
// joinOnly is set to true.
// Returns an error if there was a problem talking to the database.
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error)
//! GlobeKeeper Customization
// QueryRoomsUnderSpace looks up rooms under the given space and returns all of them sorted by their type (DMs, operations and teams).
QueryRoomsUnderSpace(ctx context.Context, spaceID string) (dms, operations, teams []string, err error)
// EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was
// not found.
// Returns an error if the retrieval went wrong.
Expand Down Expand Up @@ -177,6 +180,8 @@ type Database interface {
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]gomatrixserverlib.PDU, error)
GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error)
PurgeRoom(ctx context.Context, roomID string) error
//! GlobeKeeper Customization
DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error

// GetMembershipForHistoryVisibility queries the membership events for the given eventIDs.
Expand Down
102 changes: 102 additions & 0 deletions roomserver/storage/postgres/data_retention_statements.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package postgres

import (
"context"
"database/sql"

"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
)

const dataRetentionEventJSONSQL = "" +
"DELETE FROM roomserver_event_json WHERE event_nid = ANY(" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
")"

const dataRetentionEventsSQL = "" +
"DELETE FROM roomserver_events WHERE event_nid = ANY(" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
")"

const dataRetentionPreviousEventsSQL = "" +
"UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
") AS subquery" +
"WHERE event_nids @> ARRAY[subquery.event_nid]"

const dataRetentionStateBlockEntriesSQL = "" +
"UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
") AS subquery" +
"WHERE event_nids @> ARRAY[subquery.event_nid]"

type dataRetentionStatements struct {
dataRetentionEventJSONStmt *sql.Stmt
dataRetentionEventsStmt *sql.Stmt
dataRetentionPreviousEventsStmt *sql.Stmt
dataRetentionStateBlockEntriesStmt *sql.Stmt
}

func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) {
s := &dataRetentionStatements{}

return s, sqlutil.StatementList{
{&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL},
{&s.dataRetentionEventsStmt, dataRetentionEventsSQL},
{&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL},
{&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL},
}.Prepare(db)
}

func (s *dataRetentionStatements) DataRetentionInRoom(
ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string,
) error {
dataRetentionByRoomNID := []*sql.Stmt{
s.dataRetentionEventJSONStmt,
s.dataRetentionEventsStmt,
s.dataRetentionPreviousEventsStmt,
s.dataRetentionStateBlockEntriesStmt,
}
for _, stmt := range dataRetentionByRoomNID {
_, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID)
if err != nil {
return err
}
}
return nil
}
Loading
Loading