Skip to content

Commit

Permalink
feat: optimize AnnouncePeer log and update api verison (#2921)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Dec 6, 2023
1 parent f4e326e commit cfcfbc3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 22 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.57
d7y.io/api/v2 v2.0.58
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.57 h1:oAm1BHiMgmgLFacm1x7TEtu0XH68f0gqZEdw4XLl+MA=
d7y.io/api/v2 v2.0.57/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
d7y.io/api/v2 v2.0.58 h1:tezD5celqb0t3lnCXJI2cORkA+BVpfazYeJ2TT7hEZk=
d7y.io/api/v2 v2.0.58/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
44 changes: 26 additions & 18 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,80 +109,88 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
logger := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
switch announcePeerRequest := req.GetRequest().(type) {
case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest:
logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %s", announcePeerRequest.RegisterPeerRequest.Download.Url)
if err := v.handleRegisterPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), announcePeerRequest.RegisterPeerRequest); err != nil {
registerPeerRequest := announcePeerRequest.RegisterPeerRequest
logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %s", registerPeerRequest.Download.Url)
if err := v.handleRegisterPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), registerPeerRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest:
logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %s", announcePeerRequest.RegisterSeedPeerRequest.Download.Url)
if err := v.handleRegisterSeedPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), announcePeerRequest.RegisterSeedPeerRequest); err != nil {
registerSeedPeerRequest := announcePeerRequest.RegisterSeedPeerRequest
logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %s", registerSeedPeerRequest.Download.Url)
if err := v.handleRegisterSeedPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), registerSeedPeerRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest)
logger.Info("receive AnnouncePeerRequest_DownloadPeerStartedRequest")
if err := v.handleDownloadPeerStartedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
logger.Info("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest")
if err := v.handleDownloadPeerBackToSourceStartedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_RescheduleRequest:
logger.Infof("receive AnnouncePeerRequest_RescheduleRequest: %#v", announcePeerRequest.RescheduleRequest)
logger.Infof("receive AnnouncePeerRequest_RescheduleRequest: %s", announcePeerRequest.RescheduleRequest.GetDescription())
if err := v.handleRescheduleRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %d %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())
if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
if err := v.handleDownloadPeerBackToSourceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPeerBackToSourceFinishedRequest); err != nil {
downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %d %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())
if err := v.handleDownloadPeerBackToSourceFinishedRequest(ctx, req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFailedRequest: %#v", announcePeerRequest.DownloadPeerFailedRequest)
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFailedRequest: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())
if err := v.handleDownloadPeerFailedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFailedRequest)
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription())
if err := v.handleDownloadPeerBackToSourceFailedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest)
piece := announcePeerRequest.DownloadPieceFinishedRequest.Piece
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %d %d %s %s %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())
if err := v.handleDownloadPieceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceFinishedRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
piece := announcePeerRequest.DownloadPieceBackToSourceFinishedRequest.Piece
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %d %d %s %s %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())
if err := v.handleDownloadPieceBackToSourceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceBackToSourceFinishedRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest)
if err := v.handleDownloadPieceFailedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceFailedRequest); err != nil {
downloadPieceFailedRequest := announcePeerRequest.DownloadPieceFailedRequest
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %d %t %s", downloadPieceFailedRequest.GetPieceNumber(), downloadPieceFailedRequest.GetTemporary(), downloadPieceFailedRequest.GetParentId())
if err := v.handleDownloadPieceFailedRequest(ctx, req.GetPeerId(), downloadPieceFailedRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
if err := v.handleDownloadPieceBackToSourceFailedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceBackToSourceFailedRequest); err != nil {
downloadPieceBackToSourceFailedRequest := announcePeerRequest.DownloadPieceBackToSourceFailedRequest
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %d", downloadPieceBackToSourceFailedRequest.GetPieceNumber())
if err := v.handleDownloadPieceBackToSourceFailedRequest(ctx, req.GetPeerId(), downloadPieceBackToSourceFailedRequest); err != nil {
logger.Error(err)
return err
}
Expand Down
4 changes: 3 additions & 1 deletion scheduler/service/service_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3065,6 +3065,8 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
}

func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
mockPieceNumber := uint32(mockPiece.Number)

tests := []struct {
name string
req *schedulerv2.DownloadPieceBackToSourceFailedRequest
Expand All @@ -3088,7 +3090,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
{
name: "peer can be loaded",
req: &schedulerv2.DownloadPieceBackToSourceFailedRequest{
PieceNumber: uint32(mockPiece.Number),
PieceNumber: &mockPieceNumber,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder) {
Expand Down

0 comments on commit cfcfbc3

Please sign in to comment.