Skip to content

Commit

Permalink
feat(share/store/cache): Split accessor cache into recent and blockst…
Browse files Browse the repository at this point in the history
…ore (celestiaorg#2656)

Adds MultiCache. Replaces eds store cache with multicache containing 2
separate caches:

- recently added edses (10 accessors, added on put only)
- edses requested by ipld (128 accessors, added on request to blockstore
from bitswap server)
  • Loading branch information
walldiss authored Sep 18, 2023
1 parent fd90764 commit 288e992
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 31 deletions.
31 changes: 13 additions & 18 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/celestia-node/share/eds/cache"
)

var _ bstore.Blockstore = (*blockstore)(nil)
Expand All @@ -35,6 +34,13 @@ type blockstore struct {
ds datastore.Batching
}

func newBlockstore(store *Store, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}

func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
Expand Down Expand Up @@ -146,28 +152,17 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (*
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}

// check if cache contains any of accessors
// check if either cache contains an accessor
shardKey := keys[0]
if accessor, err := bs.store.cache.Get(shardKey); err == nil {
accessor, err := bs.store.cache.Get(shardKey)
if err == nil {
return blockstoreCloser(accessor)
}

// load accessor to the cache and use it as blockstoreCloser
accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor)
// load accessor to the blockstore cache and use it as blockstoreCloser
accessor, err = bs.store.cache.Second().GetOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
return blockstoreCloser(accessor)
}

// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor
func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) {
bs, err := ac.Blockstore()
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err)
}
return &BlockstoreCloser{
ReadBlockstore: bs,
Closer: ac,
}, nil
}
51 changes: 51 additions & 0 deletions share/eds/cache/doublecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package cache

import (
"errors"

"github.com/filecoin-project/dagstore/shard"
)

// DoubleCache represents a Cache that looks into multiple caches one by one.
type DoubleCache struct {
first, second Cache
}

// NewDoubleCache creates a new DoubleCache with the provided caches.
func NewDoubleCache(first, second Cache) *DoubleCache {
return &DoubleCache{
first: first,
second: second,
}
}

// Get looks for an item in all the caches one by one and returns the Cache found item.
func (mc *DoubleCache) Get(key shard.Key) (Accessor, error) {
ac, err := mc.first.Get(key)
if err == nil {
return ac, nil
}
return mc.second.Get(key)
}

// Remove removes an item from all underlying caches
func (mc *DoubleCache) Remove(key shard.Key) error {
err1 := mc.first.Remove(key)
err2 := mc.second.Remove(key)
return errors.Join(err1, err2)
}

func (mc *DoubleCache) First() Cache {
return mc.first
}

func (mc *DoubleCache) Second() Cache {
return mc.second
}

func (mc *DoubleCache) EnableMetrics() error {
if err := mc.first.EnableMetrics(); err != nil {
return err
}
return mc.second.EnableMetrics()
}
16 changes: 11 additions & 5 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ const (
// We don't use transient files right now, so GC is turned off by default.
defaultGCInterval = 0

defaultCacheSize = 128
defaultRecentBlocksCacheSize = 10
defaultBlockstoreCacheSize = 128
)

var ErrNotFound = errors.New("eds not found in store")
Expand All @@ -56,7 +57,7 @@ type Store struct {
mounts *mount.Registry

bs *blockstore
cache cache.Cache
cache *cache.DoubleCache

carIdx index.FullIndexRepo
invertedIdx *simpleInvertedIndex
Expand Down Expand Up @@ -114,11 +115,16 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
return nil, fmt.Errorf("failed to create DAGStore: %w", err)
}

accessorCache, err := cache.NewAccessorCache("cache", defaultCacheSize)
recentBlocksCache, err := cache.NewAccessorCache("recent", defaultRecentBlocksCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create recent blocks cache: %w", err)
}

blockstoreCache, err := cache.NewAccessorCache("blockstore", defaultBlockstoreCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create blockstore blocks cache: %w", err)
}

store := &Store{
basepath: basepath,
dgstr: dagStore,
Expand All @@ -127,7 +133,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
gcInterval: defaultGCInterval,
mounts: r,
shardFailures: failureChan,
cache: accessorCache,
cache: cache.NewDoubleCache(recentBlocksCache, blockstoreCache),
}
store.bs = newBlockstore(store, ds)
return store, nil
Expand Down Expand Up @@ -284,7 +290,7 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_, err := s.cache.GetOrLoad(ctx, result.Key, s.getAccessor)
_, err := s.cache.First().GetOrLoad(ctx, result.Key, s.getAccessor)
if err != nil {
log.Warnw("unable to put accessor to recent blocks accessors cache", "err", err)
return
Expand Down
4 changes: 2 additions & 2 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func Test_BlockstoreCache(t *testing.T) {

// store eds to the store with noopCache to allow clean cache after put
swap := edsStore.cache
edsStore.cache = cache.NoopCache{}
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})
eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)
Expand Down Expand Up @@ -388,7 +388,7 @@ func Test_NotCachedAccessor(t *testing.T) {
err = edsStore.Start(ctx)
require.NoError(t, err)
// replace cache with noopCache to
edsStore.cache = cache.NoopCache{}
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})

eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
Expand Down
16 changes: 10 additions & 6 deletions share/eds/utils.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package eds

import (
"fmt"
"io"

"github.com/filecoin-project/dagstore"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"

"github.com/celestiaorg/celestia-node/share/eds/cache"
)
Expand All @@ -30,11 +29,16 @@ func newReadCloser(ac cache.Accessor) io.ReadCloser {
}
}

func newBlockstore(store *Store, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
ds: namespace.Wrap(ds, blockstoreCacheKey),
// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor
func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) {
bs, err := ac.Blockstore()
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err)
}
return &BlockstoreCloser{
ReadBlockstore: bs,
Closer: ac,
}, nil
}

func closeAndLog(name string, closer io.Closer) {
Expand Down

0 comments on commit 288e992

Please sign in to comment.