Skip to content

Commit

Permalink
enhace: Speed up meta recovery
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Dec 12, 2024
1 parent b14a0c4 commit aaa647a
Show file tree
Hide file tree
Showing 39 changed files with 1,266 additions and 557 deletions.
4 changes: 2 additions & 2 deletions cmd/tools/migration/mmap/tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func prepareRootCoordMeta(ctx context.Context, allocator tso.Allocator) rootcoor
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, paramtable.Get().EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
panic(err)
}
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
catalog = kvmetestore.NewCatalog(metaKV, ss)
case util.MetaStoreTypeTiKV:
log.Info("Using tikv as meta storage.")
var metaKV kv.MetaKv
Expand All @@ -148,7 +148,7 @@ func prepareRootCoordMeta(ctx context.Context, allocator tso.Allocator) rootcoor
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, paramtable.Get().TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
panic(err)
}
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
catalog = kvmetestore.NewCatalog(metaKV, ss)
default:
panic(fmt.Sprintf("MetaStoreType %s not supported", paramtable.Get().MetaStoreCfg.MetaStoreType.GetValue()))
}
Expand Down
19 changes: 19 additions & 0 deletions internal/datacoord/broker/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
Expand All @@ -37,6 +38,7 @@ type Broker interface {
DescribeCollectionInternal(ctx context.Context, collectionID int64) (*milvuspb.DescribeCollectionResponse, error)
ShowPartitionsInternal(ctx context.Context, collectionID int64) ([]int64, error)
ShowCollections(ctx context.Context, dbName string) (*milvuspb.ShowCollectionsResponse, error)
ShowCollectionsInternal(ctx context.Context) (*rootcoordpb.ShowCollectionsInternalResponse, error)
ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error)
HasCollection(ctx context.Context, collectionID int64) (bool, error)
}
Expand Down Expand Up @@ -116,6 +118,23 @@ func (b *coordinatorBroker) ShowCollections(ctx context.Context, dbName string)
return resp, nil
}

func (b *coordinatorBroker) ShowCollectionsInternal(ctx context.Context) (*rootcoordpb.ShowCollectionsInternalResponse, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := b.rootCoord.ShowCollectionsInternal(ctx, &rootcoordpb.ShowCollectionsInternalRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
),
})

if err = merr.CheckRPCCall(resp, err); err != nil {
log.Warn("ShowCollectionsInternal failed", zap.Error(err))
return nil, err
}

return resp, nil
}

