Skip to content

Commit

Permalink
mv discovery to app, util to customresource
Browse files Browse the repository at this point in the history
use interface to decouple customresourcestate and CRDiscoverer
  • Loading branch information
CatherineF-dev committed Sep 26, 2023
1 parent b91dd31 commit fff8c1a
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 168 deletions.
79 changes: 3 additions & 76 deletions internal/discovery/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,93 +15,20 @@ package discovery

import (
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type groupVersionKindPlural struct {
type GroupVersionKindPlural struct {
schema.GroupVersionKind
Plural string
}

func (g groupVersionKindPlural) String() string {
func (g GroupVersionKindPlural) String() string {
return fmt.Sprintf("%s/%s, Kind=%s, Plural=%s", g.Group, g.Version, g.Kind, g.Plural)
}

type kindPlural struct {
type KindPlural struct {
Kind string
Plural string
}

// CRDiscoverer provides a cache of the collected GVKs, along with helper utilities.
type CRDiscoverer struct {
// m is a mutex to protect the cache.
m sync.RWMutex
// Map is a cache of the collected GVKs.
Map map[string]map[string][]kindPlural
// ShouldUpdate is a flag that indicates whether the cache was updated.
WasUpdated bool
// CRDsAddEventsCounter tracks the number of times that the CRD informer triggered the "add" event.
CRDsAddEventsCounter prometheus.Counter
// CRDsDeleteEventsCounter tracks the number of times that the CRD informer triggered the "remove" event.
CRDsDeleteEventsCounter prometheus.Counter
// CRDsCacheCountGauge tracks the net amount of CRDs affecting the cache at this point.
CRDsCacheCountGauge prometheus.Gauge
}

// SafeRead executes the given function while holding a read lock.
func (r *CRDiscoverer) SafeRead(f func()) {
r.m.RLock()
defer r.m.RUnlock()
f()
}

// SafeWrite executes the given function while holding a write lock.
func (r *CRDiscoverer) SafeWrite(f func()) {
r.m.Lock()
defer r.m.Unlock()
f()
}

// AppendToMap appends the given GVKs to the cache.
func (r *CRDiscoverer) AppendToMap(gvkps ...groupVersionKindPlural) {
if r.Map == nil {
r.Map = map[string]map[string][]kindPlural{}
}
for _, gvkp := range gvkps {
if _, ok := r.Map[gvkp.Group]; !ok {
r.Map[gvkp.Group] = map[string][]kindPlural{}
}
if _, ok := r.Map[gvkp.Group][gvkp.Version]; !ok {
r.Map[gvkp.Group][gvkp.Version] = []kindPlural{}
}
r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version], kindPlural{Kind: gvkp.Kind, Plural: gvkp.Plural})
}
}

// RemoveFromMap removes the given GVKs from the cache.
func (r *CRDiscoverer) RemoveFromMap(gvkps ...groupVersionKindPlural) {
for _, gvkp := range gvkps {
if _, ok := r.Map[gvkp.Group]; !ok {
continue
}
if _, ok := r.Map[gvkp.Group][gvkp.Version]; !ok {
continue
}
for i, el := range r.Map[gvkp.Group][gvkp.Version] {
if el.Kind == gvkp.Kind {
if len(r.Map[gvkp.Group][gvkp.Version]) == 1 {
delete(r.Map[gvkp.Group], gvkp.Version)
if len(r.Map[gvkp.Group]) == 0 {
delete(r.Map, gvkp.Group)
}
break
}
r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version][:i], r.Map[gvkp.Group][gvkp.Version][i+1:]...)
break
}
}
}
}
5 changes: 2 additions & 3 deletions internal/store/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"
"k8s.io/kube-state-metrics/v2/pkg/options"
"k8s.io/kube-state-metrics/v2/pkg/sharding"
"k8s.io/kube-state-metrics/v2/pkg/util"
"k8s.io/kube-state-metrics/v2/pkg/watch"
)

