Skip to content

Commit

Permalink
fix: wire-up and make duplicate dupe-adder support cli
Browse files Browse the repository at this point in the history
Fixes: #445
  • Loading branch information
rvagg committed Oct 3, 2023
1 parent 4cf3374 commit 82a4443
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.
21 changes: 16 additions & 5 deletions cmd/lassie/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,23 +338,34 @@ func defaultFetchRun(
lassie.RegisterSubscriber(pp.subscriber)
}

var carWriter *deferred.DeferredCarWriter
var carWriter storage.DeferredWriter
carOpts := []car.Option{
car.WriteAsCarV1(true),
car.StoreIdentityCIDs(false),
car.UseWholeCIDs(false),
car.AllowDuplicatePuts(duplicates),
}

tempStore := storage.NewDeferredStorageCar(tempDir, rootCid)

if outfile == stdoutFileString {
// we need the onlyWriter because stdout is presented as an os.File, and
// therefore pretend to support seeks, so feature-checking in go-car
// will make bad assumptions about capabilities unless we hide it
carWriter = deferred.NewDeferredCarWriterForStream(&onlyWriter{dataWriter}, []cid.Cid{rootCid}, carOpts...)
w := &onlyWriter{dataWriter}
if duplicates {
carWriter = storage.NewDuplicateAdderCarForStream(ctx, w, rootCid, path.String(), dagScope, entityBytes, tempStore)
} else {
carWriter = deferred.NewDeferredCarWriterForStream(w, []cid.Cid{rootCid}, carOpts...)
}
} else {
carWriter = deferred.NewDeferredCarWriterForPath(outfile, []cid.Cid{rootCid}, carOpts...)
if duplicates {
carWriter = storage.NewDuplicateAdderCarForPath(ctx, outfile, rootCid, path.String(), dagScope, entityBytes, tempStore)
} else {
carWriter = deferred.NewDeferredCarWriterForPath(outfile, []cid.Cid{rootCid}, carOpts...)
}
}
defer carWriter.Close()

tempStore := storage.NewDeferredStorageCar(tempDir, rootCid)
carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore)
defer carStore.Close()

Expand Down
49 changes: 43 additions & 6 deletions pkg/storage/duplicateaddercar.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,50 @@ func NewDuplicateAdderCarForStream(
store *DeferredStorageCar,
) *DuplicateAdderCar {

blockStream := &blockStream{ctx: ctx, seen: make(map[cid.Cid]struct{})}
blockStream.blockBuffer = list.New()
blockStream.cond = sync.NewCond(&blockStream.mu)

// create the car writer for the final stream
outgoing := deferred.NewDeferredCarWriterForStream(
outStream,
[]cid.Cid{root},
carv2.AllowDuplicatePuts(true),
carv2.StoreIdentityCIDs(false),
carv2.UseWholeCIDs(true),
)

return newDuplicateAdderCar(ctx, root, path, scope, bytes, store, outgoing)
}

func NewDuplicateAdderCarForPath(
ctx context.Context,
outPath string,
root cid.Cid,
path string,
scope trustlessutils.DagScope,
bytes *trustlessutils.ByteRange,
store *DeferredStorageCar,
) *DuplicateAdderCar {

outgoing := deferred.NewDeferredCarWriterForPath(
outPath,
[]cid.Cid{root},
carv2.AllowDuplicatePuts(true),
carv2.StoreIdentityCIDs(false),
carv2.UseWholeCIDs(true),
)

return newDuplicateAdderCar(ctx, root, path, scope, bytes, store, outgoing)
}

func newDuplicateAdderCar(
ctx context.Context,
root cid.Cid,
path string,
scope trustlessutils.DagScope,
bytes *trustlessutils.ByteRange,
store *DeferredStorageCar,
outgoing *deferred.DeferredCarWriter,
) *DuplicateAdderCar {
blockStream := &blockStream{ctx: ctx, seen: make(map[cid.Cid]struct{})}
blockStream.blockBuffer = list.New()
blockStream.cond = sync.NewCond(&blockStream.mu)
return &DuplicateAdderCar{
DeferredCarWriter: outgoing,
ctx: ctx,
Expand Down Expand Up @@ -147,7 +179,12 @@ func (da *DuplicateAdderCar) Close() error {
if streamCompletion == nil {
return nil
}
return <-streamCompletion
err := <-streamCompletion
err2 := da.DeferredCarWriter.Close()
if err == nil {
err = err2
}
return err
}

type blockStream struct {
Expand Down

0 comments on commit 82a4443

Please sign in to comment.