From 82a4443aad0bfabfa09d98f31113453c17ca5ba4 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 3 Oct 2023 16:48:18 +1100 Subject: [PATCH] fix: wire-up and make duplicate dupe-adder support cli Fixes: https://github.com/filecoin-project/lassie/issues/445 --- cmd/lassie/fetch.go | 21 ++++++++++---- pkg/storage/duplicateaddercar.go | 49 ++++++++++++++++++++++++++++---- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index c0d5319f..992f7ddd 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -338,23 +338,34 @@ func defaultFetchRun( lassie.RegisterSubscriber(pp.subscriber) } - var carWriter *deferred.DeferredCarWriter + var carWriter storage.DeferredWriter carOpts := []car.Option{ car.WriteAsCarV1(true), car.StoreIdentityCIDs(false), car.UseWholeCIDs(false), - car.AllowDuplicatePuts(duplicates), } + + tempStore := storage.NewDeferredStorageCar(tempDir, rootCid) + if outfile == stdoutFileString { // we need the onlyWriter because stdout is presented as an os.File, and // therefore pretend to support seeks, so feature-checking in go-car // will make bad assumptions about capabilities unless we hide it - carWriter = deferred.NewDeferredCarWriterForStream(&onlyWriter{dataWriter}, []cid.Cid{rootCid}, carOpts...) + w := &onlyWriter{dataWriter} + if duplicates { + carWriter = storage.NewDuplicateAdderCarForStream(ctx, w, rootCid, path.String(), dagScope, entityBytes, tempStore) + } else { + carWriter = deferred.NewDeferredCarWriterForStream(w, []cid.Cid{rootCid}, carOpts...) + } } else { - carWriter = deferred.NewDeferredCarWriterForPath(outfile, []cid.Cid{rootCid}, carOpts...) + if duplicates { + carWriter = storage.NewDuplicateAdderCarForPath(ctx, outfile, rootCid, path.String(), dagScope, entityBytes, tempStore) + } else { + carWriter = deferred.NewDeferredCarWriterForPath(outfile, []cid.Cid{rootCid}, carOpts...) + } } + defer carWriter.Close() - tempStore := storage.NewDeferredStorageCar(tempDir, rootCid) carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer carStore.Close() diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index cd6bfac4..e67a121e 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -53,11 +53,6 @@ func NewDuplicateAdderCarForStream( store *DeferredStorageCar, ) *DuplicateAdderCar { - blockStream := &blockStream{ctx: ctx, seen: make(map[cid.Cid]struct{})} - blockStream.blockBuffer = list.New() - blockStream.cond = sync.NewCond(&blockStream.mu) - - // create the car writer for the final stream outgoing := deferred.NewDeferredCarWriterForStream( outStream, []cid.Cid{root}, @@ -65,6 +60,43 @@ func NewDuplicateAdderCarForStream( carv2.StoreIdentityCIDs(false), carv2.UseWholeCIDs(true), ) + + return newDuplicateAdderCar(ctx, root, path, scope, bytes, store, outgoing) +} + +func NewDuplicateAdderCarForPath( + ctx context.Context, + outPath string, + root cid.Cid, + path string, + scope trustlessutils.DagScope, + bytes *trustlessutils.ByteRange, + store *DeferredStorageCar, +) *DuplicateAdderCar { + + outgoing := deferred.NewDeferredCarWriterForPath( + outPath, + []cid.Cid{root}, + carv2.AllowDuplicatePuts(true), + carv2.StoreIdentityCIDs(false), + carv2.UseWholeCIDs(true), + ) + + return newDuplicateAdderCar(ctx, root, path, scope, bytes, store, outgoing) +} + +func newDuplicateAdderCar( + ctx context.Context, + root cid.Cid, + path string, + scope trustlessutils.DagScope, + bytes *trustlessutils.ByteRange, + store *DeferredStorageCar, + outgoing *deferred.DeferredCarWriter, +) *DuplicateAdderCar { + blockStream := &blockStream{ctx: ctx, seen: make(map[cid.Cid]struct{})} + blockStream.blockBuffer = list.New() + blockStream.cond = sync.NewCond(&blockStream.mu) return &DuplicateAdderCar{ DeferredCarWriter: outgoing, ctx: ctx, @@ -147,7 +179,12 @@ func (da *DuplicateAdderCar) Close() error { if streamCompletion == nil { return nil } - return <-streamCompletion + err := <-streamCompletion + err2 := da.DeferredCarWriter.Close() + if err == nil { + err = err2 + } + return err } type blockStream struct {