Expand Down Expand Up @@ -197,7 +196,7 @@ func (b *Builder) DefaultGenerateCustomResourceStoresFunc() ksmtypes.BuildCustom
func (b *Builder) WithCustomResourceStoreFactories(fs ...customresource.RegistryFactory) {
for i := range fs {
f := fs[i]
gvr := util.GVRFromType(f.Name(), f.ExpectedType())
gvr := customresource.GVRFromType(f.Name(), f.ExpectedType())
var gvrString string
if gvr != nil {
gvrString = gvr.String()
Expand Down Expand Up @@ -542,7 +541,7 @@ func (b *Builder) buildCustomResourceStores(resourceName string,

familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies)

gvr := util.GVRFromType(resourceName, expectedType)
gvr := customresource.GVRFromType(resourceName, expectedType)
var gvrString string
if gvr != nil {
gvrString = gvr.String()
Expand Down
95 changes: 84 additions & 11 deletions internal/discovery/discovery.go → pkg/app/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ limitations under the License.
*/

// Package discovery provides a discovery and resolution logic for GVKs.
package discovery
package app

import (
"context"
"fmt"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand All @@ -27,16 +29,87 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"k8s.io/kube-state-metrics/v2/internal/discovery"
"k8s.io/kube-state-metrics/v2/internal/store"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/metricshandler"
"k8s.io/kube-state-metrics/v2/pkg/options"
"k8s.io/kube-state-metrics/v2/pkg/util"
)

// Interval is the time interval between two cache sync checks.
const Interval = 3 * time.Second

// CRDiscoverer provides a cache of the collected GVKs, along with helper utilities.
type CRDiscoverer struct {
// m is a mutex to protect the cache.
m sync.RWMutex
// Map is a cache of the collected GVKs.
Map map[string]map[string][]discovery.KindPlural
// ShouldUpdate is a flag that indicates whether the cache was updated.
WasUpdated bool
// CRDsAddEventsCounter tracks the number of times that the CRD informer triggered the "add" event.
CRDsAddEventsCounter prometheus.Counter
// CRDsDeleteEventsCounter tracks the number of times that the CRD informer triggered the "remove" event.
CRDsDeleteEventsCounter prometheus.Counter
// CRDsCacheCountGauge tracks the net amount of CRDs affecting the cache at this point.
CRDsCacheCountGauge prometheus.Gauge
}

// SafeRead executes the given function while holding a read lock.
func (r *CRDiscoverer) SafeRead(f func()) {
r.m.RLock()
defer r.m.RUnlock()
f()
}

// SafeWrite executes the given function while holding a write lock.
func (r *CRDiscoverer) SafeWrite(f func()) {
r.m.Lock()
defer r.m.Unlock()
f()
}

// AppendToMap appends the given GVKs to the cache.
func (r *CRDiscoverer) AppendToMap(gvkps ...discovery.GroupVersionKindPlural) {
if r.Map == nil {
r.Map = map[string]map[string][]discovery.KindPlural{}
}
for _, gvkp := range gvkps {
if _, ok := r.Map[gvkp.Group]; !ok {
r.Map[gvkp.Group] = map[string][]discovery.KindPlural{}
}
if _, ok := r.Map[gvkp.Group][gvkp.Version]; !ok {
r.Map[gvkp.Group][gvkp.Version] = []discovery.KindPlural{}
}
r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version], discovery.KindPlural{Kind: gvkp.Kind, Plural: gvkp.Plural})
}
}

// RemoveFromMap removes the given GVKs from the cache.
func (r *CRDiscoverer) RemoveFromMap(gvkps ...discovery.GroupVersionKindPlural) {
for _, gvkp := range gvkps {
if _, ok := r.Map[gvkp.Group]; !ok {
continue
}
if _, ok := r.Map[gvkp.Group][gvkp.Version]; !ok {
continue
}
for i, el := range r.Map[gvkp.Group][gvkp.Version] {
if el.Kind == gvkp.Kind {
if len(r.Map[gvkp.Group][gvkp.Version]) == 1 {
delete(r.Map[gvkp.Group], gvkp.Version)
if len(r.Map[gvkp.Group]) == 0 {
delete(r.Map, gvkp.Group)
}
break
}
r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version][:i], r.Map[gvkp.Group][gvkp.Version][i+1:]...)
break
}
}
}
}

// StartDiscovery starts the discovery process, fetching all the objects that can be listed from the apiserver, every `Interval` seconds.
// resolveGVK needs to be called after StartDiscovery to generate factories.
func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) error {
Expand All @@ -56,7 +129,7 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config)
v := version.(map[string]interface{})["name"].(string)
k := objSpec["names"].(map[string]interface{})["kind"].(string)
p := objSpec["names"].(map[string]interface{})["plural"].(string)
gotGVKP := groupVersionKindPlural{
gotGVKP := discovery.GroupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Expand All @@ -81,7 +154,7 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config)
v := version.(map[string]interface{})["name"].(string)
k := objSpec["names"].(map[string]interface{})["kind"].(string)
p := objSpec["names"].(map[string]interface{})["plural"].(string)
gotGVKP := groupVersionKindPlural{
gotGVKP := discovery.GroupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Expand Down Expand Up @@ -116,7 +189,7 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config)
}

// ResolveGVKToGVKPs resolves the variable VKs to a GVK list, based on the current cache.
func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedGVKPs []groupVersionKindPlural, err error) { // nolint:revive
func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedGVKPs []discovery.GroupVersionKindPlural, err error) { // nolint:revive
g := gvk.Group
v := gvk.Version
k := gvk.Kind
Expand All @@ -134,7 +207,7 @@ func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedG
break
}
}
return []groupVersionKindPlural{
return []discovery.GroupVersionKindPlural{
{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Expand All @@ -148,7 +221,7 @@ func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedG
if hasVersion && !hasKind {
kinds := r.Map[g][v]
for _, el := range kinds {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
resolvedGVKPs = append(resolvedGVKPs, discovery.GroupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Expand All @@ -163,7 +236,7 @@ func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedG
for version, kinds := range versions {
for _, el := range kinds {
if el.Kind == k {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
resolvedGVKPs = append(resolvedGVKPs, discovery.GroupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: version,
Expand All @@ -179,7 +252,7 @@ func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedG
versions := r.Map[g]
for version, kinds := range versions {
for _, el := range kinds {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
resolvedGVKPs = append(resolvedGVKPs, discovery.GroupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: version,
Expand Down Expand Up @@ -216,11 +289,11 @@ func (r *CRDiscoverer) PollForCacheUpdates(
// Update the list of enabled custom resources.
var enabledCustomResources []string
for _, factory := range customFactories {
gvrString := util.GVRFromType(factory.Name(), factory.ExpectedType()).String()
gvrString := customresource.GVRFromType(factory.Name(), factory.ExpectedType()).String()
enabledCustomResources = append(enabledCustomResources, gvrString)
}
// Create clients for discovered factories.
discoveredCustomResourceClients, err := util.CreateCustomResourceClients(opts.Apiserver, opts.Kubeconfig, customFactories...)
discoveredCustomResourceClients, err := customresource.CreateCustomResourceClients(opts.Apiserver, opts.Kubeconfig, customFactories...)
if err != nil {
klog.ErrorS(err, "failed to update custom resource stores")
}
Expand Down
Loading

0 comments on commit fff8c1a

Please sign in to comment.