diff --git a/data/downloader/downloader.go b/data/downloader/downloader.go index e90ec1b50..8b817fe4a 100644 --- a/data/downloader/downloader.go +++ b/data/downloader/downloader.go @@ -2,6 +2,7 @@ package downloader import ( "context" + "errors" "fmt" "maps" "os" @@ -17,7 +18,7 @@ import ( const ( // ImportQueueRoutines is the number of parallel routines processing the // remote file download queue. - ImportQueueRoutines = 10 + ImportQueueRoutines = 32 // ImportRetrieveTimeout the maximum duration the import queue will wait // for retrieving a remote file. ImportRetrieveTimeout = 5 * time.Minute @@ -117,15 +118,15 @@ func (d *Downloader) TotalItemsAdded() int32 { // handleImport fetches and imports a remote file. If the download fails, the file // is added to a secondary queue for retrying. func (d *Downloader) handleImport(item *DownloadItem) { - log.Debugw("pining remote file", "uri", item.URI) + log.Debugw("fetch queued remote file", "uri", item.URI) d.queueAddDelta(1) defer d.queueAddDelta(-1) ctx, cancel := context.WithTimeout(context.Background(), ImportRetrieveTimeout) data, err := d.RemoteStorage.Retrieve(ctx, item.URI, MaxFileSize) cancel() if err != nil { - if os.IsTimeout(err) { - log.Warnw("timeout importing file adding it to failed queue for retry", "uri", item.URI) + if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) { + log.Warnw("timeout importing file, adding it to failed queue for retry", "uri", item.URI) d.failedQueueLock.Lock() d.failedQueue[item.URI] = item d.failedQueueLock.Unlock()