Skip to content

Commit

Permalink
enhance: [2.4] Add go-deadlock as unittest only dependency (milvus-io…
Browse files Browse the repository at this point in the history
…#33063) (milvus-io#34322)

Cherry-pick from master
pr: milvus-io#33063
See also milvus-io#33062

This PR:

- Add lock.RWMutex & lock.Mutex alias to switch implementation based on
build flags
- When build flags has test in it, use go-deadlock to detect possible
deadlocks
- Replace all sync.RWMutex & sync.Mutex in datacoord pkg

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jul 2, 2024
1 parent 9958606 commit 6b348e4
Show file tree
Hide file tree
Showing 26 changed files with 100 additions and 87 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ require (
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1
github.com/valyala/fastjson v1.6.4
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -235,6 +234,7 @@ require (
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/analyze_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package datacoord
import (
"context"
"fmt"
"sync"

"github.com/golang/protobuf/proto"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

type analyzeMeta struct {
sync.RWMutex
lock.RWMutex

ctx context.Context
catalog metastore.DataCoordCatalog
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package datacoord
import (
"context"
"fmt"
"sync"
"time"

"github.com/samber/lo"
Expand All @@ -33,13 +32,14 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
)

// ChannelManagerImpl manages the allocation and the balance between channels and data nodes.
type ChannelManagerImpl struct {
ctx context.Context
mu sync.RWMutex
mu lock.RWMutex
h Handler
store RWChannelStore
factory ChannelPolicyFactory
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/channel_manager_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -62,7 +63,7 @@ type SubCluster interface {

type ChannelManagerImplV2 struct {
cancel context.CancelFunc
mu sync.RWMutex
mu lock.RWMutex
wg sync.WaitGroup

h Handler
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -182,7 +183,7 @@ func (c *ClusterImpl) DropImport(nodeID int64, in *datapb.DropImportRequest) err
func (c *ClusterImpl) QuerySlots() map[int64]int64 {
nodeIDs := c.sessionManager.GetSessionIDs()
nodeSlots := make(map[int64]int64)
mu := &sync.Mutex{}
mu := &lock.Mutex{}
wg := &sync.WaitGroup{}
for _, nodeID := range nodeIDs {
wg.Add(1)
Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ package datacoord

import (
"context"
"sync"

"github.com/golang/protobuf/proto"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

type compactionTaskMeta struct {
sync.RWMutex
lock.RWMutex
ctx context.Context
catalog metastore.DataCoordCatalog
// currently only clustering compaction task is stored in persist meta
Expand All @@ -39,7 +39,7 @@ type compactionTaskMeta struct {

func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*compactionTaskMeta, error) {
csm := &compactionTaskMeta{
RWMutex: sync.RWMutex{},
RWMutex: lock.RWMutex{},
ctx: ctx,
catalog: catalog,
compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0),
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -75,7 +76,7 @@ type compactionTrigger struct {
signals chan *compactionSignal
compactionHandler compactionPlanContext
globalTrigger *time.Ticker
forceMu sync.Mutex
forceMu lock.Mutex
quit chan struct{}
wg sync.WaitGroup

Expand Down
8 changes: 4 additions & 4 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"path"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -51,6 +50,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down Expand Up @@ -361,7 +361,7 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta
indexID = UniqueID(400)
)
return &meta{
RWMutex: sync.RWMutex{},
RWMutex: lock.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
Expand Down Expand Up @@ -476,7 +476,7 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
},
}
meta := &meta{
RWMutex: sync.RWMutex{},
RWMutex: lock.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
Expand Down Expand Up @@ -641,7 +641,7 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta
},
}
meta := &meta{
RWMutex: sync.RWMutex{},
RWMutex: lock.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
Expand Down
5 changes: 2 additions & 3 deletions internal/datacoord/import_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package datacoord

import (
"sync"

"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/util/lock"
)

type ImportMeta interface {
Expand All @@ -37,7 +36,7 @@ type ImportMeta interface {
}

type importMeta struct {
mu sync.RWMutex // guards jobs and tasks
mu lock.RWMutex // guards jobs and tasks
jobs map[int64]ImportJob
tasks map[int64]ImportTask

Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/import_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/lock"
)

const (
Expand Down Expand Up @@ -147,7 +148,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 {
return s.info.NodeID
})
nodeSlots := make(map[int64]int64)
mu := &sync.Mutex{}
mu := &lock.Mutex{}
wg := &sync.WaitGroup{}
for _, nodeID := range nodeIDs {
wg.Add(1)
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_engine_version_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package datacoord

import (
"math"
"sync"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
)

type IndexEngineVersionManager interface {
Expand All @@ -21,7 +21,7 @@ type IndexEngineVersionManager interface {
}

type versionManagerImpl struct {
mu sync.Mutex
mu lock.Mutex
versions map[int64]sessionutil.IndexEngineVersion
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"fmt"
"strconv"
"sync"

"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -35,12 +34,13 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type indexMeta struct {
sync.RWMutex
lock.RWMutex
ctx context.Context
catalog metastore.DataCoordCatalog

Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package datacoord

import (
"context"
"sync"
"testing"

"github.com/cockroachdb/errors"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/lock"
)

func TestReloadFromKV(t *testing.T) {
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestMeta_HasSameReq(t *testing.T) {

func newSegmentIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
return &indexMeta{
RWMutex: sync.RWMutex{},
RWMutex: lock.RWMutex{},
ctx: context.Background(),
catalog: catalog,
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
Expand Down
9 changes: 5 additions & 4 deletions internal/datacoord/indexnode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
)

Expand All @@ -45,7 +46,7 @@ type WorkerManager interface {
type IndexNodeManager struct {
nodeClients map[UniqueID]types.IndexNodeClient
stoppingNodes map[UniqueID]struct{}
lock sync.RWMutex
lock lock.RWMutex
ctx context.Context
indexNodeCreator indexNodeCreatorFunc
}
Expand All @@ -55,7 +56,7 @@ func NewNodeManager(ctx context.Context, indexNodeCreator indexNodeCreatorFunc)
return &IndexNodeManager{
nodeClients: make(map[UniqueID]types.IndexNodeClient),
stoppingNodes: make(map[UniqueID]struct{}),
lock: sync.RWMutex{},
lock: lock.RWMutex{},
ctx: ctx,
indexNodeCreator: indexNodeCreator,
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func (nm *IndexNodeManager) PickClient() (UniqueID, types.IndexNodeClient) {
ctx, cancel := context.WithCancel(nm.ctx)
var (
pickNodeID = UniqueID(0)
nodeMutex = sync.Mutex{}
nodeMutex = lock.Mutex{}
wg = sync.WaitGroup{}
)

Expand Down Expand Up @@ -170,7 +171,7 @@ func (nm *IndexNodeManager) ClientSupportDisk() bool {
ctx, cancel := context.WithCancel(nm.ctx)
var (
enableDisk = false
nodeMutex = sync.Mutex{}
nodeMutex = lock.Mutex{}
wg = sync.WaitGroup{}
)

Expand Down
12 changes: 6 additions & 6 deletions internal/datacoord/indexnode_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package datacoord

import (
"context"
"sync"
"testing"

"github.com/cockroachdb/errors"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
)

Expand Down Expand Up @@ -108,7 +108,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
t.Run("support", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
lock: lock.RWMutex{},
nodeClients: map[UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: merr.Success(),
Expand All @@ -126,7 +126,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
t.Run("not support", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
lock: lock.RWMutex{},
nodeClients: map[UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: merr.Success(),
Expand All @@ -144,7 +144,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
t.Run("no indexnode", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
lock: lock.RWMutex{},
nodeClients: map[UniqueID]types.IndexNodeClient{},
}

Expand All @@ -155,7 +155,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
t.Run("error", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
lock: lock.RWMutex{},
nodeClients: map[UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(nil, err),
},
Expand All @@ -168,7 +168,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
t.Run("fail reason", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
lock: lock.RWMutex{},
nodeClients: map[UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: merr.Status(err),
Expand Down
Loading

0 comments on commit 6b348e4

Please sign in to comment.