From 14c90caaeb9f7baa7b84286edca85182d70ad886 Mon Sep 17 00:00:00 2001 From: kongfei Date: Sat, 2 Dec 2023 16:46:24 +0800 Subject: [PATCH 1/5] feat:metrics duplication allowd for prometheus plugin --- conf/input.prometheus/prometheus.toml | 3 +++ inputs/prometheus/prometheus.go | 5 ++++- parser/prometheus/parser.go | 29 +++++++++++++++++++++++++-- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/conf/input.prometheus/prometheus.toml b/conf/input.prometheus/prometheus.toml index b9c727dc..d1563a06 100644 --- a/conf/input.prometheus/prometheus.toml +++ b/conf/input.prometheus/prometheus.toml @@ -9,6 +9,9 @@ urls = [ url_label_key = "instance" url_label_value = "{{.Host}}" +## metrics duplication allowed, default false +# duplication_allowed=true + ## Scrape Services available in Consul Catalog # [instances.consul] # enabled = false diff --git a/inputs/prometheus/prometheus.go b/inputs/prometheus/prometheus.go index fe7b06fd..aa8890bb 100644 --- a/inputs/prometheus/prometheus.go +++ b/inputs/prometheus/prometheus.go @@ -36,6 +36,8 @@ type Instance struct { IgnoreLabelKeys []string `toml:"ignore_label_keys"` Headers []string `toml:"headers"` + DuplicationAllowed bool `toml:"duplication_allowed"` + config.UrlLabel ignoreMetricsFilter filter.Filter @@ -228,7 +230,8 @@ func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, u slist.PushFront(types.NewSample("", "up", 1, labels)) - parser := prometheus.NewParser(ins.NamePrefix, labels, res.Header, ins.ignoreMetricsFilter, ins.ignoreLabelKeysFilter) + parser := prometheus.NewParser(ins.NamePrefix, labels, res.Header, ins.DuplicationAllowed, + ins.ignoreMetricsFilter, ins.ignoreLabelKeysFilter) if err = parser.Parse(body, slist); err != nil { log.Println("E! failed to parse response body, url:", u.String(), "error:", err) } diff --git a/parser/prometheus/parser.go b/parser/prometheus/parser.go index 88b93c37..469a1ecd 100644 --- a/parser/prometheus/parser.go +++ b/parser/prometheus/parser.go @@ -1,8 +1,11 @@ package prometheus import ( + "log" "math" + "mime" "net/http" + "strings" dto "github.com/prometheus/client_model/go" @@ -17,15 +20,18 @@ type Parser struct { Header http.Header IgnoreMetricsFilter filter.Filter IgnoreLabelKeysFilter filter.Filter + DuplicationAllowed bool } -func NewParser(namePrefix string, defaultTags map[string]string, header http.Header, ignoreMetricsFilter, ignoreLabelKeysFilter filter.Filter) *Parser { +func NewParser(namePrefix string, defaultTags map[string]string, header http.Header, + duplicationAllowed bool, ignoreMetricsFilter, ignoreLabelKeysFilter filter.Filter) *Parser { return &Parser{ NamePrefix: namePrefix, DefaultTags: defaultTags, Header: header, IgnoreMetricsFilter: ignoreMetricsFilter, IgnoreLabelKeysFilter: ignoreLabelKeysFilter, + DuplicationAllowed: duplicationAllowed, } } @@ -33,7 +39,7 @@ func EmptyParser() *Parser { return &Parser{} } -func (p *Parser) Parse(buf []byte, slist *types.SampleList) error { +func (p *Parser) parse(buf []byte, slist *types.SampleList) error { metricFamilies, err := util.Parse(buf, p.Header) if err != nil { return err @@ -59,6 +65,25 @@ func (p *Parser) Parse(buf []byte, slist *types.SampleList) error { return nil } +func (p *Parser) Parse(buf []byte, slist *types.SampleList) error { + mediatype, _, _ := mime.ParseMediaType(p.Header.Get("Content-Type")) + if mediatype == "application/vnd.google.protobuf" || !p.DuplicationAllowed { + return p.parse(buf, slist) + } + + metrics := strings.Split(string(buf), "# HELP ") + for i := range metrics { + if i != 0 { + metrics[i] = "# HELP " + metrics[i] + } + err := p.parse([]byte(metrics[i]), slist) + if err != nil { + log.Println("E! parse metrics failed, error:", err, "metrics:", metrics[i]) + } + } + + return nil +} // Get labels from metric func (p *Parser) makeLabels(m *dto.Metric) map[string]string { From 723dd16bfa0a6ced0d04a604885d72b79e92393e Mon Sep 17 00:00:00 2001 From: kongfei Date: Sun, 3 Dec 2023 08:42:15 +0800 Subject: [PATCH 2/5] ignore ident for deployment --- k8s/deployment-etcd-http.yaml | 6 +++--- k8s/deployment.yaml | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/k8s/deployment-etcd-http.yaml b/k8s/deployment-etcd-http.yaml index 5250aad4..8b2ef9bb 100644 --- a/k8s/deployment-etcd-http.yaml +++ b/k8s/deployment-etcd-http.yaml @@ -89,7 +89,7 @@ data: chan_size = 10000 [[writers]] - url = "http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write" + url = "http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write?ignore_ident=true" # Basic auth username basic_auth_user = "" @@ -212,7 +212,7 @@ data: regex: kube-system;kube-dns;metrics remote_write: - - url: 'http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write' + - url: 'http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write?ignore_ident=true' --- apiVersion: apps/v1 kind: Deployment @@ -282,4 +282,4 @@ spec: name: scrape-config name: scrape-config - emptyDir: {} - name: prometheus-wal \ No newline at end of file + name: prometheus-wal diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml index 0d1ecaf8..e09f939b 100644 --- a/k8s/deployment.yaml +++ b/k8s/deployment.yaml @@ -101,7 +101,7 @@ data: chan_size = 10000 [[writers]] - url = "http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write" + url = "http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write?ignore_ident=true" # Basic auth username basic_auth_user = "" @@ -229,7 +229,7 @@ data: regex: kube-system;kube-dns;metrics remote_write: - - url: 'http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write' + - url: 'http://${NSERVER_SERVICE_WITH_PORT}/prometheus/v1/write?ignore_ident=true' --- apiVersion: apps/v1 kind: Deployment From 3a44db26d9deff7878cfb470b47174f919363bfd Mon Sep 17 00:00:00 2001 From: kongfei Date: Mon, 4 Dec 2023 10:46:00 +0800 Subject: [PATCH 3/5] support kafka hash and sasl configuration --- conf/logs.toml | 18 ++++++++++++ config/logs.go | 16 ++++++++++- go.mod | 19 +++++++------ go.sum | 32 ++++++++++++++------- logs/client/kafka/destination.go | 49 ++++++++++++++++++++++++++++++-- logs/client/kafka/kafka.go | 2 +- logs/client/kafka/topic_json.go | 3 +- logs/message/origin.go | 13 +++++++++ logs/processor/json.go | 6 ++++ 9 files changed, 134 insertions(+), 24 deletions(-) diff --git a/conf/logs.toml b/conf/logs.toml index 293dd5c2..17338520 100644 --- a/conf/logs.toml +++ b/conf/logs.toml @@ -22,7 +22,25 @@ open_files_limit = 100 scan_period = 10 ## read buffer of udp frame_size = 9000 + +# 是否开启sasl模式 +sasl_enable = false +sasl_user = "admin" +sasl_password = "admin" +# PLAIN +sasl_mechanism= "PLAIN" +# v1 +sasl_version=1 +# set true +sasl_handshake = true +# optional +# sasl_auth_identity="" +# ## +# v0.3.39以上版本新增,是否开启pod日志采集 +enable_collect_container=false + +# 是否采集所有pod的stdout stderr collect_container_all = true ## glog processing rules # [[logs.Processing_rules]] diff --git a/config/logs.go b/config/logs.go index 310755f0..0a63d9d9 100644 --- a/config/logs.go +++ b/config/logs.go @@ -3,9 +3,10 @@ package config import ( - "github.com/Shopify/sarama" + "github.com/IBM/sarama" logsconfig "flashcat.cloud/categraf/config/logs" + "flashcat.cloud/categraf/pkg/tls" ) const ( @@ -40,6 +41,19 @@ type ( Topic string `json:"topic" toml:"topic"` Brokers []string `json:"brokers" toml:"brokers"` *sarama.Config + + KafkaVersion string `toml:"kafka_version"` + SaslEnable bool `toml:"sasl_enable"` + SaslMechanism string `toml:"sasl_mechanism"` + SaslVersion int16 `toml:"sasl_version"` + SaslHandshake bool `toml:"sasl_handshake"` + SaslUser string `toml:"sasl_user"` + SaslPassword string `toml:"sasl_password"` + SaslAuthIdentity string `toml:"sasl_auth_identity"` + + CertificateAuth []string `toml:"certificate_authorities"` + tls.ClientConfig + PartitionStrategy string `toml:"partition_strategy"` } KubeConfig struct { KubeletHTTPPort int `json:"kubernetes_http_kubelet_port" toml:"kubernetes_http_kubelet_port"` diff --git a/go.mod b/go.mod index e6b9066c..c6825c62 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/prometheus/prometheus v0.37.0 github.com/shirou/gopsutil/v3 v3.22.5 github.com/sirupsen/logrus v1.8.1 // indirect - github.com/stretchr/testify v1.8.3 + github.com/stretchr/testify v1.8.4 github.com/toolkits/pkg v1.3.5 github.com/ulricqin/gosnmp v0.0.1 github.com/xdg/scram v1.0.5 @@ -117,6 +117,7 @@ require ( require ( cloud.google.com/go/monitoring v1.13.0 + github.com/IBM/sarama v1.42.1 github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible github.com/alibabacloud-go/cms-20190101/v8 v8.0.0 github.com/alibabacloud-go/cms-export-20211101/v2 v2.0.0 @@ -182,8 +183,8 @@ require ( github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/go-connections v0.4.0 github.com/docker/go-units v0.4.0 // indirect - github.com/eapache/go-resiliency v1.3.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/emicklei/go-restful v2.16.0+incompatible // indirect @@ -257,13 +258,13 @@ require ( github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/knadh/koanf v1.4.2 // indirect github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b // indirect github.com/leodido/go-urn v1.2.4 // indirect @@ -293,7 +294,7 @@ require ( github.com/pelletier/go-toml v1.9.4 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect - github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/alertmanager v0.24.0 // indirect @@ -327,8 +328,8 @@ require ( github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect github.com/vultr/govultr/v2 v2.17.2 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/scram v1.1.1 // indirect - github.com/xdg-go/stringprep v1.0.3 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xdg/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect @@ -347,7 +348,7 @@ require ( golang.org/x/crypto v0.14.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect - golang.org/x/sync v0.3.0 // indirect + golang.org/x/sync v0.4.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect golang.org/x/tools v0.13.0 // indirect diff --git a/go.sum b/go.sum index e455df17..89e57b14 100644 --- a/go.sum +++ b/go.sum @@ -88,6 +88,8 @@ github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 h1:KeNholpO2xKjgaa github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962/go.mod h1:kC29dT1vFpj7py2OvG1khBdQpo3kInWP+6QipLbdngo= github.com/HdrHistogram/hdrhistogram-go v1.1.0 h1:6dpdDPTRoo78HxAJ6T1HfMiKSnqhgRRqzCuPshRkQ7I= github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= +github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= +github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= @@ -311,10 +313,12 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= -github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= @@ -813,8 +817,9 @@ github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nD github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= -github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= @@ -856,8 +861,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= @@ -1130,8 +1136,9 @@ github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v3 v3.3.4/go.mod h1:280XNCGS8jAcG++AHdd6SeWnzyJ1w9oow2vbORyey8Q= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1280,8 +1287,9 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tg123/go-htpasswd v1.2.0 h1:UKp34m9H467/xklxUxU15wKRru7fwXoTojtxg25ITF0= @@ -1334,11 +1342,13 @@ github.com/vultr/govultr/v2 v2.17.2/go.mod h1:ZFOKGWmgjytfyjeyAdhQlSWwTjh2ig+X49 github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= -github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= -github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw= github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4= @@ -1464,6 +1474,7 @@ golang.org/x/crypto v0.0.0-20211202192323-5770296d904e/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= @@ -1576,6 +1587,7 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= @@ -1610,8 +1622,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/logs/client/kafka/destination.go b/logs/client/kafka/destination.go index dacf59d8..96c3119e 100644 --- a/logs/client/kafka/destination.go +++ b/logs/client/kafka/destination.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" json "github.com/mailru/easyjson" coreconfig "flashcat.cloud/categraf/config" @@ -78,9 +78,50 @@ func newDestination(endpoint logsconfig.Endpoint, contentType string, destinatio if coreconfig.Config.Logs.Config == nil { coreconfig.Config.Logs.Config = sarama.NewConfig() coreconfig.Config.Logs.Producer.Partitioner = sarama.NewRandomPartitioner + if coreconfig.Config.Logs.PartitionStrategy == "round_robin" { + coreconfig.Config.Logs.Producer.Partitioner = sarama.NewRoundRobinPartitioner + } + if coreconfig.Config.Logs.PartitionStrategy == "random" { + coreconfig.Config.Logs.Producer.Partitioner = sarama.NewRandomPartitioner + } + if coreconfig.Config.Logs.PartitionStrategy == "hash" { + coreconfig.Config.Logs.Producer.Partitioner = sarama.NewHashPartitioner + } + coreconfig.Config.Logs.Producer.Return.Successes = true } + if coreconfig.Config.Logs.SendWithTLS && coreconfig.Config.Logs.SendType == "kafka" { + coreconfig.Config.Logs.Config.Net.TLS.Enable = true + coreconfig.Config.Logs.UseTLS = true + var err error + coreconfig.Config.Logs.Net.TLS.Config, err = coreconfig.Config.Logs.KafkaConfig.ClientConfig.TLSConfig() + if err != nil { + panic(err) + } + } + if coreconfig.Config.Logs.SaslEnable { + coreconfig.Config.Logs.Config.Net.SASL.Enable = true + coreconfig.Config.Logs.Config.Net.SASL.User = coreconfig.Config.Logs.SaslUser + coreconfig.Config.Logs.Config.Net.SASL.Password = coreconfig.Config.Logs.SaslPassword + coreconfig.Config.Logs.Config.Net.SASL.Mechanism = sarama.SASLMechanism(coreconfig.Config.Logs.SaslMechanism) + coreconfig.Config.Logs.Config.Net.SASL.Version = coreconfig.Config.Logs.SaslVersion + coreconfig.Config.Logs.Config.Net.SASL.Handshake = coreconfig.Config.Logs.SaslHandshake + coreconfig.Config.Logs.Config.Net.SASL.AuthIdentity = coreconfig.Config.Logs.SaslAuthIdentity + } + + if len(coreconfig.Config.Logs.KafkaVersion) != 0 { + for _, v := range sarama.SupportedVersions { + if v.String() == coreconfig.Config.Logs.KafkaVersion { + coreconfig.Config.Logs.Config.Version = v + break + } + } + } + if coreconfig.Config.DebugMode { + log.Printf("D! saram config: %+v", coreconfig.Config.Logs.Config) + } + brokers := strings.Split(endpoint.Addr, ",") c, err := sarama.NewSyncProducer(brokers, coreconfig.Config.Logs.Config) if err != nil { @@ -148,7 +189,11 @@ func (d *Destination) unconditionalSend(payload []byte) (err error) { if data.Topic != "" { topic = data.Topic } - err = NewBuilder().WithMessage(d.apiKey, encodedPayload).WithTopic(topic).Send(d.client) + msgKey := d.apiKey + if data.MsgKey != "" { + msgKey = data.MsgKey + } + err = NewBuilder().WithMessage(msgKey, encodedPayload).WithTopic(topic).Send(d.client) if err != nil { log.Printf("W! send message to kafka error %s, topic:%s", err, topic) if ctx.Err() == context.Canceled { diff --git a/logs/client/kafka/kafka.go b/logs/client/kafka/kafka.go index b4ae00de..f9a362d7 100644 --- a/logs/client/kafka/kafka.go +++ b/logs/client/kafka/kafka.go @@ -5,7 +5,7 @@ package kafka import ( "fmt" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) type MessageBuilder struct { diff --git a/logs/client/kafka/topic_json.go b/logs/client/kafka/topic_json.go index 93259ddd..b01143dd 100644 --- a/logs/client/kafka/topic_json.go +++ b/logs/client/kafka/topic_json.go @@ -9,6 +9,7 @@ import ( // easyjson:json type ( Data struct { - Topic string `json:"topic"` + Topic string `json:"topic"` + MsgKey string `json:"msg_key"` } ) diff --git a/logs/message/origin.go b/logs/message/origin.go index 0bd6f84a..836e5d69 100644 --- a/logs/message/origin.go +++ b/logs/message/origin.go @@ -9,6 +9,7 @@ package message import ( "encoding/json" + "fmt" "log" "strings" @@ -142,3 +143,15 @@ func (o *Origin) Service() string { } return o.service } + +func (o *Origin) GetIdentifier() string { + switch o.LogSource.GetSourceType() { + case logsconfig.DockerType, logsconfig.KubernetesSourceType: + return o.LogSource.Config.Identifier + case logsconfig.FileType: + return o.LogSource.Config.Path + case logsconfig.TCPType, logsconfig.UDPType: + return fmt.Sprintf("%d", o.LogSource.Config.Port) + } + return "" +} diff --git a/logs/processor/json.go b/logs/processor/json.go index 8ee5639a..2ae690f5 100644 --- a/logs/processor/json.go +++ b/logs/processor/json.go @@ -33,6 +33,7 @@ type jsonPayload struct { Source string `json:"fcsource"` Tags string `json:"fctags"` Topic string `json:"topic"` + MsgKey string `json:"msg_key"` } // Encode encodes a message into a JSON byte array. @@ -59,6 +60,10 @@ func (j *jsonEncoder) Encode(msg *message.Message, redactedMsg []byte) ([]byte, if msg.Origin.LogSource.Config.Topic != "" { topic = msg.Origin.LogSource.Config.Topic } + msgKey := config.Config.Logs.APIKey + if config.Config.Logs.SendType == "kafka" { + msgKey = msg.GetHostname() + "/" + msg.Origin.GetIdentifier() + } return json.Marshal(jsonPayload{ Message: toValidUtf8(redactedMsg), @@ -69,5 +74,6 @@ func (j *jsonEncoder) Encode(msg *message.Message, redactedMsg []byte) ([]byte, Source: msg.Origin.Source(), Tags: msg.Origin.TagsToJsonString(), Topic: topic, + MsgKey: msgKey, }) } From dc8edde695072b0376f4cebb34529f7dbfd00151 Mon Sep 17 00:00:00 2001 From: kongfei Date: Mon, 4 Dec 2023 10:52:55 +0800 Subject: [PATCH 4/5] support containerd logs collecting --- logs/input/kubernetes/scanner.go | 83 +++++++++++++++++++++++++------- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/logs/input/kubernetes/scanner.go b/logs/input/kubernetes/scanner.go index 779fb80e..db2f3369 100644 --- a/logs/input/kubernetes/scanner.go +++ b/logs/input/kubernetes/scanner.go @@ -4,20 +4,23 @@ package kubernetes import ( "context" - logService "flashcat.cloud/categraf/logs/service" "log" "strings" "sync" "time" + logService "flashcat.cloud/categraf/logs/service" + "flashcat.cloud/categraf/logs/util/containers" "flashcat.cloud/categraf/logs/util/kubernetes/kubelet" + "flashcat.cloud/categraf/pkg/checksum" + "flashcat.cloud/categraf/pkg/set" ) type ( Scanner struct { kubelet kubelet.KubeUtilInterface services *logService.Services - actives map[string]struct{} + actives map[string]checksum.Checksum mux sync.Mutex } ) @@ -25,6 +28,7 @@ type ( func NewScanner(services *logService.Services) *Scanner { return &Scanner{ services: services, + actives: make(map[string]checksum.Checksum), } } @@ -51,38 +55,81 @@ func (s *Scanner) Scan() { log.Printf("get local pod list error %s", err) return } - fetched := make(map[string]struct{}) + fetched := make(map[string]checksum.Checksum) for _, pod := range pods { for _, container := range pod.Status.GetAllContainers() { - fetched[container.ID] = struct{}{} + fetched[container.ID] = checksum.New(pod.Metadata) } } - for id := range fetched { - if !s.Contains(id) { - svc := logService.NewService("docker", strings.TrimPrefix(id, "docker://"), logService.After) - s.services.AddService(svc) - } + new := set.NewWithLoad[string, checksum.Checksum](fetched) + old := set.NewWithLoad[string, checksum.Checksum](s.GetActives()) + add, checkTwice, del := new.Diff(old) + for id := range del { + rtype, rid := parseEntity(id) + svc := logService.NewService(rtype, rid, logService.After) + s.services.RemoveService(svc) + s.DelActives(id) } - old := s.actives - s.SetActives(fetched) - for id := range old { - if !s.Contains(id) { - svc := logService.NewService("docker", strings.TrimPrefix(id, "docker://"), logService.After) + for id := range checkTwice { + sum := fetched[id] + if !s.Contains(id, sum) { + rtype, rid := parseEntity(id) + svc := logService.NewService(rtype, rid, logService.After) s.services.RemoveService(svc) + svc = logService.NewService(rtype, rid, logService.After) + s.services.AddService(svc) + s.AddActives(id, sum) } } + + for id := range add { + rtype, rid := parseEntity(id) + svc := logService.NewService(rtype, rid, logService.After) + s.services.AddService(svc) + s.AddActives(id, fetched[id]) + } + } } } -func (s *Scanner) SetActives(ids map[string]struct{}) { +func parseEntity(containerID string) (string, string) { + components := strings.Split(containerID, containers.EntitySeparator) + if len(components) != 2 { + return "docker", strings.TrimPrefix(containerID, "docker"+containers.EntitySeparator) + } + return components[0], components[1] +} + +func (s *Scanner) SetActives(ids map[string]checksum.Checksum) { s.mux.Lock() defer s.mux.Unlock() s.actives = ids } -func (s *Scanner) Contains(id string) bool { - _, ok := s.actives[id] - return ok +func (s *Scanner) GetActives() map[string]checksum.Checksum { + ret := make(map[string]checksum.Checksum) + s.mux.Lock() + defer s.mux.Unlock() + for k, v := range s.actives { + ret[k] = v + } + return ret +} + +func (s *Scanner) AddActives(id string, sum checksum.Checksum) { + s.mux.Lock() + defer s.mux.Unlock() + s.actives[id] = sum +} +func (s *Scanner) DelActives(id string) { + s.mux.Lock() + defer s.mux.Unlock() + delete(s.actives, id) +} + +func (s *Scanner) Contains(id string, sum checksum.Checksum) bool { + val, ok := s.actives[id] + return ok && val == sum } From bcb64947b35b5277f1a870a9a69b90ce1dc00533 Mon Sep 17 00:00:00 2001 From: kongfei Date: Mon, 4 Dec 2023 10:55:48 +0800 Subject: [PATCH 5/5] support debug mode for logs collecting --- logs/processor/processor.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/logs/processor/processor.go b/logs/processor/processor.go index ec29851a..e1dc0356 100644 --- a/logs/processor/processor.go +++ b/logs/processor/processor.go @@ -9,9 +9,12 @@ package processor import ( "context" + "fmt" "log" + "os" "sync" + coreconfig "flashcat.cloud/categraf/config" logsconfig "flashcat.cloud/categraf/config/logs" "flashcat.cloud/categraf/logs/diagnostic" "flashcat.cloud/categraf/logs/message" @@ -94,6 +97,9 @@ func (p *Processor) processMessage(msg *message.Message) { log.Println("unable to encode msg ", err) return } + if coreconfig.Config.DebugMode { + fmt.Fprintf(os.Stdout, "D! log item: %s", string(content)) + } msg.Content = content p.outputChan <- msg }