From d3030963a78fc5005ba91f4f74c3dfb62476b56a Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 12 Dec 2023 15:03:09 +0800 Subject: [PATCH] feat: remove concurrent_piece_count in scheduler Signed-off-by: Gaius --- api/manager/docs.go | 5 - api/manager/swagger.json | 5 - api/manager/swagger.yaml | 4 - client/daemon/peer/peertask_conductor.go | 13 +- client/daemon/peer/peertask_manager_test.go | 7 +- ...peertask_stream_backsource_partial_test.go | 7 +- client/daemon/peer/traffic_shaper_test.go | 7 +- go.mod | 2 +- go.sum | 4 +- manager/database/database.go | 3 +- manager/types/scheduler_cluster.go | 3 +- scheduler/config/constants.go | 3 - scheduler/scheduling/scheduling.go | 26 +-- scheduler/scheduling/scheduling_test.go | 197 +----------------- 14 files changed, 32 insertions(+), 254 deletions(-) diff --git a/api/manager/docs.go b/api/manager/docs.go index c32d51a0abf..9b8a5d6d9de 100644 --- a/api/manager/docs.go +++ b/api/manager/docs.go @@ -4839,11 +4839,6 @@ const docTemplate = `{ "d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": { "type": "object", "properties": { - "concurrent_piece_count": { - "type": "integer", - "maximum": 50, - "minimum": 1 - }, "load_limit": { "type": "integer", "maximum": 2000, diff --git a/api/manager/swagger.json b/api/manager/swagger.json index 02331bfa721..860b9770057 100644 --- a/api/manager/swagger.json +++ b/api/manager/swagger.json @@ -4833,11 +4833,6 @@ "d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": { "type": "object", "properties": { - "concurrent_piece_count": { - "type": "integer", - "maximum": 50, - "minimum": 1 - }, "load_limit": { "type": "integer", "maximum": 2000, diff --git a/api/manager/swagger.yaml b/api/manager/swagger.yaml index 99aa271a2e3..dbaadddc693 100644 --- a/api/manager/swagger.yaml +++ b/api/manager/swagger.yaml @@ -811,10 +811,6 @@ definitions: type: object d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig: properties: - concurrent_piece_count: - maximum: 50 - minimum: 1 - type: integer load_limit: maximum: 2000 minimum: 1 diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 0f32e13621f..c954b0f319c 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -747,13 +747,12 @@ loop: pt.Warnf("scheduler client send a peerPacket with empty peers") continue } - pt.Infof("receive new peer packet, main peer: %s, parallel count: %d", - peerPacket.MainPeer.PeerId, peerPacket.ParallelCount) + pt.Infof("receive new peer packet, main peer: %s", peerPacket.MainPeer.PeerId) pt.span.AddEvent("receive new peer packet", trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))) if !firstPacketReceived { - pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestQueue) + pt.initDownloadPieceWorkers(pieceRequestQueue) firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)) firstPeerSpan.End() } @@ -952,11 +951,9 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) { } } -func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue PieceDispatcher) { - if count < 1 { - count = 4 - } - for i := int32(0); i < count; i++ { +func (pt *peerTaskConductor) initDownloadPieceWorkers(pieceRequestQueue PieceDispatcher) { + count := 4 + for i := int32(0); i < int32(count); i++ { go pt.downloadPieceWorker(i, pieceRequestQueue) } } diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index facec4c5dd7..fc1091f849b 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -200,10 +200,9 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error") } return &schedulerv1.PeerPacket{ - Code: commonv1.Code_Success, - TaskId: opt.taskID, - SrcPid: "127.0.0.1", - ParallelCount: opt.pieceParallelCount, + Code: commonv1.Code_Success, + TaskId: opt.taskID, + SrcPid: "127.0.0.1", MainPeer: &schedulerv1.PeerPacket_DestPeer{ Ip: "127.0.0.1", RpcPort: port, diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index eb81a18f31e..e0b119ce07c 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -187,10 +187,9 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, } schedPeerPacket = true return &schedulerv1.PeerPacket{ - Code: commonv1.Code_Success, - TaskId: opt.taskID, - SrcPid: "127.0.0.1", - ParallelCount: opt.pieceParallelCount, + Code: commonv1.Code_Success, + TaskId: opt.taskID, + SrcPid: "127.0.0.1", MainPeer: &schedulerv1.PeerPacket_DestPeer{ Ip: "127.0.0.1", RpcPort: port, diff --git a/client/daemon/peer/traffic_shaper_test.go b/client/daemon/peer/traffic_shaper_test.go index a609b2db3ba..fb24867c1ea 100644 --- a/client/daemon/peer/traffic_shaper_test.go +++ b/client/daemon/peer/traffic_shaper_test.go @@ -189,10 +189,9 @@ func trafficShaperSetupPeerTaskManagerComponents(ctrl *gomock.Controller, opt tr return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error") } return &schedulerv1.PeerPacket{ - Code: commonv1.Code_Success, - TaskId: task.taskID, - SrcPid: "127.0.0.1", - ParallelCount: 4, + Code: commonv1.Code_Success, + TaskId: task.taskID, + SrcPid: "127.0.0.1", MainPeer: &schedulerv1.PeerPacket_DestPeer{ Ip: "127.0.0.1", RpcPort: port, diff --git a/go.mod b/go.mod index 3fe162c8b28..3ea4d531824 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.60 + d7y.io/api/v2 v2.0.62 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 255bbbfc006..d29ce41eeea 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.60 h1:er07NeKpjnBOB8JzkddjtGWNRdRkhavO1Qn+0meajVw= -d7y.io/api/v2 v2.0.60/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io= +d7y.io/api/v2 v2.0.62 h1:q4/r24DxWT+4zsGGMe8HqbjC3cw+B/s2+gwI2oKC7Og= +d7y.io/api/v2 v2.0.62/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/manager/database/database.go b/manager/database/database.go index c409f6e5793..4cb136584b9 100644 --- a/manager/database/database.go +++ b/manager/database/database.go @@ -116,8 +116,7 @@ func seed(db *gorm.DB) error { "filter_parent_limit": schedulerconfig.DefaultSchedulerFilterParentLimit, }, ClientConfig: map[string]any{ - "load_limit": schedulerconfig.DefaultPeerConcurrentUploadLimit, - "concurrent_piece_count": schedulerconfig.DefaultPeerConcurrentPieceCount, + "load_limit": schedulerconfig.DefaultPeerConcurrentUploadLimit, }, Scopes: map[string]any{}, IsDefault: true, diff --git a/manager/types/scheduler_cluster.go b/manager/types/scheduler_cluster.go index 7ccef86792a..363cd3b46da 100644 --- a/manager/types/scheduler_cluster.go +++ b/manager/types/scheduler_cluster.go @@ -57,8 +57,7 @@ type SchedulerClusterConfig struct { } type SchedulerClusterClientConfig struct { - LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=2000"` - ConcurrentPieceCount uint32 `yaml:"concurrentPieceCount" mapstructure:"concurrentPieceCount" json:"concurrent_piece_count" binding:"omitempty,gte=1,lte=50"` + LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=2000"` } type SchedulerClusterScopes struct { diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index 126fd8e0800..5beb3c9bd5f 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -30,9 +30,6 @@ const ( // DefaultPeerConcurrentUploadLimit is default number for peer concurrent upload limit. DefaultPeerConcurrentUploadLimit = 50 - // DefaultPeerConcurrentPieceCount is default number for pieces to concurrent downloading. - DefaultPeerConcurrentPieceCount = 4 - // DefaultSchedulerCandidateParentLimit is default limit the number of candidate parent. DefaultSchedulerCandidateParentLimit = 4 diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 4d9390d139d..df6fa7549a2 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -191,7 +191,7 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc // Send NormalTaskResponse to peer. peer.Log.Info("send NormalTaskResponse") if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ - Response: ConstructSuccessNormalTaskResponse(s.dynconfig, candidateParents), + Response: ConstructSuccessNormalTaskResponse(candidateParents), }); err != nil { peer.Log.Error(err) return status.Error(codes.FailedPrecondition, err.Error()) @@ -359,7 +359,7 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer // Send PeerPacket to peer. peer.Log.Info("send PeerPacket to peer") - if err := stream.Send(ConstructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil { + if err := stream.Send(ConstructSuccessPeerPacket(peer, candidateParents[0], candidateParents[1:])); err != nil { n++ peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error()) @@ -537,12 +537,7 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S // ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task. // Used only in v2 version of the grpc. -func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse { - concurrentPieceCount := config.DefaultPeerConcurrentPieceCount - if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 { - concurrentPieceCount = int(config.ConcurrentPieceCount) - } - +func ConstructSuccessNormalTaskResponse(candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse { var parents []*commonv2.Peer for _, candidateParent := range candidateParents { parent := &commonv2.Peer{ @@ -708,20 +703,14 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can return &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{ NormalTaskResponse: &schedulerv2.NormalTaskResponse{ - CandidateParents: parents, - ConcurrentPieceCount: uint32(concurrentPieceCount), + CandidateParents: parents, }, } } // ConstructSuccessPeerPacket constructs peer successful packet. // Used only in v1 version of the grpc. -func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket { - concurrentPieceCount := config.DefaultPeerConcurrentPieceCount - if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 { - concurrentPieceCount = int(config.ConcurrentPieceCount) - } - +func ConstructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket { var parents []*schedulerv1.PeerPacket_DestPeer for _, candidateParent := range candidateParents { parents = append(parents, &schedulerv1.PeerPacket_DestPeer{ @@ -732,9 +721,8 @@ func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resou } return &schedulerv1.PeerPacket{ - TaskId: peer.Task.ID, - SrcPid: peer.ID, - ParallelCount: int32(concurrentPieceCount), + TaskId: peer.Task.ID, + SrcPid: peer.ID, MainPeer: &schedulerv1.PeerPacket_DestPeer{ Ip: parent.Host.IP, RpcPort: parent.Host.Port, diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index d54ac1853e1..3cf9a36406a 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -437,9 +437,6 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { peer.StoreAnnouncePeerStream(stream) gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), - md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ - ConcurrentPieceCount: 2, - }, nil).Times(1), ma.Send(gomock.Any()).Return(nil).Times(1), ) }, @@ -726,9 +723,6 @@ func TestScheduling_ScheduleParentAndCandidateParents(t *testing.T) { peer.StoreReportPieceResultStream(stream) gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), - md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ - ConcurrentPieceCount: 2, - }, nil).Times(1), mr.Send(gomock.Any()).Return(nil).Times(1), ) }, @@ -1348,16 +1342,10 @@ func TestScheduling_FindSuccessParent(t *testing.T) { func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { tests := []struct { name string - mock func(md *configmocks.MockDynconfigInterfaceMockRecorder) expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) }{ { - name: "get concurrentPieceCount from dynconfig", - mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) { - md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ - ConcurrentPieceCount: 1, - }, nil).Times(1) - }, + name: "construct success normal task response", expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) { dgst := candidateParents[0].Task.Digest.String() @@ -1482,141 +1470,6 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { UpdatedAt: timestamppb.New(candidateParents[0].UpdatedAt.Load()), }, }, - ConcurrentPieceCount: 1, - }, - }) - }, - }, - { - name: "use default concurrentPieceCount", - mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) { - md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1) - }, - expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_NormalTaskResponse, candidateParents []*resource.Peer) { - dgst := candidateParents[0].Task.Digest.String() - - assert := assert.New(t) - assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{ - NormalTaskResponse: &schedulerv2.NormalTaskResponse{ - CandidateParents: []*commonv2.Peer{ - { - Id: candidateParents[0].ID, - Range: &commonv2.Range{ - Start: uint64(candidateParents[0].Range.Start), - Length: uint64(candidateParents[0].Range.Length), - }, - Priority: candidateParents[0].Priority, - Pieces: []*commonv2.Piece{ - { - Number: uint32(mockPiece.Number), - ParentId: &mockPiece.ParentID, - Offset: mockPiece.Offset, - Length: mockPiece.Length, - Digest: mockPiece.Digest.String(), - TrafficType: &mockPiece.TrafficType, - Cost: durationpb.New(mockPiece.Cost), - CreatedAt: timestamppb.New(mockPiece.CreatedAt), - }, - }, - Cost: durationpb.New(candidateParents[0].Cost.Load()), - State: candidateParents[0].FSM.Current(), - Task: &commonv2.Task{ - Id: candidateParents[0].Task.ID, - Type: candidateParents[0].Task.Type, - Url: candidateParents[0].Task.URL, - Digest: &dgst, - Tag: &candidateParents[0].Task.Tag, - Application: &candidateParents[0].Task.Application, - Filters: candidateParents[0].Task.Filters, - Header: candidateParents[0].Task.Header, - PieceLength: uint32(candidateParents[0].Task.PieceLength), - ContentLength: uint64(candidateParents[0].Task.ContentLength.Load()), - PieceCount: uint32(candidateParents[0].Task.TotalPieceCount.Load()), - SizeScope: candidateParents[0].Task.SizeScope(), - Pieces: []*commonv2.Piece{ - { - Number: uint32(mockPiece.Number), - ParentId: &mockPiece.ParentID, - Offset: mockPiece.Offset, - Length: mockPiece.Length, - Digest: mockPiece.Digest.String(), - TrafficType: &mockPiece.TrafficType, - Cost: durationpb.New(mockPiece.Cost), - CreatedAt: timestamppb.New(mockPiece.CreatedAt), - }, - }, - State: candidateParents[0].Task.FSM.Current(), - PeerCount: uint32(candidateParents[0].Task.PeerCount()), - CreatedAt: timestamppb.New(candidateParents[0].Task.CreatedAt.Load()), - UpdatedAt: timestamppb.New(candidateParents[0].Task.UpdatedAt.Load()), - }, - Host: &commonv2.Host{ - Id: candidateParents[0].Host.ID, - Type: uint32(candidateParents[0].Host.Type), - Hostname: candidateParents[0].Host.Hostname, - Ip: candidateParents[0].Host.IP, - Port: candidateParents[0].Host.Port, - DownloadPort: candidateParents[0].Host.DownloadPort, - Os: candidateParents[0].Host.OS, - Platform: candidateParents[0].Host.Platform, - PlatformFamily: candidateParents[0].Host.PlatformFamily, - PlatformVersion: candidateParents[0].Host.PlatformVersion, - KernelVersion: candidateParents[0].Host.KernelVersion, - Cpu: &commonv2.CPU{ - LogicalCount: candidateParents[0].Host.CPU.LogicalCount, - PhysicalCount: candidateParents[0].Host.CPU.PhysicalCount, - Percent: candidateParents[0].Host.CPU.Percent, - ProcessPercent: candidateParents[0].Host.CPU.ProcessPercent, - Times: &commonv2.CPUTimes{ - User: candidateParents[0].Host.CPU.Times.User, - System: candidateParents[0].Host.CPU.Times.System, - Idle: candidateParents[0].Host.CPU.Times.Idle, - Nice: candidateParents[0].Host.CPU.Times.Nice, - Iowait: candidateParents[0].Host.CPU.Times.Iowait, - Irq: candidateParents[0].Host.CPU.Times.Irq, - Softirq: candidateParents[0].Host.CPU.Times.Softirq, - Steal: candidateParents[0].Host.CPU.Times.Steal, - Guest: candidateParents[0].Host.CPU.Times.Guest, - GuestNice: candidateParents[0].Host.CPU.Times.GuestNice, - }, - }, - Memory: &commonv2.Memory{ - Total: candidateParents[0].Host.Memory.Total, - Available: candidateParents[0].Host.Memory.Available, - Used: candidateParents[0].Host.Memory.Used, - UsedPercent: candidateParents[0].Host.Memory.UsedPercent, - ProcessUsedPercent: candidateParents[0].Host.Memory.ProcessUsedPercent, - Free: candidateParents[0].Host.Memory.Free, - }, - Network: &commonv2.Network{ - TcpConnectionCount: candidateParents[0].Host.Network.TCPConnectionCount, - UploadTcpConnectionCount: candidateParents[0].Host.Network.UploadTCPConnectionCount, - Location: &candidateParents[0].Host.Network.Location, - Idc: &candidateParents[0].Host.Network.IDC, - }, - Disk: &commonv2.Disk{ - Total: candidateParents[0].Host.Disk.Total, - Free: candidateParents[0].Host.Disk.Free, - Used: candidateParents[0].Host.Disk.Used, - UsedPercent: candidateParents[0].Host.Disk.UsedPercent, - InodesTotal: candidateParents[0].Host.Disk.InodesTotal, - InodesUsed: candidateParents[0].Host.Disk.InodesUsed, - InodesFree: candidateParents[0].Host.Disk.InodesFree, - InodesUsedPercent: candidateParents[0].Host.Disk.InodesUsedPercent, - }, - Build: &commonv2.Build{ - GitVersion: candidateParents[0].Host.Build.GitVersion, - GitCommit: &candidateParents[0].Host.Build.GitCommit, - GoVersion: &candidateParents[0].Host.Build.GoVersion, - Platform: &candidateParents[0].Host.Build.Platform, - }, - }, - NeedBackToSource: candidateParents[0].NeedBackToSource.Load(), - CreatedAt: timestamppb.New(candidateParents[0].CreatedAt.Load()), - UpdatedAt: timestamppb.New(candidateParents[0].UpdatedAt.Load()), - }, - }, - ConcurrentPieceCount: 4, }, }) }, @@ -1627,7 +1480,6 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - dynconfig := configmocks.NewMockDynconfigInterface(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) @@ -1639,8 +1491,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { candidateParents[0].StorePiece(&mockPiece) candidateParents[0].Task.StorePiece(&mockPiece) - tc.mock(dynconfig.EXPECT()) - tc.expect(t, ConstructSuccessNormalTaskResponse(dynconfig, candidateParents), candidateParents) + tc.expect(t, ConstructSuccessNormalTaskResponse(candidateParents), candidateParents) }) } } @@ -1648,49 +1499,15 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) { func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) { tests := []struct { name string - mock func(md *configmocks.MockDynconfigInterfaceMockRecorder) expect func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer) }{ { - name: "get concurrentPieceCount from dynconfig", - mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) { - md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{ - ConcurrentPieceCount: 1, - }, nil).Times(1) - }, + name: "construct success peer packet", expect: func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer) { assert := assert.New(t) assert.EqualValues(packet, &schedulerv1.PeerPacket{ - TaskId: mockTaskID, - SrcPid: mockPeerID, - ParallelCount: 1, - MainPeer: &schedulerv1.PeerPacket_DestPeer{ - Ip: parent.Host.IP, - RpcPort: parent.Host.Port, - PeerId: parent.ID, - }, - CandidatePeers: []*schedulerv1.PeerPacket_DestPeer{ - { - Ip: candidateParents[0].Host.IP, - RpcPort: candidateParents[0].Host.Port, - PeerId: candidateParents[0].ID, - }, - }, - Code: commonv1.Code_Success, - }) - }, - }, - { - name: "use default concurrentPieceCount", - mock: func(md *configmocks.MockDynconfigInterfaceMockRecorder) { - md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1) - }, - expect: func(t *testing.T, packet *schedulerv1.PeerPacket, parent *resource.Peer, candidateParents []*resource.Peer) { - assert := assert.New(t) - assert.EqualValues(packet, &schedulerv1.PeerPacket{ - TaskId: mockTaskID, - SrcPid: mockPeerID, - ParallelCount: 4, + TaskId: mockTaskID, + SrcPid: mockPeerID, MainPeer: &schedulerv1.PeerPacket_DestPeer{ Ip: parent.Host.IP, RpcPort: parent.Host.Port, @@ -1713,7 +1530,6 @@ func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() - dynconfig := configmocks.NewMockDynconfigInterface(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) @@ -1723,8 +1539,7 @@ func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) { parent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost) candidateParents := []*resource.Peer{resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost)} - tc.mock(dynconfig.EXPECT()) - tc.expect(t, ConstructSuccessPeerPacket(dynconfig, peer, parent, candidateParents), parent, candidateParents) + tc.expect(t, ConstructSuccessPeerPacket(peer, parent, candidateParents), parent, candidateParents) }) } }