Skip to content

Commit

Permalink
Identify service providers based on addresses (#27907)
Browse files Browse the repository at this point in the history
Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 authored Oct 25, 2023
1 parent 8b5b33f commit 6c3f29d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 71 deletions.
5 changes: 4 additions & 1 deletion internal/storage/aliyun/aliyun.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/milvus-io/milvus/pkg/log"
)

const OSSDefaultAddress = "oss.aliyuncs.com"
const (
OSSAddressFeatureString = "aliyuncs.com"
OSSDefaultAddress = "oss.aliyuncs.com"
)

// NewMinioClient returns a minio.Client which is compatible for aliyun OSS
func NewMinioClient(address string, opts *minio.Options) (*minio.Client, error) {
Expand Down
69 changes: 1 addition & 68 deletions internal/storage/minio_chunk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,13 @@ import (

"github.com/cockroachdb/errors"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
"golang.org/x/sync/errgroup"

"github.com/milvus-io/milvus/internal/storage/aliyun"
"github.com/milvus-io/milvus/internal/storage/gcp"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

Expand Down Expand Up @@ -78,73 +74,10 @@ func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManag
}

func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunkManager, error) {
var creds *credentials.Credentials
newMinioFn := minio.New
bucketLookupType := minio.BucketLookupAuto

if c.useVirtualHost {
bucketLookupType = minio.BucketLookupDNS
}

switch c.cloudProvider {
case CloudProviderAliyun:
// auto doesn't work for aliyun, so we set to dns deliberately
bucketLookupType = minio.BucketLookupDNS
if c.useIAM {
newMinioFn = aliyun.NewMinioClient
} else {
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
}
case CloudProviderGCP:
newMinioFn = gcp.NewMinioClient
if !c.useIAM {
creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "")
}
default: // aws, minio
if c.useIAM {
creds = credentials.NewIAM("")
} else {
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
}
}
minioOpts := &minio.Options{
BucketLookup: bucketLookupType,
Creds: creds,
Secure: c.useSSL,
Region: c.region,
}
minIOClient, err := newMinioFn(c.address, minioOpts)
// options nil or invalid formatted endpoint, don't need to retry
minIOClient, err := newMinioClient(ctx, c)
if err != nil {
return nil, err
}
var bucketExists bool
// check valid in first query
checkBucketFn := func() error {
bucketExists, err = minIOClient.BucketExists(ctx, c.bucketName)
if err != nil {
log.Warn("failed to check blob bucket exist", zap.String("bucket", c.bucketName), zap.Error(err))
return err
}
if !bucketExists {
if c.createBucket {
log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", c.bucketName))
err := minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{})
if err != nil {
log.Warn("failed to create blob bucket", zap.String("bucket", c.bucketName), zap.Error(err))
return err
}
} else {
return fmt.Errorf("bucket %s not Existed", c.bucketName)
}
}
return nil
}
err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts))
if err != nil {
return nil, err
}

mcm := &MinioChunkManager{
Client: minIOClient,
bucketName: c.bucketName,
Expand Down
46 changes: 44 additions & 2 deletions internal/storage/minio_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"context"
"fmt"
"io"
"strings"
"time"

minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"

Expand All @@ -36,11 +37,16 @@ type MinioObjectStorage struct {
*minio.Client
}

func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObjectStorage, error) {
func newMinioClient(ctx context.Context, c *config) (*minio.Client, error) {
var creds *credentials.Credentials
newMinioFn := minio.New
bucketLookupType := minio.BucketLookupAuto

if c.useVirtualHost {
bucketLookupType = minio.BucketLookupDNS
}

matchedDefault := false
switch c.cloudProvider {
case CloudProviderAliyun:
// auto doesn't work for aliyun, so we set to dns deliberately
Expand All @@ -56,6 +62,34 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje
creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "")
}
default: // aws, minio
matchedDefault = true
}

// Compatibility logic. If the cloud provider is not specified in the request,
// it shall be inferred based on the service address.
if matchedDefault {
matchedDefault = false
switch {
case strings.Contains(c.address, gcp.GcsDefaultAddress):
newMinioFn = gcp.NewMinioClient
if !c.useIAM {
creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "")
}
case strings.Contains(c.address, aliyun.OSSAddressFeatureString):
// auto doesn't work for aliyun, so we set to dns deliberately
bucketLookupType = minio.BucketLookupDNS
if c.useIAM {
newMinioFn = aliyun.NewMinioClient
} else {
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
}
default:
matchedDefault = true
}
}

if matchedDefault {
// aws, minio
if c.useIAM {
creds = credentials.NewIAM("")
} else {
Expand All @@ -66,6 +100,7 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje
BucketLookup: bucketLookupType,
Creds: creds,
Secure: c.useSSL,
Region: c.region,
}
minIOClient, err := newMinioFn(c.address, minioOpts)
// options nil or invalid formatted endpoint, don't need to retry
Expand Down Expand Up @@ -98,7 +133,14 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje
if err != nil {
return nil, err
}
return minIOClient, nil
}

func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObjectStorage, error) {
minIOClient, err := newMinioClient(ctx, c)
if err != nil {
return nil, err
}
return &MinioObjectStorage{minIOClient}, nil
}

Expand Down

0 comments on commit 6c3f29d

Please sign in to comment.