From 0f14d1820119a390570824bef119c9e54cb6922c Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Sun, 1 Oct 2023 10:33:30 +0800 Subject: [PATCH] Optimize the codec code of session (#27360) Signed-off-by: longjiquan --- .../index_engine_version_manager_test.go | 24 ++-- internal/datacoord/index_service_test.go | 2 +- internal/datacoord/server_test.go | 38 +++--- internal/datanode/data_node_test.go | 4 +- internal/datanode/mock_test.go | 2 +- internal/datanode/services_test.go | 4 +- .../distributed/connection_manager_test.go | 6 +- internal/proxy/impl_test.go | 20 +-- internal/proxy/metrics_info_test.go | 2 +- internal/rootcoord/mock_test.go | 2 +- .../rootcoord/proxy_client_manager_test.go | 12 +- internal/rootcoord/proxy_manager_test.go | 6 +- internal/rootcoord/root_coord_test.go | 6 +- internal/rootcoord/timeticksync_test.go | 4 +- internal/util/sessionutil/session_util.go | 116 ++++++------------ .../util/sessionutil/session_util_test.go | 14 ++- 16 files changed, 119 insertions(+), 143 deletions(-) diff --git a/internal/datacoord/index_engine_version_manager_test.go b/internal/datacoord/index_engine_version_manager_test.go index 781fe6e7deda9..d544b4e2c754f 100644 --- a/internal/datacoord/index_engine_version_manager_test.go +++ b/internal/datacoord/index_engine_version_manager_test.go @@ -17,8 +17,10 @@ func Test_IndexEngineVersionManager_GetMergedIndexVersion(t *testing.T) { // startup m.Startup(map[string]*sessionutil.Session{ "1": { - ServerID: 1, - IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0}, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 1, + IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0}, + }, }, }) assert.Equal(t, int32(20), m.GetCurrentIndexEngineVersion()) @@ -26,24 +28,30 @@ func Test_IndexEngineVersionManager_GetMergedIndexVersion(t *testing.T) { // add node m.AddNode(&sessionutil.Session{ - ServerID: 2, - IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5}, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 2, + IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5}, + }, }) assert.Equal(t, int32(10), m.GetCurrentIndexEngineVersion()) assert.Equal(t, int32(5), m.GetMinimalIndexEngineVersion()) // update m.Update(&sessionutil.Session{ - ServerID: 2, - IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2}, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 2, + IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2}, + }, }) assert.Equal(t, int32(5), m.GetCurrentIndexEngineVersion()) assert.Equal(t, int32(2), m.GetMinimalIndexEngineVersion()) // remove m.RemoveNode(&sessionutil.Session{ - ServerID: 2, - IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3}, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 2, + IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3}, + }, }) assert.Equal(t, int32(20), m.GetCurrentIndexEngineVersion()) assert.Equal(t, int32(0), m.GetMinimalIndexEngineVersion()) diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index cc93da75fd2f6..87a5f3c443b31 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -41,7 +41,7 @@ import ( ) func TestServerId(t *testing.T) { - s := &Server{session: &sessionutil.Session{ServerID: 0}} + s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}}} assert.Equal(t, int64(0), s.serverID()) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index fb848cb4bc435..103cd2dc8f372 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -935,7 +935,7 @@ func TestService_WatchServices(t *testing.T) { factory := dependency.NewDefaultFactory(true) svr := CreateServer(context.TODO(), factory) svr.session = &sessionutil.Session{ - TriggerKill: true, + SessionRaw: sessionutil.SessionRaw{TriggerKill: true}, } svr.serverLoopWg.Add(1) @@ -3277,10 +3277,12 @@ func TestHandleSessionEvent(t *testing.T) { evt := &sessionutil.SessionEvent{ EventType: sessionutil.SessionNoneEvent, Session: &sessionutil.Session{ - ServerID: 0, - ServerName: "", - Address: "", - Exclusive: false, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 0, + ServerName: "", + Address: "", + Exclusive: false, + }, }, } err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt) @@ -3289,10 +3291,12 @@ func TestHandleSessionEvent(t *testing.T) { evt = &sessionutil.SessionEvent{ EventType: sessionutil.SessionAddEvent, Session: &sessionutil.Session{ - ServerID: 101, - ServerName: "DN101", - Address: "DN127.0.0.101", - Exclusive: false, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 101, + ServerName: "DN101", + Address: "DN127.0.0.101", + Exclusive: false, + }, }, } err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt) @@ -3304,10 +3308,12 @@ func TestHandleSessionEvent(t *testing.T) { evt = &sessionutil.SessionEvent{ EventType: sessionutil.SessionDelEvent, Session: &sessionutil.Session{ - ServerID: 101, - ServerName: "DN101", - Address: "DN127.0.0.101", - Exclusive: false, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 101, + ServerName: "DN101", + Address: "DN127.0.0.101", + Exclusive: false, + }, }, } err = svr.handleSessionEvent(context.Background(), typeutil.DataNodeRole, evt) @@ -4320,7 +4326,7 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server { func Test_CheckHealth(t *testing.T) { t.Run("not healthy", func(t *testing.T) { ctx := context.Background() - s := &Server{session: &sessionutil.Session{ServerID: 1}} + s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} s.stateCode.Store(commonpb.StateCode_Abnormal) resp, err := s.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -4329,7 +4335,7 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("data node health check is ok", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{ServerID: 1}} + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} svr.stateCode.Store(commonpb.StateCode_Healthy) healthClient := &mockDataNodeClient{ id: 1, @@ -4355,7 +4361,7 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("data node health check is fail", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{ServerID: 1}} + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} svr.stateCode.Store(commonpb.StateCode_Healthy) unhealthClient := &mockDataNodeClient{ id: 1, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 2526c3731dcaa..0786f26afc373 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -156,7 +156,7 @@ func TestDataNode(t *testing.T) { t.Run("Test getSystemInfoMetrics", func(t *testing.T) { emptyNode := &DataNode{} - emptyNode.SetSession(&sessionutil.Session{ServerID: 1}) + emptyNode.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}) emptyNode.flowgraphManager = newFlowgraphManager() req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) @@ -171,7 +171,7 @@ func TestDataNode(t *testing.T) { t.Run("Test getSystemInfoMetrics with quotaMetric error", func(t *testing.T) { emptyNode := &DataNode{} - emptyNode.SetSession(&sessionutil.Session{ServerID: 1}) + emptyNode.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}) emptyNode.flowgraphManager = newFlowgraphManager() req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index bb7208926c68d..fcad1b7363d36 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -79,7 +79,7 @@ var emptyFlushAndDropFunc flushAndDropFunc = func(_ []*segmentFlushPack) {} func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNode { factory := dependency.NewDefaultFactory(true) node := NewDataNode(ctx, factory) - node.SetSession(&sessionutil.Session{ServerID: 1}) + node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}) node.dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID()) rc := &RootCoordFactory{ diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 57ab03dfcd23c..8c59fcead878a 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -321,7 +321,7 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() { // test closed server node := &DataNode{} - node.SetSession(&sessionutil.Session{ServerID: 1}) + node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}) node.stateCode.Store(commonpb.StateCode_Abnormal) resp, err := node.ShowConfigurations(s.ctx, req) @@ -338,7 +338,7 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() { func (s *DataNodeServicesSuite) TestGetMetrics() { node := &DataNode{} - node.SetSession(&sessionutil.Session{ServerID: 1}) + node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}) node.flowgraphManager = newFlowgraphManager() // server is closed node.stateCode.Store(commonpb.StateCode_Abnormal) diff --git a/internal/distributed/connection_manager_test.go b/internal/distributed/connection_manager_test.go index 1175f4eb2e4ec..ce3eff2273b09 100644 --- a/internal/distributed/connection_manager_test.go +++ b/internal/distributed/connection_manager_test.go @@ -222,8 +222,10 @@ func TestConnectionManager_processEvent(t *testing.T) { cm := &ConnectionManager{ closeCh: make(chan struct{}), session: &sessionutil.Session{ - ServerID: 1, - TriggerKill: true, + SessionRaw: sessionutil.SessionRaw{ + ServerID: 1, + TriggerKill: true, + }, }, } diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index 6b7938a54ec33..7961252ad889a 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -63,7 +63,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) { func TestProxy_CheckHealth(t *testing.T) { t.Run("not healthy", func(t *testing.T) { - node := &Proxy{session: &sessionutil.Session{ServerID: 1}} + node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} node.multiRateLimiter = NewMultiRateLimiter() node.stateCode.Store(commonpb.StateCode_Abnormal) ctx := context.Background() @@ -80,7 +80,7 @@ func TestProxy_CheckHealth(t *testing.T) { rootCoord: NewRootCoordMock(), queryCoord: qc, dataCoord: NewDataCoordMock(), - session: &sessionutil.Session{ServerID: 1}, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, } node.multiRateLimiter = NewMultiRateLimiter() node.stateCode.Store(commonpb.StateCode_Healthy) @@ -108,7 +108,7 @@ func TestProxy_CheckHealth(t *testing.T) { qc := &mocks.MockQueryCoordClient{} qc.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(nil, errors.New("test")) node := &Proxy{ - session: &sessionutil.Session{ServerID: 1}, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, rootCoord: NewRootCoordMock(func(mock *RootCoordMock) { mock.checkHealthFunc = checkHealthFunc1 }), @@ -159,7 +159,7 @@ func TestProxy_CheckHealth(t *testing.T) { func TestProxyRenameCollection(t *testing.T) { t.Run("not healthy", func(t *testing.T) { - node := &Proxy{session: &sessionutil.Session{ServerID: 1}} + node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} node.stateCode.Store(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{}) @@ -168,7 +168,7 @@ func TestProxyRenameCollection(t *testing.T) { }) t.Run("rename with illegal new collection name", func(t *testing.T) { - node := &Proxy{session: &sessionutil.Session{ServerID: 1}} + node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} node.stateCode.Store(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "$#^%#&#$*!)#@!"}) @@ -181,7 +181,7 @@ func TestProxyRenameCollection(t *testing.T) { rc.On("RenameCollection", mock.Anything, mock.Anything). Return(nil, errors.New("fail")) node := &Proxy{ - session: &sessionutil.Session{ServerID: 1}, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, rootCoord: rc, } node.stateCode.Store(commonpb.StateCode_Healthy) @@ -197,7 +197,7 @@ func TestProxyRenameCollection(t *testing.T) { rc.On("RenameCollection", mock.Anything, mock.Anything). Return(merr.Status(nil), nil) node := &Proxy{ - session: &sessionutil.Session{ServerID: 1}, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, rootCoord: rc, } node.stateCode.Store(commonpb.StateCode_Healthy) @@ -884,7 +884,7 @@ func TestProxyCreateDatabase(t *testing.T) { paramtable.Init() t.Run("not healthy", func(t *testing.T) { - node := &Proxy{session: &sessionutil.Session{ServerID: 1}} + node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} node.stateCode.Store(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{}) @@ -938,7 +938,7 @@ func TestProxyDropDatabase(t *testing.T) { paramtable.Init() t.Run("not healthy", func(t *testing.T) { - node := &Proxy{session: &sessionutil.Session{ServerID: 1}} + node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} node.stateCode.Store(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{}) @@ -992,7 +992,7 @@ func TestProxyListDatabase(t *testing.T) { paramtable.Init() t.Run("not healthy", func(t *testing.T) { - node := &Proxy{session: &sessionutil.Session{ServerID: 1}} + node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} node.stateCode.Store(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{}) diff --git a/internal/proxy/metrics_info_test.go b/internal/proxy/metrics_info_test.go index a0712c973098a..165e21ad1daa5 100644 --- a/internal/proxy/metrics_info_test.go +++ b/internal/proxy/metrics_info_test.go @@ -48,7 +48,7 @@ func TestProxy_metrics(t *testing.T) { rootCoord: rc, queryCoord: qc, dataCoord: dc, - session: &sessionutil.Session{Address: funcutil.GenRandomStr()}, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{Address: funcutil.GenRandomStr()}}, } rc.getMetricsFunc = func(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index e0839869d8d04..1aa3553943c3d 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -390,7 +390,7 @@ func newMockProxy() *mockProxy { func newTestCore(opts ...Opt) *Core { c := &Core{ - session: &sessionutil.Session{ServerID: TestRootCoordID}, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}}, } executor := newMockStepExecutor() executor.AddStepsFunc = func(s *stepStack) { diff --git a/internal/rootcoord/proxy_client_manager_test.go b/internal/rootcoord/proxy_client_manager_test.go index 41afd8bd3925c..5b4f4c0752bdb 100644 --- a/internal/rootcoord/proxy_client_manager_test.go +++ b/internal/rootcoord/proxy_client_manager_test.go @@ -118,8 +118,10 @@ func TestProxyClientManager_GetProxyClients(t *testing.T) { pcm := newProxyClientManager(core.proxyCreator) session := &sessionutil.Session{ - ServerID: 100, - Address: "localhost", + SessionRaw: sessionutil.SessionRaw{ + ServerID: 100, + Address: "localhost", + }, } sessions := []*sessionutil.Session{session} @@ -150,8 +152,10 @@ func TestProxyClientManager_AddProxyClient(t *testing.T) { pcm := newProxyClientManager(core.proxyCreator) session := &sessionutil.Session{ - ServerID: 100, - Address: "localhost", + SessionRaw: sessionutil.SessionRaw{ + ServerID: 100, + Address: "localhost", + }, } pcm.AddProxyClient(session) diff --git a/internal/rootcoord/proxy_manager_test.go b/internal/rootcoord/proxy_manager_test.go index 6660824964692..c60310d414cac 100644 --- a/internal/rootcoord/proxy_manager_test.go +++ b/internal/rootcoord/proxy_manager_test.go @@ -53,7 +53,7 @@ func TestProxyManager(t *testing.T) { etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) defer etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) s1 := sessionutil.Session{ - ServerID: 100, + SessionRaw: sessionutil.SessionRaw{ServerID: 100}, } b1, err := json.Marshal(&s1) assert.NoError(t, err) @@ -62,7 +62,7 @@ func TestProxyManager(t *testing.T) { assert.NoError(t, err) s0 := sessionutil.Session{ - ServerID: 99, + SessionRaw: sessionutil.SessionRaw{ServerID: 99}, } b0, err := json.Marshal(&s0) assert.NoError(t, err) @@ -94,7 +94,7 @@ func TestProxyManager(t *testing.T) { t.Log("======== start watch proxy ==========") s2 := sessionutil.Session{ - ServerID: 101, + SessionRaw: sessionutil.SessionRaw{ServerID: 101}, } b2, err := json.Marshal(&s2) assert.NoError(t, err) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 3748d6c0a0638..1a65a17634321 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -801,7 +801,7 @@ func TestRootCoord_UpdateChannelTimeTick(t *testing.T) { defaultTs := Timestamp(101) ticker := newRocksMqTtSynchronizer() - ticker.addSession(&sessionutil.Session{ServerID: source}) + ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: source}}) ctx := context.Background() c := newTestCore(withHealthyCode(), @@ -1650,7 +1650,7 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) { c.stateCode.Store(commonpb.StateCode_Healthy) c.session.ServerID = TestRootCoordID c.sendMinDdlTsAsTt() // no session. - ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID}) + ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}}) c.sendMinDdlTsAsTt() sched.GetMinDdlTsFunc = func() Timestamp { return typeutil.ZeroTimestamp @@ -1667,7 +1667,7 @@ func TestCore_sendMinDdlTsAsTt(t *testing.T) { func TestCore_startTimeTickLoop(t *testing.T) { ticker := newRocksMqTtSynchronizer() - ticker.addSession(&sessionutil.Session{ServerID: TestRootCoordID}) + ticker.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}}) ddlManager := newMockDdlTsLockManager() ddlManager.GetMinDdlTsFunc = func() Timestamp { return 100 diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index ec9572f513078..40b6a986db821 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -129,10 +129,10 @@ func TestMultiTimetickSync(t *testing.T) { defer wg.Done() // suppose this is rooit - ttSync.addSession(&sessionutil.Session{ServerID: 1}) + ttSync.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}) // suppose this is proxy1 - ttSync.addSession(&sessionutil.Session{ServerID: 2}) + ttSync.addSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 2}}) msg := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 9d83aaa1a954a..545b7122efab7 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -83,6 +83,19 @@ type IndexEngineVersion struct { CurrentIndexVersion int32 `json:"CurrentIndexVersion,omitempty"` } +// SessionRaw the persistent part of Session. +type SessionRaw struct { + ServerID int64 `json:"ServerID,omitempty"` + ServerName string `json:"ServerName,omitempty"` + Address string `json:"Address,omitempty"` + Exclusive bool `json:"Exclusive,omitempty"` + Stopping bool `json:"Stopping,omitempty"` + TriggerKill bool + Version string `json:"Version"` + IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"` + LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"` +} + // Session is a struct to store service's session, including ServerID, ServerName, // Address. // Exclusive indicates that this server can only start one. @@ -94,20 +107,14 @@ type Session struct { keepAliveCancel context.CancelFunc keepAliveCtx context.Context - ServerID int64 `json:"ServerID,omitempty"` - ServerName string `json:"ServerName,omitempty"` - Address string `json:"Address,omitempty"` - Exclusive bool `json:"Exclusive,omitempty"` - Stopping bool `json:"Stopping,omitempty"` - TriggerKill bool - Version semver.Version `json:"Version,omitempty"` - IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"` + SessionRaw + + Version semver.Version `json:"Version,omitempty"` liveChOnce sync.Once liveCh chan struct{} etcdCli *clientv3.Client - leaseID *clientv3.LeaseID watchSessionKeyCh clientv3.WatchChan watchCancel atomic.Pointer[context.CancelFunc] wg sync.WaitGroup @@ -156,78 +163,25 @@ func (s *Session) apply(opts ...SessionOption) { // UnmarshalJSON unmarshal bytes to Session. func (s *Session) UnmarshalJSON(data []byte) error { - var raw struct { - ServerID int64 `json:"ServerID,omitempty"` - ServerName string `json:"ServerName,omitempty"` - Address string `json:"Address,omitempty"` - Exclusive bool `json:"Exclusive,omitempty"` - Stopping bool `json:"Stopping,omitempty"` - TriggerKill bool - Version string `json:"Version"` - IndexEngineVersion string `json:"IndexEngineVersion,omitempty"` - LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"` - } - err := json.Unmarshal(data, &raw) + err := json.Unmarshal(data, &s.SessionRaw) if err != nil { return err } - if raw.Version != "" { - s.Version, err = semver.Parse(raw.Version) - if err != nil { - return err - } - } - - if raw.IndexEngineVersion != "" { - json.Unmarshal([]byte(raw.IndexEngineVersion), &s.IndexEngineVersion) + if s.SessionRaw.Version != "" { + s.Version, err = semver.Parse(s.SessionRaw.Version) if err != nil { return err } - } else { - // set zero when queryNode not register knowhere version - s.IndexEngineVersion.MinimalIndexVersion = 0 - s.IndexEngineVersion.CurrentIndexVersion = 0 } - s.ServerID = raw.ServerID - s.ServerName = raw.ServerName - s.Address = raw.Address - s.Exclusive = raw.Exclusive - s.Stopping = raw.Stopping - s.TriggerKill = raw.TriggerKill - s.leaseID = raw.LeaseID return nil } // MarshalJSON marshals session to bytes. func (s *Session) MarshalJSON() ([]byte, error) { - verStr := s.Version.String() - indexVerStr, err := json.Marshal(s.IndexEngineVersion) - if err != nil { - return nil, err - } - return json.Marshal(&struct { - ServerID int64 `json:"ServerID,omitempty"` - ServerName string `json:"ServerName,omitempty"` - Address string `json:"Address,omitempty"` - Exclusive bool `json:"Exclusive,omitempty"` - Stopping bool `json:"Stopping,omitempty"` - TriggerKill bool - Version string `json:"Version"` - IndexEngineVersion string `json:"IndexEngineVersion,omitempty"` - LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"` - }{ - ServerID: s.ServerID, - ServerName: s.ServerName, - Address: s.Address, - Exclusive: s.Exclusive, - Stopping: s.Stopping, - TriggerKill: s.TriggerKill, - Version: verStr, - IndexEngineVersion: string(indexVerStr), - LeaseID: s.leaseID, - }) + s.SessionRaw.Version = s.Version.String() + return json.Marshal(s.SessionRaw) } // NewSession is a helper to build Session object. @@ -443,7 +397,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er log.Error("register service", zap.Error(err)) return err } - s.leaseID = &resp.ID + s.LeaseID = &resp.ID sessionJSON, err := json.Marshal(s) if err != nil { @@ -516,21 +470,21 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes err := retry.Do(s.ctx, func() error { ctx, cancel := context.WithTimeout(s.keepAliveCtx, time.Second*10) defer cancel() - resp, err := s.etcdCli.KeepAliveOnce(ctx, *s.leaseID) + resp, err := s.etcdCli.KeepAliveOnce(ctx, *s.LeaseID) keepAliveOnceResp = resp return err }, retry.Attempts(3)) if err != nil { - log.Warn("fail to retry keepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Error(err)) + log.Warn("fail to retry keepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("LeaseID", int64(*s.LeaseID)), zap.Error(err)) s.safeCloseLiveCh() return } - log.Info("succeed to KeepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("leaseID", int64(*s.leaseID)), zap.Any("resp", keepAliveOnceResp)) + log.Info("succeed to KeepAliveOnce", zap.String("serverName", s.ServerName), zap.Int64("LeaseID", int64(*s.LeaseID)), zap.Any("resp", keepAliveOnceResp)) var chNew <-chan *clientv3.LeaseKeepAliveResponse keepAliveFunc := func() error { var err1 error - chNew, err1 = s.etcdCli.KeepAlive(s.keepAliveCtx, *s.leaseID) + chNew, err1 = s.etcdCli.KeepAlive(s.keepAliveCtx, *s.LeaseID) return err1 } err = fnWithTimeout(keepAliveFunc, time.Second*10) @@ -627,7 +581,7 @@ func (s *Session) GetSessionsWithVersionRange(prefix string, r semver.Range) (ma } func (s *Session) GoingStop() error { - if s == nil || s.etcdCli == nil || s.leaseID == nil { + if s == nil || s.etcdCli == nil || s.LeaseID == nil { return errors.New("the session hasn't been init") } @@ -650,7 +604,7 @@ func (s *Session) GoingStop() error { log.Error("fail to marshal the session", zap.String("key", completeKey)) return err } - _, err = s.etcdCli.Put(s.ctx, completeKey, string(sessionJSON), clientv3.WithLease(*s.leaseID)) + _, err = s.etcdCli.Put(s.ctx, completeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID)) if err != nil { log.Error("fail to update the session to stopping state", zap.String("key", completeKey)) return err @@ -906,12 +860,12 @@ func (s *Session) Stop() { s.wg.Wait() } -// Revoke revokes the internal leaseID for the session key +// Revoke revokes the internal LeaseID for the session key func (s *Session) Revoke(timeout time.Duration) { if s == nil { return } - if s.etcdCli == nil || s.leaseID == nil { + if s.etcdCli == nil || s.LeaseID == nil { return } if s.Disconnected() { @@ -921,7 +875,7 @@ func (s *Session) Revoke(timeout time.Duration) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // ignores resp & error, just do best effort to revoke - _, _ = s.etcdCli.Revoke(ctx, *s.leaseID) + _, _ = s.etcdCli.Revoke(ctx, *s.LeaseID) } // UpdateRegistered update the state of registered. @@ -998,7 +952,7 @@ func (s *Session) ProcessActiveStandBy(activateFunc func() error) error { clientv3.Version(s.activeKey), "=", 0)). - Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.leaseID))).Commit() + Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit() if err != nil { log.Error("register active key to etcd failed", zap.Error(err)) return false, -1, err @@ -1085,7 +1039,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error { if len(sessions) != 0 { activeSess := sessions[s.ServerName] - if activeSess == nil || activeSess.leaseID == nil { + if activeSess == nil || activeSess.LeaseID == nil { // force delete all old sessions s.etcdCli.Delete(s.ctx, s.activeKey) for _, sess := range sessions { @@ -1097,7 +1051,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error { } } else { // force release old active session - _, _ = s.etcdCli.Revoke(s.ctx, *activeSess.leaseID) + _, _ = s.etcdCli.Revoke(s.ctx, *activeSess.LeaseID) } } @@ -1107,7 +1061,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error { clientv3.Version(s.activeKey), "=", 0)). - Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.leaseID))).Commit() + Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit() if !resp.Succeeded { msg := fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName) diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index c5da64a5e2b54..5f856c4230e51 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -98,7 +98,7 @@ func TestInit(t *testing.T) { s := NewSession(ctx, metaRoot, etcdCli) s.Init("inittest", "testAddr", false, false) - assert.NotEqual(t, int64(0), s.leaseID) + assert.NotEqual(t, int64(0), s.LeaseID) assert.NotEqual(t, int64(0), s.ServerID) s.Register() sessions, _, err := s.GetSessions("inittest") @@ -400,10 +400,12 @@ func TestSession_String(t *testing.T) { func TestSesssionMarshal(t *testing.T) { s := &Session{ - ServerID: 1, - ServerName: "test", - Address: "localhost", - Version: common.Version, + SessionRaw: SessionRaw{ + ServerID: 1, + ServerName: "test", + Address: "localhost", + }, + Version: common.Version, } bs, err := json.Marshal(s) @@ -663,7 +665,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, _ = s1.etcdCli.Revoke(ctx, *s1.leaseID) + _, _ = s1.etcdCli.Revoke(ctx, *s1.LeaseID) } select { case <-signal: