Skip to content

Commit

Permalink
Merge pull request #4 from ZTO-Express/volume-0830
Browse files Browse the repository at this point in the history
Volume 0830
  • Loading branch information
sunnysabor authored Aug 30, 2024
2 parents 04eabd5 + 03c7001 commit 95c181a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
2 changes: 2 additions & 0 deletions weed/storage/disk_location_ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ func (l *DiskLocation) closeEcVolumeById(vid needle.VolumeId) {
}

func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) {
l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock()
ecVolume, ok := l.ecVolumes[vid]
if !ok {
return
Expand Down
35 changes: 23 additions & 12 deletions weed/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,24 +388,35 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {

func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeEcShardInformationMessage) {
for _, location := range s.Locations {
for _, ev := range location.ecVolumes {
messages := ev.ToVolumeEcShardInformationMessage()
if ev.IsTimeToDestroy() {
err := location.deleteEcVolumeById(ev.VolumeId)
if err != nil {
ecShards = append(ecShards, messages...)
glog.Errorf("delete EcVolume err %d: %v", ev.VolumeId, err)
continue
}
deleted = append(deleted, messages...)
} else {
ecShards = append(ecShards, messages...)
toDeleteEcVolume := s.prepareDeleteExpiredEcVolumesInLocation(location, ecShards)
for _, ev := range toDeleteEcVolume {
err := location.deleteEcVolumeById(ev.VolumeId)
if err != nil {
ecShards = append(ecShards, ev.ToVolumeEcShardInformationMessage()...)
glog.Errorf("delete EcVolume err %d: %v", ev.VolumeId, err)
continue
}
deleted = append(deleted, ev.ToVolumeEcShardInformationMessage()...)
}
}
return
}

// prepareDeleteExpiredEcVolumesInLocation 计算ec volume是否达到删除时间,然后返回需要删除的ec volume列表,不需要删除的添加到ecShards中
func (s *Store) prepareDeleteExpiredEcVolumesInLocation(location *DiskLocation, ecShards []*master_pb.VolumeEcShardInformationMessage) []*erasure_coding.EcVolume {
var toDeleteEcVolume []*erasure_coding.EcVolume
location.ecVolumesLock.RLock()
defer location.ecVolumesLock.RUnlock()
for _, ev := range location.ecVolumes {
if ev.IsTimeToDestroy() {
toDeleteEcVolume = append(toDeleteEcVolume, ev)
} else {
ecShards = append(ecShards, ev.ToVolumeEcShardInformationMessage()...)
}
}
return toDeleteEcVolume
}

func (s *Store) SetStopping() {
s.isStopping = true
for _, location := range s.Locations {
Expand Down

0 comments on commit 95c181a

Please sign in to comment.