Skip to content

Commit

Permalink
HasResource should return whether resource is registered not cached
Browse files Browse the repository at this point in the history
Signed-off-by: huangyanfeng <[email protected]>
  • Loading branch information
yanfeng1992 authored and seanlaii committed Oct 23, 2024
1 parent e8d81b6 commit 4e3fe40
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 49 deletions.
6 changes: 3 additions & 3 deletions pkg/search/proxy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
if err != nil {
return err
}

registeredResources := make(map[schema.GroupVersionResource]struct{})
resourcesByClusters := make(map[string]map[schema.GroupVersionResource]*store.MultiNamespace)
for _, registry := range registries {
matchedResources := make(map[schema.GroupVersionResource]*store.MultiNamespace, len(registry.Spec.ResourceSelectors))
Expand All @@ -203,8 +203,8 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
matchedResources[gvr] = nsSelector
}
nsSelector.Add(selector.Namespace)
registeredResources[gvr] = struct{}{}
}

if len(matchedResources) == 0 {
continue
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
}
}

return ctl.store.UpdateCache(resourcesByClusters)
return ctl.store.UpdateCache(resourcesByClusters, registeredResources)
}

type errorHTTPHandler struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/search/proxy/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestController_reconcile(t *testing.T) {
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: &proxytest.MockStore{
UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error {
UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]*store.MultiNamespace, _ map[schema.GroupVersionResource]struct{}) error {
for clusterName, resources := range m {
resourceNames := make([]string, 0, len(resources))
for resource := range resources {
Expand Down
43 changes: 13 additions & 30 deletions pkg/search/proxy/store/multi_cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (

// Store is the cache for resources from multiple member clusters
type Store interface {
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error
HasResource(resource schema.GroupVersionResource) bool
GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
Stop()
Expand All @@ -52,10 +52,10 @@ type Store interface {

// MultiClusterCache caches resource from multi member clusters
type MultiClusterCache struct {
lock sync.RWMutex
cache map[string]*clusterCache
cachedResources map[schema.GroupVersionResource]struct{}
restMapper meta.RESTMapper
lock sync.RWMutex
cache map[string]*clusterCache
registeredResources map[schema.GroupVersionResource]struct{}
restMapper meta.RESTMapper
// newClientFunc returns a dynamic client for member cluster apiserver
newClientFunc func(string) (dynamic.Interface, error)
}
Expand All @@ -65,15 +65,15 @@ var _ Store = &MultiClusterCache{}
// NewMultiClusterCache return a cache for resources from member clusters
func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache {
return &MultiClusterCache{
restMapper: restMapper,
newClientFunc: newClientFunc,
cache: map[string]*clusterCache{},
cachedResources: map[schema.GroupVersionResource]struct{}{},
restMapper: restMapper,
newClientFunc: newClientFunc,
cache: map[string]*clusterCache{},
registeredResources: map[schema.GroupVersionResource]struct{}{},
}
}

// UpdateCache update cache for multi clusters
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error {
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error {
if klog.V(3).Enabled() {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -106,24 +106,7 @@ func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema
return err
}
}

// update cachedResource
newCachedResources := make(map[schema.GroupVersionResource]struct{}, len(c.cachedResources))
for _, resources := range resourcesByCluster {
for resource := range resources {
newCachedResources[resource] = struct{}{}
}
}
for resource := range c.cachedResources {
if _, exist := newCachedResources[resource]; !exist {
delete(c.cachedResources, resource)
}
}
for resource := range newCachedResources {
if _, exist := c.cachedResources[resource]; !exist {
c.cachedResources[resource] = struct{}{}
}
}
c.registeredResources = registeredResources
return nil
}

Expand All @@ -137,11 +120,11 @@ func (c *MultiClusterCache) Stop() {
}
}

// HasResource return whether resource is cached.
// HasResource return whether resource is registered.
func (c *MultiClusterCache) HasResource(resource schema.GroupVersionResource) bool {
c.lock.RLock()
defer c.lock.RUnlock()
_, ok := c.cachedResources[resource]
_, ok := c.registeredResources[resource]
return ok
}

Expand Down
58 changes: 46 additions & 12 deletions pkg/search/proxy/store/multi_cluster_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ func TestMultiClusterCache_UpdateCache(t *testing.T) {
cluster1.Name: resourceSet(podGVR, nodeGVR),
cluster2.Name: resourceSet(podGVR),
}

err := cache.UpdateCache(resources)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
nodeGVR: {},
}
err := cache.UpdateCache(resources, registeredResources)
if err != nil {
t.Error(err)
}
Expand All @@ -93,7 +96,7 @@ func TestMultiClusterCache_UpdateCache(t *testing.T) {
// Then test removing cluster2 and remove node cache for cluster1
err = cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Error(err)
}
Expand All @@ -115,7 +118,11 @@ func TestMultiClusterCache_HasResource(t *testing.T) {
cluster1.Name: resourceSet(podGVR, nodeGVR),
cluster2.Name: resourceSet(podGVR),
}
err := cache.UpdateCache(resources)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
nodeGVR: {},
}
err := cache.UpdateCache(resources, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -160,6 +167,9 @@ func TestMultiClusterCache_GetResourceFromCache(t *testing.T) {
cluster1.Name: resourceSet(podGVR),
cluster2.Name: resourceSet(podGVR),
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cluster1Client := fakedynamic.NewSimpleDynamicClient(scheme,
newUnstructuredObject(podGVK, "pod11", withDefaultNamespace()),
newUnstructuredObject(podGVK, "pod_conflict", withDefaultNamespace()),
Expand All @@ -180,7 +190,7 @@ func TestMultiClusterCache_GetResourceFromCache(t *testing.T) {
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(resources)
err := cache.UpdateCache(resources, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -300,11 +310,15 @@ func TestMultiClusterCache_Get(t *testing.T) {
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
nodeGVR: {},
}
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR, nodeGVR),
cluster2.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -440,6 +454,9 @@ func TestMultiClusterCache_Get_Namespaced(t *testing.T) {
}
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
Expand All @@ -449,7 +466,7 @@ func TestMultiClusterCache_Get_Namespaced(t *testing.T) {
cluster2.Name: {
podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2")},
},
})
}, registeredResources)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -569,6 +586,9 @@ func TestMultiClusterCache_List(t *testing.T) {
newUnstructuredObject(podGVK, "pod24", withDefaultNamespace(), withResourceVersion("2004")),
newUnstructuredObject(podGVK, "pod25", withDefaultNamespace(), withResourceVersion("2005")),
)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}

newClientFunc := func(cluster string) (dynamic.Interface, error) {
switch cluster {
Expand Down Expand Up @@ -657,7 +677,7 @@ func TestMultiClusterCache_List(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(tt.resources)
err := cache.UpdateCache(tt.resources, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -723,12 +743,15 @@ func TestMultiClusterCache_List_CacheSourceAnnotation(t *testing.T) {
}
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
cluster2.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -777,11 +800,14 @@ func TestMultiClusterCache_List_Namespaced(t *testing.T) {
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1")}},
cluster2.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2", "ns3")}},
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -917,11 +943,14 @@ func TestMultiClusterCache_Watch(t *testing.T) {
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
cluster2.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -1038,12 +1067,15 @@ func TestMultiClusterCache_Watch_Namespaced(t *testing.T) {
}
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1")}},
cluster2.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2", "ns3")}},
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -1396,6 +1428,8 @@ func TestMultiClusterCache_fillMissingClusterResourceVersion(t *testing.T) {
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
}, map[schema.GroupVersionResource]struct{}{
podGVR: {},
})
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/search/proxy/testing/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// MockStore is a mock for store.Store interface
type MockStore struct {
UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error
UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error
HasResourceFunc func(resource schema.GroupVersionResource) bool
GetResourceFromCacheFunc func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
StopFunc func()
Expand All @@ -42,11 +42,11 @@ type MockStore struct {
var _ store.Store = &MockStore{}

// UpdateCache implements store.Store interface
func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error {
func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error {
if c.UpdateCacheFunc == nil {
panic("implement me")
}
return c.UpdateCacheFunc(resourcesByCluster)
return c.UpdateCacheFunc(resourcesByCluster, registeredResources)
}

// HasResource implements store.Store interface
Expand Down

0 comments on commit 4e3fe40

Please sign in to comment.