Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watch-namespaces to spicedb-operator cmd #331

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Options struct {
OperatorConfigPath string

MetricNamespace string

WatchNamespaces []string
}

// RecommendedOptions builds a new options config with default values
Expand Down Expand Up @@ -77,8 +79,10 @@ func NewCmdRun(o *Options) *cobra.Command {
bootstrapFlags.StringVar(&o.BootstrapSpicedbsPath, "bootstrap-spicedbs", "", "set a path to a config file for spicedbs to load on start up.")
debugFlags := namedFlagSets.FlagSet("debug")
debugFlags.StringVar(&o.DebugAddress, "debug-address", o.DebugAddress, "address where debug information is served (/healthz, /metrics/, /debug/pprof, etc)")
o.ConfigFlags.AddFlags(namedFlagSets.FlagSet("kubernetes"))
o.DebugFlags.AddFlags(debugFlags)
kubernetesFlags := namedFlagSets.FlagSet("kubernetes")
kubernetesFlags.StringSliceVar(&o.WatchNamespaces, "watch-namespaces", []string{}, "set a comma-separated list of namespaces to watch for CRDs.")
o.ConfigFlags.AddFlags(kubernetesFlags)
globalFlags := namedFlagSets.FlagSet("global")
globalflag.AddGlobalFlags(globalFlags, cmd.Name())
globalFlags.StringVar(&o.OperatorConfigPath, "config", "", "set a path to the operator's config file (configure registries, image tags, etc)")
Expand Down Expand Up @@ -148,7 +152,7 @@ func (o *Options) Run(ctx context.Context, f cmdutil.Factory) error {
controllers = append(controllers, staticSpiceDBController)
}

ctrl, err := controller.NewController(ctx, registry, dclient, kclient, resources, o.OperatorConfigPath, broadcaster)
ctrl, err := controller.NewController(ctx, registry, dclient, kclient, resources, o.OperatorConfigPath, broadcaster, o.WatchNamespaces)
if err != nil {
return err
}
Expand Down
119 changes: 73 additions & 46 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
applyrbacv1 "k8s.io/client-go/applyconfigurations/rbac/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -87,7 +88,7 @@ type Controller struct {
lastConfigHash atomic.Uint64
}

