Skip to content

Commit

Permalink
Merge pull request #48 from kongfei605/feature_log_kafka
Browse files Browse the repository at this point in the history
send logs to kafka
  • Loading branch information
kongfei605 authored Jun 24, 2022
2 parents d286f5b + bc52cc6 commit 126767d
Show file tree
Hide file tree
Showing 15 changed files with 549 additions and 122 deletions.
7 changes: 1 addition & 6 deletions agent/logs_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/logs/auditor"
"flashcat.cloud/categraf/logs/client"
"flashcat.cloud/categraf/logs/client/http"
"flashcat.cloud/categraf/logs/diagnostic"
"flashcat.cloud/categraf/logs/input/file"
"flashcat.cloud/categraf/logs/input/journald"
Expand Down Expand Up @@ -154,11 +153,7 @@ func (a *Agent) startLogAgent() {
return
}

httpConnectivity := logsconfig.HTTPConnectivityFailure
if endpoints, err := BuildHTTPEndpoints(intakeTrackType, AgentJSONIntakeProtocol, logsconfig.DefaultIntakeOrigin); err == nil {
httpConnectivity = http.CheckConnectivity(endpoints.Main)
}
endpoints, err := BuildEndpoints(httpConnectivity, intakeTrackType, AgentJSONIntakeProtocol, logsconfig.DefaultIntakeOrigin)
endpoints, err := BuildEndpoints(intakeTrackType, AgentJSONIntakeProtocol, logsconfig.DefaultIntakeOrigin)
processingRules, err := GlobalProcessingRules()
if err != nil {
message := fmt.Sprintf("Invalid processing rules: %v", err)
Expand Down
70 changes: 60 additions & 10 deletions agent/logs_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,70 @@ const (
)

// BuildEndpoints returns the endpoints to send logs.
func BuildEndpoints(httpConnectivity logsconfig.HTTPConnectivity, intakeTrackType logsconfig.IntakeTrackType, intakeProtocol logsconfig.IntakeProtocol, intakeOrigin logsconfig.IntakeOrigin) (*logsconfig.Endpoints, error) {
return BuildEndpointsWithConfig(httpEndpointPrefix, httpConnectivity, intakeTrackType, intakeProtocol, intakeOrigin)
func BuildEndpoints(intakeTrackType logsconfig.IntakeTrackType, intakeProtocol logsconfig.IntakeProtocol, intakeOrigin logsconfig.IntakeOrigin) (*logsconfig.Endpoints, error) {
return BuildEndpointsWithConfig(httpEndpointPrefix, intakeTrackType, intakeProtocol, intakeOrigin)
}

// BuildEndpointsWithConfig returns the endpoints to send logs.
func BuildEndpointsWithConfig(endpointPrefix string, httpConnectivity logsconfig.HTTPConnectivity, intakeTrackType logsconfig.IntakeTrackType, intakeProtocol logsconfig.IntakeProtocol, intakeOrigin logsconfig.IntakeOrigin) (*logsconfig.Endpoints, error) {
func BuildEndpointsWithConfig(endpointPrefix string, intakeTrackType logsconfig.IntakeTrackType, intakeProtocol logsconfig.IntakeProtocol, intakeOrigin logsconfig.IntakeOrigin) (*logsconfig.Endpoints, error) {
logsConfig := coreconfig.Config.Logs

if logsConfig.SendType == "http" || (bool(httpConnectivity) && !(logsConfig.SendType == "tcp")) {
switch logsConfig.SendType {
case "http":
return BuildHTTPEndpointsWithConfig(endpointPrefix, intakeTrackType, intakeProtocol, intakeOrigin)
case "tcp":
return buildTCPEndpoints(logsConfig)
case "kafka":
return buildKafkaEndpoints(logsConfig)

}
return buildTCPEndpoints(logsConfig)
}

func buildKafkaEndpoints(logsConfig coreconfig.Logs) (*logsconfig.Endpoints, error) {
// return nil, nil
// Provide default values for legacy settings when the configuration key does not exist
defaultTLS := coreconfig.Config.Logs.SendWithTLS

main := logsconfig.Endpoint{
APIKey: strings.TrimSpace(logsConfig.APIKey),
UseCompression: logsConfig.UseCompression,
CompressionLevel: logsConfig.CompressionLevel,
ConnectionResetInterval: 0,
BackoffBase: 1.0,
BackoffMax: 120.0,
BackoffFactor: 2.0,
RecoveryInterval: 2,
RecoveryReset: false,
Addr: logsConfig.SendTo,
Topic: logsConfig.Topic,
}

if intakeTrackType != "" {
main.Version = logsconfig.EPIntakeVersion2
main.TrackType = intakeTrackType
} else {
main.Version = logsconfig.EPIntakeVersion1
}

if len(logsConfig.SendTo) != 0 {
brokers := strings.Split(logsConfig.SendTo, ",")
if len(brokers) == 0 {
return nil, fmt.Errorf("wrong send_to content %s", logsConfig.SendTo)
}
host, port, err := parseAddress(brokers[0])
if err != nil {
return nil, fmt.Errorf("could not parse %s: %v", logsConfig.SendTo, err)
}
main.Host = host
main.Port = port
main.UseSSL = defaultTLS
} else {
return nil, fmt.Errorf("empty send_to is not allowed when send_type is kafka")
}
return NewEndpoints(main, false, "kafka"), nil
}

func buildTCPEndpoints(logsConfig coreconfig.Logs) (*logsconfig.Endpoints, error) {
main := logsconfig.Endpoint{
APIKey: logsConfig.APIKey,
Expand All @@ -58,7 +108,7 @@ func buildTCPEndpoints(logsConfig coreconfig.Logs) (*logsconfig.Endpoints, error
main.UseSSL = logsConfig.SendWithTLS
}

return NewEndpoints(main, false, false), nil
return NewEndpoints(main, false, "tcp"), nil
}

// BuildHTTPEndpoints returns the HTTP endpoints to send logs to.
Expand Down Expand Up @@ -112,7 +162,7 @@ func BuildHTTPEndpointsWithConfig(endpointPrefix string, intakeTrackType logscon
batchMaxSize := 100
batchMaxContentSize := 1000000

return NewEndpointsWithBatchSettings(main, false, true, batchWait, batchMaxConcurrentSend, batchMaxSize, batchMaxContentSize), nil
return NewEndpointsWithBatchSettings(main, false, "http", batchWait, batchMaxConcurrentSend, batchMaxSize, batchMaxContentSize), nil
}

// parseAddress returns the host and the port of the address.
Expand All @@ -129,13 +179,13 @@ func parseAddress(address string) (string, int, error) {
}

// NewEndpoints returns a new endpoints composite with default batching settings
func NewEndpoints(main logsconfig.Endpoint, useProto bool, useHTTP bool) *logsconfig.Endpoints {
func NewEndpoints(main logsconfig.Endpoint, useProto bool, typ string) *logsconfig.Endpoints {
logsConfig := coreconfig.Config.Logs
return &logsconfig.Endpoints{
Main: main,
Additionals: nil,
UseProto: useProto,
UseHTTP: useHTTP,
Type: typ,
BatchWait: time.Duration(logsConfig.BatchWait) * time.Second,
// TODO support custom param
BatchMaxConcurrentSend: 0,
Expand All @@ -145,12 +195,12 @@ func NewEndpoints(main logsconfig.Endpoint, useProto bool, useHTTP bool) *logsco
}

// NewEndpointsWithBatchSettings returns a new endpoints composite with non-default batching settings specified
func NewEndpointsWithBatchSettings(main logsconfig.Endpoint, useProto bool, useHTTP bool, batchWait time.Duration, batchMaxConcurrentSend int, batchMaxSize int, batchMaxContentSize int) *logsconfig.Endpoints {
func NewEndpointsWithBatchSettings(main logsconfig.Endpoint, useProto bool, typ string, batchWait time.Duration, batchMaxConcurrentSend int, batchMaxSize int, batchMaxContentSize int) *logsconfig.Endpoints {
return &logsconfig.Endpoints{
Main: main,
Additionals: nil,
UseProto: useProto,
UseHTTP: useHTTP,
Type: typ,
BatchWait: batchWait,
BatchMaxConcurrentSend: batchMaxConcurrentSend,
BatchMaxSize: batchMaxSize,
Expand Down
7 changes: 4 additions & 3 deletions conf/logs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
api_key = "ef4ahfbwzwwtlwfpbertgq1i6mq0ab1q"
## 是否开启日志采集
enable = false
## 接受日志的server地址
## 接受日志的server地址, http/tcp/kafka, 只有kafka支持多个地址(broker)用逗号分割
send_to = "127.0.0.1:17878"
## 发送日志的协议 http/tcp
## 发送日志的协议 http/tcp/kafka
send_type = "http"
topic = "flashcatcloud"
## 是否压缩发送
use_compress = false
## 是否采用ssl
Expand All @@ -32,4 +33,4 @@ collect_container_all = true
## type=file时 path必填,type=journald/tcp/udp时 port必填
path = "/opt/tomcat/logs/*.txt"
source = "tomcat"
service = "my_service"
service = "my_service"
8 changes: 8 additions & 0 deletions config/logs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"github.com/Shopify/sarama"

logsconfig "flashcat.cloud/categraf/config/logs"
)

Expand All @@ -26,6 +28,12 @@ type (
CollectContainerAll bool `json:"collect_container_all" toml:"collect_container_all"`
GlobalProcessingRules []*logsconfig.ProcessingRule `json:"processing_rules" toml:"processing_rules"`
Items []*logsconfig.LogsConfig `json:"items" toml:"items"`
KafkaConfig
}
KafkaConfig struct {
Topic string `json:"topic" toml:"topic"`
Brokers []string `json:"brokers" toml:"brokers"`
*sarama.Config
}
)

Expand Down
6 changes: 4 additions & 2 deletions config/logs/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ const (
EPIntakeVersion2
)

// Endpoint holds all the organization and network parameters to send logs to Datadog.
// Endpoint holds all the organization and network parameters to send logs
type Endpoint struct {
APIKey string `mapstructure:"api_key" json:"api_key"`
Addr string
Topic string
Host string
Port int
UseSSL bool
Expand All @@ -57,7 +59,7 @@ type Endpoints struct {
Main Endpoint
Additionals []Endpoint
UseProto bool
UseHTTP bool
Type string
BatchWait time.Duration
BatchMaxConcurrentSend int
BatchMaxSize int
Expand Down
80 changes: 41 additions & 39 deletions config/logs/integration_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,45 +29,47 @@ const (

// LogsConfig represents a log source config, which can be for instance
// a file to tail or a port to listen to.
type LogsConfig struct {
Type string

Port int // Network
IdleTimeout string `mapstructure:"idle_timeout" json:"idle_timeout"` // Network
Path string // File, Journald

Encoding string `mapstructure:"encoding" json:"encoding"` // File
ExcludePaths []string `mapstructure:"exclude_paths" json:"exclude_paths"` // File
TailingMode string `mapstructure:"start_position" json:"start_position"` // File

IncludeUnits []string `mapstructure:"include_units" json:"include_units"` // Journald
ExcludeUnits []string `mapstructure:"exclude_units" json:"exclude_units"` // Journald
ContainerMode bool `mapstructure:"container_mode" json:"container_mode"` // Journald

Image string // Docker
Label string // Docker
// Name contains the container name
Name string // Docker
// Identifier contains the container ID
Identifier string // Docker

ChannelPath string `mapstructure:"channel_path" json:"channel_path"` // Windows Event
Query string // Windows Event

// used as input only by the Channel tailer.
// could have been unidirectional but the tailer could not close it in this case.
Channel chan *ChannelMessage `json:"-"`

Service string
Source string
SourceCategory string
Tags []string
ProcessingRules []*ProcessingRule `mapstructure:"log_processing_rules" json:"log_processing_rules"`

AutoMultiLine bool `mapstructure:"auto_multi_line_detection" json:"auto_multi_line_detection"`
AutoMultiLineSampleSize int `mapstructure:"auto_multi_line_sample_size" json:"auto_multi_line_sample_size"`
AutoMultiLineMatchThreshold float64 `mapstructure:"auto_multi_line_match_threshold" json:"auto_multi_line_match_threshold"`
}
type (
LogsConfig struct {
Type string

Port int // Network
IdleTimeout string `mapstructure:"idle_timeout" json:"idle_timeout"` // Network
Path string // File, Journald

Encoding string `mapstructure:"encoding" json:"encoding"` // File
ExcludePaths []string `mapstructure:"exclude_paths" json:"exclude_paths"` // File
TailingMode string `mapstructure:"start_position" json:"start_position"` // File

IncludeUnits []string `mapstructure:"include_units" json:"include_units"` // Journald
ExcludeUnits []string `mapstructure:"exclude_units" json:"exclude_units"` // Journald
ContainerMode bool `mapstructure:"container_mode" json:"container_mode"` // Journald

Image string // Docker
Label string // Docker
// Name contains the container name
Name string // Docker
// Identifier contains the container ID
Identifier string // Docker

ChannelPath string `mapstructure:"channel_path" json:"channel_path"` // Windows Event
Query string // Windows Event

// used as input only by the Channel tailer.
// could have been unidirectional but the tailer could not close it in this case.
Channel chan *ChannelMessage `json:"-"`

Service string
Source string
SourceCategory string
Tags []string
ProcessingRules []*ProcessingRule `mapstructure:"log_processing_rules" json:"log_processing_rules"`

AutoMultiLine bool `mapstructure:"auto_multi_line_detection" json:"auto_multi_line_detection"`
AutoMultiLineSampleSize int `mapstructure:"auto_multi_line_sample_size" json:"auto_multi_line_sample_size"`
AutoMultiLineMatchThreshold float64 `mapstructure:"auto_multi_line_match_threshold" json:"auto_multi_line_match_threshold"`
}
)

// TailingMode type
type TailingMode uint8
Expand Down
19 changes: 17 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/ClickHouse/clickhouse-go/v2 v2.0.15
github.com/Shopify/sarama v1.34.0
github.com/chai2010/winsvc v0.0.0-20200705094454-db7ec320025c
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/docker/docker v20.10.16+incompatible
Expand Down Expand Up @@ -31,7 +32,7 @@ require (
github.com/stretchr/testify v1.7.2
github.com/toolkits/pkg v1.3.0
github.com/ulricqin/gosnmp v0.0.1
golang.org/x/net v0.0.0-20210525063256-abc453219eb5
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32
golang.org/x/text v0.3.7
)
Expand All @@ -47,6 +48,9 @@ require (
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/camelcase v1.0.0 // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
Expand All @@ -57,12 +61,21 @@ require (
github.com/godror/knownpb v0.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/hashicorp/serf v0.9.6 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.15.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
Expand All @@ -77,15 +90,17 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
google.golang.org/grpc v1.33.1 // indirect
Expand Down
Loading

0 comments on commit 126767d

Please sign in to comment.