Skip to content

Commit

Permalink
Merge pull request #409 from kongfei605/logs_filter
Browse files Browse the repository at this point in the history
add filter policy for pod log collecting
  • Loading branch information
kongfei605 authored Mar 15, 2023
2 parents d546f08 + 2f4a3f8 commit 3be930f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
16 changes: 16 additions & 0 deletions config/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type (
ScanPeriod int `json:"scan_period" toml:"scan_period"`
FrameSize int `json:"frame_size" toml:"frame_size"`
CollectContainerAll bool `json:"collect_container_all" toml:"collect_container_all"`
ContainerInclude []string `json:"container_include" toml:"container_include"`
ContainerExclude []string `json:"container_exclude" toml:"container_exclude"`
GlobalProcessingRules []*logsconfig.ProcessingRule `json:"processing_rules" toml:"processing_rules"`
Items []*logsconfig.LogsConfig `json:"items" toml:"items"`
KafkaConfig
Expand Down Expand Up @@ -88,3 +90,17 @@ func IsFeaturePresent(t string) bool {
func GetContainerCollectAll() bool {
return Config.Logs.CollectContainerAll
}

func GetContainerIncludeList() []string {
if Config.Logs.ContainerInclude == nil {
return []string{}
}
return Config.Logs.ContainerInclude
}

func GetContainerExcludeList() []string {
if Config.Logs.ContainerExclude == nil {
return []string{}
}
return Config.Logs.ContainerExclude
}
6 changes: 6 additions & 0 deletions logs/input/kubernetes/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
AnnotationRuleKey = "categraf/logs.stdout.processing_rules"
AnnotationTopicKey = "categraf/logs.stdout.topic"
AnnotationTagPrefixKey = "categraf/tags.prefix"
AnnotationCollectKey = "categraf/logs.stdout.collect"
)

var (
Expand Down Expand Up @@ -254,6 +255,11 @@ func (l *Launcher) getSource(pod *kubelet.Pod, container kubelet.ContainerStatus
if !l.collectAll {
return nil, errCollectAllDisabled
}
if !(pod.Metadata.Annotations[AnnotationCollectKey] == "" ||
pod.Metadata.Annotations[AnnotationCollectKey] == "true") {
log.Printf("pod %s disable stdout collecting", pod.Metadata.Name)
return nil, errCollectAllDisabled
}
// The logs source is the short image name
logsSource := ""
shortImageName, err := l.getShortImageName(pod, container.Name)
Expand Down
26 changes: 19 additions & 7 deletions logs/util/containers/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"log"
"regexp"
"strings"

coreconfig "flashcat.cloud/categraf/config"
)

const (
Expand Down Expand Up @@ -60,6 +62,7 @@ const (
pauseContainerUpstream = `image:upstream/pause.*`
// - cdk/pause-amd64
pauseContainerCDK = `image:cdk/pause.*`
categrafContainer = `image:flashcatcloud/categraf.*`

// filter prefixes for inclusion/exclusion
imageFilterPrefix = `image:`
Expand Down Expand Up @@ -200,8 +203,8 @@ func NewFilter(includeList, excludeList []string) (*Filter, error) {
func newMetricFilterFromConfig() (*Filter, error) {
// We merge `container_include` and `container_include_metrics` as this filter
// is used by all core and python checks (so components sending metrics).
includeList := []string{}
excludeList := []string{}
includeList := coreconfig.GetContainerIncludeList()
excludeList := coreconfig.GetContainerExcludeList()

excludeList = append(excludeList,
pauseContainerGCR,
Expand All @@ -219,6 +222,7 @@ func newMetricFilterFromConfig() (*Filter, error) {
pauseContainerECR,
pauseContainerUpstream,
pauseContainerCDK,
categrafContainer,
)
return NewFilter(includeList, excludeList)
}
Expand All @@ -228,15 +232,19 @@ func newMetricFilterFromConfig() (*Filter, error) {
// It allows to filter metrics and logs separately
// For use in autodiscovery.
func NewAutodiscoveryFilter(filter FilterType) (*Filter, error) {
includeList := []string{}
excludeList := []string{}
includeList := coreconfig.GetContainerIncludeList()
excludeList := coreconfig.GetContainerExcludeList()
switch filter {
case GlobalFilter:
includeList = []string{}
excludeList = []string{"image:*.categraf.*"}
if len(excludeList) == 0 {
excludeList = append(excludeList, categrafContainer)
}
case LogsFilter:
includeList = []string{}
excludeList = []string{"image:.*categraf.*"}
if len(excludeList) == 0 {
excludeList = append(excludeList, categrafContainer)
}
}
return NewFilter(includeList, excludeList)
}
Expand Down Expand Up @@ -267,7 +275,11 @@ func (cf Filter) IsExcluded(containerName, containerImage, podNamespace string)

// Check if excludeListed
for _, r := range cf.ImageExcludeList {
if r.MatchString(containerImage) {
match := r.MatchString(containerImage)
if coreconfig.Config.DebugMode {
log.Printf("D!, exclude item :%+v, container image:%s, %t\n", r, containerImage, match)
}
if match {
return true
}
}
Expand Down
21 changes: 20 additions & 1 deletion logs/util/kubernetes/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"time"

coreconfig "flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/logs/errors"
"flashcat.cloud/categraf/logs/util/containers"
"flashcat.cloud/categraf/logs/util/containers/providers"
Expand Down Expand Up @@ -87,6 +88,10 @@ func NewKubeUtil() *KubeUtil {
podListCacheDuration: 5 * time.Second,
podUnmarshaller: newPodUnmarshaller(),
}
filter, err := containers.NewAutodiscoveryFilter(containers.LogsFilter)
if err == nil {
ku.filter = filter
}

waitOnMissingContainer := 0
if waitOnMissingContainer > 0 {
Expand Down Expand Up @@ -211,7 +216,9 @@ func (ku *KubeUtil) GetLocalPodList(ctx context.Context) ([]*Pod, error) {
allContainers = append(allContainers, pod.Status.InitContainers...)
allContainers = append(allContainers, pod.Status.Containers...)
pod.Status.AllContainers = allContainers
tmpSlice = append(tmpSlice, pod)
if !ku.filterPod(pod) {
tmpSlice = append(tmpSlice, pod)
}
}
}
pods.Items = tmpSlice
Expand All @@ -222,6 +229,18 @@ func (ku *KubeUtil) GetLocalPodList(ctx context.Context) ([]*Pod, error) {
return pods.Items, nil
}

func (ku *KubeUtil) filterPod(pod *Pod) bool {
for _, c := range pod.Status.GetAllContainers() {
if coreconfig.Config.DebugMode {
log.Printf("D! container name:%s image:%s, ns:%s", c.Name, c.Image, pod.Metadata.Namespace)
}
if ku.filter.IsExcluded(c.Name, c.Image, pod.Metadata.Namespace) {
return true
}
}
return false
}

// ForceGetLocalPodList reset podList cache and call GetLocalPodList
func (ku *KubeUtil) ForceGetLocalPodList(ctx context.Context) ([]*Pod, error) {
ResetCache()
Expand Down

0 comments on commit 3be930f

Please sign in to comment.