diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index b63d88de4d2..28e150994c4 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -203,6 +203,8 @@ func (l *DiskLocation) closeEcVolumeById(vid needle.VolumeId) { } func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) { + l.ecVolumesLock.Lock() + defer l.ecVolumesLock.Unlock() ecVolume, ok := l.ecVolumes[vid] if !ok { return diff --git a/weed/storage/store.go b/weed/storage/store.go index b48177befa0..f7f2e5ebe57 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -388,24 +388,35 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeEcShardInformationMessage) { for _, location := range s.Locations { - for _, ev := range location.ecVolumes { - messages := ev.ToVolumeEcShardInformationMessage() - if ev.IsTimeToDestroy() { - err := location.deleteEcVolumeById(ev.VolumeId) - if err != nil { - ecShards = append(ecShards, messages...) - glog.Errorf("delete EcVolume err %d: %v", ev.VolumeId, err) - continue - } - deleted = append(deleted, messages...) - } else { - ecShards = append(ecShards, messages...) + toDeleteEcVolume := s.prepareDeleteExpiredEcVolumesInLocation(location, ecShards) + for _, ev := range toDeleteEcVolume { + err := location.deleteEcVolumeById(ev.VolumeId) + if err != nil { + ecShards = append(ecShards, ev.ToVolumeEcShardInformationMessage()...) + glog.Errorf("delete EcVolume err %d: %v", ev.VolumeId, err) + continue } + deleted = append(deleted, ev.ToVolumeEcShardInformationMessage()...) } } return } +// prepareDeleteExpiredEcVolumesInLocation 计算ec volume是否达到删除时间,然后返回需要删除的ec volume列表,不需要删除的添加到ecShards中 +func (s *Store) prepareDeleteExpiredEcVolumesInLocation(location *DiskLocation, ecShards []*master_pb.VolumeEcShardInformationMessage) []*erasure_coding.EcVolume { + var toDeleteEcVolume []*erasure_coding.EcVolume + location.ecVolumesLock.RLock() + defer location.ecVolumesLock.RUnlock() + for _, ev := range location.ecVolumes { + if ev.IsTimeToDestroy() { + toDeleteEcVolume = append(toDeleteEcVolume, ev) + } else { + ecShards = append(ecShards, ev.ToVolumeEcShardInformationMessage()...) + } + } + return toDeleteEcVolume +} + func (s *Store) SetStopping() { s.isStopping = true for _, location := range s.Locations {