Skip to content

Commit

Permalink
Merge pull request #114 from jzwlqx/concurrency_sink
Browse files Browse the repository at this point in the history
concurrently write events to sink
  • Loading branch information
Lyt99 authored Oct 7, 2023
2 parents 14e1bf3 + 73391cb commit d5bc117
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 15 deletions.
67 changes: 54 additions & 13 deletions pkg/exporter/cmd/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,22 @@ type EventServer struct {
}

func NewEventServer(sinks []sink.Sink) (*EventServer, error) {
var sinkWrappers []*sinkWrapper

done := make(chan struct{})

for _, s := range sinks {
sinkWrappers = append(sinkWrappers, &sinkWrapper{
ch: make(chan *probe.Event, 1024),
s: s,
done: done,
})
}

probeManager := &EventProbeManager{
sinks: sinks,
sinks: sinkWrappers,
sinkChan: make(chan *probe.Event),
done: make(chan struct{}),
done: done,
}

return &EventServer{
Expand All @@ -25,7 +37,7 @@ func NewEventServer(sinks []sink.Sink) (*EventServer, error) {
}

func (s *EventServer) Start(ctx context.Context, probeConfig []ProbeConfig) error {
go s.probeManager.(*EventProbeManager).start()
s.probeManager.(*EventProbeManager).start()
return s.DynamicProbeServer.Start(ctx, probeConfig)
}

Expand All @@ -39,31 +51,60 @@ func (s *EventServer) Stop(ctx context.Context) error {

type EventProbeManager struct {
sinkChan chan *probe.Event
sinks []sink.Sink
sinks []*sinkWrapper
done chan struct{}
}

type sinkWrapper struct {
ch chan *probe.Event
s sink.Sink
done chan struct{}
}

func (m *EventProbeManager) stop() {
log.Infof("probe manager stopped")
close(m.done)
}

func (m *EventProbeManager) start() {
func consume(sw *sinkWrapper) {
loop:
for {
select {
case evt := <-m.sinkChan:
for _, sink := range m.sinks {
//TODO be concurrency
if err := sink.Write(evt); err != nil {
log.Errorf("error sink evt %s", err)
}
case evt := <-sw.ch:
if err := sw.s.Write(evt); err != nil {
log.Errorf("error sink evt %s", err)
}
case <-m.done:
break
case <-sw.done:
break loop
}
}
}

func (m *EventProbeManager) start() {
for _, s := range m.sinks {
go consume(s)
}

go func() {
loop:
for {
select {
case evt := <-m.sinkChan:
for _, sw := range m.sinks {
select {
case sw.ch <- evt:
break
default:
log.Errorf("%s is blocked, discard event.", sw.s)
}
}
case <-m.done:
break loop
}
}
}()
}

func (m *EventProbeManager) CreateProbe(config ProbeConfig) (probe.EventProbe, error) {
return probe.CreateEventProbe(config.Name, m.sinkChan, config.Args)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/exporter/probe/tracebiolatency/tracebiolatency.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (p *BiolatencyProbe) perf() {
Message: fmt.Sprintf("%s %d latency %s", bpfutil.GetCommString(event.Target), event.Pid, bpfutil.GetHumanTimes(event.Latency)),
}

log.Errorf("sink event to channel")
p.sink <- evt
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/exporter/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type FileSink struct {
file *os.File
}

func (f *FileSink) String() string {
return "file"
}

func (f *FileSink) Write(event *probe.Event) error {
data, err := json.Marshal(event)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/exporter/sink/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type LokiSink struct {
client promtail.Client
}

func (l *LokiSink) String() string {
return "loki"
}

func (l *LokiSink) Write(event *probe.Event) error {
data, err := json.Marshal(event)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/exporter/sink/stderr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ func NewStderrSink() *StderrSink {
return &StderrSink{}
}

func (s StderrSink) Write(event *probe.Event) error {
func (s *StderrSink) String() string {
return "stderr"
}

func (s *StderrSink) Write(event *probe.Event) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed marshal event, err: %w", err)
Expand Down

0 comments on commit d5bc117

Please sign in to comment.