Skip to content

Commit

Permalink
Merge pull request moby#4756 from jsternberg/trace-recorder-refactor
Browse files Browse the repository at this point in the history
tracing: refactor the trace recorder
  • Loading branch information
tonistiigi authored Mar 29, 2024
2 parents 5e0fe27 + 0e1cf1c commit 69ad98e
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 71 deletions.
13 changes: 3 additions & 10 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -158,14 +156,9 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge {
}

func (s *Solver) recordBuildHistory(ctx context.Context, id string, req frontend.SolveRequest, exp ExporterRequest, j *solver.Job, usage *resources.SysSampler) (func(*Result, []exporter.DescriptorReference, error) error, error) {
var stopTrace func() []tracetest.SpanStub

if s := trace.SpanFromContext(ctx); s.SpanContext().IsValid() {
if exp, _, err := detect.Exporter(); err == nil {
if rec, ok := exp.(*detect.TraceRecorder); ok {
stopTrace = rec.Record(s.SpanContext().TraceID())
}
}
stopTrace, err := detect.Recorder.Record(ctx)
if err != nil {
return nil, err
}

st := time.Now()
Expand Down
38 changes: 12 additions & 26 deletions util/tracing/detect/detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type detector struct {
}

var ServiceName string
var Recorder *TraceRecorder

var detectors map[string]detector
var once sync.Once
Expand Down Expand Up @@ -116,45 +115,32 @@ func detectExporter[T any](envVar string, fn func(d ExporterDetector) (T, bool,
return exp, nil
}

func getExporters() (sdktrace.SpanExporter, sdkmetric.Exporter, error) {
texp, mexp, err := detectExporters()
if err != nil {
return nil, nil, err
}

if Recorder != nil {
Recorder.SpanExporter = texp
texp = Recorder
}

return texp, mexp, nil
}

func detect() error {
tp = noop.NewTracerProvider()
mp = sdkmetric.NewMeterProvider()

texp, mexp, err := getExporters()
texp, mexp, err := detectExporters()
if err != nil || (texp == nil && mexp == nil) {
return err
}

res := Resource()

// enable log with traceID when valid exporter
if texp != nil {
if texp != nil || Recorder != nil {
// enable log with traceID when a valid exporter is used
bklog.EnableLogWithTraceID(true)

sp := sdktrace.NewBatchSpanProcessor(texp)

sdktpopts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(res),
}
if texp != nil {
sdktpopts = append(sdktpopts, sdktrace.WithBatcher(texp))
}
if Recorder != nil {
Recorder.flush = sp.ForceFlush
sp := sdktrace.NewSimpleSpanProcessor(Recorder)
sdktpopts = append(sdktpopts, sdktrace.WithSpanProcessor(sp))
}

sdktp := sdktrace.NewTracerProvider(
sdktrace.WithSpanProcessor(sp),
sdktrace.WithResource(res),
)
sdktp := sdktrace.NewTracerProvider(sdktpopts...)
closers = append(closers, sdktp.Shutdown)

exporter.SpanExporter = texp
Expand Down
119 changes: 84 additions & 35 deletions util/tracing/detect/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,31 @@ import (
"sync"
"time"

"github.com/pkg/errors"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
)

var Recorder *TraceRecorder

type TraceRecorder struct {
sdktrace.SpanExporter
// sem is a binary semaphore for this struct.
// This is used instead of sync.Mutex because it allows
// for context cancellation to work properly.
sem *semaphore.Weighted

// shutdown function for the gc.
shutdownGC func(err error)

// done channel that marks when background goroutines
// are closed.
done chan struct{}

mu sync.Mutex
// track traces and listeners for traces.
m map[trace.TraceID]*stubs
listeners map[trace.TraceID]int
flush func(context.Context) error
}

type stubs struct {
Expand All @@ -26,37 +39,52 @@ type stubs struct {

func NewTraceRecorder() *TraceRecorder {
tr := &TraceRecorder{
sem: semaphore.NewWeighted(1),
done: make(chan struct{}),
m: map[trace.TraceID]*stubs{},
listeners: map[trace.TraceID]int{},
}

go func() {
t := time.NewTimer(60 * time.Second)
for {
<-t.C
tr.gc()
t.Reset(50 * time.Second)
}
}()
ctx, cancel := context.WithCancelCause(context.Background())
go tr.gcLoop(ctx)
tr.shutdownGC = cancel

return tr
}

func (r *TraceRecorder) Record(traceID trace.TraceID) func() []tracetest.SpanStub {
r.mu.Lock()
defer r.mu.Unlock()
// Record signals to the TraceRecorder that it should track spans associated with the current
// trace and returns a function that will return these spans.
//
// If the TraceRecorder is nil or there is no valid active span, the returned function
// will be nil to signal that the trace cannot be recorded.
func (r *TraceRecorder) Record(ctx context.Context) (func() []tracetest.SpanStub, error) {
if r == nil {
return nil, nil
}

spanCtx := trace.SpanContextFromContext(ctx)
if !spanCtx.IsValid() {
return nil, nil
}

if err := r.sem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer r.sem.Release(1)

traceID := spanCtx.TraceID()
r.listeners[traceID]++
var once sync.Once
var spans []tracetest.SpanStub

var (
once sync.Once
spans []tracetest.SpanStub
)
return func() []tracetest.SpanStub {
once.Do(func() {
if r.flush != nil {
r.flush(context.TODO())
if err := r.sem.Acquire(context.Background(), 1); err != nil {
return
}

r.mu.Lock()
defer r.mu.Unlock()
defer r.sem.Release(1)

if v, ok := r.m[traceID]; ok {
spans = v.spans
Expand All @@ -67,26 +95,46 @@ func (r *TraceRecorder) Record(traceID trace.TraceID) func() []tracetest.SpanStu
}
})
return spans
}, nil
}

func (r *TraceRecorder) gcLoop(ctx context.Context) {
defer close(r.done)

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
r.gc(ctx, now)
}
}
}

func (r *TraceRecorder) gc() {
r.mu.Lock()
defer r.mu.Unlock()
func (r *TraceRecorder) gc(ctx context.Context, now time.Time) {
if err := r.sem.Acquire(ctx, 1); err != nil {
return
}
defer r.sem.Release(1)

now := time.Now()
for k, s := range r.m {
if _, ok := r.listeners[k]; ok {
continue
}
if now.Sub(s.last) > 60*time.Second {
if now.Sub(s.last) > time.Minute {
delete(r.m, k)
}
}
}

func (r *TraceRecorder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
r.mu.Lock()
if err := r.sem.Acquire(ctx, 1); err != nil {
return err
}
defer r.sem.Release(1)

now := time.Now()
for _, s := range spans {
Expand All @@ -99,17 +147,18 @@ func (r *TraceRecorder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOn
v.last = now
v.spans = append(v.spans, ss)
}
r.mu.Unlock()

if r.SpanExporter == nil {
return nil
}
return r.SpanExporter.ExportSpans(ctx, spans)
return nil
}

func (r *TraceRecorder) Shutdown(ctx context.Context) error {
if r.SpanExporter == nil {
// Initiate the shutdown of the gc loop.
r.shutdownGC(errors.WithStack(context.Canceled))

// Wait for it to be done or the context is canceled.
select {
case <-r.done:
return nil
case <-ctx.Done():
return context.Cause(ctx)
}
return r.SpanExporter.Shutdown(ctx)
}

0 comments on commit 69ad98e

Please sign in to comment.