Skip to content

Commit

Permalink
读锁内的处理存活volume逻辑放到外面处理
Browse files Browse the repository at this point in the history
  • Loading branch information
xuwenfeng committed Aug 30, 2024
1 parent 03c7001 commit 12581d0
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions weed/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {

func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeEcShardInformationMessage) {
for _, location := range s.Locations {
toDeleteEcVolume := s.prepareDeleteExpiredEcVolumesInLocation(location, ecShards)
for _, ev := range toDeleteEcVolume {
toDeleteEcVolumes, aliveEcVolumes := s.prepareDiffExpiredEcVolumesInLocation(location)
for _, ev := range toDeleteEcVolumes {
err := location.deleteEcVolumeById(ev.VolumeId)
if err != nil {
ecShards = append(ecShards, ev.ToVolumeEcShardInformationMessage()...)
Expand All @@ -398,23 +398,25 @@ func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeE
}
deleted = append(deleted, ev.ToVolumeEcShardInformationMessage()...)
}
for _, v := range aliveEcVolumes {
ecShards = append(ecShards, v.ToVolumeEcShardInformationMessage()...)
}
}
return
}

// prepareDeleteExpiredEcVolumesInLocation 计算ec volume是否达到删除时间,然后返回需要删除的ec volume列表,不需要删除的添加到ecShards中
func (s *Store) prepareDeleteExpiredEcVolumesInLocation(location *DiskLocation, ecShards []*master_pb.VolumeEcShardInformationMessage) []*erasure_coding.EcVolume {
var toDeleteEcVolume []*erasure_coding.EcVolume
// prepareDiffExpiredEcVolumesInLocation 遍历location下所有的ec volumes,区分是否到了删除时间
func (s *Store) prepareDiffExpiredEcVolumesInLocation(location *DiskLocation) (toDeleteEcVolumes, aliveEcVolumes []*erasure_coding.EcVolume) {
location.ecVolumesLock.RLock()
defer location.ecVolumesLock.RUnlock()
for _, ev := range location.ecVolumes {
if ev.IsTimeToDestroy() {
toDeleteEcVolume = append(toDeleteEcVolume, ev)
toDeleteEcVolumes = append(toDeleteEcVolumes, ev)
} else {
ecShards = append(ecShards, ev.ToVolumeEcShardInformationMessage()...)
aliveEcVolumes = append(aliveEcVolumes, ev)
}
}
return toDeleteEcVolume
return
}

func (s *Store) SetStopping() {
Expand Down

0 comments on commit 12581d0

Please sign in to comment.