Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: react to pod events to regen preprocess conf #293

Merged
merged 1 commit into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions config-reloader/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (
"github.com/vmware/kube-fluentd-operator/config-reloader/datasource"
"github.com/vmware/kube-fluentd-operator/config-reloader/fluentd"
"github.com/vmware/kube-fluentd-operator/config-reloader/generator"
"github.com/vmware/kube-fluentd-operator/config-reloader/util"

"github.com/sirupsen/logrus"
)

type Controller struct {
Updater Updater
OutputDir string
Reloader *fluentd.Reloader
Datasource datasource.Datasource
Generator *generator.Generator
NumTotalConfigNS int
Updater Updater
OutputDir string
Reloader *fluentd.Reloader
Datasource datasource.Datasource
Generator *generator.Generator
AllConfigsHash uint64
}

func (c *Controller) Run(ctx context.Context, stop <-chan struct{}) {
Expand Down Expand Up @@ -104,15 +105,20 @@ func (c *Controller) RunOnce(ctx context.Context) error {

if newHash != nsConfig.PreviousConfigHash {
needsReload = true
logrus.Debugf("Previous Config hash for ns %s is %v", nsConfig.Name, nsConfig.PreviousConfigHash)
logrus.Debugf("New Config hash for ns %s is %v", nsConfig.Name, newHash)
c.Datasource.WriteCurrentConfigHash(nsConfig.Name, newHash)
}
}

// lastly, if number of configs has changed, then need to reload configurations obviously!
// lastly, if number of all configs has changed, then need to reload configurations obviously!
// this means a crd was deleted or reapplied, and GetNamespaces does not return it anymore
if c.NumTotalConfigNS != len(allConfigNamespaces) {
// metahashing, hashing the object of hashes :)
allConfigsHash, _ := util.MakeStructureHash(configHashes)
if c.AllConfigsHash != allConfigsHash {
needsReload = true
c.NumTotalConfigNS = len(allConfigNamespaces)
c.AllConfigsHash = allConfigsHash
logrus.Debugf("All Configs hash for all KFO is %v", c.AllConfigsHash)
}

if needsReload {
Expand Down
6 changes: 3 additions & 3 deletions config-reloader/datasource/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var template = `
`

type fakeDatasource struct {
hashes map[string]string
confHashes map[string]string
}

func makeFakeConfig(namespace string) string {
Expand Down Expand Up @@ -59,7 +59,7 @@ func (d *fakeDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig,
}

func (d *fakeDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

func (d *fakeDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
Expand All @@ -69,6 +69,6 @@ func (d *fakeDatasource) UpdateStatus(ctx context.Context, namespace string, sta
// NewFakeDatasource returns a predefined set of namespaces + configs
func NewFakeDatasource(ctx context.Context) Datasource {
return &fakeDatasource{
hashes: make(map[string]string),
confHashes: make(map[string]string),
}
}
8 changes: 4 additions & 4 deletions config-reloader/datasource/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

type fsDatasource struct {
hashes map[string]string
confHashes map[string]string
rootDir string
statusOutputDir string
}
Expand All @@ -41,7 +41,7 @@ func (d *fsDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, e
cfg := &NamespaceConfig{
Name: ns,
FluentdConfig: string(contents),
PreviousConfigHash: d.hashes[ns],
PreviousConfigHash: d.confHashes[ns],
}

logrus.Infof("Loading namespace %s from file %s", ns, f)
Expand All @@ -52,7 +52,7 @@ func (d *fsDatasource) GetNamespaces(ctx context.Context) ([]*NamespaceConfig, e
}

func (d *fsDatasource) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

func (d *fsDatasource) UpdateStatus(ctx context.Context, namespace string, status string) {
Expand All @@ -67,7 +67,7 @@ func (d *fsDatasource) UpdateStatus(ctx context.Context, namespace string, statu
// NewFileSystemDatasource turns all files matching *.conf patter in the given dir into namespace configs
func NewFileSystemDatasource(ctx context.Context, rootDir string, statusOutputDir string) Datasource {
return &fsDatasource{
hashes: make(map[string]string),
confHashes: make(map[string]string),
rootDir: rootDir,
statusOutputDir: statusOutputDir,
}
Expand Down
60 changes: 42 additions & 18 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
)

type kubeInformerConnection struct {
client kubernetes.Interface
hashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
client kubernetes.Interface
confHashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
}

// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
Expand Down Expand Up @@ -77,7 +77,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
nsconfigs = append(nsconfigs, &NamespaceConfig{
Name: ns,
FluentdConfig: configdata,
PreviousConfigHash: d.hashes[ns],
PreviousConfigHash: d.confHashes[ns],
Labels: nsobj.Labels,
MiniContainers: minis,
})
Expand All @@ -88,7 +88,7 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac

// WriteCurrentConfigHash is a setter for the hashtable maintained by this Datasource
func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash string) {
d.hashes[namespace] = hash
d.confHashes[namespace] = hash
}

// UpdateStatus updates a namespace's status annotation with the latest result
Expand Down Expand Up @@ -168,6 +168,13 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
for _, cfmap := range confMapsList {
if cfmap.ObjectMeta.Name == d.cfg.DefaultConfigmapName {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
} else {
// We need to find configmaps that honor the global annotation for configmaps:
configMapNamespace, _ := d.nslist.Get(cfmap.ObjectMeta.Namespace)
configMapName := configMapNamespace.Annotations[d.cfg.AnnotConfigmapName]
if configMapName != "" {
namespaces = append(namespaces, cfmap.ObjectMeta.Namespace)
}
}
}
} else {
Expand Down Expand Up @@ -253,6 +260,23 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
}
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
select {
case updateChan <- time.Now():
default:
}
},
UpdateFunc: func(old, new interface{}) {
},
DeleteFunc: func(new interface{}) {
select {
case updateChan <- time.Now():
default:
}
},
})

factory.Start(nil)
if !cache.WaitForCacheSync(nil,
factory.Core().V1().Namespaces().Informer().HasSynced,
Expand All @@ -264,13 +288,13 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
logrus.Infof("Synced local informer with upstream Kubernetes API")

return &kubeInformerConnection{
client: client,
hashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
client: client,
confHashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
}, nil
}
1 change: 1 addition & 0 deletions config-reloader/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.6.1
github.com/mitchellh/hashstructure/v2 v2.0.2
k8s.io/api v0.21.4
k8s.io/apiextensions-apiserver v0.21.4
k8s.io/apimachinery v0.21.4
Expand Down
2 changes: 2 additions & 0 deletions config-reloader/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down
21 changes: 21 additions & 0 deletions config-reloader/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/mitchellh/hashstructure/v2"
"io/ioutil"
"os/exec"
"sort"
Expand Down Expand Up @@ -123,3 +124,23 @@ func TrimTrailingComment(line string) string {

return line
}

func MakeStructureHash(v interface{}) (uint64, error) {
hashV, err := hashstructure.Hash(v, hashstructure.FormatV2, nil)
if err != nil {
return hashV, err
}

return hashV, nil
}

func AreStructureHashEqual(v interface{}, f interface{}) bool {
hashV, _ := hashstructure.Hash(v, hashstructure.FormatV2, nil)
hashF, _ := hashstructure.Hash(f, hashstructure.FormatV2, nil)

if hashV != 0 && hashF != 0 {
return hashV == hashF
}

return false
}
72 changes: 72 additions & 0 deletions config-reloader/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@ import (
"github.com/stretchr/testify/assert"
)

type Mount struct {
Path string
VolumeName string
SubPath string
}

// MiniContainer container subset with the parent pod's metadata
type MiniContainer struct {
// the pod id
PodID string
PodName string

Image string
ContainerID string

// pod labels
Labels map[string]string

// container name
Name string
// only the emptyDir mounts, never empty, sorted by len(Path), descending
HostMounts []*Mount

NodeName string
}

func TestMakeFluentdSafeName(t *testing.T) {
assert.Equal(t, "a", MakeFluentdSafeName("a"))
assert.Equal(t, "123", MakeFluentdSafeName("123"))
Expand Down Expand Up @@ -41,3 +67,49 @@ func TestTrimTrailingComment(t *testing.T) {
assert.Equal(t, "a", TrimTrailingComment("a"))
assert.Equal(t, "a", TrimTrailingComment("a#########"))
}

func TestMakeStructureHash(t *testing.T) {
mini1 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-nwxtj",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}

hashMini1, err := MakeStructureHash(mini1)
assert.Nil(t, err)
assert.Equal(t, uint64(0xa92a93a3863f8fd6), hashMini1)
}

func TestAreStructureHashEqual(t *testing.T) {
mini1 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-nwxtj",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}
mini2 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-nwxtj",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}
mini3 := &MiniContainer{
PodID: "4b519aaf-67f1-4588-8164-f679b2298e25",
PodName: "kfo-log-router-next",
Name: "config-reloader",
NodeName: "vdp-dev-control-plane",
Image: "testing/kfo:delete-problems-3",
ContainerID: "containerd://37dce75ed2f01c5f858b4c4cc96b23ebacaba6569af93ed64b3904be9a676cb1",
}

assert.Equal(t, true, AreStructureHashEqual(mini1, mini2))
assert.NotEqual(t, true, AreStructureHashEqual(mini1, mini3))
assert.Equal(t, false, AreStructureHashEqual(mini1, mini3))
}