func (b *coordinatorBroker) ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
Expand Down
60 changes: 60 additions & 0 deletions internal/datacoord/broker/mock_coordinator_broker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metastore/model"
Expand Down Expand Up @@ -61,7 +62,9 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
ctx := context.Background()
cm := storage.NewLocalChunkManager(storage.RootPath(""))
catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "")
meta, err := newMeta(ctx, catalog, cm)
broker := broker.NewMockBroker(s.T())
broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil)
meta, err := newMeta(ctx, catalog, cm, broker)
s.NoError(err)
s.meta = meta

Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Test_garbageCollector_basic(t *testing.T) {
cli, _, _, _, _, err := initUtOSSEnv(bucketName, rootPath, 0)
require.NoError(t, err)

meta, err := newMemoryMeta()
meta, err := newMemoryMeta(t)
assert.NoError(t, err)

t.Run("normal gc", func(t *testing.T) {
Expand Down Expand Up @@ -118,7 +118,7 @@ func Test_garbageCollector_scan(t *testing.T) {
cli, inserts, stats, delta, others, err := initUtOSSEnv(bucketName, rootPath, 4)
require.NoError(t, err)

meta, err := newMemoryMeta()
meta, err := newMemoryMeta(t)
assert.NoError(t, err)

t.Run("key is reference", func(t *testing.T) {
Expand Down Expand Up @@ -1602,7 +1602,7 @@ func (s *GarbageCollectorSuite) SetupTest() {
s.cli, s.inserts, s.stats, s.delta, s.others, err = initUtOSSEnv(s.bucketName, s.rootPath, 4)
s.Require().NoError(err)

s.meta, err = newMemoryMeta()
s.meta, err = newMemoryMeta(s.T())
s.Require().NoError(err)
}

Expand Down
9 changes: 5 additions & 4 deletions internal/datacoord/import_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *ImportCheckerSuite) SetupTest() {
catalog.EXPECT().ListImportJobs(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
Expand All @@ -68,10 +68,11 @@ func (s *ImportCheckerSuite) SetupTest() {
s.NoError(err)
s.imeta = imeta

meta, err := newMeta(context.TODO(), catalog, nil)
s.NoError(err)

broker := broker2.NewMockBroker(s.T())
broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil)

meta, err := newMeta(context.TODO(), catalog, nil, broker)
s.NoError(err)

sjm := NewMockStatsJobManager(s.T())

Expand Down
7 changes: 5 additions & 2 deletions internal/datacoord/import_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (s *ImportSchedulerSuite) SetupTest() {
s.catalog.EXPECT().ListImportJobs(mock.Anything).Return(nil, nil)
s.catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil)
s.catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil)
s.catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
s.catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil)
s.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
s.catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
s.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
Expand All @@ -67,7 +68,9 @@ func (s *ImportSchedulerSuite) SetupTest() {

s.cluster = NewMockCluster(s.T())
s.alloc = allocator.NewMockAllocator(s.T())
s.meta, err = newMeta(context.TODO(), s.catalog, nil)
broker := broker.NewMockBroker(s.T())
broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil)
s.meta, err = newMeta(context.TODO(), s.catalog, nil, broker)
s.NoError(err)
s.meta.AddCollection(&collectionInfo{
ID: s.collectionID,
Expand Down
25 changes: 17 additions & 8 deletions internal/datacoord/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/metastore/mocks"
mocks2 "github.com/milvus-io/milvus/internal/mocks"
Expand Down Expand Up @@ -106,7 +107,7 @@ func TestImportUtil_NewImportTasks(t *testing.T) {
alloc.EXPECT().AllocTimestamp(mock.Anything).Return(rand.Uint64(), nil)

catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
Expand All @@ -116,7 +117,9 @@ func TestImportUtil_NewImportTasks(t *testing.T) {
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil)

meta, err := newMeta(context.TODO(), catalog, nil)
broker := broker.NewMockBroker(t)
broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil)
meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err)

tasks, err := NewImportTasks(fileGroups, job, alloc, meta)
Expand Down Expand Up @@ -158,7 +161,7 @@ func TestImportUtil_AssembleRequest(t *testing.T) {
}

catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
Expand All @@ -175,7 +178,9 @@ func TestImportUtil_AssembleRequest(t *testing.T) {
})
alloc.EXPECT().AllocTimestamp(mock.Anything).Return(800, nil)

meta, err := newMeta(context.TODO(), catalog, nil)
broker := broker.NewMockBroker(t)
broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil)
meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err)
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 5, IsImporting: true},
Expand Down Expand Up @@ -244,7 +249,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) {
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil)
Expand All @@ -255,7 +260,9 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) {
imeta, err := NewImportMeta(context.TODO(), catalog)
assert.NoError(t, err)

meta, err := newMeta(context.TODO(), catalog, nil)
broker := broker.NewMockBroker(t)
broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil)
meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err)

job := &importJob{
Expand Down Expand Up @@ -424,7 +431,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
catalog.EXPECT().ListImportJobs(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegments(mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
Expand All @@ -441,7 +448,9 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
imeta, err := NewImportMeta(context.TODO(), catalog)
assert.NoError(t, err)

meta, err := newMeta(context.TODO(), catalog, nil)
broker := broker.NewMockBroker(t)
broker.EXPECT().ShowCollectionsInternal(mock.Anything).Return(nil, nil)
meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err)

file1 := &internalpb.ImportFile{
Expand Down
Loading

0 comments on commit aaa647a

Please sign in to comment.