Skip to content

Commit

Permalink
Merge pull request #306 from vkadi/mounted-file-issue
Browse files Browse the repository at this point in the history
Issue#289 Mounted file issue fix part-2
  • Loading branch information
javiercri authored Feb 11, 2022
2 parents 1c551d7 + c8d7210 commit bef7a7e
Showing 1 changed file with 37 additions and 19 deletions.
56 changes: 37 additions & 19 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"sort"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -33,6 +34,7 @@ type kubeInformerConnection struct {
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
updateChan chan time.Time
}

// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
Expand Down Expand Up @@ -204,6 +206,25 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
return nsList, nil
}

// handlePodChange decides whether to to a graceful reload on pod changes based on source type such as mounted-file
// it will call Run controller loop if pod changed is a mounted-file type as other types don't require the reload
// Note Namespace config may have mixed mounted-file and non-mounted file pods, In the first attempt,
// let's start simple and start by finding if pod changed is associated with a namespace that has mounted-file plugin in it's config
func (d *kubeInformerConnection) handlePodChange(ctx context.Context, obj interface{}) {
mObj := obj.(metav1.Object)
logrus.Infof("Detected pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
configdata, err := d.kubeds.GetFluentdConfig(ctx, mObj.GetNamespace())
nsConfigStr := fmt.Sprintf("%#v", configdata)
if err == nil {
if strings.Contains(nsConfigStr, "mounted-file") {
select {
case d.updateChan <- time.Now():
default:
}
}
}
}

// NewKubernetesInformerDatasource builds a new Datasource from the provided config.
// The returned Datasource uses Informers to efficiently track objects in the kubernetes
// API by watching for updates to a known state.
Expand Down Expand Up @@ -260,23 +281,6 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
}
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
select {
case updateChan <- time.Now():
default:
}
},
UpdateFunc: func(old, new interface{}) {
},
DeleteFunc: func(new interface{}) {
select {
case updateChan <- time.Now():
default:
}
},
})

factory.Start(nil)
if !cache.WaitForCacheSync(nil,
factory.Core().V1().Namespaces().Informer().HasSynced,
Expand All @@ -287,7 +291,7 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
}
logrus.Infof("Synced local informer with upstream Kubernetes API")

return &kubeInformerConnection{
kubeInfoCx := &kubeInformerConnection{
client: client,
confHashes: make(map[string]string),
cfg: cfg,
Expand All @@ -296,5 +300,19 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
}, nil
updateChan: updateChan,
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
UpdateFunc: func(old, obj interface{}) {
},
DeleteFunc: func(obj interface{}) {
kubeInfoCx.handlePodChange(ctx, obj)
},
})

return kubeInfoCx, nil
}

0 comments on commit bef7a7e

Please sign in to comment.