Skip to content

Commit

Permalink
Reorder peer deletion when deleteing a user (#1191)
Browse files Browse the repository at this point in the history
  • Loading branch information
braginini authored Oct 3, 2023
1 parent e26ec0b commit 35bc493
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 90 deletions.
62 changes: 24 additions & 38 deletions management/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,22 @@ func (am *DefaultAccountManager) UpdatePeer(accountID, userID string, update *Pe
// deletePeers will delete all specified peers and send updates to the remote peers. Don't call without acquiring account lock
func (am *DefaultAccountManager) deletePeers(account *Account, peerIDs []string, userID string) error {

var peerError error
// the first loop is needed to ensure all peers present under the account before modifying, otherwise
// we might have some inconsistencies
peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {

peer := account.GetPeer(peerID)
if peer == nil {
peerError = status.Errorf(status.NotFound, "peer %s not found", peerID)
goto save
return status.Errorf(status.NotFound, "peer %s not found", peerID)
}
peers = append(peers, peer)
}

account.DeletePeer(peerID)

peerError = am.peersUpdateManager.SendUpdate(peer.ID,
// the 2nd loop performs the actual modification
for _, peer := range peers {
account.DeletePeer(peer.ID)
am.peersUpdateManager.SendUpdate(peer.ID,
&UpdateMessage{
Update: &proto.SyncResponse{
// fill those field for backward compatibility
Expand All @@ -402,36 +406,11 @@ func (am *DefaultAccountManager) deletePeers(account *Account, peerIDs []string,
},
},
})
if peerError != nil {
goto save
}

am.peersUpdateManager.CloseChannel(peerID)
am.peersUpdateManager.CloseChannel(peer.ID)
am.storeEvent(userID, peer.ID, account.Id, activity.PeerRemovedByUser, peer.EventMeta(am.GetDNSDomain()))
}

save:
err := am.Store.SaveAccount(account)
if err != nil {
if peerError != nil {
log.Errorf("account save error: %s", err)
return peerError
} else {
return err
}
}

err = am.updateAccountPeers(account)
if err != nil {
if peerError != nil {
log.Errorf("update account peers error: %s", err)
return peerError
} else {
return err
}
}

return peerError
return nil
}

// DeletePeer removes peer from the account by its IP
Expand All @@ -444,7 +423,17 @@ func (am *DefaultAccountManager) DeletePeer(accountID, peerID, userID string) er
return err
}

return am.deletePeers(account, []string{peerID}, userID)
err = am.deletePeers(account, []string{peerID}, userID)
if err != nil {
return err
}

err = am.Store.SaveAccount(account)
if err != nil {
return err
}

return am.updateAccountPeers(account)
}

// GetPeerByIP returns peer by its IP
Expand Down Expand Up @@ -943,10 +932,7 @@ func (am *DefaultAccountManager) updateAccountPeers(account *Account) error {
}

update := toSyncResponse(nil, peer, nil, remotePeerNetworkMap, am.GetDNSDomain())
err = am.peersUpdateManager.SendUpdate(peer.ID, &UpdateMessage{Update: update})
if err != nil {
return err
}
am.peersUpdateManager.SendUpdate(peer.ID, &UpdateMessage{Update: update})
}

return nil
Expand Down
6 changes: 1 addition & 5 deletions management/server/turncredentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,7 @@ func (m *TimeBasedAuthSecretsManager) SetupRefresh(peerID string) {
},
}
log.Debugf("sending new TURN credentials to peer %s", peerID)
err := m.updateManager.SendUpdate(peerID, &UpdateMessage{Update: update})
if err != nil {
log.Errorf("error while sending TURN update to peer %s %v", peerID, err)
// todo maybe continue trying?
}
m.updateManager.SendUpdate(peerID, &UpdateMessage{Update: update})
}
}
}()
Expand Down
7 changes: 3 additions & 4 deletions management/server/updatechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewPeersUpdateManager() *PeersUpdateManager {
}

// SendUpdate sends update message to the peer's channel
func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) error {
func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) {
p.channelsMux.Lock()
defer p.channelsMux.Unlock()
if channel, ok := p.peerChannels[peerID]; ok {
Expand All @@ -39,10 +39,9 @@ func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) er
default:
log.Warnf("channel for peer %s is %d full", peerID, len(channel))
}
return nil
} else {
log.Debugf("peer %s has no channel", peerID)
}
log.Debugf("peer %s has no channel", peerID)
return nil
}

// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
Expand Down
15 changes: 3 additions & 12 deletions management/server/updatechannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,15 @@ func TestSendUpdate(t *testing.T) {
if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel")
}
err := peersUpdater.SendUpdate(peer, update1)
if err != nil {
t.Error("Error sending update: ", err)
}
peersUpdater.SendUpdate(peer, update1)
select {
case <-peersUpdater.peerChannels[peer]:
default:
t.Error("Update wasn't send")
}

