From ea9bfe50ecd82530b59922c8720d7e9b6214617c Mon Sep 17 00:00:00 2001 From: chengjoey <30427474+chengjoey@users.noreply.github.com> Date: Thu, 15 Aug 2024 14:25:25 +0800 Subject: [PATCH] chore use lister instead of client full list (#8) --- pkg/plugins/kprobe/controller/controller.go | 8 ++- pkg/plugins/kprobe/kprobe.go | 8 +-- pkg/plugins/kprobe/kprobesysctl/criruntime.go | 28 +++----- .../kprobe/kprobesysctl/ebpf_kprobe_sysctl.go | 69 ++++++++++--------- pkg/plugins/memory/controller/controller.go | 2 +- pkg/plugins/netfilter/netfilter.go | 4 +- pkg/plugins/protocols/http/meta/provider.go | 4 +- 7 files changed, 63 insertions(+), 60 deletions(-) diff --git a/pkg/plugins/kprobe/controller/controller.go b/pkg/plugins/kprobe/controller/controller.go index 04bf170..48eb606 100644 --- a/pkg/plugins/kprobe/controller/controller.go +++ b/pkg/plugins/kprobe/controller/controller.go @@ -42,7 +42,9 @@ func NewController() Controller { } func (c *Controller) Start(ch chan *metric.Metric) { - go c.watchKprobe(ch) + if err := c.watchKprobe(ch); err != nil { + log.Panic(err) + } } func (c *Controller) watchKprobe(ch chan *metric.Metric) error { @@ -62,10 +64,10 @@ func (c *Controller) GetSysctlStat(pid uint32) (kprobesysctl.SysctlStat, error) return c.sysctlController.GetSysctlStatByPID(pid) } -func (c *Controller) GetPodByUID(podUID string) (corev1.Pod, error) { +func (c *Controller) GetPodByUID(podUID string) (*corev1.Pod, error) { return c.sysctlController.GetPodByUID(podUID) } -func (c *Controller) GetService(ip string) (corev1.Service, error) { +func (c *Controller) GetService(ip string) (*corev1.Service, error) { return c.sysctlController.GetService(ip) } diff --git a/pkg/plugins/kprobe/kprobe.go b/pkg/plugins/kprobe/kprobe.go index 783cba7..cbe14c7 100644 --- a/pkg/plugins/kprobe/kprobe.go +++ b/pkg/plugins/kprobe/kprobe.go @@ -14,8 +14,8 @@ import ( type Interface interface { GetSysctlStat(pid uint32) (kprobesysctl.SysctlStat, error) - GetPodByUID(podUID string) (corev1.Pod, error) - GetService(ip string) (corev1.Service, error) + GetPodByUID(podUID string) (*corev1.Pod, error) + GetService(ip string) (*corev1.Service, error) RegisterNetLinkListener() <-chan NeighLinkEvent GetVethes() ([]NeighLink, error) } @@ -85,11 +85,11 @@ func (p *provider) GetSysctlStat(pid uint32) (kprobesysctl.SysctlStat, error) { return p.kprobeController.GetSysctlStat(pid) } -func (p *provider) GetPodByUID(podUID string) (corev1.Pod, error) { +func (p *provider) GetPodByUID(podUID string) (*corev1.Pod, error) { return p.kprobeController.GetPodByUID(podUID) } -func (p *provider) GetService(ip string) (corev1.Service, error) { +func (p *provider) GetService(ip string) (*corev1.Service, error) { return p.kprobeController.GetService(ip) } diff --git a/pkg/plugins/kprobe/kprobesysctl/criruntime.go b/pkg/plugins/kprobe/kprobesysctl/criruntime.go index 25cbc06..53ccf8c 100644 --- a/pkg/plugins/kprobe/kprobesysctl/criruntime.go +++ b/pkg/plugins/kprobe/kprobesysctl/criruntime.go @@ -2,7 +2,6 @@ package kprobesysctl import ( "bufio" - "context" "fmt" "os" "path" @@ -13,7 +12,7 @@ import ( "github.com/prometheus/procfs" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog" ) @@ -156,51 +155,46 @@ func (k *KprobeSysctlController) refreshProcCgroupInfo() error { } func (k *KprobeSysctlController) refreshPodInfo() error { - pods, err := k.clientSet.CoreV1().Pods(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{}) + pods, err := k.podLister.List(labels.Everything()) if err != nil { return err } - for i := range pods.Items { + for i := range pods { // ignore evicted pods - if pods.Items[i].Status.Reason == "Evicted" { + if pods[i].Status.Reason == "Evicted" { continue } - k.podCache.Set(string(pods.Items[i].UID), pods.Items[i], 30*time.Minute) - k.podCache.Set(pods.Items[i].Status.PodIP, pods.Items[i], 30*time.Minute) + k.podCache.Set(string(pods[i].UID), pods[i], 30*time.Minute) + k.podCache.Set(pods[i].Status.PodIP, pods[i], 30*time.Minute) } return nil } func (k *KprobeSysctlController) refreshServiceInfo(s *corev1.Service) error { - refreshFunc := func(svc corev1.Service) { + refreshFunc := func(svc *corev1.Service) { // ignore not ClusterIP and headless services if (svc.Spec.Type != corev1.ServiceTypeClusterIP) || (svc.Spec.ClusterIP == corev1.ClusterIPNone) { return } - if svc.Spec.ClusterIP == "10.17.48.182" { - fmt.Println(fmt.Sprintf("0000000000xxxxxxxxxxxxx, ip: %s, svc: %s/%s", svc.Spec.ClusterIP, - svc.Namespace, svc.Name)) - } k.serviceCache.Set(svc.Spec.ClusterIP, svc, time.Hour) } // load all namespace. if s == nil { - services, err := k.clientSet.CoreV1().Services(metav1.NamespaceAll). - List(context.Background(), metav1.ListOptions{}) + services, err := k.serviceLister.List(labels.Everything()) if err != nil { return err } - for _, i := range services.Items { - refreshFunc(i) + for i := range services { + refreshFunc(services[i]) } return nil } // load specific service. - refreshFunc(*s) + refreshFunc(s) return nil } diff --git a/pkg/plugins/kprobe/kprobesysctl/ebpf_kprobe_sysctl.go b/pkg/plugins/kprobe/kprobesysctl/ebpf_kprobe_sysctl.go index 2f750f8..d2f0e0d 100644 --- a/pkg/plugins/kprobe/kprobesysctl/ebpf_kprobe_sysctl.go +++ b/pkg/plugins/kprobe/kprobesysctl/ebpf_kprobe_sysctl.go @@ -13,16 +13,18 @@ import ( "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/rlimit" - "github.com/erda-project/ebpf-agent/metric" - "github.com/erda-project/ebpf-agent/pkg/btfs" - "github.com/erda-project/ebpf-agent/pkg/envconf" - "github.com/erda-project/ebpf-agent/pkg/exporter/collector" "github.com/patrickmn/go-cache" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/listers/core/v1" clientgoCache "k8s.io/client-go/tools/cache" "k8s.io/klog" + + "github.com/erda-project/ebpf-agent/metric" + "github.com/erda-project/ebpf-agent/pkg/btfs" + "github.com/erda-project/ebpf-agent/pkg/envconf" + "github.com/erda-project/ebpf-agent/pkg/exporter/collector" ) var ( @@ -32,13 +34,15 @@ var ( ) type KprobeSysctlController struct { - hostIP string - sysCtlCache *cache.Cache - podCache *cache.Cache - serviceCache *cache.Cache - clientSet *kubernetes.Clientset - reportClient *collector.ReportClient - objs bpfObjects + hostIP string + sysCtlCache *cache.Cache + podCache *cache.Cache + serviceCache *cache.Cache + clientSet *kubernetes.Clientset + podLister v1.PodLister + serviceLister v1.ServiceLister + reportClient *collector.ReportClient + objs bpfObjects } func New(clientSet *kubernetes.Clientset) *KprobeSysctlController { @@ -73,34 +77,28 @@ func (k *KprobeSysctlController) GetSysctlStatByPID(pid uint32) (SysctlStat, err return SysctlStat{}, fmt.Errorf("failed to find sysctl stat for pid: %d", pid) } -func (k *KprobeSysctlController) GetPodByUID(uid string) (corev1.Pod, error) { +func (k *KprobeSysctlController) GetPodByUID(uid string) (*corev1.Pod, error) { if pod, ok := k.podCache.Get(uid); ok { - return pod.(corev1.Pod), nil + return pod.(*corev1.Pod), nil } - return corev1.Pod{}, fmt.Errorf("failed to find pod for uid: %s", uid) + return &corev1.Pod{}, fmt.Errorf("failed to find pod for uid: %s", uid) } -func (k *KprobeSysctlController) GetService(ip string) (corev1.Service, error) { +func (k *KprobeSysctlController) GetService(ip string) (*corev1.Service, error) { if svc, ok := k.serviceCache.Get(ip); ok { - return svc.(corev1.Service), nil + return svc.(*corev1.Service), nil } - return corev1.Service{}, fmt.Errorf("failed to get service from cache, ip: %s", ip) + return &corev1.Service{}, fmt.Errorf("failed to get service from cache, ip: %s", ip) } func (k *KprobeSysctlController) Start(ch chan<- *SysctlStat) error { if err := k.refreshProcCgroupInfo(); err != nil { return err } - if err := k.refreshPodInfo(); err != nil { - return err - } - if err := k.refreshServiceInfo(nil); err != nil { - return err - } factory := informers.NewSharedInformerFactory(k.clientSet, 0) // pod informer - podInformerStopper := make(chan struct{}) + stopper := make(chan struct{}) podInformer := factory.Core().V1().Pods().Informer() podInformer.AddEventHandler(clientgoCache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -108,8 +106,8 @@ func (k *KprobeSysctlController) Start(ch chan<- *SysctlStat) error { if newPod.Status.Reason == "Evicted" { return } - k.podCache.Set(string(newPod.UID), *newPod, 30*time.Minute) - k.podCache.Set(newPod.Status.PodIP, *newPod, 30*time.Minute) + k.podCache.Set(string(newPod.UID), newPod, 30*time.Minute) + k.podCache.Set(newPod.Status.PodIP, newPod, 30*time.Minute) }, DeleteFunc: func(obj interface{}) { pod := obj.(*corev1.Pod) @@ -119,10 +117,8 @@ func (k *KprobeSysctlController) Start(ch chan<- *SysctlStat) error { // UpdateFunc: func(oldObj interface{}, newObj interface{}) { // }, }) - go podInformer.Run(podInformerStopper) // service informer - serviceInformerStopper := make(chan struct{}) serviceInformer := factory.Core().V1().Services().Informer() serviceInformer.AddEventHandler(clientgoCache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -149,7 +145,18 @@ func (k *KprobeSysctlController) Start(ch chan<- *SysctlStat) error { }, }) - go serviceInformer.Run(serviceInformerStopper) + factory.Start(stopper) + factory.WaitForCacheSync(stopper) + + k.podLister = factory.Core().V1().Pods().Lister() + k.serviceLister = factory.Core().V1().Services().Lister() + + if err := k.refreshPodInfo(); err != nil { + return err + } + if err := k.refreshServiceInfo(nil); err != nil { + return err + } // todo: add recover and context control go func() { @@ -270,7 +277,7 @@ func EncodeStat(stat *SysctlStat) []byte { return w.Bytes() } -func makeServiceNodeMetric(pod corev1.Pod) *metric.Metric { +func makeServiceNodeMetric(pod *corev1.Pod) *metric.Metric { now := time.Now().Unix() return &metric.Metric{ Measurement: "application_service_node", @@ -314,7 +321,7 @@ func (k *KprobeSysctlController) updateServiceNode() error { podItems := k.podCache.Items() serviceNodes := make([]*metric.Metric, 0) for _, item := range podItems { - pod, ok := item.Object.(corev1.Pod) + pod, ok := item.Object.(*corev1.Pod) if !ok { continue } diff --git a/pkg/plugins/memory/controller/controller.go b/pkg/plugins/memory/controller/controller.go index 39ca367..eccb57f 100644 --- a/pkg/plugins/memory/controller/controller.go +++ b/pkg/plugins/memory/controller/controller.go @@ -100,7 +100,7 @@ func (c *Controller) watchForOoms(ch chan *metric.Metric) error { return nil } -func (c *Controller) convertOomEvent2Metric(event *oomprocesser2.OOMEvent, pod v1.Pod, stat kprobesysctl.SysctlStat) metric.Metric { +func (c *Controller) convertOomEvent2Metric(event *oomprocesser2.OOMEvent, pod *v1.Pod, stat kprobesysctl.SysctlStat) metric.Metric { var metric metric.Metric metric.Measurement = "docker_container_summary" metric.Name = "docker_container_summary" diff --git a/pkg/plugins/netfilter/netfilter.go b/pkg/plugins/netfilter/netfilter.go index 193727d..075e88c 100644 --- a/pkg/plugins/netfilter/netfilter.go +++ b/pkg/plugins/netfilter/netfilter.go @@ -73,8 +73,8 @@ func (p *provider) Gather(c chan *metric.Metric) { continue } srcIP, dstIP := net.IP(event.OriSrc[:4]), net.IP(event.OriDst[:4]) - replySrcIP, replyDstIP := net.IP(event.Dst[:4]), net.IP(event.Src[:4]) - klog.Infof("srcIP: %s, srcPort: %d, dstIP: %s, dstPort: %d, reply srcIP :%s, reply dstIP: %s", srcIP, event.OriSport, dstIP, event.OriDport, replySrcIP, replyDstIP) + _, replyDstIP := net.IP(event.Dst[:4]), net.IP(event.Src[:4]) + //klog.Infof("srcIP: %s, srcPort: %d, dstIP: %s, dstPort: %d, reply srcIP :%s, reply dstIP: %s", srcIP, event.OriSport, dstIP, event.OriDport, replySrcIP, replyDstIP) natInfo := NatInfo{ OriDstIP: dstIP.String(), OriDstPort: event.OriDport, diff --git a/pkg/plugins/protocols/http/meta/provider.go b/pkg/plugins/protocols/http/meta/provider.go index 45035d1..df555e2 100644 --- a/pkg/plugins/protocols/http/meta/provider.go +++ b/pkg/plugins/protocols/http/meta/provider.go @@ -124,7 +124,7 @@ func (p *provider) Convert(m *ebpf.Metric) *metric.Metric { // in cluster switch t := target.(type) { - case corev1.Pod: + case *corev1.Pod: p.l.Infof("source(pod): %s/%d, target(pod): %s/%s", m.SourceIP, m.SourcePort, t.Namespace, t.Name) output.Tags["cluster_name"] = t.Labels["DICE_CLUSTER_NAME"] output.Tags["db_host"] = fmt.Sprintf("%s:%d", m.DestIP, m.DestPort) @@ -151,7 +151,7 @@ func (p *provider) Convert(m *ebpf.Metric) *metric.Metric { output.Tags["target_service_name"] = t.Annotations["msp.erda.cloud/service_name"] output.Tags["target_terminus_key"] = t.Annotations["msp.erda.cloud/terminus_key"] output.Tags["target_workspace"] = t.Annotations["msp.erda.cloud/workspace"] - case corev1.Service: + case *corev1.Service: // TODO: service resource p.l.Infof("source(pod): %s/%d, target(service): %s/%s", m.SourceIP, m.SourcePort, t.Namespace, t.Name) default: