Skip to content

Commit

Permalink
Cache deleted pods (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivaka authored Jan 26, 2024
1 parent 71a3541 commit 422906c
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func run(log logrus.FieldLogger) error {
podsInformer := informersFactory.Core().V1().Pods().Informer()
nodesInformer := informersFactory.Core().V1().Nodes().Informer()
podsByNodeCache := kube.NewPodsByNodeCache(podsInformer)
podByIPCache := kube.NewPodByIPCache(podsInformer)
podByIPCache := kube.NewPodByIPCache(ctx, podsInformer, log)
nodeByNameCache := kube.NewNodeByNameCache(nodesInformer)
nodeByIPCache := kube.NewNodeByIPCache(nodesInformer)
informersFactory.Start(wait.NeverStop)
Expand Down
146 changes: 146 additions & 0 deletions kube/podbyipcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package kube

import (
"context"
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)

var (
// there are cases when connections might be kept in conntrack for 120 seconds
// https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
podTimeout = 2 * time.Minute
)

type PodByIPCache struct {
indexer cache.Indexer
// sorted by DeletionTimestamp in ascending order
deletedPods []*corev1.Pod
mu sync.Mutex
log logrus.FieldLogger
}

func (c *PodByIPCache) Get(key string) (*corev1.Pod, error) {
items, err := c.indexer.ByIndex(podIPIdx, key)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, ErrNotFound
}

var earliestPod *corev1.Pod
for _, item := range items {
pod, ok := item.(*corev1.Pod)
if !ok {
continue
}

if earliestPod == nil || pod.CreationTimestamp.Before(&earliestPod.CreationTimestamp) {
earliestPod = pod
}
}

if earliestPod != nil {
return earliestPod, nil
}

return nil, ErrNotFound
}

func (c *PodByIPCache) onDelete(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}
c.mu.Lock()
defer c.mu.Unlock()

c.deletedPods = append(c.deletedPods, pod)
}

func (c *PodByIPCache) onAddOrUpdate(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}

err := c.indexer.Add(pod)
if err != nil {
c.log.Errorf("cannot add pod to cache %s", err)
}
}

func (c *PodByIPCache) cleanupExpired(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.mu.Lock()

now := time.Now()
var i int
for i = 0; i < len(c.deletedPods); i++ {
pod := c.deletedPods[i]
// deletedPods should be sorted by DeletionTimeout in ascending order
// as soon as we observe a pod that is not yet timed out it is safe to break
// we shouldn't have pods with no deletion timestamp in this slice
if pod.DeletionTimestamp != nil && now.Sub(pod.DeletionTimestamp.Time) <= podTimeout {
break
}
err := c.indexer.Delete(pod)
if err != nil {
c.log.Errorf("cannot remove pod from cache %s", err)
}
}
c.deletedPods = c.deletedPods[i:]

c.mu.Unlock()
case <-ctx.Done():
return
}
}
}

func NewPodByIPCache(ctx context.Context, informer cache.SharedIndexInformer, log logrus.FieldLogger) *PodByIPCache {
if err := informer.SetTransform(defaultTransformFunc); err != nil {
panic(err)
}

indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
podIPIdx: func(obj interface{}) ([]string, error) {
switch t := obj.(type) {
case *corev1.Pod:
return []string{t.Status.PodIP}, nil
}
return nil, fmt.Errorf("expected pod, got unknown type %T", obj)
},
})

c := &PodByIPCache{indexer: indexer, mu: sync.Mutex{}, log: log}

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
c.onDelete(obj)
},
AddFunc: func(obj interface{}) {
c.onAddOrUpdate(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.onAddOrUpdate(newObj)
},
})
if err != nil {
panic(err)
}

go c.cleanupExpired(ctx)

return c
}
69 changes: 0 additions & 69 deletions kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ package kube
import (
"errors"
"fmt"
"sort"
"time"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -21,8 +18,6 @@ const (
var (
ErrNotFound = errors.New("object not found")
ErrToManyObjects = errors.New("too many objects")

exitedPodTimeout = 1 * time.Minute
)

func NewPodsByNodeCache(informer cache.SharedIndexInformer) *PodsByNodeCache {
Expand Down Expand Up @@ -62,70 +57,6 @@ func (p *PodsByNodeCache) Get(nodeName string) ([]*corev1.Pod, error) {
return res, nil
}

func NewPodByIPCache(informer cache.SharedIndexInformer) *PodByIPCache {
if err := informer.SetTransform(defaultTransformFunc); err != nil {
panic(err)
}
if err := informer.AddIndexers(map[string]cache.IndexFunc{
podIPIdx: func(obj interface{}) ([]string, error) {
switch t := obj.(type) {
case *corev1.Pod:
if t.Status.Phase == corev1.PodRunning {
return []string{t.Status.PodIP}, nil
}
if t.Status.Phase == corev1.PodSucceeded || t.Status.Phase == corev1.PodFailed {
if podExitedBeforeTimeout(t) {
return []string{t.Status.PodIP}, nil
}
}
return []string{}, nil
}
return nil, fmt.Errorf("expected pod, got unknown type %T", obj)
},
}); err != nil {
panic(err)
}
return &PodByIPCache{informer: informer}
}

func podExitedBeforeTimeout(t *corev1.Pod) bool {
podReady, found := lo.Find(t.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodReady
})
deadLine := time.Now().UTC().Add(-exitedPodTimeout)
return found && podReady.LastTransitionTime.UTC().After(deadLine)
}

type PodByIPCache struct {
informer cache.SharedIndexInformer
}

func (p *PodByIPCache) Get(ip string) (*corev1.Pod, error) {
items, err := p.informer.GetIndexer().ByIndex(podIPIdx, ip)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, ErrNotFound
}
pods := lo.FilterMap(items, func(item interface{}, i int) (*corev1.Pod, bool) {
pod, ok := item.(*corev1.Pod)
if !ok {
return nil, false
}
return pod, true
})
sort.SliceStable(pods, func(i, j int) bool {
return pods[i].CreationTimestamp.Before(&pods[j].CreationTimestamp)
})
for i := range pods {
if pod := pods[i]; pod != nil {
return pod, nil
}
}
return nil, ErrNotFound
}

func NewNodeByNameCache(informer cache.SharedIndexInformer) *NodeByNameCache {
if err := informer.SetTransform(defaultTransformFunc); err != nil {
panic(err)
Expand Down
4 changes: 3 additions & 1 deletion kube/watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package kube

import (
"context"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -115,7 +117,7 @@ func TestWatcher(t *testing.T) {
podsInformer := informersFactory.Core().V1().Pods().Informer()
nodesInformer := informersFactory.Core().V1().Nodes().Informer()
podsByNodeCache := NewPodsByNodeCache(podsInformer)
podByIPCache := NewPodByIPCache(podsInformer)
podByIPCache := NewPodByIPCache(context.Background(), podsInformer, logrus.New())
nodeByNameCache := NewNodeByNameCache(nodesInformer)
nodeByIPCache := NewNodeByIPCache(nodesInformer)
informersFactory.Start(wait.NeverStop)
Expand Down

0 comments on commit 422906c

Please sign in to comment.