Skip to content

Commit

Permalink
Support dynamic config for opentelemetry trace
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd committed Apr 16, 2024
1 parent 78d32bd commit 686c97b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 18 deletions.
26 changes: 26 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/initcore"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/nmq"
Expand Down Expand Up @@ -392,6 +394,30 @@ func (mr *MilvusRoles) Run() {

mr.setupLogger()
tracer.Init()
paramtable.Get().WatchKeyPrefix("trace", config.NewHandler("tracing handler", func(e *config.Event) {
params := paramtable.Get()

exp, err := tracer.CreateTracerExporter(params)
if err != nil {
log.Warn("Init tracer faield", zap.Error(err))
return
}

// close old provider
err = tracer.CloseTracerProvider()
if err != nil {
log.Warn("Close old provider failed, stop reset", zap.Error(err))
return
}

tracer.SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat())
log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))

if paramtable.GetRole() == typeutil.QueryNodeRole || paramtable.GetRole() == typeutil.StandaloneRole {
initcore.InitTraceConfig(params)
log.Info("Reset segcore tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
}
}))

paramtable.SetCreateTime(time.Now())
paramtable.SetUpdateTime(time.Now())
Expand Down
65 changes: 47 additions & 18 deletions pkg/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
Expand All @@ -38,8 +39,52 @@ import (
func Init() {
params := paramtable.Get()

exp, err := CreateTracerExporter(params)
if err != nil {
log.Warn("Init tracer faield", zap.Error(err))
return
}

SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat())
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
log.Info("Init tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
}

func CloseTracerProvider() error {
provider, ok := otel.GetTracerProvider().(*sdk.TracerProvider)
if ok {
err := provider.Shutdown(context.Background())
if err != nil {
return err
}
}
return nil
}

func SetTracerProvider(exp sdk.SpanExporter, traceIDRatio float64) {
if exp == nil {
otel.SetTracerProvider(trace.NewNoopTracerProvider())
return
}

tp := sdk.NewTracerProvider(
sdk.WithBatcher(exp),
sdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(paramtable.GetRole()),
attribute.Int64("NodeID", paramtable.GetNodeID()),
)),
sdk.WithSampler(sdk.ParentBased(
sdk.TraceIDRatioBased(traceIDRatio),
)),
)
otel.SetTracerProvider(tp)
}

func CreateTracerExporter(params *paramtable.ComponentParam) (sdk.SpanExporter, error) {
var exp sdk.SpanExporter
var err error

switch params.TraceCfg.Exporter.GetValue() {
case "jaeger":
exp, err = jaeger.New(jaeger.WithCollectorEndpoint(
Expand All @@ -58,22 +103,6 @@ func Init() {
default:
err = errors.New("Empty Trace")
}
if err != nil {
log.Warn("Init tracer faield", zap.Error(err))
return
}
tp := sdk.NewTracerProvider(
sdk.WithBatcher(exp),
sdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(paramtable.GetRole()),
attribute.Int64("NodeID", paramtable.GetNodeID()),
)),
sdk.WithSampler(sdk.ParentBased(
sdk.TraceIDRatioBased(params.TraceCfg.SampleFraction.GetAsFloat()),
)),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
log.Info("Init tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))

return exp, err
}

0 comments on commit 686c97b

Please sign in to comment.