From 6c3f29d003ab7439896023b555ca8bafd4cc4d66 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Wed, 25 Oct 2023 17:28:10 +0800 Subject: [PATCH] Identify service providers based on addresses (#27907) Signed-off-by: zhenshan.cao --- internal/storage/aliyun/aliyun.go | 5 +- internal/storage/minio_chunk_manager.go | 69 +----------------------- internal/storage/minio_object_storage.go | 46 +++++++++++++++- 3 files changed, 49 insertions(+), 71 deletions(-) diff --git a/internal/storage/aliyun/aliyun.go b/internal/storage/aliyun/aliyun.go index c4c097941c015..09e3a083b88c5 100644 --- a/internal/storage/aliyun/aliyun.go +++ b/internal/storage/aliyun/aliyun.go @@ -9,7 +9,10 @@ import ( "github.com/milvus-io/milvus/pkg/log" ) -const OSSDefaultAddress = "oss.aliyuncs.com" +const ( + OSSAddressFeatureString = "aliyuncs.com" + OSSDefaultAddress = "oss.aliyuncs.com" +) // NewMinioClient returns a minio.Client which is compatible for aliyun OSS func NewMinioClient(address string, opts *minio.Options) (*minio.Client, error) { diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index d5c660d968f44..15db04273fdcc 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -27,17 +27,13 @@ import ( "github.com/cockroachdb/errors" minio "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" "golang.org/x/exp/mmap" "golang.org/x/sync/errgroup" - "github.com/milvus-io/milvus/internal/storage/aliyun" - "github.com/milvus-io/milvus/internal/storage/gcp" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -78,73 +74,10 @@ func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManag } func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunkManager, error) { - var creds *credentials.Credentials - newMinioFn := minio.New - bucketLookupType := minio.BucketLookupAuto - - if c.useVirtualHost { - bucketLookupType = minio.BucketLookupDNS - } - - switch c.cloudProvider { - case CloudProviderAliyun: - // auto doesn't work for aliyun, so we set to dns deliberately - bucketLookupType = minio.BucketLookupDNS - if c.useIAM { - newMinioFn = aliyun.NewMinioClient - } else { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") - } - case CloudProviderGCP: - newMinioFn = gcp.NewMinioClient - if !c.useIAM { - creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "") - } - default: // aws, minio - if c.useIAM { - creds = credentials.NewIAM("") - } else { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") - } - } - minioOpts := &minio.Options{ - BucketLookup: bucketLookupType, - Creds: creds, - Secure: c.useSSL, - Region: c.region, - } - minIOClient, err := newMinioFn(c.address, minioOpts) - // options nil or invalid formatted endpoint, don't need to retry + minIOClient, err := newMinioClient(ctx, c) if err != nil { return nil, err } - var bucketExists bool - // check valid in first query - checkBucketFn := func() error { - bucketExists, err = minIOClient.BucketExists(ctx, c.bucketName) - if err != nil { - log.Warn("failed to check blob bucket exist", zap.String("bucket", c.bucketName), zap.Error(err)) - return err - } - if !bucketExists { - if c.createBucket { - log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", c.bucketName)) - err := minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{}) - if err != nil { - log.Warn("failed to create blob bucket", zap.String("bucket", c.bucketName), zap.Error(err)) - return err - } - } else { - return fmt.Errorf("bucket %s not Existed", c.bucketName) - } - } - return nil - } - err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts)) - if err != nil { - return nil, err - } - mcm := &MinioChunkManager{ Client: minIOClient, bucketName: c.bucketName, diff --git a/internal/storage/minio_object_storage.go b/internal/storage/minio_object_storage.go index 76a14a492f325..0003c9a140487 100644 --- a/internal/storage/minio_object_storage.go +++ b/internal/storage/minio_object_storage.go @@ -20,9 +20,10 @@ import ( "context" "fmt" "io" + "strings" "time" - minio "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" @@ -36,11 +37,16 @@ type MinioObjectStorage struct { *minio.Client } -func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObjectStorage, error) { +func newMinioClient(ctx context.Context, c *config) (*minio.Client, error) { var creds *credentials.Credentials newMinioFn := minio.New bucketLookupType := minio.BucketLookupAuto + if c.useVirtualHost { + bucketLookupType = minio.BucketLookupDNS + } + + matchedDefault := false switch c.cloudProvider { case CloudProviderAliyun: // auto doesn't work for aliyun, so we set to dns deliberately @@ -56,6 +62,34 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "") } default: // aws, minio + matchedDefault = true + } + + // Compatibility logic. If the cloud provider is not specified in the request, + // it shall be inferred based on the service address. + if matchedDefault { + matchedDefault = false + switch { + case strings.Contains(c.address, gcp.GcsDefaultAddress): + newMinioFn = gcp.NewMinioClient + if !c.useIAM { + creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "") + } + case strings.Contains(c.address, aliyun.OSSAddressFeatureString): + // auto doesn't work for aliyun, so we set to dns deliberately + bucketLookupType = minio.BucketLookupDNS + if c.useIAM { + newMinioFn = aliyun.NewMinioClient + } else { + creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + } + default: + matchedDefault = true + } + } + + if matchedDefault { + // aws, minio if c.useIAM { creds = credentials.NewIAM("") } else { @@ -66,6 +100,7 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje BucketLookup: bucketLookupType, Creds: creds, Secure: c.useSSL, + Region: c.region, } minIOClient, err := newMinioFn(c.address, minioOpts) // options nil or invalid formatted endpoint, don't need to retry @@ -98,7 +133,14 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje if err != nil { return nil, err } + return minIOClient, nil +} +func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObjectStorage, error) { + minIOClient, err := newMinioClient(ctx, c) + if err != nil { + return nil, err + } return &MinioObjectStorage{minIOClient}, nil }