diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index 87a226df1e6f7..bfc31f8e5ad87 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -213,7 +213,8 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() { size: *atomic.NewInt64(1024 * 1024 * 1024 * 10), }, }, true}, - {"very recent partition stats and enough new data", + { + "very recent partition stats and enough new data", []*datapb.PartitionStatsInfo{ { CollectionID: collectionID, @@ -228,8 +229,11 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() { { size: *atomic.NewInt64(1024 * 1024 * 1024 * 10), }, - }, false}, - {"very old partition stats and not enough new data", + }, + false, + }, + { + "very old partition stats and not enough new data", []*datapb.PartitionStatsInfo{ { CollectionID: collectionID, @@ -244,8 +248,11 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() { { size: *atomic.NewInt64(1024), }, - }, true}, - {"partition stats and enough new data", + }, + true, + }, + { + "partition stats and enough new data", []*datapb.PartitionStatsInfo{ { CollectionID: collectionID, @@ -262,8 +269,11 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() { SegmentInfo: &datapb.SegmentInfo{ID: 9999}, size: *atomic.NewInt64(1024 * 1024 * 1024 * 10), }, - }, true}, - {"partition stats and not enough new data", + }, + true, + }, + { + "partition stats and not enough new data", []*datapb.PartitionStatsInfo{ { CollectionID: collectionID, @@ -280,7 +290,9 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() { SegmentInfo: &datapb.SegmentInfo{ID: 9999}, size: *atomic.NewInt64(1024), }, - }, false}, + }, + false, + }, } for _, test := range tests { diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 2fd2ecb736d88..017e743930715 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -5,21 +5,18 @@ import ( "strconv" "testing" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - - "github.com/milvus-io/milvus/pkg/util/paramtable" - - "github.com/milvus-io/milvus/internal/metastore/model" - "github.com/milvus-io/milvus/pkg/common" - "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestCompactionTriggerManagerSuite(t *testing.T) { diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index bb02776423676..e74940c4d7e9e 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -1074,6 +1074,7 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) { if le != nil { log.Error("failed to load policy after RefreshPolicyInfo", zap.Error(le)) } + CleanPrivilegeCache() } }() if op.OpType != typeutil.CacheRefresh { diff --git a/internal/proxy/privilege_cache.go b/internal/proxy/privilege_cache.go new file mode 100644 index 0000000000000..f319b705dc27f --- /dev/null +++ b/internal/proxy/privilege_cache.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "fmt" + "sync" + + "go.uber.org/atomic" + + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var ( + priCacheInitOnce sync.Once + priCacheMut sync.RWMutex + priCache *PrivilegeCache + ver atomic.Int64 +) + +func getPriCache() *PrivilegeCache { + priCacheMut.RLock() + c := priCache + priCacheMut.RUnlock() + + if c == nil { + priCacheInitOnce.Do(func() { + priCacheMut.Lock() + defer priCacheMut.Unlock() + c = &PrivilegeCache{ + version: ver.Inc(), + values: typeutil.ConcurrentMap[string, bool]{}, + } + priCache = c + }) + } + + return c +} + +func CleanPrivilegeCache() { + priCacheMut.Lock() + defer priCacheMut.Unlock() + priCache = &PrivilegeCache{ + version: ver.Inc(), + values: typeutil.ConcurrentMap[string, bool]{}, + } +} + +func GetPrivilegeCache(roleName, object, objectPrivilege string) (isPermit, cached bool, version int64) { + key := fmt.Sprintf("%s_%s_%s", roleName, object, objectPrivilege) + c := getPriCache() + isPermit, cached = c.values.Get(key) + return isPermit, cached, c.version +} + +func SetPrivilegeCache(roleName, object, objectPrivilege string, isPermit bool, version int64) { + key := fmt.Sprintf("%s_%s_%s", roleName, object, objectPrivilege) + c := getPriCache() + if c.version == version { + c.values.Insert(key, isPermit) + } +} + +// PrivilegeCache is a cache for privilege enforce result +// version provides version control when any policy updates +type PrivilegeCache struct { + values typeutil.ConcurrentMap[string, bool] + version int64 +} diff --git a/internal/proxy/privilege_cache_test.go b/internal/proxy/privilege_cache_test.go new file mode 100644 index 0000000000000..cf80b8c3e8633 --- /dev/null +++ b/internal/proxy/privilege_cache_test.go @@ -0,0 +1,76 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type PrivilegeCacheSuite struct { + suite.Suite +} + +func (s *PrivilegeCacheSuite) TearDownTest() { + CleanPrivilegeCache() +} + +func (s *PrivilegeCacheSuite) TestGetPrivilege() { + // get current version + _, _, version := GetPrivilegeCache("", "", "") + SetPrivilegeCache("test-role", "test-object", "read", true, version) + SetPrivilegeCache("test-role", "test-object", "delete", false, version) + + type testCase struct { + tag string + input [3]string + expectIsPermit bool + expectExists bool + } + + testCases := []testCase{ + {tag: "exist_true", input: [3]string{"test-role", "test-object", "read"}, expectIsPermit: true, expectExists: true}, + {tag: "exist_false", input: [3]string{"test-role", "test-object", "delete"}, expectIsPermit: false, expectExists: true}, + {tag: "not_exist", input: [3]string{"guest", "test-object", "delete"}, expectIsPermit: false, expectExists: false}, + } + + for _, tc := range testCases { + s.Run(tc.tag, func() { + isPermit, exists, _ := GetPrivilegeCache(tc.input[0], tc.input[1], tc.input[2]) + s.Equal(tc.expectIsPermit, isPermit) + s.Equal(tc.expectExists, exists) + }) + } +} + +func (s *PrivilegeCacheSuite) TestSetPrivilegeVersion() { + // get current version + _, _, version := GetPrivilegeCache("", "", "") + CleanPrivilegeCache() + + SetPrivilegeCache("test-role", "test-object", "read", true, version) + + isPermit, exists, nextVersion := GetPrivilegeCache("test-role", "test-object", "read") + s.False(isPermit) + s.False(exists) + s.NotEqual(version, nextVersion) +} + +func TestPrivilegeSuite(t *testing.T) { + suite.Run(t, new(PrivilegeCacheSuite)) +} diff --git a/internal/proxy/privilege_interceptor.go b/internal/proxy/privilege_interceptor.go index ad0496fd8adc3..01f7ae302965d 100644 --- a/internal/proxy/privilege_interceptor.go +++ b/internal/proxy/privilege_interceptor.go @@ -135,10 +135,15 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context for _, roleName := range roleNames { permitFunc := func(resName string) (bool, error) { object := funcutil.PolicyForResource(dbName, objectType, resName) + isPermit, cached, version := GetPrivilegeCache(roleName, object, objectPrivilege) + if cached { + return isPermit, nil + } isPermit, err := e.Enforce(roleName, object, objectPrivilege) if err != nil { return false, err } + SetPrivilegeCache(roleName, object, objectPrivilege, isPermit, version) return isPermit, nil }