Skip to content

Commit

Permalink
Merge pull request #134 from xiaoziv/input-code-refactor
Browse files Browse the repository at this point in the history
input code refactor
  • Loading branch information
UlricQin authored Aug 1, 2022
2 parents 89248e4 + 13cc06e commit e964b15
Show file tree
Hide file tree
Showing 43 changed files with 50 additions and 200 deletions.
13 changes: 4 additions & 9 deletions agent/metrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,28 @@ func (a *Agent) startMetricsAgent() error {
continue
}

if err = input.Init(); err != nil {
if err = inputs.MayInit(input); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}

if input.GetInstances() != nil {
instances := input.GetInstances()
if len(instances) == 0 {
continue
}

instances := inputs.MayGetInstances(input)
if instances != nil {
empty := true
for i := 0; i < len(instances); i++ {
if err := instances[i].InitInternalConfig(); err != nil {
log.Println("E! failed to init input:", name, "error:", err)
continue
}

if err := instances[i].Init(); err != nil {
if err := inputs.MayInit(instances[i]); err != nil {
if !errors.Is(err, types.ErrInstancesEmpty) {
log.Println("E! failed to init input:", name, "error:", err)
}
continue
}

empty = false
}

Expand Down
8 changes: 4 additions & 4 deletions agent/metrics_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewInputReader(inputName string, in inputs.Input) *InputReader {

func (r *InputReader) Stop() {
r.quitChan <- struct{}{}
r.input.Drop()
inputs.MayDrop(r.input)
}

func (r *InputReader) startInput() {
Expand Down Expand Up @@ -82,10 +82,10 @@ func (r *InputReader) gatherOnce() {

// plugin level, for system plugins
slist := types.NewSampleList()
r.input.Gather(slist)
inputs.MayGather(r.input, slist)
r.forward(r.input.Process(slist))

instances := r.input.GetInstances()
instances := inputs.MayGetInstances(r.input)
if len(instances) == 0 {
return
}
Expand All @@ -106,7 +106,7 @@ func (r *InputReader) gatherOnce() {
}

insList := types.NewSampleList()
ins.Gather(insList)
inputs.MayGather(ins, insList)
r.forward(ins.Process(insList))
}(instances[i])
}
Expand Down
7 changes: 0 additions & 7 deletions inputs/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ func init() {
})
}

// just placeholder
func (c *Conntrack) GetInstances() []inputs.Instance {
return nil
}

func (c *Conntrack) setDefaults() {
if len(c.Dirs) == 0 {
c.Dirs = dfltDirs
Expand All @@ -60,8 +55,6 @@ func (c *Conntrack) Init() error {
return nil
}

func (c *Conntrack) Drop() {}

func (c *Conntrack) Gather(slist *types.SampleList) {
var metricKey string
fields := make(map[string]interface{})
Expand Down
4 changes: 0 additions & 4 deletions inputs/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ func init() {
})
}

func (c *CPUStats) Init() error { return nil }
func (c *CPUStats) Drop() {}
func (c *CPUStats) GetInstances() []inputs.Instance { return nil }

func (c *CPUStats) Gather(slist *types.SampleList) {
times, err := c.ps.CPUTimes(c.CollectPerCPU, true)
if err != nil {
Expand Down
12 changes: 0 additions & 12 deletions inputs/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ func init() {
})
}

// just placeholder
func (s *DiskStats) GetInstances() []inputs.Instance {
return nil
}

func (s *DiskStats) Init() error {
return nil
}

func (s *DiskStats) Drop() {
}

func (s *DiskStats) Gather(slist *types.SampleList) {
disks, partitions, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions inputs/diskio/diskio.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ func init() {
})
}

// just placeholder
func (d *DiskIO) GetInstances() []inputs.Instance {
return nil
}

func (d *DiskIO) Drop() {}

func (d *DiskIO) Init() error {
for _, device := range d.Devices {
if filter.HasMeta(device) {
Expand Down
4 changes: 0 additions & 4 deletions inputs/dns_query/dns_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ func init() {
})
}

func (dq *DnsQuery) Init() error { return nil }
func (dq *DnsQuery) Drop() {}
func (dq *DnsQuery) Gather(slist *types.SampleList) {}

func (dq *DnsQuery) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(dq.Instances))
for i := 0; i < len(dq.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ func init() {
})
}

func (d *Docker) Init() error { return nil }
func (d *Docker) Drop() {}
func (d *Docker) Gather(slist *itypes.SampleList) {}

func (d *Docker) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(d.Instances))
for i := 0; i < len(d.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ func init() {
})
}

func (r *Elasticsearch) Init() error { return nil }
func (r *Elasticsearch) Drop() {}
func (r *Elasticsearch) Gather(slist *types.SampleList) {}

func (r *Elasticsearch) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ func init() {
})
}

func (e *Exec) Init() error { return nil }
func (e *Exec) Drop() {}
func (e *Exec) Gather(slist *types.SampleList) {}

func (e *Exec) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(e.Instances))
for i := 0; i < len(e.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/http_response/http_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ func init() {
})
}

func (h *HTTPResponse) Init() error { return nil }
func (h *HTTPResponse) Drop() {}
func (h *HTTPResponse) Gather(slist *types.SampleList) {}

