From 27615893bcbac2cf8dc6bc7107e632c00ad5e0f0 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Mon, 18 Nov 2024 21:17:35 +0800 Subject: [PATCH] fix: Move init kafka pool into once Introduced by #37744 Signed-off-by: Congqi Xia --- pkg/mq/msgstream/mqwrapper/kafka/pool.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/pool.go b/pkg/mq/msgstream/mqwrapper/kafka/pool.go index 6a235c800b6ec..ad82b0b74f76b 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/pool.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/pool.go @@ -34,15 +34,17 @@ var ( ) func initPool() { - pool := conc.NewPool[any]( - hardware.GetCPUNum(), - conc.WithPreAlloc(false), - conc.WithDisablePurge(false), - conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal - ) - - kafkaCPool.Store(pool) - log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) + initOnce.Do(func() { + pool := conc.NewPool[any]( + hardware.GetCPUNum(), + conc.WithPreAlloc(false), + conc.WithDisablePurge(false), + conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + ) + + kafkaCPool.Store(pool) + log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) + }) } // GetSQPool returns the singleton pool instance for search/query operations.