Skip to content

Commit

Permalink
offchaindownloader: fix on ipfs queue
Browse files Browse the repository at this point in the history
Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed Nov 8, 2023
1 parent b676010 commit a2ab04f
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions data/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package downloader

import (
"context"
"errors"
"fmt"
"maps"
"os"
Expand All @@ -17,7 +18,7 @@ import (
const (
// ImportQueueRoutines is the number of parallel routines processing the
// remote file download queue.
ImportQueueRoutines = 10
ImportQueueRoutines = 32
// ImportRetrieveTimeout the maximum duration the import queue will wait
// for retrieving a remote file.
ImportRetrieveTimeout = 5 * time.Minute
Expand Down Expand Up @@ -117,15 +118,15 @@ func (d *Downloader) TotalItemsAdded() int32 {
// handleImport fetches and imports a remote file. If the download fails, the file
// is added to a secondary queue for retrying.
func (d *Downloader) handleImport(item *DownloadItem) {
log.Debugw("pining remote file", "uri", item.URI)
log.Debugw("fetch queued remote file", "uri", item.URI)
d.queueAddDelta(1)
defer d.queueAddDelta(-1)
ctx, cancel := context.WithTimeout(context.Background(), ImportRetrieveTimeout)
data, err := d.RemoteStorage.Retrieve(ctx, item.URI, MaxFileSize)
cancel()
if err != nil {
if os.IsTimeout(err) {
log.Warnw("timeout importing file adding it to failed queue for retry", "uri", item.URI)
if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) {
log.Warnw("timeout importing file, adding it to failed queue for retry", "uri", item.URI)
d.failedQueueLock.Lock()
d.failedQueue[item.URI] = item
d.failedQueueLock.Unlock()
Expand Down

0 comments on commit a2ab04f

Please sign in to comment.