From 73391cbe72eb74498f1ac1791a3dfa5a4f6e61fc Mon Sep 17 00:00:00 2001 From: "jizhong.jiangjz" Date: Sat, 7 Oct 2023 17:44:11 +0800 Subject: [PATCH] concurrently write events to sink --- pkg/exporter/cmd/eventserver.go | 67 +++++++++++++++---- .../probe/tracebiolatency/tracebiolatency.go | 1 - pkg/exporter/sink/file.go | 4 ++ pkg/exporter/sink/loki.go | 4 ++ pkg/exporter/sink/stderr.go | 6 +- 5 files changed, 67 insertions(+), 15 deletions(-) diff --git a/pkg/exporter/cmd/eventserver.go b/pkg/exporter/cmd/eventserver.go index 1e5fb689..14349f7a 100644 --- a/pkg/exporter/cmd/eventserver.go +++ b/pkg/exporter/cmd/eventserver.go @@ -13,10 +13,22 @@ type EventServer struct { } func NewEventServer(sinks []sink.Sink) (*EventServer, error) { + var sinkWrappers []*sinkWrapper + + done := make(chan struct{}) + + for _, s := range sinks { + sinkWrappers = append(sinkWrappers, &sinkWrapper{ + ch: make(chan *probe.Event, 1024), + s: s, + done: done, + }) + } + probeManager := &EventProbeManager{ - sinks: sinks, + sinks: sinkWrappers, sinkChan: make(chan *probe.Event), - done: make(chan struct{}), + done: done, } return &EventServer{ @@ -25,7 +37,7 @@ func NewEventServer(sinks []sink.Sink) (*EventServer, error) { } func (s *EventServer) Start(ctx context.Context, probeConfig []ProbeConfig) error { - go s.probeManager.(*EventProbeManager).start() + s.probeManager.(*EventProbeManager).start() return s.DynamicProbeServer.Start(ctx, probeConfig) } @@ -39,31 +51,60 @@ func (s *EventServer) Stop(ctx context.Context) error { type EventProbeManager struct { sinkChan chan *probe.Event - sinks []sink.Sink + sinks []*sinkWrapper done chan struct{} } +type sinkWrapper struct { + ch chan *probe.Event + s sink.Sink + done chan struct{} +} + func (m *EventProbeManager) stop() { log.Infof("probe manager stopped") close(m.done) } -func (m *EventProbeManager) start() { +func consume(sw *sinkWrapper) { +loop: for { select { - case evt := <-m.sinkChan: - for _, sink := range m.sinks { - //TODO be concurrency - if err := sink.Write(evt); err != nil { - log.Errorf("error sink evt %s", err) - } + case evt := <-sw.ch: + if err := sw.s.Write(evt); err != nil { + log.Errorf("error sink evt %s", err) } - case <-m.done: - break + case <-sw.done: + break loop } } } +func (m *EventProbeManager) start() { + for _, s := range m.sinks { + go consume(s) + } + + go func() { + loop: + for { + select { + case evt := <-m.sinkChan: + for _, sw := range m.sinks { + select { + case sw.ch <- evt: + break + default: + log.Errorf("%s is blocked, discard event.", sw.s) + } + } + case <-m.done: + break loop + } + } + }() +} + func (m *EventProbeManager) CreateProbe(config ProbeConfig) (probe.EventProbe, error) { return probe.CreateEventProbe(config.Name, m.sinkChan, config.Args) } diff --git a/pkg/exporter/probe/tracebiolatency/tracebiolatency.go b/pkg/exporter/probe/tracebiolatency/tracebiolatency.go index d6a8140d..6200e6cc 100644 --- a/pkg/exporter/probe/tracebiolatency/tracebiolatency.go +++ b/pkg/exporter/probe/tracebiolatency/tracebiolatency.go @@ -98,7 +98,6 @@ func (p *BiolatencyProbe) perf() { Message: fmt.Sprintf("%s %d latency %s", bpfutil.GetCommString(event.Target), event.Pid, bpfutil.GetHumanTimes(event.Latency)), } - log.Errorf("sink event to channel") p.sink <- evt } } diff --git a/pkg/exporter/sink/file.go b/pkg/exporter/sink/file.go index d8ff80a4..8647881e 100644 --- a/pkg/exporter/sink/file.go +++ b/pkg/exporter/sink/file.go @@ -23,6 +23,10 @@ type FileSink struct { file *os.File } +func (f *FileSink) String() string { + return "file" +} + func (f *FileSink) Write(event *probe.Event) error { data, err := json.Marshal(event) if err != nil { diff --git a/pkg/exporter/sink/loki.go b/pkg/exporter/sink/loki.go index 97eb5c07..07a05f49 100644 --- a/pkg/exporter/sink/loki.go +++ b/pkg/exporter/sink/loki.go @@ -62,6 +62,10 @@ type LokiSink struct { client promtail.Client } +func (l *LokiSink) String() string { + return "loki" +} + func (l *LokiSink) Write(event *probe.Event) error { data, err := json.Marshal(event) if err != nil { diff --git a/pkg/exporter/sink/stderr.go b/pkg/exporter/sink/stderr.go index f1afd22c..eb9f729d 100644 --- a/pkg/exporter/sink/stderr.go +++ b/pkg/exporter/sink/stderr.go @@ -15,7 +15,11 @@ func NewStderrSink() *StderrSink { return &StderrSink{} } -func (s StderrSink) Write(event *probe.Event) error { +func (s *StderrSink) String() string { + return "stderr" +} + +func (s *StderrSink) Write(event *probe.Event) error { data, err := json.Marshal(event) if err != nil { return fmt.Errorf("failed marshal event, err: %w", err)