From dc8edde695072b0376f4cebb34529f7dbfd00151 Mon Sep 17 00:00:00 2001 From: kongfei Date: Mon, 4 Dec 2023 10:52:55 +0800 Subject: [PATCH] support containerd logs collecting --- logs/input/kubernetes/scanner.go | 83 +++++++++++++++++++++++++------- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/logs/input/kubernetes/scanner.go b/logs/input/kubernetes/scanner.go index 779fb80e..db2f3369 100644 --- a/logs/input/kubernetes/scanner.go +++ b/logs/input/kubernetes/scanner.go @@ -4,20 +4,23 @@ package kubernetes import ( "context" - logService "flashcat.cloud/categraf/logs/service" "log" "strings" "sync" "time" + logService "flashcat.cloud/categraf/logs/service" + "flashcat.cloud/categraf/logs/util/containers" "flashcat.cloud/categraf/logs/util/kubernetes/kubelet" + "flashcat.cloud/categraf/pkg/checksum" + "flashcat.cloud/categraf/pkg/set" ) type ( Scanner struct { kubelet kubelet.KubeUtilInterface services *logService.Services - actives map[string]struct{} + actives map[string]checksum.Checksum mux sync.Mutex } ) @@ -25,6 +28,7 @@ type ( func NewScanner(services *logService.Services) *Scanner { return &Scanner{ services: services, + actives: make(map[string]checksum.Checksum), } } @@ -51,38 +55,81 @@ func (s *Scanner) Scan() { log.Printf("get local pod list error %s", err) return } - fetched := make(map[string]struct{}) + fetched := make(map[string]checksum.Checksum) for _, pod := range pods { for _, container := range pod.Status.GetAllContainers() { - fetched[container.ID] = struct{}{} + fetched[container.ID] = checksum.New(pod.Metadata) } } - for id := range fetched { - if !s.Contains(id) { - svc := logService.NewService("docker", strings.TrimPrefix(id, "docker://"), logService.After) - s.services.AddService(svc) - } + new := set.NewWithLoad[string, checksum.Checksum](fetched) + old := set.NewWithLoad[string, checksum.Checksum](s.GetActives()) + add, checkTwice, del := new.Diff(old) + for id := range del { + rtype, rid := parseEntity(id) + svc := logService.NewService(rtype, rid, logService.After) + s.services.RemoveService(svc) + s.DelActives(id) } - old := s.actives - s.SetActives(fetched) - for id := range old { - if !s.Contains(id) { - svc := logService.NewService("docker", strings.TrimPrefix(id, "docker://"), logService.After) + for id := range checkTwice { + sum := fetched[id] + if !s.Contains(id, sum) { + rtype, rid := parseEntity(id) + svc := logService.NewService(rtype, rid, logService.After) s.services.RemoveService(svc) + svc = logService.NewService(rtype, rid, logService.After) + s.services.AddService(svc) + s.AddActives(id, sum) } } + + for id := range add { + rtype, rid := parseEntity(id) + svc := logService.NewService(rtype, rid, logService.After) + s.services.AddService(svc) + s.AddActives(id, fetched[id]) + } + } } } -func (s *Scanner) SetActives(ids map[string]struct{}) { +func parseEntity(containerID string) (string, string) { + components := strings.Split(containerID, containers.EntitySeparator) + if len(components) != 2 { + return "docker", strings.TrimPrefix(containerID, "docker"+containers.EntitySeparator) + } + return components[0], components[1] +} + +func (s *Scanner) SetActives(ids map[string]checksum.Checksum) { s.mux.Lock() defer s.mux.Unlock() s.actives = ids } -func (s *Scanner) Contains(id string) bool { - _, ok := s.actives[id] - return ok +func (s *Scanner) GetActives() map[string]checksum.Checksum { + ret := make(map[string]checksum.Checksum) + s.mux.Lock() + defer s.mux.Unlock() + for k, v := range s.actives { + ret[k] = v + } + return ret +} + +func (s *Scanner) AddActives(id string, sum checksum.Checksum) { + s.mux.Lock() + defer s.mux.Unlock() + s.actives[id] = sum +} +func (s *Scanner) DelActives(id string) { + s.mux.Lock() + defer s.mux.Unlock() + delete(s.actives, id) +} + +func (s *Scanner) Contains(id string, sum checksum.Checksum) bool { + val, ok := s.actives[id] + return ok && val == sum }