Skip to content

Commit

Permalink
feat:less goroutines by conc
Browse files Browse the repository at this point in the history
Signed-off-by: FanOne <[email protected]>
  • Loading branch information
CocaineCong committed Jun 12, 2024
1 parent 415186c commit efd1413
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions pkg/util/distance/calc_distance.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package distance

import (
Expand All @@ -6,7 +22,8 @@ import (
"sync"

"github.com/cockroachdb/errors"
"github.com/panjf2000/ants/v2"

"github.com/milvus-io/milvus/pkg/util/conc"
)

/**
Expand Down Expand Up @@ -132,26 +149,18 @@ func CalcFloatDistance(dim int64, left, right []float32, metric string) ([]float
distArray := make([]float32, leftNum*rightNum)

// Multi-threads to calculate distance.
var waitGroup sync.WaitGroup
calcWorker := func(index interface{}) {
i := index.(int64)
CalcFFBatch(dim, left, i, right, metricUpper, &distArray)
waitGroup.Done()
}
waitGroup := new(sync.WaitGroup)
// avoid too many goroutines by ants pool
poolSize := runtime.NumCPU()
pool, err := conc.NewPoolWithFunc(poolSize, calcWorker)
if err != nil {
return nil, err
}
pool := conc.NewDefaultPool[any]()
defer pool.Release()

for i := int64(0); i < leftNum; i++ {
waitGroup.Add(1)
err = pool.Invoke(i)
if err != nil {
return nil, err
}
index := i
pool.Submit(func() (r any, err error) {
CalcFFBatch(dim, left, index, right, metricUpper, &distArray)
waitGroup.Done()
return
})
}
waitGroup.Wait()

Expand Down

0 comments on commit efd1413

Please sign in to comment.