From efd141366d25028c55f5d876d4ac3f5c7dbb6918 Mon Sep 17 00:00:00 2001 From: FanOne <294350394@qq.com> Date: Thu, 13 Jun 2024 00:55:48 +0800 Subject: [PATCH] feat:less goroutines by conc Signed-off-by: FanOne <294350394@qq.com> --- pkg/util/distance/calc_distance.go | 43 ++++++++++++++++++------------ 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/pkg/util/distance/calc_distance.go b/pkg/util/distance/calc_distance.go index 95900f86514d5..0ecd99e3f15ae 100644 --- a/pkg/util/distance/calc_distance.go +++ b/pkg/util/distance/calc_distance.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package distance import ( @@ -6,7 +22,8 @@ import ( "sync" "github.com/cockroachdb/errors" - "github.com/panjf2000/ants/v2" + + "github.com/milvus-io/milvus/pkg/util/conc" ) /** @@ -132,26 +149,18 @@ func CalcFloatDistance(dim int64, left, right []float32, metric string) ([]float distArray := make([]float32, leftNum*rightNum) // Multi-threads to calculate distance. - var waitGroup sync.WaitGroup - calcWorker := func(index interface{}) { - i := index.(int64) - CalcFFBatch(dim, left, i, right, metricUpper, &distArray) - waitGroup.Done() - } + waitGroup := new(sync.WaitGroup) // avoid too many goroutines by ants pool - poolSize := runtime.NumCPU() - pool, err := conc.NewPoolWithFunc(poolSize, calcWorker) - if err != nil { - return nil, err - } + pool := conc.NewDefaultPool[any]() defer pool.Release() - for i := int64(0); i < leftNum; i++ { waitGroup.Add(1) - err = pool.Invoke(i) - if err != nil { - return nil, err - } + index := i + pool.Submit(func() (r any, err error) { + CalcFFBatch(dim, left, index, right, metricUpper, &distArray) + waitGroup.Done() + return + }) } waitGroup.Wait()