Skip to content

Commit

Permalink
fix: [2.4] Wrap init segcore tracing with golang timeout (#33494) (#3…
Browse files Browse the repository at this point in the history
…4191)

Cherry-pick from master
pr: #33494
See also #33483

Wrap `C.InitTrace` & `C.SetTrace` with timeout preventing otlp
initializtion hangs forever when endpoint is not set correctly

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jun 26, 2024
1 parent 18a0efe commit 07a05db
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 9 deletions.
4 changes: 2 additions & 2 deletions internal/core/src/common/Tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ initTelemetry(const TraceConfig& cfg) {
opts.transport_format = jaeger::TransportFormat::kThriftHttp;
opts.endpoint = cfg.jaegerURL;
exporter = jaeger::JaegerExporterFactory::Create(opts);
LOG_INFO("init jaeger exporter, endpoint:", opts.endpoint);
LOG_INFO("init jaeger exporter, endpoint: {}", opts.endpoint);
} else if (cfg.exporter == "otlp") {
auto opts = otlp::OtlpGrpcExporterOptions{};
opts.endpoint = cfg.otlpEndpoint;
opts.use_ssl_credentials = cfg.oltpSecure;
exporter = otlp::OtlpGrpcExporterFactory::Create(opts);
LOG_INFO("init otlp exporter, endpoint:", opts.endpoint);
LOG_INFO("init otlp exporter, endpoint: {}", opts.endpoint);
} else {
LOG_INFO("Empty Trace");
enable_trace = false;
Expand Down
35 changes: 33 additions & 2 deletions internal/util/initcore/init_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import "C"
import (
"fmt"
"path"
"time"
"unsafe"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -62,7 +63,13 @@ func InitTraceConfig(params *paramtable.ComponentParam) {
otlpEndpoint: endpoint,
nodeID: nodeID,
}
C.InitTrace(&config)
// oltp grpc may hangs forever, add timeout logic at go side
timeout := params.TraceCfg.InitTimeoutSeconds.GetAsDuration(time.Second)
callWithTimeout(func() {
C.InitTrace(&config)
}, func() {
panic("init segcore tracing timeout, See issue #33483")
}, timeout)
}

func ResetTraceConfig(params *paramtable.ComponentParam) {
Expand All @@ -82,7 +89,31 @@ func ResetTraceConfig(params *paramtable.ComponentParam) {
otlpEndpoint: endpoint,
nodeID: nodeID,
}
C.SetTrace(&config)

// oltp grpc may hangs forever, add timeout logic at go side
timeout := params.TraceCfg.InitTimeoutSeconds.GetAsDuration(time.Second)
callWithTimeout(func() {
C.SetTrace(&config)
}, func() {
panic("set segcore tracing timeout, See issue #33483")
}, timeout)
}

func callWithTimeout(fn func(), timeoutHandler func(), timeout time.Duration) {
if timeout > 0 {
ch := make(chan struct{})
go func() {
defer close(ch)
fn()
}()
select {
case <-ch:
case <-time.After(timeout):
timeoutHandler()
}
} else {
fn()
}
}

func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
Expand Down
16 changes: 16 additions & 0 deletions internal/util/initcore/init_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package initcore
import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand All @@ -29,3 +31,17 @@ func TestTracer(t *testing.T) {
paramtable.Get().Save(paramtable.Get().TraceCfg.Exporter.Key, "stdout")
ResetTraceConfig(paramtable.Get())
}

func TestOtlpHang(t *testing.T) {
paramtable.Init()
InitTraceConfig(paramtable.Get())

paramtable.Get().Save(paramtable.Get().TraceCfg.Exporter.Key, "otlp")
paramtable.Get().Save(paramtable.Get().TraceCfg.InitTimeoutSeconds.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().TraceCfg.Exporter.Key)
defer paramtable.Get().Reset(paramtable.Get().TraceCfg.InitTimeoutSeconds.Key)

assert.Panics(t, func() {
ResetTraceConfig(paramtable.Get())
})
}
20 changes: 15 additions & 5 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,11 +777,12 @@ func (t *gpuConfig) init(base *BaseTable) {
}

type traceConfig struct {
Exporter ParamItem `refreshable:"false"`
SampleFraction ParamItem `refreshable:"false"`
JaegerURL ParamItem `refreshable:"false"`
OtlpEndpoint ParamItem `refreshable:"false"`
OtlpSecure ParamItem `refreshable:"false"`
Exporter ParamItem `refreshable:"false"`
SampleFraction ParamItem `refreshable:"false"`
JaegerURL ParamItem `refreshable:"false"`
OtlpEndpoint ParamItem `refreshable:"false"`
OtlpSecure ParamItem `refreshable:"false"`
InitTimeoutSeconds ParamItem `refreshable:"false"`
}

func (t *traceConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -829,6 +830,15 @@ Fractions >= 1 will always sample. Fractions < 0 are treated as zero.`,
Export: true,
}
t.OtlpSecure.Init(base.mgr)

t.InitTimeoutSeconds = ParamItem{
Key: "trace.initTimeoutSeconds",
Version: "2.4.4",
DefaultValue: "10",
Export: true,
Doc: "segcore initialization timeout in seconds, preventing otlp grpc hangs forever",
}
t.InitTimeoutSeconds.Init(base.mgr)
}

type logConfig struct {
Expand Down

0 comments on commit 07a05db

Please sign in to comment.