Skip to content

Commit

Permalink
Merge pull request #66 from ysyneu/main
Browse files Browse the repository at this point in the history
add kafka metrics collector
  • Loading branch information
UlricQin authored Jul 4, 2022
2 parents 56b42c7 + c63c941 commit 5357da0
Show file tree
Hide file tree
Showing 65 changed files with 2,313 additions and 347 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Click on the links to see the README of each plugin.
- [ ] mongodb
- [ ] rocketmq
- [ ] activemq
- [ ] kafka
- [x] [kafka](inputs/kafka)
- [x] [elasticsearch](inputs/elasticsearch)
- [x] windows
- [ ] mssql
Expand All @@ -117,12 +117,12 @@ Click on the links to see the README of each plugin.
- [ ] ipmi
- [ ] smartctl
- [ ] logging
- [ ] trace
- [x] [traces](traces)


## Thanks

Categraf is developed on the basis of Telegraf and Exporters. Thanks to the great open source community.
Categraf is developed on the basis of Telegraf, Exporters and the OpenTelemetry. Thanks to the great open source community.

## Community

Expand Down
4 changes: 3 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package agent
import (
"log"

"flashcat.cloud/categraf/traces"

// auto registry
_ "flashcat.cloud/categraf/inputs/conntrack"
_ "flashcat.cloud/categraf/inputs/cpu"
Expand All @@ -12,6 +14,7 @@ import (
_ "flashcat.cloud/categraf/inputs/elasticsearch"
_ "flashcat.cloud/categraf/inputs/exec"
_ "flashcat.cloud/categraf/inputs/http_response"
_ "flashcat.cloud/categraf/inputs/kafka"
_ "flashcat.cloud/categraf/inputs/kernel"
_ "flashcat.cloud/categraf/inputs/kernel_vmstat"
_ "flashcat.cloud/categraf/inputs/kubernetes"
Expand All @@ -35,7 +38,6 @@ import (
_ "flashcat.cloud/categraf/inputs/system"
_ "flashcat.cloud/categraf/inputs/tomcat"
_ "flashcat.cloud/categraf/inputs/zookeeper"
"flashcat.cloud/categraf/traces"
)

type Agent struct {
Expand Down
11 changes: 5 additions & 6 deletions agent/logs_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import (
"log"
"time"

coreconfig "flashcat.cloud/categraf/config"

logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/logs/auditor"
"flashcat.cloud/categraf/logs/client"
"flashcat.cloud/categraf/logs/diagnostic"
Expand All @@ -23,9 +20,11 @@ import (
"flashcat.cloud/categraf/logs/input/listener"
"flashcat.cloud/categraf/logs/pipeline"
"flashcat.cloud/categraf/logs/restart"
"flashcat.cloud/categraf/logs/service"
logService "flashcat.cloud/categraf/logs/service"
"flashcat.cloud/categraf/logs/status"

coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
logService "flashcat.cloud/categraf/logs/service"
)

// LogAgent represents the data pipeline that collects, decodes,
Expand All @@ -44,7 +43,7 @@ type LogAgent struct {
}

// NewAgent returns a new Logs LogAgent
func NewLogAgent(sources *logsconfig.LogSources, services *service.Services, processingRules []*logsconfig.ProcessingRule, endpoints *logsconfig.Endpoints) *LogAgent {
func NewLogAgent(sources *logsconfig.LogSources, services *logService.Services, processingRules []*logsconfig.ProcessingRule, endpoints *logsconfig.Endpoints) *LogAgent {
// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
// critical part. Arguably it could also be plugged to the destination.
Expand Down
8 changes: 4 additions & 4 deletions agent/metrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ func (a *Agent) startMetricsAgent() error {
}

// construct input instance
instance := creator()
inp := creator()
// set configurations for input instance
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, inputFilePrefix+name), instance)
cfg.LoadConfigs(path.Join(config.Config.ConfigDir, inputFilePrefix+name), inp)

if err = instance.Init(); err != nil {
if err = inp.Init(); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}

reader := NewInputReader(instance)
reader := NewInputReader(inp)
reader.Start()
a.InputReaders[name] = reader

Expand Down
6 changes: 5 additions & 1 deletion agent/metrics_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/house"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/runtimex"
"flashcat.cloud/categraf/types"
"flashcat.cloud/categraf/writer"
"github.com/toolkits/pkg/container/list"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (r *InputReader) gatherOnce() {
if strings.Contains(fmt.Sprint(r), "closed channel") {
return
} else {
log.Println("E! gather metrics panic:", r)
log.Println("E! gather metrics panic:", r, string(runtimex.Stack(3)))
}
}
}()
Expand All @@ -92,6 +93,9 @@ func (r *InputReader) gatherOnce() {
}

s := samples[i].(*types.Sample)
if s == nil {
continue
}

if s.Timestamp.IsZero() {
s.Timestamp = now
Expand Down
85 changes: 85 additions & 0 deletions conf/input.kafka/kafka.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# # collect interval
# interval = 15

############################################################################
# !!! uncomment [[instances]] to enable this plugin
[[instances]]
# # interval = global.interval * interval_times
# interval_times = 1

# append some labels to metrics
# cluster is a preferred tag with the cluster name. If none is provided, the first of kafka_uris will be used
labels = { cluster="kafka-cluster-01" }

# log level only for kafka exporter
log_level = "error"

# Address (host:port) of Kafka server.
kafka_uris = ["127.0.0.1:9092","127.0.0.1:9092","127.0.0.1:9092"]

# Connect using SASL/PLAIN
# Default is false
# use_sasl = false

# Only set this to false if using a non-Kafka SASL proxy
# Default is true
# use_sasl_handshake = false

# SASL user name
# sasl_username = "username"

# SASL user password
# sasl_password = "password"

# The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism
# sasl_mechanism = ""

# Connect using TLS
# use_tls = false

# The optional certificate authority file for TLS client authentication
# ca_file = ""

# The optional certificate file for TLS client authentication
# cert_file = ""

# The optional key file for TLS client authentication
# key_file = ""

# If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
# insecure_skip_verify = true

# Kafka broker version
# Default is 2.0.0
# kafka_version = "2.0.0"

# if you need to use a group from zookeeper
# Default is false
# use_zookeeper_lag = false

# Address array (hosts) of zookeeper server.
# zookeeper_uris = []

# Metadata refresh interval
# Default is 1s
# metadata_refresh_interval = "1m"

# If true, all scrapes will trigger kafka operations otherwise, they will share results. WARN: This should be disabled on large clusters
# Default is false
# allow_concurrency = false

# Maximum number of offsets to store in the interpolation table for a partition
# Default is 1000
# max_offsets = 1000

# How frequently should the interpolation table be pruned, in seconds.
# Default is 30
# prune_interval_seconds = 30

# Regex filter for topics to be monitored
# Default is ".*"
# topics_filter_regex = ".*"

# Regex filter for consumer groups to be monitored
# Default is ".*"
# groups_filter_regex = ".*"
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/docker/docker v20.10.16+incompatible
github.com/gaochao1/sw v1.0.0
github.com/go-kit/log v0.2.0
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.6.0
Expand All @@ -22,6 +23,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter v0.54.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter v0.54.0
Expand All @@ -46,6 +48,7 @@ require (
github.com/stretchr/testify v1.7.4
github.com/toolkits/pkg v1.3.0
github.com/ulricqin/gosnmp v0.0.1
github.com/xdg/scram v1.0.5
go.opentelemetry.io/collector v0.54.0
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/trace v1.7.0
Expand Down Expand Up @@ -91,7 +94,6 @@ require (
github.com/freedomkk-qfeng/go-fastping v0.0.0-20160109021039-d7bb493dee3e // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-kit/kit v0.11.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down Expand Up @@ -158,6 +160,7 @@ require (
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.8.2 // indirect
Expand All @@ -176,6 +179,7 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/pdata v0.54.0 // indirect
Expand All @@ -199,4 +203,7 @@ require (
gotest.tools/v3 v3.2.0 // indirect
)

replace go.opentelemetry.io/collector => github.com/flashcatcloud/opentelemetry-collector v0.54.1-0.20220628041301-3b8dabd1bcd0
replace (
github.com/prometheus/client_golang => github.com/flashcatcloud/client_golang v1.12.2-0.20220704074148-3b31f0c90903
go.opentelemetry.io/collector => github.com/flashcatcloud/opentelemetry-collector v0.54.1-0.20220628041301-3b8dabd1bcd0
)
Loading

0 comments on commit 5357da0

Please sign in to comment.