Skip to content

Commit

Permalink
feat: remove concurrent_piece_count in scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Dec 12, 2023
1 parent 04f65de commit d303096
Show file tree
Hide file tree
Showing 14 changed files with 32 additions and 254 deletions.
5 changes: 0 additions & 5 deletions api/manager/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4839,11 +4839,6 @@ const docTemplate = `{
"d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": {
"type": "object",
"properties": {
"concurrent_piece_count": {
"type": "integer",
"maximum": 50,
"minimum": 1
},
"load_limit": {
"type": "integer",
"maximum": 2000,
Expand Down
5 changes: 0 additions & 5 deletions api/manager/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4833,11 +4833,6 @@
"d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig": {
"type": "object",
"properties": {
"concurrent_piece_count": {
"type": "integer",
"maximum": 50,
"minimum": 1
},
"load_limit": {
"type": "integer",
"maximum": 2000,
Expand Down
4 changes: 0 additions & 4 deletions api/manager/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -811,10 +811,6 @@ definitions:
type: object
d7y_io_dragonfly_v2_manager_types.SchedulerClusterClientConfig:
properties:
concurrent_piece_count:
maximum: 50
minimum: 1
type: integer
load_limit:
maximum: 2000
minimum: 1
Expand Down
13 changes: 5 additions & 8 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,13 +747,12 @@ loop:
pt.Warnf("scheduler client send a peerPacket with empty peers")
continue
}
pt.Infof("receive new peer packet, main peer: %s, parallel count: %d",
peerPacket.MainPeer.PeerId, peerPacket.ParallelCount)
pt.Infof("receive new peer packet, main peer: %s", peerPacket.MainPeer.PeerId)
pt.span.AddEvent("receive new peer packet",
trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)))

if !firstPacketReceived {
pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestQueue)
pt.initDownloadPieceWorkers(pieceRequestQueue)
firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))
firstPeerSpan.End()
}
Expand Down Expand Up @@ -952,11 +951,9 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
}
}

func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue PieceDispatcher) {
if count < 1 {
count = 4
}
for i := int32(0); i < count; i++ {
func (pt *peerTaskConductor) initDownloadPieceWorkers(pieceRequestQueue PieceDispatcher) {
count := 4
for i := int32(0); i < int32(count); i++ {
go pt.downloadPieceWorker(i, pieceRequestQueue)
}
}
Expand Down
7 changes: 3 additions & 4 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,9 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error")
}
return &schedulerv1.PeerPacket{
Code: commonv1.Code_Success,
TaskId: opt.taskID,
SrcPid: "127.0.0.1",
ParallelCount: opt.pieceParallelCount,
Code: commonv1.Code_Success,
TaskId: opt.taskID,
SrcPid: "127.0.0.1",
MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: "127.0.0.1",
RpcPort: port,
Expand Down
7 changes: 3 additions & 4 deletions client/daemon/peer/peertask_stream_backsource_partial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,9 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
}
schedPeerPacket = true
return &schedulerv1.PeerPacket{
Code: commonv1.Code_Success,
TaskId: opt.taskID,
SrcPid: "127.0.0.1",
ParallelCount: opt.pieceParallelCount,
Code: commonv1.Code_Success,
TaskId: opt.taskID,
SrcPid: "127.0.0.1",
MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: "127.0.0.1",
RpcPort: port,
Expand Down
7 changes: 3 additions & 4 deletions client/daemon/peer/traffic_shaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,9 @@ func trafficShaperSetupPeerTaskManagerComponents(ctrl *gomock.Controller, opt tr
return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error")
}
return &schedulerv1.PeerPacket{
Code: commonv1.Code_Success,
TaskId: task.taskID,
SrcPid: "127.0.0.1",
ParallelCount: 4,
Code: commonv1.Code_Success,
TaskId: task.taskID,
SrcPid: "127.0.0.1",
MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: "127.0.0.1",
RpcPort: port,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.60
d7y.io/api/v2 v2.0.62
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.60 h1:er07NeKpjnBOB8JzkddjtGWNRdRkhavO1Qn+0meajVw=
d7y.io/api/v2 v2.0.60/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
d7y.io/api/v2 v2.0.62 h1:q4/r24DxWT+4zsGGMe8HqbjC3cw+B/s2+gwI2oKC7Og=
d7y.io/api/v2 v2.0.62/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
3 changes: 1 addition & 2 deletions manager/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ func seed(db *gorm.DB) error {
"filter_parent_limit": schedulerconfig.DefaultSchedulerFilterParentLimit,
},
ClientConfig: map[string]any{
"load_limit": schedulerconfig.DefaultPeerConcurrentUploadLimit,
"concurrent_piece_count": schedulerconfig.DefaultPeerConcurrentPieceCount,
"load_limit": schedulerconfig.DefaultPeerConcurrentUploadLimit,
},
Scopes: map[string]any{},
IsDefault: true,
Expand Down
3 changes: 1 addition & 2 deletions manager/types/scheduler_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ type SchedulerClusterConfig struct {
}

type SchedulerClusterClientConfig struct {
LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=2000"`
ConcurrentPieceCount uint32 `yaml:"concurrentPieceCount" mapstructure:"concurrentPieceCount" json:"concurrent_piece_count" binding:"omitempty,gte=1,lte=50"`
LoadLimit uint32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=2000"`
}