for range [channelBufferSize]int{} {
err = peersUpdater.SendUpdate(peer, update1)
if err != nil {
t.Errorf("got an early error sending update: %v ", err)
}
peersUpdater.SendUpdate(peer, update1)
}

update2 := &UpdateMessage{Update: &proto.SyncResponse{
Expand All @@ -54,10 +48,7 @@ func TestSendUpdate(t *testing.T) {
},
}}

err = peersUpdater.SendUpdate(peer, update2)
if err != nil {
t.Error("update shouldn't return an error when channel buffer is full")
}
peersUpdater.SendUpdate(peer, update2)
timeout := time.After(5 * time.Second)
for range [channelBufferSize]int{} {
select {
Expand Down
72 changes: 41 additions & 31 deletions management/server/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func (am *DefaultAccountManager) GetUser(claims jwtclaims.AuthorizationClaims) (
return user, nil
}

func (am *DefaultAccountManager) deleteServiceUser(account *Account, initiatorUserID string, targetUser *User) {
meta := map[string]any{"name": targetUser.ServiceUserName}
am.storeEvent(initiatorUserID, targetUser.Id, account.Id, activity.ServiceUserDeleted, meta)
delete(account.Users, targetUser.Id)
}

// DeleteUser deletes a user from the given account.
func (am *DefaultAccountManager) DeleteUser(accountID, initiatorUserID string, targetUserID string) error {
if initiatorUserID == targetUserID {
Expand All @@ -320,11 +326,6 @@ func (am *DefaultAccountManager) DeleteUser(accountID, initiatorUserID string, t
return err
}

targetUser := account.Users[targetUserID]
if targetUser == nil {
return status.Errorf(status.NotFound, "user not found")
}

executingUser := account.Users[initiatorUserID]
if executingUser == nil {
return status.Errorf(status.NotFound, "user not found")
Expand All @@ -333,55 +334,64 @@ func (am *DefaultAccountManager) DeleteUser(accountID, initiatorUserID string, t
return status.Errorf(status.PermissionDenied, "only admins can delete users")
}

peers, err := account.FindUserPeers(targetUserID)
if err != nil {
return status.Errorf(status.Internal, "failed to find user peers")
targetUser := account.Users[targetUserID]
if targetUser == nil {
return status.Errorf(status.NotFound, "target user not found")
}

peerIDs := make([]string, 0, len(peers))
for _, peer := range peers {
peerIDs = append(peerIDs, peer.ID)
// handle service user first and exit, no need to fetch extra data from IDP, etc
if targetUser.IsServiceUser {
am.deleteServiceUser(account, initiatorUserID, targetUser)
return am.Store.SaveAccount(account)
}

err = am.deletePeers(account, peerIDs, initiatorUserID)
if err != nil {
return err
}
return am.deleteRegularUser(account, initiatorUserID, targetUserID)
}

func (am *DefaultAccountManager) deleteRegularUser(account *Account, initiatorUserID, targetUserID string) error {
tuEmail, tuName, err := am.getEmailAndNameOfTargetUser(account.Id, initiatorUserID, targetUserID)
if err != nil {
log.Errorf("failed to resolve email address: %s", err)
return err
}

var meta map[string]any
var eventAction activity.Activity
if targetUser.IsServiceUser {
meta = map[string]any{"name": targetUser.ServiceUserName}
eventAction = activity.ServiceUserDeleted
} else {
meta = map[string]any{"name": tuName, "email": tuEmail}
eventAction = activity.UserDeleted
}
am.storeEvent(initiatorUserID, targetUserID, accountID, eventAction, meta)

if !targetUser.IsServiceUser && !isNil(am.idpManager) {
err := am.deleteUserFromIDP(targetUserID, accountID)
if !isNil(am.idpManager) {
err = am.deleteUserFromIDP(targetUserID, account.Id)
if err != nil {
log.Debugf("failed to delete user from IDP: %s", targetUserID)
return err
}
}

delete(account.Users, targetUserID)
err = am.deleteUserPeers(initiatorUserID, targetUserID, account)
if err != nil {
return err
}

// todo should be unnecessary because we save account in the am.deletePeers
delete(account.Users, targetUserID)
err = am.Store.SaveAccount(account)
if err != nil {
return err
}

return nil
meta := map[string]any{"name": tuName, "email": tuEmail}
am.storeEvent(initiatorUserID, targetUserID, account.Id, activity.UserDeleted, meta)

return am.updateAccountPeers(account)
}

func (am *DefaultAccountManager) deleteUserPeers(initiatorUserID string, targetUserID string, account *Account) error {
peers, err := account.FindUserPeers(targetUserID)
if err != nil {
return status.Errorf(status.Internal, "failed to find user peers")
}

peerIDs := make([]string, 0, len(peers))
for _, peer := range peers {
peerIDs = append(peerIDs, peer.ID)
}

return am.deletePeers(account, peerIDs, initiatorUserID)
}

// InviteUser resend invitations to users who haven't activated their accounts prior to the expiration period.
Expand Down

0 comments on commit 35bc493

Please sign in to comment.