func NewController(ctx context.Context, registry *typed.Registry, dclient dynamic.Interface, kclient kubernetes.Interface, resources openapi.Resources, configFilePath string, broadcaster record.EventBroadcaster) (*Controller, error) {
func NewController(ctx context.Context, registry *typed.Registry, dclient dynamic.Interface, kclient kubernetes.Interface, resources openapi.Resources, configFilePath string, broadcaster record.EventBroadcaster, namespaces []string) (*Controller, error) {
c := Controller{
client: dclient,
kclient: kclient,
Expand Down Expand Up @@ -121,60 +122,86 @@ func NewController(ctx context.Context, registry *typed.Registry, dclient dynami
logr.FromContextOrDiscard(ctx).V(3).Info("no operator configuration provided", "path", configFilePath)
}

ownedInformerFactory := registry.MustNewFilteredDynamicSharedInformerFactory(
OwnedFactoryKey,
dclient,
0,
metav1.NamespaceAll,
nil,
)
if _, err := ownedInformerFactory.ForResource(v1alpha1ClusterGVR).Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { c.enqueue(v1alpha1ClusterGVR, obj) },
UpdateFunc: func(_, obj any) { c.enqueue(v1alpha1ClusterGVR, obj) },
// Delete is not used right now, we rely on ownerrefs to clean up
}); err != nil {
return nil, err
// If no namespaces are provided, watch all namespaces
if len(namespaces) == 0 {
namespaces = []string{metav1.NamespaceAll}
}

externalInformerFactory := registry.MustNewFilteredDynamicSharedInformerFactory(
DependentFactoryKey,
dclient,
0,
metav1.NamespaceAll,
func(options *metav1.ListOptions) {
options.LabelSelector = metadata.ManagedDependentSelector.String()
},
)

for _, gvr := range []schema.GroupVersionResource{
appsv1.SchemeGroupVersion.WithResource("deployments"),
corev1.SchemeGroupVersion.WithResource("secrets"),
corev1.SchemeGroupVersion.WithResource("serviceaccounts"),
corev1.SchemeGroupVersion.WithResource("services"),
corev1.SchemeGroupVersion.WithResource("pods"),
batchv1.SchemeGroupVersion.WithResource("jobs"),
rbacv1.SchemeGroupVersion.WithResource("roles"),
rbacv1.SchemeGroupVersion.WithResource("rolebindings"),
} {
inf := externalInformerFactory.ForResource(gvr).Informer()
if err := inf.AddIndexers(cache.Indexers{metadata.OwningClusterIndex: metadata.GetClusterKeyFromMeta}); err != nil {
return nil, err
}
if _, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { c.syncExternalResource(obj) },
UpdateFunc: func(_, obj any) { c.syncExternalResource(obj) },
DeleteFunc: func(obj any) { c.syncExternalResource(obj) },
ownedInformerFactories := make([]dynamicinformer.DynamicSharedInformerFactory, 0, len(namespaces))
for _, ns := range namespaces {
ownedInformerFactory := registry.MustNewFilteredDynamicSharedInformerFactory(
OwnedFactoryKey,
dclient,
0,
ns,
nil,
)
if _, err := ownedInformerFactory.ForResource(v1alpha1ClusterGVR).Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { c.enqueue(v1alpha1ClusterGVR, obj) },
UpdateFunc: func(_, obj any) { c.enqueue(v1alpha1ClusterGVR, obj) },
// Delete is not used right now, we rely on ownerrefs to clean up
}); err != nil {
return nil, err
}
ownedInformerFactories = append(ownedInformerFactories, ownedInformerFactory)
}

externalInformerFactories := make([]dynamicinformer.DynamicSharedInformerFactory, 0, len(namespaces))
for _, ns := range namespaces {
externalInformerFactory := registry.MustNewFilteredDynamicSharedInformerFactory(
DependentFactoryKey,
dclient,
0,
ns,
func(options *metav1.ListOptions) {
options.LabelSelector = metadata.ManagedDependentSelector.String()
},
)

for _, gvr := range []schema.GroupVersionResource{
appsv1.SchemeGroupVersion.WithResource("deployments"),
corev1.SchemeGroupVersion.WithResource("secrets"),
corev1.SchemeGroupVersion.WithResource("serviceaccounts"),
corev1.SchemeGroupVersion.WithResource("services"),
corev1.SchemeGroupVersion.WithResource("pods"),
batchv1.SchemeGroupVersion.WithResource("jobs"),
rbacv1.SchemeGroupVersion.WithResource("roles"),
rbacv1.SchemeGroupVersion.WithResource("rolebindings"),
} {
inf := externalInformerFactory.ForResource(gvr).Informer()
if err := inf.AddIndexers(cache.Indexers{metadata.OwningClusterIndex: metadata.GetClusterKeyFromMeta}); err != nil {
return nil, err
}
if _, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { c.syncExternalResource(obj) },
UpdateFunc: func(_, obj any) { c.syncExternalResource(obj) },
DeleteFunc: func(obj any) { c.syncExternalResource(obj) },
}); err != nil {
return nil, err
}
}
externalInformerFactories = append(externalInformerFactories, externalInformerFactory)
}

// start informers
ownedInformerFactory.Start(ctx.Done())
externalInformerFactory.Start(ctx.Done())
for _, ownedInformerFactory := range ownedInformerFactories {
ownedInformerFactory.Start(ctx.Done())
}

for _, externalInformerFactory := range externalInformerFactories {
externalInformerFactory.Start(ctx.Done())
}

fileInformerFactory.Start(ctx.Done())
ownedInformerFactory.WaitForCacheSync(ctx.Done())
externalInformerFactory.WaitForCacheSync(ctx.Done())

// wait for caches to sync
for _, ownedInformerFactory := range ownedInformerFactories {
ownedInformerFactory.WaitForCacheSync(ctx.Done())
}

for _, externalInformerFactory := range externalInformerFactories {
externalInformerFactory.WaitForCacheSync(ctx.Done())
}
fileInformerFactory.WaitForCacheSync(ctx.Done())

// Build mainHandler handler
Expand Down
Loading