diff --git a/cmd/exporter/main.go b/cmd/exporter/main.go index 3385b5a..8ccf132 100644 --- a/cmd/exporter/main.go +++ b/cmd/exporter/main.go @@ -101,7 +101,7 @@ func run(log logrus.FieldLogger) error { podsInformer := informersFactory.Core().V1().Pods().Informer() nodesInformer := informersFactory.Core().V1().Nodes().Informer() podsByNodeCache := kube.NewPodsByNodeCache(podsInformer) - podByIPCache := kube.NewPodByIPCache(podsInformer) + podByIPCache := kube.NewPodByIPCache(ctx, podsInformer, log) nodeByNameCache := kube.NewNodeByNameCache(nodesInformer) nodeByIPCache := kube.NewNodeByIPCache(nodesInformer) informersFactory.Start(wait.NeverStop) diff --git a/kube/podbyipcache.go b/kube/podbyipcache.go new file mode 100644 index 0000000..2532be1 --- /dev/null +++ b/kube/podbyipcache.go @@ -0,0 +1,146 @@ +package kube + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +var ( + // there are cases when connections might be kept in conntrack for 120 seconds + // https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt + podTimeout = 2 * time.Minute +) + +type PodByIPCache struct { + indexer cache.Indexer + // sorted by DeletionTimestamp in ascending order + deletedPods []*corev1.Pod + mu sync.Mutex + log logrus.FieldLogger +} + +func (c *PodByIPCache) Get(key string) (*corev1.Pod, error) { + items, err := c.indexer.ByIndex(podIPIdx, key) + if err != nil { + return nil, err + } + if len(items) == 0 { + return nil, ErrNotFound + } + + var earliestPod *corev1.Pod + for _, item := range items { + pod, ok := item.(*corev1.Pod) + if !ok { + continue + } + + if earliestPod == nil || pod.CreationTimestamp.Before(&earliestPod.CreationTimestamp) { + earliestPod = pod + } + } + + if earliestPod != nil { + return earliestPod, nil + } + + return nil, ErrNotFound +} + +func (c *PodByIPCache) onDelete(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + c.deletedPods = append(c.deletedPods, pod) +} + +func (c *PodByIPCache) onAddOrUpdate(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return + } + + err := c.indexer.Add(pod) + if err != nil { + c.log.Errorf("cannot add pod to cache %s", err) + } +} + +func (c *PodByIPCache) cleanupExpired(ctx context.Context) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.mu.Lock() + + now := time.Now() + var i int + for i = 0; i < len(c.deletedPods); i++ { + pod := c.deletedPods[i] + // deletedPods should be sorted by DeletionTimeout in ascending order + // as soon as we observe a pod that is not yet timed out it is safe to break + // we shouldn't have pods with no deletion timestamp in this slice + if pod.DeletionTimestamp != nil && now.Sub(pod.DeletionTimestamp.Time) <= podTimeout { + break + } + err := c.indexer.Delete(pod) + if err != nil { + c.log.Errorf("cannot remove pod from cache %s", err) + } + } + c.deletedPods = c.deletedPods[i:] + + c.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +func NewPodByIPCache(ctx context.Context, informer cache.SharedIndexInformer, log logrus.FieldLogger) *PodByIPCache { + if err := informer.SetTransform(defaultTransformFunc); err != nil { + panic(err) + } + + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + podIPIdx: func(obj interface{}) ([]string, error) { + switch t := obj.(type) { + case *corev1.Pod: + return []string{t.Status.PodIP}, nil + } + return nil, fmt.Errorf("expected pod, got unknown type %T", obj) + }, + }) + + c := &PodByIPCache{indexer: indexer, mu: sync.Mutex{}, log: log} + + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + c.onDelete(obj) + }, + AddFunc: func(obj interface{}) { + c.onAddOrUpdate(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + c.onAddOrUpdate(newObj) + }, + }) + if err != nil { + panic(err) + } + + go c.cleanupExpired(ctx) + + return c +} diff --git a/kube/watcher.go b/kube/watcher.go index 45ed776..b5e6b23 100644 --- a/kube/watcher.go +++ b/kube/watcher.go @@ -3,10 +3,7 @@ package kube import ( "errors" "fmt" - "sort" - "time" - "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" ) @@ -21,8 +18,6 @@ const ( var ( ErrNotFound = errors.New("object not found") ErrToManyObjects = errors.New("too many objects") - - exitedPodTimeout = 1 * time.Minute ) func NewPodsByNodeCache(informer cache.SharedIndexInformer) *PodsByNodeCache { @@ -62,70 +57,6 @@ func (p *PodsByNodeCache) Get(nodeName string) ([]*corev1.Pod, error) { return res, nil } -func NewPodByIPCache(informer cache.SharedIndexInformer) *PodByIPCache { - if err := informer.SetTransform(defaultTransformFunc); err != nil { - panic(err) - } - if err := informer.AddIndexers(map[string]cache.IndexFunc{ - podIPIdx: func(obj interface{}) ([]string, error) { - switch t := obj.(type) { - case *corev1.Pod: - if t.Status.Phase == corev1.PodRunning { - return []string{t.Status.PodIP}, nil - } - if t.Status.Phase == corev1.PodSucceeded || t.Status.Phase == corev1.PodFailed { - if podExitedBeforeTimeout(t) { - return []string{t.Status.PodIP}, nil - } - } - return []string{}, nil - } - return nil, fmt.Errorf("expected pod, got unknown type %T", obj) - }, - }); err != nil { - panic(err) - } - return &PodByIPCache{informer: informer} -} - -func podExitedBeforeTimeout(t *corev1.Pod) bool { - podReady, found := lo.Find(t.Status.Conditions, func(c corev1.PodCondition) bool { - return c.Type == corev1.PodReady - }) - deadLine := time.Now().UTC().Add(-exitedPodTimeout) - return found && podReady.LastTransitionTime.UTC().After(deadLine) -} - -type PodByIPCache struct { - informer cache.SharedIndexInformer -} - -func (p *PodByIPCache) Get(ip string) (*corev1.Pod, error) { - items, err := p.informer.GetIndexer().ByIndex(podIPIdx, ip) - if err != nil { - return nil, err - } - if len(items) == 0 { - return nil, ErrNotFound - } - pods := lo.FilterMap(items, func(item interface{}, i int) (*corev1.Pod, bool) { - pod, ok := item.(*corev1.Pod) - if !ok { - return nil, false - } - return pod, true - }) - sort.SliceStable(pods, func(i, j int) bool { - return pods[i].CreationTimestamp.Before(&pods[j].CreationTimestamp) - }) - for i := range pods { - if pod := pods[i]; pod != nil { - return pod, nil - } - } - return nil, ErrNotFound -} - func NewNodeByNameCache(informer cache.SharedIndexInformer) *NodeByNameCache { if err := informer.SetTransform(defaultTransformFunc); err != nil { panic(err) diff --git a/kube/watcher_test.go b/kube/watcher_test.go index 4168123..3192fc1 100644 --- a/kube/watcher_test.go +++ b/kube/watcher_test.go @@ -1,9 +1,11 @@ package kube import ( + "context" "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -115,7 +117,7 @@ func TestWatcher(t *testing.T) { podsInformer := informersFactory.Core().V1().Pods().Informer() nodesInformer := informersFactory.Core().V1().Nodes().Informer() podsByNodeCache := NewPodsByNodeCache(podsInformer) - podByIPCache := NewPodByIPCache(podsInformer) + podByIPCache := NewPodByIPCache(context.Background(), podsInformer, logrus.New()) nodeByNameCache := NewNodeByNameCache(nodesInformer) nodeByIPCache := NewNodeByIPCache(nodesInformer) informersFactory.Start(wait.NeverStop)