Skip to content

Commit

Permalink
shovel: bugfix. reorg may halt progress until restart
Browse files Browse the repository at this point in the history
The process for handling a reorg is as follows:

1. Shovel task gets latest remote block
2. Shovel task gets latest local block
3. Shovel decides which blocks to process
4. Shovel uses (cached) jrpc2 client to download blocks
5. Shovel checks downloaded chain with latest local hash for reorg
6. If reorg then delete local (and all indexed data for that block)
7. GOTO 1

The problem is that in step 4 may keep a block that has been removed
from the global chain around in memory so that subsequent requests are
given the removed block and therefore all new blocks are unable to be
connected.

This commit introduces the concept of maxreads for a block/header cache
at which point the blocks/header will be removed from the cache. This
solves the problem for the cache's existence (we want concurrent
integrations to reuse data if they can) and ensures that bad data
doesn't stay too long.

In the case of a bad block in the cache, it will be there for a few
requests and then Shovel will re-download the data and continue making
progress.
  • Loading branch information
ryandotsmith committed May 1, 2024
1 parent 64c35e8 commit c20d328
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
24 changes: 20 additions & 4 deletions jrpc2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func New(url string) *Client {
pollDuration: time.Second,
url: url,
lcache: NumHash{maxreads: 20},
bcache: cache{maxreads: 20},
hcache: cache{maxreads: 20},
}
}

Expand All @@ -66,6 +68,8 @@ type Client struct {

func (c *Client) WithMaxReads(n int) *Client {
c.lcache.maxreads = n
c.bcache.maxreads = n
c.hcache.maxreads = n
return c
}

Expand Down Expand Up @@ -422,18 +426,28 @@ type blockResp struct {

type segment struct {
sync.Mutex
done bool
d []eth.Block
nreads int
done bool
d []eth.Block
}

type cache struct {
sync.Mutex
maxreads int
segments map[key]*segment
}

type getter func(ctx context.Context, start, limit uint64) ([]eth.Block, error)

func (c *cache) prune() {
func (c *cache) pruneMaxRead() {
for k, v := range c.segments {
if v.nreads >= c.maxreads {
delete(c.segments, k)
}
}
}

func (c *cache) pruneSegments() {
const size = 5
if len(c.segments) <= size {
return
Expand All @@ -458,16 +472,18 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge
if c.segments == nil {
c.segments = make(map[key]*segment)
}
c.pruneMaxRead()
seg, ok := c.segments[key{start, limit}]
if !ok {
seg = &segment{}
c.segments[key{start, limit}] = seg
}
c.prune()
c.pruneSegments()
c.Unlock()

seg.Lock()
defer seg.Unlock()
seg.nreads++
if seg.done {
return seg.d, nil
}
Expand Down
55 changes: 54 additions & 1 deletion jrpc2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/indexsupply/x/eth"
"github.com/indexsupply/x/shovel/glf"
"github.com/indexsupply/x/tc"
"golang.org/x/sync/errgroup"
"kr.dev/diff"
)
Expand All @@ -44,7 +45,7 @@ func (tg *testGetter) get(ctx context.Context, start, limit uint64) ([]eth.Block
func TestCache_Prune(t *testing.T) {
ctx := context.Background()
tg := testGetter{}
c := cache{}
c := cache{maxreads: 2}
blocks, err := c.get(false, ctx, 1, 1, tg.get)
diff.Test(t, t.Fatalf, nil, err)
diff.Test(t, t.Errorf, 1, len(blocks))
Expand Down Expand Up @@ -76,6 +77,25 @@ func TestCache_Prune(t *testing.T) {
})
}

func TestCache_MaxReads(t *testing.T) {
var (
ctx = context.Background()
tg = testGetter{}
c = cache{maxreads: 2}
)
_, err := c.get(false, ctx, 1, 1, tg.get)
tc.NoErr(t, err)
tc.WantGot(t, 1, tg.callCount)

_, err = c.get(false, ctx, 1, 1, tg.get)
tc.NoErr(t, err)
tc.WantGot(t, 1, tg.callCount)

_, err = c.get(false, ctx, 1, 1, tg.get)
tc.NoErr(t, err)
tc.WantGot(t, 2, tg.callCount)
}

var (
//go:embed testdata/block-18000000.json
block18000000JSON string
Expand Down Expand Up @@ -356,6 +376,39 @@ func TestGet_Cached(t *testing.T) {
eg.Wait()
}

// Test that a block cache removes its segments after
// they've been read N times. Once N is reached, subsequent
// calls to Get should make new requests.
func TestGet_Cached_Pruned(t *testing.T) {
var n int32
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
diff.Test(t, t.Fatalf, nil, err)
switch {
case strings.Contains(string(body), "eth_getBlockByNumber"):
atomic.AddInt32(&n, 1)
_, err := w.Write([]byte(block18000000JSON))
diff.Test(t, t.Fatalf, nil, err)
}
}))
defer ts.Close()
var (
ctx = context.Background()
c = New(ts.URL).WithMaxReads(2)
)
_, err := c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, n, int32(1))
_, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, n, int32(1))

//maxreads should have been reached with last 2 calls
_, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, n, int32(2))
}

func TestNoLogs(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
Expand Down

0 comments on commit c20d328

Please sign in to comment.