Skip to content

Commit

Permalink
fix: fix debug
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Aug 3, 2024
1 parent 4c1c0a8 commit bc8834f
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func (s *FlusherSuite) SetupSuite() {
wbMgr.EXPECT().Start().Return()
wbMgr.EXPECT().Stop().Return()

resource.Init(
resource.InitForTest(
s.T(),
resource.OptSyncManager(syncMgr),
resource.OptBufferManager(wbMgr),
resource.OptRootCoordClient(rootcoord),
Expand Down
15 changes: 15 additions & 0 deletions internal/streamingnode/server/wal/adaptor/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mocks/mock_metastore"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_flusher"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
Expand Down Expand Up @@ -56,10 +59,22 @@ func initResourceForTest(t *testing.T) {
catalog.EXPECT().ListSegmentAssignment(mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().SaveSegmentAssignments(mock.Anything, mock.Anything, mock.Anything).Return(nil)

syncMgr := syncmgr.NewMockSyncManager(t)
wbMgr := writebuffer.NewMockBufferManager(t)

flusher := mock_flusher.NewMockFlusher(t)
flusher.EXPECT().RegisterPChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
flusher.EXPECT().UnregisterPChannel(mock.Anything).Return().Maybe()
flusher.EXPECT().RegisterVChannel(mock.Anything, mock.Anything).Return()
flusher.EXPECT().UnregisterVChannel(mock.Anything).Return()

resource.InitForTest(
t,
resource.OptSyncManager(syncMgr),
resource.OptBufferManager(wbMgr),
resource.OptRootCoordClient(rc),
resource.OptDataCoordClient(dc),
resource.OptFlusher(flusher),
resource.OptStreamingNodeCatalog(catalog),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (m *partitionSegmentManager) CollectShouldBeSealed() []*segmentAllocManager
return shouldBeSealedSegments
}

// CollectAllDirtySegments collects all segments in the manager.
func (m *partitionSegmentManager) CollectAllDirtySegments() []*segmentAllocManager {
// CollectDirtySegmentsAndClear collects all segments in the manager and clear the maanger.
func (m *partitionSegmentManager) CollectDirtySegmentsAndClear() []*segmentAllocManager {
m.mu.Lock()
defer m.mu.Unlock()
dirtySegments := make([]*segmentAllocManager, 0, len(m.segments))
Expand All @@ -106,8 +106,8 @@ func (m *partitionSegmentManager) CollectAllDirtySegments() []*segmentAllocManag
return dirtySegments
}

// CollectAllCanBeSealed collects all segments that can be sealed.
func (m *partitionSegmentManager) CollectAllCanBeSealed() []*segmentAllocManager {
// CollectAllCanBeSealedAndClear collects all segments that can be sealed and clear the manager.
func (m *partitionSegmentManager) CollectAllCanBeSealedAndClear() []*segmentAllocManager {
m.mu.Lock()
defer m.mu.Unlock()
canBeSealed := make([]*segmentAllocManager, 0, len(m.segments))
Expand Down Expand Up @@ -159,7 +159,7 @@ func (m *partitionSegmentManager) allocNewGrowingSegment(ctx context.Context) (*
SegmentId: pendingSegment.GetSegmentID(),
Vchannel: pendingSegment.GetVChannel(),
})
if merr.CheckRPCCall(resp, err); err != nil {
if err := merr.CheckRPCCall(resp, err); err != nil {
return nil, errors.Wrap(err, "failed to alloc growing segment at datacoord")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ func buildNewPartitionManagers(
) (*partitionSegmentManagers, []*segmentAllocManager) {
// create a map to check if the partition exists.
partitionExist := make(map[int64]struct{}, len(collectionInfos))
// collectionMap is a map from collectionID to collectionInfo.
collectionInfoMap := make(map[int64]*rootcoordpb.CollectionInfoOnPChannel, len(collectionInfos))
for _, collectionInfo := range collectionInfos {
for _, partition := range collectionInfo.GetPartitions() {
partitionExist[partition.GetPartitionId()] = struct{}{}
}
collectionInfoMap[collectionInfo.GetCollectionId()] = collectionInfo
}

// recover the segment infos from the streaming node segment assignment meta storage
Expand All @@ -44,12 +47,6 @@ func buildNewPartitionManagers(
metaMaps[rawMeta.GetPartitionId()] = append(metaMaps[rawMeta.GetPartitionId()], m)
}

// collectionMap is a map from collectionID to collectionInfo.
collectionInfoMap := make(map[int64]*rootcoordpb.CollectionInfoOnPChannel, len(collectionInfos))
for _, collectionInfo := range collectionInfos {
collectionInfoMap[collectionInfo.GetCollectionId()] = collectionInfo
}

// create managers list.
managers := typeutil.NewConcurrentMap[int64, *partitionSegmentManager]()
for collectionID, collectionInfo := range collectionInfoMap {
Expand Down Expand Up @@ -176,7 +173,7 @@ func (m *partitionSegmentManagers) RemoveCollection(collectionID int64) []*segme
for _, partition := range collectionInfo.Partitions {
pm, ok := m.managers.Get(partition.PartitionId)
if ok {
needSealed = append(needSealed, pm.CollectAllCanBeSealed()...)
needSealed = append(needSealed, pm.CollectAllCanBeSealedAndClear()...)
}
m.managers.Remove(partition.PartitionId)
}
Expand Down Expand Up @@ -208,7 +205,7 @@ func (m *partitionSegmentManagers) RemovePartition(collectionID int64, partition
zap.Int64("partitionID", partitionID))
return nil
}
return pm.CollectAllCanBeSealed()
return pm.CollectAllCanBeSealedAndClear()
}

// Range ranges the partition managers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func RecoverPChannelSegmentAllocManager(
resp, err := resource.Resource().RootCoordClient().GetPChannelInfo(ctx, &rootcoordpb.GetPChannelInfoRequest{
Pchannel: pchannel.Name,
})
if merr.CheckRPCCall(resp, err); err != nil {
if err := merr.CheckRPCCall(resp, err); err != nil {
return nil, errors.Wrap(err, "failed to get pchannel info from rootcoord")
}
managers, waitForSealed := buildNewPartitionManagers(pchannel, rawMetas, resp.GetCollections())
Expand Down Expand Up @@ -207,7 +207,7 @@ func (m *PChannelSegmentAllocManager) Close(ctx context.Context) {

segments := make([]*segmentAllocManager, 0)
m.managers.Range(func(pm *partitionSegmentManager) {
segments = append(segments, pm.CollectAllDirtySegments()...)
segments = append(segments, pm.CollectDirtySegmentsAndClear()...)
})

// commitAllSegmentsOnSamePChannel commits all segments on the same pchannel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestManager(t *testing.T) {
flusher := mock_flusher.NewMockFlusher(t)
flusher.EXPECT().RegisterPChannel(mock.Anything, mock.Anything).Return(nil)

resource.Init(
resource.InitForTest(
t,
resource.OptFlusher(flusher),
resource.OptRootCoordClient(rootcoord),
resource.OptDataCoordClient(datacoord),
Expand Down

0 comments on commit bc8834f

Please sign in to comment.