Skip to content

Commit

Permalink
support enable etcd authn (#201)
Browse files Browse the repository at this point in the history
Signed-off-by: haorenfsa <[email protected]>
  • Loading branch information
haorenfsa authored Oct 25, 2024
1 parent 2aa883f commit 39df548
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 26 deletions.
24 changes: 24 additions & 0 deletions config/samples/milvus_etcd_auth.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# This is a sample to enable etcd auth for milvus
apiVersion: milvus.io/v1beta1
kind: Milvus
metadata:
name: my-release
labels:
app: milvus
spec:
dependencies:
etcd:
inCluster:
values:
replicaCount: 1
auth:
rbac:
enabled: true
rootPassword: myrootpass
components: {}
config:
etcd:
auth:
enabled: true
userName: root
password: myrootpass
42 changes: 32 additions & 10 deletions pkg/controllers/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,20 @@ var (
wrapKafkaConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka, cfg external.CheckKafkaConfig) func() v1beta1.MilvusCondition {
return func() v1beta1.MilvusCondition { return GetKafkaCondition(ctx, logger, p, cfg) }
}
wrapEtcdConditionGetter = func(ctx context.Context, endpoints []string) func() v1beta1.MilvusCondition {
return func() v1beta1.MilvusCondition { return GetEtcdCondition(ctx, endpoints) }
wrapEtcdConditionGetter = func(ctx context.Context, m *v1beta1.Milvus, endpoints []string) func() v1beta1.MilvusCondition {
sslEnabled, _ := util.GetBoolValue(m.Spec.Conf.Data, "etcd", "ssl", "enabled")
if sslEnabled {
return external.NewTCPDialConditionGetter(v1beta1.EtcdReady, endpoints).GetCondition
}
authEnabled, _ := util.GetBoolValue(m.Spec.Conf.Data, "etcd", "auth", "enabled")
userName, _ := util.GetStringValue(m.Spec.Conf.Data, "etcd", "auth", "userName")
password, _ := util.GetStringValue(m.Spec.Conf.Data, "etcd", "auth", "password")
authCfg := EtcdAuthConfig{
Enabled: authEnabled,
Username: userName,
Password: password,
}
return func() v1beta1.MilvusCondition { return GetEtcdCondition(ctx, authCfg, endpoints) }
}
wrapMinioConditionGetter = func(ctx context.Context, logger logr.Logger, cli client.Client, info StorageConditionInfo) func() v1beta1.MilvusCondition {
return func() v1beta1.MilvusCondition { return GetMinioCondition(ctx, logger, cli, info) }
Expand Down Expand Up @@ -140,8 +152,8 @@ type EtcdConditionInfo struct {
Endpoints []string
}

func GetEtcdCondition(ctx context.Context, endpoints []string) v1beta1.MilvusCondition {
health := GetEndpointsHealth(endpoints)
func GetEtcdCondition(ctx context.Context, authCfg EtcdAuthConfig, endpoints []string) v1beta1.MilvusCondition {
health := GetEndpointsHealth(authCfg, endpoints)
etcdReady := false
var msg string
for _, ep := range endpoints {
Expand Down Expand Up @@ -174,19 +186,29 @@ var etcdNewClient NewEtcdClientFunc = func(cfg clientv3.Config) (EtcdClient, err

const etcdHealthKey = "health"

func GetEndpointsHealth(endpoints []string) map[string]EtcdEndPointHealth {
type EtcdAuthConfig struct {
Enabled bool
Username string
Password string
}

func GetEndpointsHealth(authConfig EtcdAuthConfig, endpoints []string) map[string]EtcdEndPointHealth {
hch := make(chan EtcdEndPointHealth, len(endpoints))
var wg sync.WaitGroup
for _, ep := range endpoints {
wg.Add(1)
go func(ep string) {
defer wg.Done()

cliCfg := clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 5 * time.Second,
}
if authConfig.Enabled {
cliCfg.Username = authConfig.Username
cliCfg.Password = authConfig.Password
}
var checkEtcd = func() error {
cli, err := etcdNewClient(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 5 * time.Second,
})
cli, err := etcdNewClient(cliCfg)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWrapGetters(t *testing.T) {
fn()
})
t.Run("etcd", func(t *testing.T) {
fn := wrapEtcdConditionGetter(ctx, []string{})
fn := wrapEtcdConditionGetter(ctx, &v1beta1.Milvus{}, []string{})
fn()
})
t.Run("minio", func(t *testing.T) {
Expand Down Expand Up @@ -202,15 +202,15 @@ func TestGetEtcdCondition(t *testing.T) {
errTest := errors.New("test")

// no endpoint
ret := GetEtcdCondition(ctx, []string{})
ret := GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{})
assert.Equal(t, corev1.ConditionFalse, ret.Status)
assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason)

// new client failed
t.Run("new client failed", func(t *testing.T) {
stubs := gostub.Stub(&etcdNewClient, getMockNewEtcdClient(nil, errTest))
defer stubs.Reset()
ret = GetEtcdCondition(ctx, []string{"etcd:2379"})
ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"})
assert.Equal(t, corev1.ConditionFalse, ret.Status)
assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason)
})
Expand All @@ -221,15 +221,15 @@ func TestGetEtcdCondition(t *testing.T) {
defer stubs.Reset()
mockEtcdCli.EXPECT().Get(gomock.Any(), etcdHealthKey, gomock.Any()).Return(nil, errTest).AnyTimes()
mockEtcdCli.EXPECT().Close().AnyTimes()
ret = GetEtcdCondition(ctx, []string{"etcd:2379"})
ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"})
assert.Equal(t, corev1.ConditionFalse, ret.Status)
assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason)

// etcd get, err permession denied, alarm failed
mockEtcdCli.EXPECT().Get(gomock.Any(), etcdHealthKey, gomock.Any()).Return(nil, rpctypes.ErrPermissionDenied).AnyTimes()
mockEtcdCli.EXPECT().AlarmList(gomock.Any()).Return(nil, errTest).AnyTimes()
mockEtcdCli.EXPECT().Close().AnyTimes()
ret = GetEtcdCondition(ctx, []string{"etcd:2379"})
ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"})
assert.Equal(t, corev1.ConditionFalse, ret.Status)
assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason)

Expand All @@ -241,7 +241,7 @@ func TestGetEtcdCondition(t *testing.T) {
},
}, nil).AnyTimes()
mockEtcdCli.EXPECT().Close().AnyTimes()
ret = GetEtcdCondition(ctx, []string{"etcd:2379"})
ret = GetEtcdCondition(ctx, EtcdAuthConfig{}, []string{"etcd:2379"})
assert.Equal(t, corev1.ConditionFalse, ret.Status)
assert.Equal(t, v1beta1.ReasonEtcdNotReady, ret.Reason)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/status_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (r *MilvusStatusSyncer) GetMinioCondition(
}

func (r *MilvusStatusSyncer) GetEtcdCondition(ctx context.Context, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error) {
getter := wrapEtcdConditionGetter(ctx, mc.Spec.Dep.Etcd.Endpoints)
getter := wrapEtcdConditionGetter(ctx, &mc, mc.Spec.Dep.Etcd.Endpoints)
return GetCondition(getter, mc.Spec.Dep.Etcd.Endpoints), nil
}

Expand Down
9 changes: 0 additions & 9 deletions pkg/controllers/status_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"testing"

"github.com/go-logr/logr"
"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util"
"github.com/prashantv/gostub"
Expand Down Expand Up @@ -438,14 +437,6 @@ func mockConditionGetter() v1beta1.MilvusCondition {
return v1beta1.MilvusCondition{Reason: "update"}
}

func TestWrapGetter(t *testing.T) {
var getter func() v1beta1.MilvusCondition
getter = wrapEtcdConditionGetter(nil, []string{})
assert.NotNil(t, getter)
getter = wrapMinioConditionGetter(nil, logr.Logger{}, nil, StorageConditionInfo{})
assert.NotNil(t, getter)
}

func Test_updateMetrics(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
9 changes: 9 additions & 0 deletions test/min-milvus-feature.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ spec:
pvcDeletion: true
values:
replicaCount: 1
auth:
rbac:
enabled: true
rootPassword: myrootpass
storage:
inCluster:
deletionPolicy: Delete
Expand Down Expand Up @@ -186,3 +190,8 @@ spec:
pulsar:
authPlugin: token
authParams: file:/milvus/pulsar/token
etcd:
auth:
enabled: true
userName: root
password: myrootpass

0 comments on commit 39df548

Please sign in to comment.