diff --git a/share/eds/blockstore.go b/share/eds/blockstore.go index 349d6f58ba..9cbb3f4e8a 100644 --- a/share/eds/blockstore.go +++ b/share/eds/blockstore.go @@ -10,9 +10,8 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" ipld "github.com/ipfs/go-ipld-format" - - "github.com/celestiaorg/celestia-node/share/eds/cache" ) var _ bstore.Blockstore = (*blockstore)(nil) @@ -35,6 +34,13 @@ type blockstore struct { ds datastore.Batching } +func newBlockstore(store *Store, ds datastore.Batching) *blockstore { + return &blockstore{ + store: store, + ds: namespace.Wrap(ds, blockstoreCacheKey), + } +} + func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash()) if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) { @@ -146,28 +152,17 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (* return nil, fmt.Errorf("failed to find shards containing multihash: %w", err) } - // check if cache contains any of accessors + // check if either cache contains an accessor shardKey := keys[0] - if accessor, err := bs.store.cache.Get(shardKey); err == nil { + accessor, err := bs.store.cache.Get(shardKey) + if err == nil { return blockstoreCloser(accessor) } - // load accessor to the cache and use it as blockstoreCloser - accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor) + // load accessor to the blockstore cache and use it as blockstoreCloser + accessor, err = bs.store.cache.Second().GetOrLoad(ctx, shardKey, bs.store.getAccessor) if err != nil { return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err) } return blockstoreCloser(accessor) } - -// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor -func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) { - bs, err := ac.Blockstore() - if err != nil { - return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err) - } - return &BlockstoreCloser{ - ReadBlockstore: bs, - Closer: ac, - }, nil -} diff --git a/share/eds/cache/doublecache.go b/share/eds/cache/doublecache.go new file mode 100644 index 0000000000..a63eadee9e --- /dev/null +++ b/share/eds/cache/doublecache.go @@ -0,0 +1,51 @@ +package cache + +import ( + "errors" + + "github.com/filecoin-project/dagstore/shard" +) + +// DoubleCache represents a Cache that looks into multiple caches one by one. +type DoubleCache struct { + first, second Cache +} + +// NewDoubleCache creates a new DoubleCache with the provided caches. +func NewDoubleCache(first, second Cache) *DoubleCache { + return &DoubleCache{ + first: first, + second: second, + } +} + +// Get looks for an item in all the caches one by one and returns the Cache found item. +func (mc *DoubleCache) Get(key shard.Key) (Accessor, error) { + ac, err := mc.first.Get(key) + if err == nil { + return ac, nil + } + return mc.second.Get(key) +} + +// Remove removes an item from all underlying caches +func (mc *DoubleCache) Remove(key shard.Key) error { + err1 := mc.first.Remove(key) + err2 := mc.second.Remove(key) + return errors.Join(err1, err2) +} + +func (mc *DoubleCache) First() Cache { + return mc.first +} + +func (mc *DoubleCache) Second() Cache { + return mc.second +} + +func (mc *DoubleCache) EnableMetrics() error { + if err := mc.first.EnableMetrics(); err != nil { + return err + } + return mc.second.EnableMetrics() +} diff --git a/share/eds/store.go b/share/eds/store.go index e8caf4c35a..14df4a4bee 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -40,7 +40,8 @@ const ( // We don't use transient files right now, so GC is turned off by default. defaultGCInterval = 0 - defaultCacheSize = 128 + defaultRecentBlocksCacheSize = 10 + defaultBlockstoreCacheSize = 128 ) var ErrNotFound = errors.New("eds not found in store") @@ -56,7 +57,7 @@ type Store struct { mounts *mount.Registry bs *blockstore - cache cache.Cache + cache *cache.DoubleCache carIdx index.FullIndexRepo invertedIdx *simpleInvertedIndex @@ -114,11 +115,16 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { return nil, fmt.Errorf("failed to create DAGStore: %w", err) } - accessorCache, err := cache.NewAccessorCache("cache", defaultCacheSize) + recentBlocksCache, err := cache.NewAccessorCache("recent", defaultRecentBlocksCacheSize) if err != nil { return nil, fmt.Errorf("failed to create recent blocks cache: %w", err) } + blockstoreCache, err := cache.NewAccessorCache("blockstore", defaultBlockstoreCacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create blockstore blocks cache: %w", err) + } + store := &Store{ basepath: basepath, dgstr: dagStore, @@ -127,7 +133,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { gcInterval: defaultGCInterval, mounts: r, shardFailures: failureChan, - cache: accessorCache, + cache: cache.NewDoubleCache(recentBlocksCache, blockstoreCache), } store.bs = newBlockstore(store, ds) return store, nil @@ -284,7 +290,7 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext go func() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - _, err := s.cache.GetOrLoad(ctx, result.Key, s.getAccessor) + _, err := s.cache.First().GetOrLoad(ctx, result.Key, s.getAccessor) if err != nil { log.Warnw("unable to put accessor to recent blocks accessors cache", "err", err) return diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 0d5283e2f2..b38a25c827 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -303,7 +303,7 @@ func Test_BlockstoreCache(t *testing.T) { // store eds to the store with noopCache to allow clean cache after put swap := edsStore.cache - edsStore.cache = cache.NoopCache{} + edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) eds, dah := randomEDS(t) err = edsStore.Put(ctx, dah.Hash(), eds) require.NoError(t, err) @@ -388,7 +388,7 @@ func Test_NotCachedAccessor(t *testing.T) { err = edsStore.Start(ctx) require.NoError(t, err) // replace cache with noopCache to - edsStore.cache = cache.NoopCache{} + edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}) eds, dah := randomEDS(t) err = edsStore.Put(ctx, dah.Hash(), eds) diff --git a/share/eds/utils.go b/share/eds/utils.go index e7b24a9aee..3417a2aa62 100644 --- a/share/eds/utils.go +++ b/share/eds/utils.go @@ -1,11 +1,10 @@ package eds import ( + "fmt" "io" "github.com/filecoin-project/dagstore" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" "github.com/celestiaorg/celestia-node/share/eds/cache" ) @@ -30,11 +29,16 @@ func newReadCloser(ac cache.Accessor) io.ReadCloser { } } -func newBlockstore(store *Store, ds datastore.Batching) *blockstore { - return &blockstore{ - store: store, - ds: namespace.Wrap(ds, blockstoreCacheKey), +// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor +func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) { + bs, err := ac.Blockstore() + if err != nil { + return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err) } + return &BlockstoreCloser{ + ReadBlockstore: bs, + Closer: ac, + }, nil } func closeAndLog(name string, closer io.Closer) {