Skip to content

Commit

Permalink
Merge pull request #250 from jzwlqx/bugfix/event
Browse files Browse the repository at this point in the history
enable metrics for packetloss and tcpretrans even if event not enabled
  • Loading branch information
BSWANG authored Apr 12, 2024
2 parents 7cccdb1 + 93dce19 commit ca01d07
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 89 deletions.
4 changes: 0 additions & 4 deletions pkg/exporter/probe/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,6 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error {
}

tuple := toProbeTuple(&key)
if !p.enablePort {
tuple.Dport = 0
tuple.Sport = 0
}

labels := probe.BuildTupleMetricsLabels(tuple)

Expand Down
48 changes: 15 additions & 33 deletions pkg/exporter/probe/tracepacketloss/packetloss.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,20 @@ func (p *packetLossProbe) stop(probeType probe.Type) error {

p.probeConfig[probeType] = nil

if probeType == probe.ProbeTypeEvent {
p.closePerfReader()
}

if p.probeCount() == 0 {
p.cleanup()
}

return nil
}

func (p *packetLossProbe) closePerfReader() {
func (p *packetLossProbe) cleanup() {

if p.perfReader != nil {
p.perfReader.Close()
p.perfReader = nil
}
}

func (p *packetLossProbe) cleanup() {
for _, link := range p.links {
link.Close()
}
Expand All @@ -257,42 +252,29 @@ func (p *packetLossProbe) start(probeType probe.Type, cfg *probeConfig) error {
return fmt.Errorf("%s failed install ebpf: %w", probeName, err)
}

var err error

if probeType == probe.ProbeTypeEvent {
p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{})))
if err != nil {
log.Errorf("%s error create perf reader, err: %v", probeName, err)
return err
}

go p.perfLoop()
}

return nil
}

func (p *packetLossProbe) reinstallBPFLocked() error {
p.closePerfReader()
func (p *packetLossProbe) reinstallBPFLocked() (err error) {
p.cleanup()

if err := p.loadAndAttachBPF(); err != nil {
log.Errorf("%s failed load and attach bpf, err: %v", probeName, err)
p.cleanup()
return err
}

if p.probeConfig[probe.ProbeTypeEvent] != nil {
var err error
p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{})))
defer func() {
if err != nil {
log.Errorf("%s error create perf reader, err: %v", probeName, err)
return err
p.cleanup()
}
}()

go p.perfLoop()
if err = p.loadAndAttachBPF(); err != nil {
return fmt.Errorf("%s failed load and attach bpf, err: %w", probeName, err)
}

p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{})))
if err != nil {
return fmt.Errorf("%s error create perf reader, err: %w", probeName, err)
}

go p.perfLoop()

return nil
}

Expand Down
116 changes: 64 additions & 52 deletions pkg/exporter/probe/tracetcpretrans/tcpretrans.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func metricsProbeCreator() (probe.MetricsProbe, error) {
VariableLabels: probe.TupleMetricsLabels,
SingleMetricsOpts: []probe.SingleMetricsOpts{
{Name: retransTotal, ValueType: prometheus.CounterValue},
{Name: retransFast, ValueType: prometheus.CounterValue},
//{Name: retransFast, ValueType: prometheus.CounterValue},
},
}
batchMetrics := probe.NewBatchMetrics(opts, p.collectOnce)
Expand All @@ -77,15 +77,18 @@ func eventProbeCreator(sink chan<- *probe.Event, _ map[string]interface{}) (prob
return probe.NewEventProbe(probeName, p), nil
}

type probeConfig struct {
}

type metricsProbe struct {
}

