Skip to content

Commit

Permalink
如果找不到ecVolume,尝试扫描locations删除
Browse files Browse the repository at this point in the history
  • Loading branch information
xuwenfeng committed Oct 9, 2024
1 parent 16d4409 commit f25bc2b
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 23 deletions.
1 change: 1 addition & 0 deletions weed/pb/volume_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ message VolumeDeleteResponse {

message EcVolumeDeleteRequest {
uint32 volume_id = 1;
string collection = 2;
}
message EcVolumeDeleteResponse {
}
Expand Down
16 changes: 13 additions & 3 deletions weed/pb/volume_server_pb/volume_server.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions weed/server/volume_grpc_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"path/filepath"
"time"

Expand Down Expand Up @@ -115,12 +116,22 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
func (vs *VolumeServer) EcVolumeDelete(ctx context.Context, req *volume_server_pb.EcVolumeDeleteRequest) (*volume_server_pb.EcVolumeDeleteResponse, error) {
resp := &volume_server_pb.EcVolumeDeleteResponse{}
vid := needle.VolumeId(req.VolumeId)
collection := req.Collection

locations := vs.store.DestroyEcVolume(vid)
if len(locations) == 0 {
return resp, fmt.Errorf("no ec shards found, volumeId:%d", req.VolumeId)
deletedShards := vs.store.DestroyEcVolume(vid)
if len(deletedShards) > 0 {
glog.V(0).Infof("deleteEcVolume %s_%d deleted,shards: %v", collection, vid, deletedShards)
return resp, nil
}
glog.V(0).Infof("deleteEcVolume %s_%d location not found, try to delete by scan all location dir", collection, vid)
bName := erasure_coding.EcShardBaseFileName(collection, int(vid))
for _, location := range vs.store.Locations {
err := deleteAllEcShardIdsForEachLocation(bName, location)
if err != nil {
glog.Errorf("deleteEcShards %s_%d from %s %s: %v", collection, vid, location.Directory, bName, err)
return nil, err
}
}
glog.V(0).Infof("ec volume [%d] deleted,shards: %v", vid, locations)
return resp, nil
}

Expand Down
35 changes: 26 additions & 9 deletions weed/server/volume_grpc_erasure_coding.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)

for _, location := range vs.store.Locations {
if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
if _, err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
glog.Errorf("deleteEcShards from %s %s.%v: %v", location.Directory, bName, req.ShardIds, err)
return nil, err
}
Expand All @@ -210,9 +210,26 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
}

func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) error {
func deleteAllEcShardIdsForEachLocation(bName string, location *storage.DiskLocation) error {
var shardIds []uint32
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardIds = append(shardIds, uint32(i))
}
deleted, err := deleteEcShardIdsForEachLocation(bName, location, shardIds)
if err != nil {
return err
}
if len(deleted) == 0 {
glog.V(0).Infof("ec volume %s no shards found in directory:%s", bName, location.Directory)
} else {
glog.V(0).Infof("ec volume %s shard deleted %v, directory:%s", bName, deleted, location.Directory)
}
return nil
}

func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) ([]uint32, error) {

found := false
deletedShards := make([]uint32, 0)

indexBaseFilename := path.Join(location.IdxDirectory, bName)
dataBaseFilename := path.Join(location.Directory, bName)
Expand All @@ -221,24 +238,24 @@ func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocatio
for _, shardId := range shardIds {
shardFileName := dataBaseFilename + erasure_coding.ToExt(int(shardId))
if util.FileExists(shardFileName) {
found = true
os.Remove(shardFileName)
deletedShards = append(deletedShards, shardId)
}
}
//}

if !found {
return nil
if len(deletedShards) == 0 {
return nil, nil
}

hasEcxFile, hasIdxFile, existingShardCount, err := checkEcVolumeStatus(bName, location)
if err != nil {
return err
return deletedShards, err
}

if hasEcxFile && existingShardCount == 0 {
if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
return err
return deletedShards, err
}
os.Remove(indexBaseFilename + ".ecj")

Expand All @@ -248,7 +265,7 @@ func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocatio
}
}

return nil
return deletedShards, nil
}

func checkEcVolumeStatus(bName string, location *storage.DiskLocation) (hasEcxFile bool, hasIdxFile bool, existingShardCount int, err error) {
Expand Down
7 changes: 4 additions & 3 deletions weed/shell/command_ecVolume_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *commandEcVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer
}

for _, sourceVolumeServer := range sourceVolumeServerList {
err = deleteEcVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(volumeId), sourceVolumeServer)
err = deleteEcVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(volumeId), collection, sourceVolumeServer)
if err != nil {
//delete error, interrupt
return err
Expand Down Expand Up @@ -115,10 +115,11 @@ func findEcVolumeLocations(commandEnv *CommandEnv, volumeId int, collection stri
return sourceVolumeServerList, err
}

func deleteEcVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
func deleteEcVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) (err error) {
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.EcVolumeDelete(context.Background(), &volume_server_pb.EcVolumeDeleteRequest{
VolumeId: uint32(volumeId),
VolumeId: uint32(volumeId),
Collection: collection,
})
return deleteErr
})
Expand Down
6 changes: 2 additions & 4 deletions weed/storage/store_ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,11 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) []uint8 {
deletedShards = append(deletedShards, uint8(ecShard.ShardId))
}
} else {
glog.V(0).Infof("DestroyEcVolume %d: %v", vid, err)
glog.Errorf("DestroyEcVolume %d: %v", vid, err)
}
} else {

}
}
glog.V(0).Infof("delete ecVolume:%d", vid)
glog.V(0).Infof("delete ecVolume:%d, deletedShards:%v", vid, deletedShards)
return deletedShards
}

Expand Down

0 comments on commit f25bc2b

Please sign in to comment.