func (h *HTTPResponse) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(h.Instances))
for i := 0; i < len(h.Instances); i++ {
Expand Down
50 changes: 42 additions & 8 deletions inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,53 @@ import (
"flashcat.cloud/categraf/types"
)

type Initializer interface {
Init() error
}

type SampleGatherer interface {
Gather(*types.SampleList)
}

type Dropper interface {
Drop()
}

type InstancesGetter interface {
GetInstances() []Instance
}

func MayInit(t interface{}) error {
if initializer, ok := t.(Initializer); ok {
return initializer.Init()
}
return nil
}

func MayGather(t interface{}, slist *types.SampleList) {
if gather, ok := t.(SampleGatherer); ok {
gather.Gather(slist)
}
}

func MayDrop(t interface{}) {
if dropper, ok := t.(Dropper); ok {
dropper.Drop()
}
}

func MayGetInstances(t interface{}) []Instance {
if instancesGetter, ok := t.(InstancesGetter); ok {
return instancesGetter.GetInstances()
}
return nil
}

type Input interface {
GetLabels() map[string]string
GetInterval() config.Duration
InitInternalConfig() error
Process(*types.SampleList) *types.SampleList

Init() error
Drop()
Gather(*types.SampleList)
GetInstances() []Instance
}

type Creator func() Input
Expand All @@ -30,7 +67,4 @@ type Instance interface {
GetIntervalTimes() int64
InitInternalConfig() error
Process(*types.SampleList) *types.SampleList

Init() error
Gather(*types.SampleList)
}
4 changes: 0 additions & 4 deletions inputs/jolokia_agent/jolokia_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ func init() {
})
}

func (r *JolokiaAgent) Init() error { return nil }
func (r *JolokiaAgent) Drop() {}
func (r *JolokiaAgent) Gather(slist *types.SampleList) {}

func (r *JolokiaAgent) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/jolokia_proxy/jolokia_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ func init() {
})
}

func (r *JolokiaProxy) Init() error { return nil }
func (r *JolokiaProxy) Drop() {}
func (r *JolokiaProxy) Gather(slist *types.SampleList) {}

func (r *JolokiaProxy) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
Expand Down
3 changes: 0 additions & 3 deletions inputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ func init() {
})
}

func (r *Kafka) Init() error { return nil }
func (r *Kafka) Gather(slist *types.SampleList) {}

func (r *Kafka) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/kernel/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func init() {
})
}

func (s *KernelStats) Init() error { return nil }
func (s *KernelStats) Drop() {}
func (s *KernelStats) GetInstances() []inputs.Instance { return nil }

func (s *KernelStats) Gather(slist *types.SampleList) {
data, err := s.getProcStat()
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions inputs/kernel_vmstat/kernel_vmstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ func init() {
})
}

func (s *KernelVmstat) Init() error { return nil }
func (s *KernelVmstat) Drop() {}
func (s *KernelVmstat) GetInstances() []inputs.Instance { return nil }

func (s *KernelVmstat) Gather(slist *types.SampleList) {
data, err := s.getProcVmstat()
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions inputs/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ func init() {
})
}

func (k *Kubernetes) Init() error { return nil }
func (k *Kubernetes) Drop() {}
func (k *Kubernetes) Gather(slist *types.SampleList) {}

func (k *Kubernetes) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(k.Instances))
for i := 0; i < len(k.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/linux_sysctl_fs/linux_sysctl_fs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ func init() {
})
}

func (s *SysctlFS) Init() error { return nil }
func (s *SysctlFS) Drop() {}
func (s *SysctlFS) GetInstances() []inputs.Instance { return nil }

func (s *SysctlFS) Gather(slist *types.SampleList) {
fields := map[string]interface{}{}

Expand Down
4 changes: 0 additions & 4 deletions inputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ func init() {
})
}

func (l *Logstash) Init() error { return nil }
func (l *Logstash) Drop() {}
func (l *Logstash) Gather(slist *types.SampleList) {}

func (l *Logstash) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(l.Instances))
for i := 0; i < len(l.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ func init() {
})
}

func (s *MemStats) Init() error { return nil }
func (s *MemStats) Drop() {}
func (s *MemStats) GetInstances() []inputs.Instance { return nil }

func (s *MemStats) Gather(slist *types.SampleList) {
vm, err := s.ps.VMStat()
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions inputs/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ func init() {
})
}

func (r *MongoDB) Init() error { return nil }
func (r *MongoDB) Gather(slist *types.SampleList) {}

func (r *MongoDB) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
Expand Down
4 changes: 0 additions & 4 deletions inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ func init() {
})
}

func (m *MySQL) Init() error { return nil }
func (m *MySQL) Drop() {}
func (m *MySQL) Gather(slist *types.SampleList) {}

func (m *MySQL) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(m.Instances))
for i := 0; i < len(m.Instances); i++ {
Expand Down
3 changes: 0 additions & 3 deletions inputs/net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ func init() {
})
}

func (s *NetIOStats) Drop() {}
func (s *NetIOStats) GetInstances() []inputs.Instance { return nil }

func (s *NetIOStats) Init() error {
var err error

Expand Down
4 changes: 0 additions & 4 deletions inputs/net_response/net_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ func init() {
})
}

func (n *NetResponse) Init() error { return nil }
func (n *NetResponse) Drop() {}
func (n *NetResponse) Gather(slist *types.SampleList) {}

func (n *NetResponse) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(n.Instances))
for i := 0; i < len(n.Instances); i++ {
Expand Down
Loading

0 comments on commit e964b15

Please sign in to comment.