func (p *metricsProbe) Start(ctx context.Context) error {
return _tcpRetransProbe.start(ctx, probe.ProbeTypeMetrics)
func (p *metricsProbe) Start(_ context.Context) error {
return _tcpRetransProbe.start(probe.ProbeTypeMetrics, &probeConfig{})
}

func (p *metricsProbe) Stop(ctx context.Context) error {
return _tcpRetransProbe.stop(ctx, probe.ProbeTypeMetrics)
func (p *metricsProbe) Stop(_ context.Context) error {
return _tcpRetransProbe.stop(probe.ProbeTypeMetrics)
}

func (p *metricsProbe) collectOnce(emit probe.Emit) error {
Expand All @@ -98,7 +101,7 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error {

labels := probe.BuildTupleMetricsLabels(&tuple)
emit(retransTotal, labels, float64(counter.Total))
emit(retransFast, labels, float64(counter.Fast))
//emit(retransFast, labels, float64(counter.Fast))
}
return nil
}
Expand All @@ -107,8 +110,8 @@ type eventProbe struct {
sink chan<- *probe.Event
}

func (e *eventProbe) Start(ctx context.Context) error {
err := _tcpRetransProbe.start(ctx, probe.ProbeTypeEvent)
func (e *eventProbe) Start(_ context.Context) error {
err := _tcpRetransProbe.start(probe.ProbeTypeEvent, &probeConfig{})
if err != nil {
return err
}
Expand All @@ -117,8 +120,8 @@ func (e *eventProbe) Start(ctx context.Context) error {
return nil
}

func (e *eventProbe) Stop(ctx context.Context) error {
return _tcpRetransProbe.stop(ctx, probe.ProbeTypeEvent)
func (e *eventProbe) Stop(_ context.Context) error {
return _tcpRetransProbe.stop(probe.ProbeTypeEvent)
}

type Counter struct {
Expand All @@ -127,84 +130,93 @@ type Counter struct {
}

type tcpRetransProbe struct {
objs bpfObjects
links []link.Link
sink chan<- *probe.Event
refcnt [probe.ProbeTypeCount]int
lock sync.Mutex
perfReader *perf.Reader
objs bpfObjects
links []link.Link
sink chan<- *probe.Event
probeConfig [probe.ProbeTypeCount]*probeConfig
lock sync.Mutex
perfReader *perf.Reader

cache *lru.Cache[probe.Tuple, *Counter]
}

func (p *tcpRetransProbe) stop(_ context.Context, probeType probe.Type) error {
func (p *tcpRetransProbe) probeCount() int {
var ret int
for _, cfg := range p.probeConfig {
if cfg != nil {
ret++
}
}
return ret
}
func (p *tcpRetransProbe) stop(probeType probe.Type) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.refcnt[probeType] == 0 {
if p.probeConfig[probeType] == nil {
return fmt.Errorf("probe %s never start", probeType)
}

p.refcnt[probeType]--
p.probeConfig[probeType] = nil

if p.refcnt[probe.ProbeTypeEvent] == 0 {
if p.perfReader != nil {
p.perfReader.Close()
}
if p.probeCount() == 0 {
p.cleanup()
}

if p.totalReferenceCountLocked() == 0 {
return p.cleanup()
}
return nil
}

func (p *tcpRetransProbe) cleanup() error {
func (p *tcpRetransProbe) cleanup() {
if p.perfReader != nil {
p.perfReader.Close()
p.perfReader = nil
}

for _, link := range p.links {
link.Close()
}

p.links = nil

p.objs.Close()

return nil
}

func (p *tcpRetransProbe) totalReferenceCountLocked() int {
var c int
for _, n := range p.refcnt {
c += n
}
return c
}

func (p *tcpRetransProbe) start(ctx context.Context, probeType probe.Type) (err error) {
func (p *tcpRetransProbe) start(probeType probe.Type, cfg *probeConfig) (err error) {
p.lock.Lock()
defer p.lock.Unlock()

if p.refcnt[probeType] != 0 {
if p.probeConfig[probeType] != nil {
return fmt.Errorf("%s(%s) has already started", probeName, probeType)
}

p.refcnt[probeType]++
if p.totalReferenceCountLocked() == 1 {
if err = p.loadAndAttachBPF(); err != nil {
log.Errorf("%s failed load and attach bpf, err: %v", probeName, err)
_ = p.cleanup()
return
}
p.probeConfig[probeType] = cfg

if err := p.reinstallBPFLocked(); err != nil {
return fmt.Errorf("%s failed install ebpf: %w", probeName, err)
}

if p.refcnt[probe.ProbeTypeEvent] == 1 {
p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspTcpRetransEvent, int(unsafe.Sizeof(bpfInspTcpretransEventT{})))
return nil
}

func (p *tcpRetransProbe) reinstallBPFLocked() (err error) {
p.cleanup()

defer func() {
if err != nil {
log.Errorf("%s error create perf reader, err: %v", probeName, err)
_ = p.stop(ctx, probeType)
return
p.cleanup()
}
}()

if err = p.loadAndAttachBPF(); err != nil {
return fmt.Errorf("%s failed load and attach bpf, err: %w", probeName, err)
}

go p.perfLoop()
p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspTcpRetransEvent, int(unsafe.Sizeof(bpfInspTcpretransEventT{})))
if err != nil {
return fmt.Errorf("%s error create perf reader, err: %w", probeName, err)
}

go p.perfLoop()

return nil
}

Expand Down

0 comments on commit ca01d07

Please sign in to comment.