Skip to content

Commit

Permalink
Use ts as msgID for request
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Nov 5, 2024
1 parent 0fc6c63 commit 1b11ecb
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 147 deletions.
2 changes: 0 additions & 2 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ type Cache interface {
RemoveDatabase(ctx context.Context, database string)
HasDatabase(ctx context.Context, database string) bool
GetDatabaseInfo(ctx context.Context, database string) (*databaseInfo, error)
// AllocID is only using on requests that need to skip timestamp allocation, don't overuse it.
AllocID(ctx context.Context) (int64, error)
}
type collectionBasicInfo struct {
collID typeutil.UniqueID
Expand Down
65 changes: 0 additions & 65 deletions internal/proxy/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,71 +887,6 @@ func TestGetDatabaseInfo(t *testing.T) {
})
}

func TestMetaCache_AllocID(t *testing.T) {
ctx := context.Background()
queryCoord := &mocks.MockQueryCoordClient{}
shardMgr := newShardClientMgr()

t.Run("success", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: merr.Status(nil),
ID: 11198,
Count: 10,
}, nil)
rootCoord.EXPECT().ListPolicy(mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
}, nil)

err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), false)

id, err := globalMetaCache.AllocID(ctx)
assert.NoError(t, err)
assert.Equal(t, id, int64(11198))
})

t.Run("error", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: merr.Status(nil),
}, fmt.Errorf("mock error"))
rootCoord.EXPECT().ListPolicy(mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
}, nil)

err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), false)

id, err := globalMetaCache.AllocID(ctx)
assert.Error(t, err)
assert.Equal(t, id, int64(0))
})

t.Run("failed", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: merr.Status(fmt.Errorf("mock failed")),
}, nil)
rootCoord.EXPECT().ListPolicy(mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
}, nil)

err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), false)

id, err := globalMetaCache.AllocID(ctx)
assert.Error(t, err)
assert.Equal(t, id, int64(0))
})
}

func TestMetaCache_InvalidateShardLeaderCache(t *testing.T) {
paramtable.Init()
paramtable.Get().Save(Params.ProxyCfg.ShardLeaderCacheInterval.Key, "1")
Expand Down
56 changes: 0 additions & 56 deletions internal/proxy/mock_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions internal/proxy/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,14 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
var id UniqueID
if t.CanSkipAllocTimestamp() {
ts = tsoutil.ComposeTS(time.Now().UnixMilli(), 0)
id, err = globalMetaCache.AllocID(t.TraceCtx())
if err != nil {
return err
}
} else {
ts, err = queue.tsoAllocatorIns.AllocOne(t.TraceCtx())
if err != nil {
return err
}
// we always use same msg id and ts for now.
id = UniqueID(ts)
}
// we always use same msg id and ts for now.
id = UniqueID(ts)
t.SetTs(ts)
t.SetID(id)

Expand Down
18 changes: 0 additions & 18 deletions internal/proxy/task_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,6 @@ func TestTaskScheduler_SkipAllocTimestamp(t *testing.T) {
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil)
mockMetaCache.EXPECT().AllocID(mock.Anything).Return(1, nil).Twice()

t.Run("query", func(t *testing.T) {
qt := &queryTask{
Expand Down Expand Up @@ -658,21 +657,4 @@ func TestTaskScheduler_SkipAllocTimestamp(t *testing.T) {
err := queue.Enqueue(st)
assert.NoError(t, err)
})

mockMetaCache.EXPECT().AllocID(mock.Anything).Return(0, fmt.Errorf("mock error")).Once()
t.Run("failed", func(t *testing.T) {
st := &searchTask{
SearchRequest: &internalpb.SearchRequest{
Base: &commonpb.MsgBase{},
},
request: &milvuspb.SearchRequest{
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
},
}

err := queue.Enqueue(st)
assert.Error(t, err)
})
}

0 comments on commit 1b11ecb

Please sign in to comment.