Skip to content

Commit

Permalink
Optimize the codec code of session (#27360)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan authored Oct 1, 2023
1 parent 7d0dd00 commit 0f14d18
Show file tree
Hide file tree
Showing 16 changed files with 119 additions and 143 deletions.
24 changes: 16 additions & 8 deletions internal/datacoord/index_engine_version_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,41 @@ func Test_IndexEngineVersionManager_GetMergedIndexVersion(t *testing.T) {
// startup
m.Startup(map[string]*sessionutil.Session{
"1": {
ServerID: 1,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0},
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0},
},
},
})
assert.Equal(t, int32(20), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalIndexEngineVersion())

// add node
m.AddNode(&sessionutil.Session{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5},
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5},
},
})
assert.Equal(t, int32(10), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(5), m.GetMinimalIndexEngineVersion())

// update
m.Update(&sessionutil.Session{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2},
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2},
},
})
assert.Equal(t, int32(5), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(2), m.GetMinimalIndexEngineVersion())

// remove
m.RemoveNode(&sessionutil.Session{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3},
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3},
},
})
assert.Equal(t, int32(20), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalIndexEngineVersion())
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

func TestServerId(t *testing.T) {
s := &Server{session: &sessionutil.Session{ServerID: 0}}
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}}}
assert.Equal(t, int64(0), s.serverID())
}

