Skip to content

Commit

Permalink
support containerd logs collecting
Browse files Browse the repository at this point in the history
  • Loading branch information
kongfei605 committed Dec 4, 2023
1 parent 3a44db2 commit dc8edde
Showing 1 changed file with 65 additions and 18 deletions.
83 changes: 65 additions & 18 deletions logs/input/kubernetes/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,31 @@ package kubernetes

import (
"context"
logService "flashcat.cloud/categraf/logs/service"
"log"
"strings"
"sync"
"time"

logService "flashcat.cloud/categraf/logs/service"
"flashcat.cloud/categraf/logs/util/containers"
"flashcat.cloud/categraf/logs/util/kubernetes/kubelet"
"flashcat.cloud/categraf/pkg/checksum"
"flashcat.cloud/categraf/pkg/set"
)

type (
Scanner struct {
kubelet kubelet.KubeUtilInterface
services *logService.Services
actives map[string]struct{}
actives map[string]checksum.Checksum
mux sync.Mutex
}
)

func NewScanner(services *logService.Services) *Scanner {
return &Scanner{
services: services,
actives: make(map[string]checksum.Checksum),
}
}

Expand All @@ -51,38 +55,81 @@ func (s *Scanner) Scan() {
log.Printf("get local pod list error %s", err)
return
}
fetched := make(map[string]struct{})
fetched := make(map[string]checksum.Checksum)
for _, pod := range pods {
for _, container := range pod.Status.GetAllContainers() {
fetched[container.ID] = struct{}{}
fetched[container.ID] = checksum.New(pod.Metadata)
}
}
for id := range fetched {
if !s.Contains(id) {
svc := logService.NewService("docker", strings.TrimPrefix(id, "docker://"), logService.After)
s.services.AddService(svc)
}
new := set.NewWithLoad[string, checksum.Checksum](fetched)
old := set.NewWithLoad[string, checksum.Checksum](s.GetActives())
add, checkTwice, del := new.Diff(old)
for id := range del {
rtype, rid := parseEntity(id)
svc := logService.NewService(rtype, rid, logService.After)
s.services.RemoveService(svc)
s.DelActives(id)
}

old := s.actives
s.SetActives(fetched)
for id := range old {
if !s.Contains(id) {
svc := logService.NewService("docker", strings.TrimPrefix(id, "docker://"), logService.After)
for id := range checkTwice {
sum := fetched[id]
if !s.Contains(id, sum) {
rtype, rid := parseEntity(id)
svc := logService.NewService(rtype, rid, logService.After)
s.services.RemoveService(svc)
svc = logService.NewService(rtype, rid, logService.After)
s.services.AddService(svc)
s.AddActives(id, sum)
}
}

for id := range add {
rtype, rid := parseEntity(id)
svc := logService.NewService(rtype, rid, logService.After)
s.services.AddService(svc)
s.AddActives(id, fetched[id])
}

}
}
}

func (s *Scanner) SetActives(ids map[string]struct{}) {
func parseEntity(containerID string) (string, string) {
components := strings.Split(containerID, containers.EntitySeparator)
if len(components) != 2 {
return "docker", strings.TrimPrefix(containerID, "docker"+containers.EntitySeparator)
}
return components[0], components[1]
}

func (s *Scanner) SetActives(ids map[string]checksum.Checksum) {
s.mux.Lock()
defer s.mux.Unlock()
s.actives = ids
}

func (s *Scanner) Contains(id string) bool {
_, ok := s.actives[id]
return ok
func (s *Scanner) GetActives() map[string]checksum.Checksum {
ret := make(map[string]checksum.Checksum)
s.mux.Lock()
defer s.mux.Unlock()
for k, v := range s.actives {
ret[k] = v
}
return ret
}

func (s *Scanner) AddActives(id string, sum checksum.Checksum) {
s.mux.Lock()
defer s.mux.Unlock()
s.actives[id] = sum
}
func (s *Scanner) DelActives(id string) {
s.mux.Lock()
defer s.mux.Unlock()
delete(s.actives, id)
}

func (s *Scanner) Contains(id string, sum checksum.Checksum) bool {
val, ok := s.actives[id]
return ok && val == sum
}

0 comments on commit dc8edde

Please sign in to comment.