Skip to content

Commit

Permalink
feat: optimize piece download failed handler (#2883)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 14, 2023
1 parent 4d4ea8d commit 79b4aab
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 26 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.46
d7y.io/api/v2 v2.0.48
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.46 h1:oPPjp3eKUDAWX9VnCdKG3Mpdwdp57a4gRfnLAZHyMiw=
d7y.io/api/v2 v2.0.46/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
d7y.io/api/v2 v2.0.48 h1:vhEjO2OsL+eWo+z60wFeopEnyMbt5ti5fkiWwQflJY4=
d7y.io/api/v2 v2.0.48/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
12 changes: 6 additions & 6 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,16 +1203,16 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string
}

// Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
metrics.DownloadPieceCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()

if req.Temporary {
// Handle peer with piece temporary failed request.
peer.UpdatedAt.Store(time.Now())
peer.BlockParents.Add(req.Piece.GetParentId())
if parent, loaded := v.resource.PeerManager().Load(req.Piece.GetParentId()); loaded {
peer.BlockParents.Add(req.GetParentId())
if parent, loaded := v.resource.PeerManager().Load(req.GetParentId()); loaded {
parent.Host.UploadFailedCount.Inc()
}

Expand All @@ -1238,9 +1238,9 @@ func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, p
peer.Task.UpdatedAt.Store(time.Now())

// Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
metrics.DownloadPieceCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()

return status.Error(codes.Internal, "download piece from source failed")
Expand Down
26 changes: 9 additions & 17 deletions scheduler/service/service_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2963,9 +2963,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
{
name: "peer can not be loaded",
req: &schedulerv2.DownloadPieceFailedRequest{
Piece: &commonv2.Piece{
ParentId: &mockSeedPeerID,
},
ParentId: mockSeedPeerID,
Temporary: true,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
Expand All @@ -2982,9 +2980,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
{
name: "temporary is false",
req: &schedulerv2.DownloadPieceFailedRequest{
Piece: &commonv2.Piece{
ParentId: &mockSeedPeerID,
},
ParentId: mockSeedPeerID,
Temporary: false,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
Expand All @@ -3001,9 +2997,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
{
name: "parent can not be loaded",
req: &schedulerv2.DownloadPieceFailedRequest{
Piece: &commonv2.Piece{
ParentId: &mockSeedPeerID,
},
ParentId: mockSeedPeerID,
Temporary: true,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
Expand All @@ -3012,22 +3006,20 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(nil, false).Times(1),
mp.Load(gomock.Eq(req.GetParentId())).Return(nil, false).Times(1),
)

assert := assert.New(t)
assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req))
assert.NotEqual(peer.UpdatedAt.Load(), 0)
assert.True(peer.BlockParents.Contains(req.Piece.GetParentId()))
assert.True(peer.BlockParents.Contains(req.GetParentId()))
assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
},
},
{
name: "parent can be loaded",
req: &schedulerv2.DownloadPieceFailedRequest{
Piece: &commonv2.Piece{
ParentId: &mockSeedPeerID,
},
ParentId: mockSeedPeerID,
Temporary: true,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
Expand All @@ -3036,13 +3028,13 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(peer, true).Times(1),
mp.Load(gomock.Eq(req.GetParentId())).Return(peer, true).Times(1),
)

assert := assert.New(t)
assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req))
assert.NotEqual(peer.UpdatedAt.Load(), 0)
assert.True(peer.BlockParents.Contains(req.Piece.GetParentId()))
assert.True(peer.BlockParents.Contains(req.GetParentId()))
assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
assert.Equal(peer.Host.UploadFailedCount.Load(), int64(1))
},
Expand Down Expand Up @@ -3096,7 +3088,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
{
name: "peer can be loaded",
req: &schedulerv2.DownloadPieceBackToSourceFailedRequest{
Piece: &commonv2.Piece{},
PieceNumber: mockPiece.Number,
},
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
mp *resource.MockPeerManagerMockRecorder) {
Expand Down

0 comments on commit 79b4aab

Please sign in to comment.