Skip to content

Commit

Permalink
enhance: Mark cgo thread with tag name (milvus-io#38000)
Browse files Browse the repository at this point in the history
Related to milvus-io#37999

This PR add `SetThreadName` API for marking cgo thread and utilize it
when initializing cgo worker.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Nov 26, 2024
1 parent 797db3d commit c99a368
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
10 changes: 10 additions & 0 deletions internal/core/src/segcore/segcore_init_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include "pthread.h"
#include "config/ConfigKnowhere.h"
#include "fmt/core.h"
#include "log/Log.h"
Expand Down Expand Up @@ -105,4 +106,13 @@ GetMinimalIndexVersion() {
return milvus::config::GetMinimalIndexVersion();
}

extern "C" void
SetThreadName(const char* name) {
#ifdef __linux__
pthread_setname_np(pthread_self(), name);
#elif __APPLE__
pthread_setname_np(name);
#endif
}

} // namespace milvus::segcore
3 changes: 3 additions & 0 deletions internal/core/src/segcore/segcore_init_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ GetCurrentIndexVersion();
int32_t
GetMinimalIndexVersion();

void
SetThreadName(const char*);

#ifdef __cplusplus
}
#endif
43 changes: 36 additions & 7 deletions internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@

package segments

/*
#cgo pkg-config: milvus_core
#include <stdlib.h>
#include <stdint.h>
#include "common/init_c.h"
#include "segcore/segcore_init_c.h"
*/
import "C"

import (
"context"
"math"
Expand Down Expand Up @@ -51,6 +61,12 @@ var (

bfPool atomic.Pointer[conc.Pool[any]]
bfApplyOnce sync.Once

// intentionally leaked CGO tag names
cgoTagSQ = C.CString("CGO_SQ")
cgoTagLoad = C.CString("CGO_LOAD")
cgoTagDynamic = C.CString("CGO_DYN")
cgoTagWarmup = C.CString("CGO_WARMUP")
)

// initSQPool initialize
Expand All @@ -63,7 +79,10 @@ func initSQPool() {
conc.WithPreAlloc(false), // pre alloc must be false to resize pool dynamically, use warmup to alloc worker here
conc.WithDisablePurge(true),
)
conc.WarmupPool(pool, runtime.LockOSThread)
conc.WarmupPool(pool, func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagSQ)
})
sqp.Store(pool)

pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("qn.sqpool.maxconc", ResizeSQPool))
Expand All @@ -74,15 +93,19 @@ func initSQPool() {

func initDynamicPool() {
dynOnce.Do(func() {
size := hardware.GetCPUNum()
pool := conc.NewPool[any](
hardware.GetCPUNum(),
size,
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithPreHandler(func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagDynamic)
}), // lock os thread for cgo thread disposal
)

dp.Store(pool)
log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum()))
log.Info("init dynamicPool done", zap.Int("size", size))
})
}

Expand All @@ -94,7 +117,10 @@ func initLoadPool() {
poolSize,
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithPreHandler(func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagLoad)
}), // lock os thread for cgo thread disposal
)

loadPool.Store(pool)
Expand All @@ -112,8 +138,11 @@ func initWarmupPool() {
poolSize,
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithNonBlocking(true), // make warming up non blocking
conc.WithPreHandler(func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagWarmup)
}), // lock os thread for cgo thread disposal
conc.WithNonBlocking(true), // make warming up non blocking
)

warmupPool.Store(pool)
Expand Down

0 comments on commit c99a368

Please sign in to comment.