Expand Down
38 changes: 22 additions & 16 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ func TestService_WatchServices(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
svr := CreateServer(context.TODO(), factory)
svr.session = &sessionutil.Session{
TriggerKill: true,
SessionRaw: sessionutil.SessionRaw{TriggerKill: true},
}
svr.serverLoopWg.Add(1)

Expand Down Expand Up @@ -3277,10 +3277,12 @@ func TestHandleSessionEvent(t *testing.T) {
evt := &sessionutil.SessionEvent{
EventType: sessionutil.SessionNoneEvent,
Session: &sessionutil.Session{
ServerID: 0,
ServerName: "",
Address: "",
Exclusive: false,
SessionRaw: sessionutil.SessionRaw{
ServerID: 0,
ServerName: "",
Address: "",
Exclusive: false,
},
},
}
err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt)
Expand All @@ -3289,10 +3291,12 @@ func TestHandleSessionEvent(t *testing.T) {
evt = &sessionutil.SessionEvent{
EventType: sessionutil.SessionAddEvent,
Session: &sessionutil.Session{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
SessionRaw: sessionutil.SessionRaw{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
},
},
}
err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt)
Expand All @@ -3304,10 +3308,12 @@ func TestHandleSessionEvent(t *testing.T) {
evt = &sessionutil.SessionEvent{
EventType: sessionutil.SessionDelEvent,
Session: &sessionutil.Session{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
SessionRaw: sessionutil.SessionRaw{
ServerID: 101,
ServerName: "DN101",
Address: "DN127.0.0.101",
Exclusive: false,
},
},
}
err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt)
Expand Down Expand Up @@ -4320,7 +4326,7 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
func Test_CheckHealth(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{ServerID: 1}}
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
s.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := s.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -4329,7 +4335,7 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("data node health check is ok", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{ServerID: 1}}
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
healthClient := &mockDataNodeClient{
id: 1,
Expand All @@ -4355,7 +4361,7 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("data node health check is fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{ServerID: 1}}
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
unhealthClient := &mockDataNodeClient{
id: 1,
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/data_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestDataNode(t *testing.T) {

t.Run("Test getSystemInfoMetrics", func(t *testing.T) {
emptyNode := &DataNode{}
emptyNode.SetSession(&sessionutil.Session{ServerID: 1})
emptyNode.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
emptyNode.flowgraphManager = newFlowgraphManager()

req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
Expand All @@ -171,7 +171,7 @@ func TestDataNode(t *testing.T) {

t.Run("Test getSystemInfoMetrics with quotaMetric error", func(t *testing.T) {
emptyNode := &DataNode{}
emptyNode.SetSession(&sessionutil.Session{ServerID: 1})
emptyNode.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
emptyNode.flowgraphManager = newFlowgraphManager()

req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var emptyFlushAndDropFunc flushAndDropFunc = func(_ []*segmentFlushPack) {}
func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNode {
factory := dependency.NewDefaultFactory(true)
node := NewDataNode(ctx, factory)
node.SetSession(&sessionutil.Session{ServerID: 1})
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID())

rc := &RootCoordFactory{
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() {

// test closed server
node := &DataNode{}
node.SetSession(&sessionutil.Session{ServerID: 1})
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.stateCode.Store(commonpb.StateCode_Abnormal)

resp, err := node.ShowConfigurations(s.ctx, req)
Expand All @@ -338,7 +338,7 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() {

func (s *DataNodeServicesSuite) TestGetMetrics() {
node := &DataNode{}
node.SetSession(&sessionutil.Session{ServerID: 1})
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.flowgraphManager = newFlowgraphManager()
// server is closed
node.stateCode.Store(commonpb.StateCode_Abnormal)
Expand Down
6 changes: 4 additions & 2 deletions internal/distributed/connection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ func TestConnectionManager_processEvent(t *testing.T) {
cm := &ConnectionManager{
closeCh: make(chan struct{}),
session: &sessionutil.Session{
ServerID: 1,
TriggerKill: true,
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
TriggerKill: true,
},
},
}

Expand Down
20 changes: 10 additions & 10 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) {

func TestProxy_CheckHealth(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
Expand All @@ -80,7 +80,7 @@ func TestProxy_CheckHealth(t *testing.T) {
rootCoord: NewRootCoordMock(),
queryCoord: qc,
dataCoord: NewDataCoordMock(),
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestProxy_CheckHealth(t *testing.T) {
qc := &mocks.MockQueryCoordClient{}
qc.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(nil, errors.New("test"))
node := &Proxy{
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: NewRootCoordMock(func(mock *RootCoordMock) {
mock.checkHealthFunc = checkHealthFunc1
}),
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestProxy_CheckHealth(t *testing.T) {

func TestProxyRenameCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{})
Expand All @@ -168,7 +168,7 @@ func TestProxyRenameCollection(t *testing.T) {
})

t.Run("rename with illegal new collection name", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "$#^%#&#$*!)#@!"})
Expand All @@ -181,7 +181,7 @@ func TestProxyRenameCollection(t *testing.T) {
rc.On("RenameCollection", mock.Anything, mock.Anything).
Return(nil, errors.New("fail"))
node := &Proxy{
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: rc,
}
node.stateCode.Store(commonpb.StateCode_Healthy)
Expand All @@ -197,7 +197,7 @@ func TestProxyRenameCollection(t *testing.T) {
rc.On("RenameCollection", mock.Anything, mock.Anything).
Return(merr.Status(nil), nil)
node := &Proxy{
session: &sessionutil.Session{ServerID: 1},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: rc,
}
node.stateCode.Store(commonpb.StateCode_Healthy)
Expand Down Expand Up @@ -884,7 +884,7 @@ func TestProxyCreateDatabase(t *testing.T) {
paramtable.Init()

t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{})
Expand Down Expand Up @@ -938,7 +938,7 @@ func TestProxyDropDatabase(t *testing.T) {
paramtable.Init()

t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{})
Expand Down Expand Up @@ -992,7 +992,7 @@ func TestProxyListDatabase(t *testing.T) {
paramtable.Init()

t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{})
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/metrics_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestProxy_metrics(t *testing.T) {
rootCoord: rc,
queryCoord: qc,
dataCoord: dc,
session: &sessionutil.Session{Address: funcutil.GenRandomStr()},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{Address: funcutil.GenRandomStr()}},
}

rc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func newMockProxy() *mockProxy {

func newTestCore(opts ...Opt) *Core {
c := &Core{
session: &sessionutil.Session{ServerID: TestRootCoordID},
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}},
}
executor := newMockStepExecutor()
executor.AddStepsFunc = func(s *stepStack) {
Expand Down
12 changes: 8 additions & 4 deletions internal/rootcoord/proxy_client_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ func TestProxyClientManager_GetProxyClients(t *testing.T) {
pcm := newProxyClientManager(core.proxyCreator)

session := &sessionutil.Session{
ServerID: 100,
Address: "localhost",
SessionRaw: sessionutil.SessionRaw{
ServerID: 100,
Address: "localhost",
},
}

sessions := []*sessionutil.Session{session}
Expand Down Expand Up @@ -150,8 +152,10 @@ func TestProxyClientManager_AddProxyClient(t *testing.T) {
pcm := newProxyClientManager(core.proxyCreator)

session := &sessionutil.Session{
ServerID: 100,
Address: "localhost",
SessionRaw: sessionutil.SessionRaw{
ServerID: 100,
Address: "localhost",
},
}

pcm.AddProxyClient(session)
Expand Down
6 changes: 3 additions & 3 deletions internal/rootcoord/proxy_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestProxyManager(t *testing.T) {
etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
defer etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
s1 := sessionutil.Session{
ServerID: 100,
SessionRaw: sessionutil.SessionRaw{ServerID: 100},
}
b1, err := json.Marshal(&s1)
assert.NoError(t, err)
Expand All @@ -62,7 +62,7 @@ func TestProxyManager(t *testing.T) {
assert.NoError(t, err)

s0 := sessionutil.Session{
ServerID: 99,
SessionRaw: sessionutil.SessionRaw{ServerID: 99},
}
b0, err := json.Marshal(&s0)
assert.NoError(t, err)
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestProxyManager(t *testing.T) {
t.Log("======== start watch proxy ==========")

s2 := sessionutil.Session{
ServerID: 101,
SessionRaw: sessionutil.SessionRaw{ServerID: 101},
}
b2, err := json.Marshal(&s2)
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions internal/rootcoord/root_coord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func TestRootCoord_UpdateChannelTimeTick(t *testing.T) {
defaultTs := Timestamp(101)

ticker := newRocksMqTtSynchronizer()
ticker.addSession(&sessionutil.Session{ServerID: source})
ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: source}})

ctx := context.Background()
c := newTestCore(withHealthyCode(),
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) {
c.stateCode.Store(commonpb.StateCode_Healthy)
c.session.ServerID = TestRootCoordID
c.sendMinDdlTsAsTt() // no session.
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}})
c.sendMinDdlTsAsTt()
sched.GetMinDdlTsFunc = func() Timestamp {
return typeutil.ZeroTimestamp
Expand All @@ -1667,7 +1667,7 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) {

func TestCore_startTimeTickLoop(t *testing.T) {
ticker := newRocksMqTtSynchronizer()
ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID})
ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}})
ddlManager := newMockDdlTsLockManager()
ddlManager.GetMinDdlTsFunc = func() Timestamp {
return 100
Expand Down
Loading

0 comments on commit 0f14d18

Please sign in to comment.