Skip to content

Commit

Permalink
feat: back to source with piece group(multiple pieces) (#2826)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Dec 27, 2023
1 parent 086cb62 commit e329bdb
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 11 deletions.
203 changes: 196 additions & 7 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/go-http-utils/headers"
"go.uber.org/atomic"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -235,6 +236,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
return result, nil
}

// pieceOffset is the offset in the peer task, not the original range start from source
func (pm *pieceManager) processPieceFromSource(pt Task,
reader io.Reader, contentLength int64, pieceNum int32, pieceOffset uint64, pieceSize uint32,
isLastPiece func(n int64) (totalPieces int32, contentLength int64, ok bool)) (
Expand Down Expand Up @@ -796,34 +798,137 @@ func (pm *pieceManager) concurrentDownloadSource(ctx context.Context, pt Task, p
pt.SetContentLength(parsedRange.Length)
pt.SetTotalPieces(pieceCount)

pieceCountToDownload := pieceCount - continuePieceNum

con := pm.concurrentOption.GoroutineCount
// Fix int overflow
if int(pieceCount) > 0 && int(pieceCount) < con {
con = int(pieceCount)
if int(pieceCountToDownload) > 0 && int(pieceCountToDownload) < con {
con = int(pieceCountToDownload)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := pm.concurrentDownloadSourceByPiece(ctx, pt, peerTaskRequest, parsedRange, continuePieceNum, pieceCount, con, pieceSize, cancel)
if err != nil {
return err
return pm.concurrentDownloadSourceByPieceGroup(ctx, pt, peerTaskRequest, parsedRange, continuePieceNum, pieceCount, pieceCountToDownload, con, pieceSize, cancel)
}

func (pm *pieceManager) concurrentDownloadSourceByPieceGroup(
ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest,
parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, pieceCountToDownload int32,
con int, pieceSize uint32, cancel context.CancelFunc) error {
log := pt.Log()
log.Infof("start concurrentDownloadSourceByPieceGroup, startPieceNum: %d, pieceCount: %d, pieceCountToDownload: %d, con: %d, pieceSize: %d",
startPieceNum, pieceCount, pieceCountToDownload, con, pieceSize)

var downloadError atomic.Value
downloadedPieces := mapset.NewSet[int32]()

wg := sync.WaitGroup{}
wg.Add(con)

minPieceCountPerGroup := pieceCountToDownload / int32(con)
reminderPieces := pieceCountToDownload % int32(con)

// piece group eg:
// con = 4, piece = 5:
// worker 0: 2
// worker 1: 1
// worker 2: 1
// worker 3: 1
// worker 4: 1
for i := int32(0); i < int32(con); i++ {
go func(i int32) {
pg := newPieceGroup(i, reminderPieces, startPieceNum, minPieceCountPerGroup, pieceSize, parsedRange)
log.Infof("concurrent worker %d start to download piece %d-%d, byte %d-%d", i, pg.start, pg.end, pg.startByte, pg.endByte)
_, _, retryErr := retry.Run(ctx,
pm.concurrentOption.InitBackoff,
pm.concurrentOption.MaxBackoff,
pm.concurrentOption.MaxAttempts,
func() (data any, cancel bool, err error) {
err = pm.downloadPieceGroupFromSource(ctx, pt, log,
peerTaskRequest, pg, pieceCount, pieceCountToDownload, downloadedPieces)
return nil, errors.Is(err, context.Canceled), err
})
if retryErr != nil {
// download piece error after many retry, cancel task
cancel()
downloadError.Store(&backSourceError{err: retryErr})
log.Infof("concurrent worker %d failed to download piece group after %d retries, last error: %s",
i, pm.concurrentOption.MaxAttempts, retryErr.Error())
}
wg.Done()
}(i)
}

wg.Wait()

// check error
if downloadError.Load() != nil {
return downloadError.Load().(*backSourceError).err
}

return nil
}

type pieceGroup struct {
start, end int32
startByte, endByte int64
// store original task metadata
pieceSize uint32
parsedRange *nethttp.Range
}

func newPieceGroup(i int32, reminderPieces int32, startPieceNum int32, minPieceCountPerGroup int32, pieceSize uint32, parsedRange *nethttp.Range) *pieceGroup {
var (
start int32
end int32
)

if i < reminderPieces {
start = i*minPieceCountPerGroup + i
end = start + minPieceCountPerGroup
} else {
start = i*minPieceCountPerGroup + reminderPieces
end = start + minPieceCountPerGroup - 1
}

// adjust by startPieceNum
start += startPieceNum
end += startPieceNum

// calculate piece group first and last range byte with parsedRange.Start
startByte := int64(start) * int64(pieceSize)
endByte := int64(end+1)*int64(pieceSize) - 1
if endByte > parsedRange.Length-1 {
endByte = parsedRange.Length - 1
}

// adjust by range start
startByte += parsedRange.Start
endByte += parsedRange.Start

pg := &pieceGroup{
start: start,
end: end,
startByte: startByte,
endByte: endByte,
pieceSize: pieceSize,
parsedRange: parsedRange,
}
return pg
}

func (pm *pieceManager) concurrentDownloadSourceByPiece(
ctx context.Context, pt Task, peerTaskRequest *schedulerv1.PeerTaskRequest,
parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32,
parsedRange *nethttp.Range, startPieceNum int32, pieceCount int32, pieceCountToDownload int32,
con int, pieceSize uint32, cancel context.CancelFunc) error {

log := pt.Log()
var downloadError atomic.Value
var pieceCh = make(chan int32, con)

wg := sync.WaitGroup{}
wg.Add(int(pieceCount - startPieceNum))
wg.Add(int(pieceCountToDownload))

downloadedPieceCount := atomic.NewInt32(startPieceNum)

Expand Down Expand Up @@ -965,3 +1070,87 @@ func (pm *pieceManager) downloadPieceFromSource(ctx context.Context,
pt.PublishPieceInfo(pieceNum, uint32(result.Size))
return nil
}

func (pm *pieceManager) downloadPieceGroupFromSource(ctx context.Context,
pt Task, log *logger.SugaredLoggerOnWith,
peerTaskRequest *schedulerv1.PeerTaskRequest,
pg *pieceGroup,
totalPieceCount int32,
totalPieceCountToDownload int32,
downloadedPieces mapset.Set[int32]) error {

backSourceRequest, err := source.NewRequestWithContext(ctx, peerTaskRequest.Url, peerTaskRequest.UrlMeta.Header)
if err != nil {
log.Errorf("build piece %d-%d back source request error: %s", pg.start, pg.end, err)
return err
}

pieceGroupRange := fmt.Sprintf("%d-%d", pg.startByte, pg.endByte)
// FIXME refactor source package, normal Range header is enough
backSourceRequest.Header.Set(source.Range, pieceGroupRange)
backSourceRequest.Header.Set(headers.Range, "bytes="+pieceGroupRange)

log.Debugf("piece %d-%d back source header: %#v", pg.start, pg.end, backSourceRequest.Header)

response, err := source.Download(backSourceRequest)
if err != nil {
log.Errorf("piece %d-%d back source response error: %s", pg.start, pg.end, err)
return err
}
defer response.Body.Close()

err = response.Validate()
if err != nil {
log.Errorf("piece %d-%d back source response validate error: %s", pg.start, pg.end, err)
return err
}

log.Debugf("piece %d-%d back source response ok", pg.start, pg.end)

for i := pg.start; i <= pg.end; i++ {
pieceNum := i
offset := uint64(pg.startByte) + uint64(i-pg.start)*uint64(pg.pieceSize)
size := pg.pieceSize
// update last piece size
if offset+uint64(size)-1 > uint64(pg.endByte) {
size = uint32(uint64(pg.endByte) + 1 - offset)
}

result, md5, err := pm.processPieceFromSource(
pt, response.Body, pg.parsedRange.Length, pieceNum, offset-uint64(pg.parsedRange.Start), size,
func(int64) (int32, int64, bool) {
downloadedPieces.Add(pieceNum)
return totalPieceCount, pg.parsedRange.Length, downloadedPieces.Cardinality() == int(totalPieceCountToDownload)
})
request := &DownloadPieceRequest{
TaskID: pt.GetTaskID(),
PeerID: pt.GetPeerID(),
piece: &commonv1.PieceInfo{
PieceNum: pieceNum,
RangeStart: offset,
RangeSize: uint32(result.Size),
PieceMd5: md5,
PieceOffset: offset,
PieceStyle: 0,
},
}

if err != nil {
log.Errorf("download piece %d error: %s", pieceNum, err)
pt.ReportPieceResult(request, result, detectBackSourceError(err))
return err
}

if result.Size != int64(size) {
log.Errorf("download piece %d size not match, desired: %d, actual: %d", pieceNum, size, result.Size)
pt.ReportPieceResult(request, result, detectBackSourceError(err))
return storage.ErrShortRead
}

pt.ReportPieceResult(request, result, nil)
pt.PublishPieceInfo(pieceNum, uint32(result.Size))

log.Debugf("piece %d done", pieceNum)
}
return nil
}
Loading

0 comments on commit e329bdb

Please sign in to comment.