type SchedulerClusterScopes struct {
Expand Down
3 changes: 0 additions & 3 deletions scheduler/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ const (
// DefaultPeerConcurrentUploadLimit is default number for peer concurrent upload limit.
DefaultPeerConcurrentUploadLimit = 50

// DefaultPeerConcurrentPieceCount is default number for pieces to concurrent downloading.
DefaultPeerConcurrentPieceCount = 4

// DefaultSchedulerCandidateParentLimit is default limit the number of candidate parent.
DefaultSchedulerCandidateParentLimit = 4

Expand Down
26 changes: 7 additions & 19 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
// Send NormalTaskResponse to peer.
peer.Log.Info("send NormalTaskResponse")
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Response: ConstructSuccessNormalTaskResponse(s.dynconfig, candidateParents),
Response: ConstructSuccessNormalTaskResponse(candidateParents),
}); err != nil {
peer.Log.Error(err)
return status.Error(codes.FailedPrecondition, err.Error())
Expand Down Expand Up @@ -359,7 +359,7 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer

// Send PeerPacket to peer.
peer.Log.Info("send PeerPacket to peer")
if err := stream.Send(ConstructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil {
if err := stream.Send(ConstructSuccessPeerPacket(peer, candidateParents[0], candidateParents[1:])); err != nil {
n++
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())

Expand Down Expand Up @@ -537,12 +537,7 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S

// ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task.
// Used only in v2 version of the grpc.
func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
concurrentPieceCount = int(config.ConcurrentPieceCount)
}

func ConstructSuccessNormalTaskResponse(candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
var parents []*commonv2.Peer
for _, candidateParent := range candidateParents {
parent := &commonv2.Peer{
Expand Down Expand Up @@ -708,20 +703,14 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can

return &schedulerv2.AnnouncePeerResponse_NormalTaskResponse{
NormalTaskResponse: &schedulerv2.NormalTaskResponse{
CandidateParents: parents,
ConcurrentPieceCount: uint32(concurrentPieceCount),
CandidateParents: parents,
},
}
}

// ConstructSuccessPeerPacket constructs peer successful packet.
// Used only in v1 version of the grpc.
func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket {
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
concurrentPieceCount = int(config.ConcurrentPieceCount)
}

func ConstructSuccessPeerPacket(peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket {
var parents []*schedulerv1.PeerPacket_DestPeer
for _, candidateParent := range candidateParents {
parents = append(parents, &schedulerv1.PeerPacket_DestPeer{
Expand All @@ -732,9 +721,8 @@ func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resou
}

return &schedulerv1.PeerPacket{
TaskId: peer.Task.ID,
SrcPid: peer.ID,
ParallelCount: int32(concurrentPieceCount),
TaskId: peer.Task.ID,
SrcPid: peer.ID,
MainPeer: &schedulerv1.PeerPacket_DestPeer{
Ip: parent.Host.IP,
RpcPort: parent.Host.Port,
Expand Down
Loading

0 comments on commit d303096

Please sign in to comment.