From 788d2720407179f6ecc54b7869c3f8964d3e7a5e Mon Sep 17 00:00:00 2001 From: ryan smith Date: Fri, 7 Jun 2024 14:44:11 -0700 Subject: [PATCH] shovel: multiple source urls --- cmd/shovel/demo.json | 4 +- cmd/shovel/main.go | 7 +++ jrpc2/client.go | 104 +++++++++++++++++++++++++++++-------------- jrpc2/client_test.go | 40 ++++++++--------- shovel/task.go | 31 ++++++++----- shovel/task_test.go | 11 +++-- shovel/web/web.go | 4 +- wctx/ctx.go | 11 +++++ 8 files changed, 140 insertions(+), 72 deletions(-) diff --git a/cmd/shovel/demo.json b/cmd/shovel/demo.json index 11db9da5..9ff78f52 100644 --- a/cmd/shovel/demo.json +++ b/cmd/shovel/demo.json @@ -4,7 +4,7 @@ { "name": "mainnet", "chain_id": 1, - "url": "https://ethereum-rpc.publicnode.com" + "url": "https://hidden-necessary-mound.quiknode.pro/70d6a30b07922a142e049ceea8be11d385650593/,https://ethereum-rpc.publicnode.com" }, { "name": "base", @@ -15,7 +15,7 @@ "integrations": [{ "name": "usdc-transfer", "enabled": true, - "sources": [{"name": "mainnet"}, {"name": "base"}], + "sources": [{"name": "mainnet"}], "table": { "name": "usdc", "columns": [ diff --git a/cmd/shovel/main.go b/cmd/shovel/main.go index d9b2901c..0d1bac0f 100644 --- a/cmd/shovel/main.go +++ b/cmd/shovel/main.go @@ -77,6 +77,13 @@ func main() { } return "ig", igName }) + lh.RegisterContext(func(ctx context.Context) (string, any) { + srcURL := wctx.SrcURL(ctx) + if srcURL == "" { + return "", nil + } + return "host", srcURL + }) lh.RegisterContext(func(ctx context.Context) (string, any) { srcName := wctx.SrcName(ctx) if srcName == "" { diff --git a/jrpc2/client.go b/jrpc2/client.go index f9b82985..4220b8d8 100644 --- a/jrpc2/client.go +++ b/jrpc2/client.go @@ -11,11 +11,13 @@ import ( "log/slog" "net" "net/http" + "net/url" "os" "sort" "strconv" "strings" "sync" + "sync/atomic" "time" "unicode" @@ -31,22 +33,50 @@ import ( "nhooyr.io/websocket/wsjson" ) +type URL struct { + parsed *url.URL + provided string + success, total uint64 + lastError time.Time +} + +func MustURL(provided string) *URL { + parsed, err := url.Parse(provided) + if err != nil { + fmt.Printf("unable to parse url: %s\n", provided) + os.Exit(1) + } + return &URL{parsed: parsed, provided: provided} +} + +func (u *URL) Hostname() string { + return u.parsed.Hostname() +} + +func (u *URL) String() string { + return u.parsed.String() +} + func randbytes() []byte { b := make([]byte, 10) rand.Read(b) return b } -func New(url string) *Client { +func New(providedURL string) *Client { + var urls []*URL + for _, provided := range strings.Split(providedURL, ",") { + urls = append(urls, MustURL(provided)) + } return &Client{ - d: strings.Contains(url, "debug"), - nocache: strings.Contains(url, "nocache"), + d: strings.Contains(providedURL, "debug"), + nocache: strings.Contains(providedURL, "nocache"), hc: &http.Client{ Timeout: 10 * time.Second, Transport: gzhttp.Transport(http.DefaultTransport), }, + urls: urls, pollDuration: time.Second, - url: url, lcache: NumHash{maxreads: 20}, bcache: cache{maxreads: 20}, hcache: cache{maxreads: 20}, @@ -57,9 +87,10 @@ type Client struct { nocache bool d bool hc *http.Client - url string + urls []*URL wsurl string + reqCounter uint64 pollDuration time.Duration lcache NumHash @@ -67,6 +98,12 @@ type Client struct { hcache cache } +func (c *Client) NextURL() *URL { + atomic.AddUint64(&c.reqCounter, 1) + next := c.reqCounter % uint64(len(c.urls)) + return c.urls[next] +} + func (c *Client) WithMaxReads(n int) *Client { c.lcache.maxreads = n c.bcache.maxreads = n @@ -98,7 +135,7 @@ type request struct { Params []any `json:"params"` } -func (c *Client) do(ctx context.Context, dest, req any) error { +func (c *Client) do(ctx context.Context, url string, dest, req any) error { var ( eg errgroup.Group r, w = io.Pipe() @@ -109,7 +146,7 @@ func (c *Client) do(ctx context.Context, dest, req any) error { return json.NewEncoder(w).Encode(req) }) eg.Go(func() error { - req, err := http.NewRequest("POST", c.url, c.debug(r)) + req, err := http.NewRequest("POST", url, c.debug(r)) if err != nil { return fmt.Errorf("unable to new request: %w", err) } @@ -266,14 +303,14 @@ func (c *Client) wsListen(ctx context.Context) { } } -func (c *Client) httpPoll(ctx context.Context) { +func (c *Client) httpPoll(ctx context.Context, url string) { var ( ticker = time.NewTicker(c.pollDuration) hresp = headerResp{} ) defer ticker.Stop() for range ticker.C { - err := c.do(ctx, &hresp, request{ + err := c.do(ctx, url, &hresp, request{ ID: "1", Version: "2.0", Method: "eth_getBlockByNumber", @@ -305,7 +342,7 @@ func (c *Client) httpPoll(ctx context.Context) { // When n is 0, Latest always fetches the latest block // rather than using the cached value, // bypassing the caching mechanism. -func (c *Client) Latest(ctx context.Context, n uint64) (uint64, []byte, error) { +func (c *Client) Latest(ctx context.Context, url string, n uint64) (uint64, []byte, error) { c.lcache.once.Do(func() { switch { case len(c.wsurl) > 0: @@ -313,7 +350,7 @@ func (c *Client) Latest(ctx context.Context, n uint64) (uint64, []byte, error) { go c.wsListen(context.Background()) default: slog.DebugContext(ctx, "jrpc2 http polling") - go c.httpPoll(context.Background()) + go c.httpPoll(context.Background(), url) } }) if n, h, ok := c.lcache.get(ctx, n); ok { @@ -321,7 +358,7 @@ func (c *Client) Latest(ctx context.Context, n uint64) (uint64, []byte, error) { } hresp := headerResp{} - err := c.do(ctx, &hresp, request{ + err := c.do(ctx, url, &hresp, request{ ID: fmt.Sprintf("latest-%d-%x", n, randbytes()), Version: "2.0", Method: "eth_getBlockByNumber", @@ -342,9 +379,9 @@ func (c *Client) Latest(ctx context.Context, n uint64) (uint64, []byte, error) { return uint64(hresp.Number), hresp.Hash, nil } -func (c *Client) Hash(ctx context.Context, n uint64) ([]byte, error) { +func (c *Client) Hash(ctx context.Context, url string, n uint64) ([]byte, error) { hresp := headerResp{} - err := c.do(ctx, &hresp, request{ + err := c.do(ctx, url, &hresp, request{ ID: fmt.Sprintf("hash-%d-%x", n, randbytes()), Version: "2.0", Method: "eth_getBlockByNumber", @@ -368,6 +405,7 @@ type blockmap map[uint64]*eth.Block func (c *Client) Get( ctx context.Context, + url string, filter *glf.Filter, start, limit uint64, ) ([]eth.Block, error) { @@ -385,12 +423,12 @@ func (c *Client) Get( ) switch { case filter.UseBlocks: - blocks, err = c.bcache.get(c.nocache, ctx, start, limit, c.blocks) + blocks, err = c.bcache.get(c.nocache, ctx, url, start, limit, c.blocks) if err != nil { return nil, fmt.Errorf("getting blocks: %w", err) } case filter.UseHeaders: - blocks, err = c.hcache.get(c.nocache, ctx, start, limit, c.headers) + blocks, err = c.hcache.get(c.nocache, ctx, url, start, limit, c.headers) if err != nil { return nil, fmt.Errorf("getting headers: %w", err) } @@ -411,15 +449,15 @@ func (c *Client) Get( switch { case filter.UseReceipts: - if err := c.receipts(ctx, bm, start, limit); err != nil { + if err := c.receipts(ctx, url, bm, start, limit); err != nil { return nil, fmt.Errorf("getting receipts: %w", err) } case filter.UseLogs: - if err := c.logs(ctx, filter, bm, start, limit); err != nil { + if err := c.logs(ctx, url, filter, bm, start, limit); err != nil { return nil, fmt.Errorf("getting logs: %w", err) } case filter.UseTraces: - if err := c.traces(ctx, bm, start, limit); err != nil { + if err := c.traces(ctx, url, bm, start, limit); err != nil { return nil, fmt.Errorf("getting traces: %w", err) } } @@ -444,7 +482,7 @@ type cache struct { segments map[key]*segment } -type getter func(ctx context.Context, start, limit uint64) ([]eth.Block, error) +type getter func(ctx context.Context, url string, start, limit uint64) ([]eth.Block, error) func (c *cache) pruneMaxRead() { for k, v := range c.segments { @@ -473,9 +511,9 @@ func (c *cache) pruneSegments() { } } -func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f getter) ([]eth.Block, error) { +func (c *cache) get(nocache bool, ctx context.Context, url string, start, limit uint64, f getter) ([]eth.Block, error) { if nocache { - return f(ctx, start, limit) + return f(ctx, url, start, limit) } c.Lock() if c.segments == nil { @@ -497,7 +535,7 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge return seg.d, nil } - blocks, err := f(ctx, start, limit) + blocks, err := f(ctx, url, start, limit) if err != nil { return nil, fmt.Errorf("cache get: %w", err) } @@ -507,7 +545,7 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge return seg.d, nil } -func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block, error) { +func (c *Client) blocks(ctx context.Context, url string, start, limit uint64) ([]eth.Block, error) { var ( t0 = time.Now() reqs = make([]request, limit) @@ -523,7 +561,7 @@ func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block, } resps[i].Block = &blocks[i] } - err := c.do(ctx, &resps, reqs) + err := c.do(ctx, url, &resps, reqs) if err != nil { return nil, fmt.Errorf("requesting blocks: %w", err) } @@ -571,7 +609,7 @@ type headerResp struct { *eth.Header `json:"result"` } -func (c *Client) headers(ctx context.Context, start, limit uint64) ([]eth.Block, error) { +func (c *Client) headers(ctx context.Context, url string, start, limit uint64) ([]eth.Block, error) { var ( t0 = time.Now() reqs = make([]request, limit) @@ -587,7 +625,7 @@ func (c *Client) headers(ctx context.Context, start, limit uint64) ([]eth.Block, } resps[i].Header = &blocks[i].Header } - err := c.do(ctx, &resps, reqs) + err := c.do(ctx, url, &resps, reqs) if err != nil { return nil, fmt.Errorf("requesting headers: %w", err) } @@ -620,7 +658,7 @@ type receiptResp struct { Result []receiptResult `json:"result"` } -func (c *Client) receipts(ctx context.Context, bm blockmap, start, limit uint64) error { +func (c *Client) receipts(ctx context.Context, url string, bm blockmap, start, limit uint64) error { var ( reqs = make([]request, limit) resps = make([]receiptResp, limit) @@ -633,7 +671,7 @@ func (c *Client) receipts(ctx context.Context, bm blockmap, start, limit uint64) Params: []any{eth.EncodeUint64(start + i)}, } } - err := c.do(ctx, &resps, reqs) + err := c.do(ctx, url, &resps, reqs) if err != nil { return fmt.Errorf("requesting receipts: %w", err) } @@ -688,7 +726,7 @@ type logResp struct { Result []logResult `json:"result"` } -func (c *Client) logs(ctx context.Context, filter *glf.Filter, bm blockmap, start, limit uint64) error { +func (c *Client) logs(ctx context.Context, url string, filter *glf.Filter, bm blockmap, start, limit uint64) error { var ( t0 = time.Now() lf = struct { @@ -707,7 +745,7 @@ func (c *Client) logs(ctx context.Context, filter *glf.Filter, bm blockmap, star &logResp{}, } ) - err := c.do(ctx, &resp, []request{ + err := c.do(ctx, url, &resp, []request{ request{ ID: fmt.Sprintf("blocks-%d-%d-%x", start, limit, randbytes()), Version: "2.0", @@ -788,7 +826,7 @@ type traceBlockResp struct { Result []traceBlockResult `json:"result"` } -func (c *Client) traces(ctx context.Context, bm blockmap, start, limit uint64) error { +func (c *Client) traces(ctx context.Context, url string, bm blockmap, start, limit uint64) error { t0 := time.Now() for i := uint64(0); i < limit; i++ { res := traceBlockResp{} @@ -798,7 +836,7 @@ func (c *Client) traces(ctx context.Context, bm blockmap, start, limit uint64) e Method: "trace_block", Params: []any{eth.EncodeUint64(start + i)}, } - err := c.do(ctx, &res, req) + err := c.do(ctx, url, &res, req) if err != nil { return fmt.Errorf("requesting traces: %w", err) } diff --git a/jrpc2/client_test.go b/jrpc2/client_test.go index e071d45a..720d5b7b 100644 --- a/jrpc2/client_test.go +++ b/jrpc2/client_test.go @@ -33,7 +33,7 @@ type testGetter struct { callCount int } -func (tg *testGetter) get(ctx context.Context, start, limit uint64) ([]eth.Block, error) { +func (tg *testGetter) get(ctx context.Context, url string, start, limit uint64) ([]eth.Block, error) { tg.callCount++ var res []eth.Block @@ -47,14 +47,14 @@ func TestCache_Prune(t *testing.T) { ctx := context.Background() tg := testGetter{} c := cache{maxreads: 2} - blocks, err := c.get(false, ctx, 1, 1, tg.get) + blocks, err := c.get(false, ctx, "", 1, 1, tg.get) diff.Test(t, t.Fatalf, nil, err) diff.Test(t, t.Errorf, 1, len(blocks)) diff.Test(t, t.Errorf, 1, tg.callCount) diff.Test(t, t.Errorf, 1, len(c.segments)) for i := uint64(0); i < 9; i++ { - blocks, err := c.get(false, ctx, 2+i, 1, tg.get) + blocks, err := c.get(false, ctx, "", 2+i, 1, tg.get) diff.Test(t, t.Fatalf, nil, err) diff.Test(t, t.Errorf, 1, len(blocks)) } @@ -84,15 +84,15 @@ func TestCache_MaxReads(t *testing.T) { tg = testGetter{} c = cache{maxreads: 2} ) - _, err := c.get(false, ctx, 1, 1, tg.get) + _, err := c.get(false, ctx, "", 1, 1, tg.get) tc.NoErr(t, err) tc.WantGot(t, 1, tg.callCount) - _, err = c.get(false, ctx, 1, 1, tg.get) + _, err = c.get(false, ctx, "", 1, 1, tg.get) tc.NoErr(t, err) tc.WantGot(t, 1, tg.callCount) - _, err = c.get(false, ctx, 1, 1, tg.get) + _, err = c.get(false, ctx, "", 1, 1, tg.get) tc.NoErr(t, err) tc.WantGot(t, 2, tg.callCount) } @@ -156,18 +156,18 @@ func TestLatest_Cached(t *testing.T) { ctx := context.Background() c := New(ts.URL).WithMaxReads(1) - n, h, err := c.Latest(ctx, 0) + n, h, err := c.Latest(ctx, c.NextURL().String(), 0) diff.Test(t, t.Errorf, nil, err) diff.Test(t, t.Errorf, counter, 1) diff.Test(t, t.Errorf, n, uint64(18000000)) diff.Test(t, t.Errorf, eth.EncodeHex(h), "0x95b198e154acbfc64109dfd22d8224fe927fd8dfdedfae01587674482ba4baf3") - n, _, err = c.Latest(ctx, 18000000-1) + n, _, err = c.Latest(ctx, c.NextURL().String(), 18000000-1) diff.Test(t, t.Errorf, nil, err) diff.Test(t, t.Errorf, counter, 1) diff.Test(t, t.Errorf, n, uint64(18000000)) - n, h, err = c.Latest(ctx, 18000000) + n, h, err = c.Latest(ctx, c.NextURL().String(), 18000000) diff.Test(t, t.Errorf, nil, err) diff.Test(t, t.Errorf, counter, 2) diff.Test(t, t.Errorf, n, uint64(18000001)) @@ -258,7 +258,7 @@ func TestValidate_Blocks(t *testing.T) { var ( ctx = context.Background() c = New(ts.URL) - _, err = c.Get(ctx, &glf.Filter{UseBlocks: true}, 18000000, 2) + _, err = c.Get(ctx, c.NextURL().String(), &glf.Filter{UseBlocks: true}, 18000000, 2) ) want := "getting blocks: cache get: blocks: rpc response contains invalid data. requested last: 18000001 got: 18000002" diff.Test(t, t.Fatalf, false, err == nil) @@ -303,7 +303,7 @@ func TestValidate_Logs(t *testing.T) { var ( ctx = context.Background() c = New(ts.URL) - _, err = c.Get(ctx, &glf.Filter{UseLogs: true}, 18000000, 2) + _, err = c.Get(ctx, c.NextURL().String(), &glf.Filter{UseLogs: true}, 18000000, 2) ) tc.WantErr(t, err) want := "getting logs: eth_getLogs out of range block. num=18000002 start=18000000 lim=2" @@ -324,7 +324,7 @@ func TestValidate_Logs_NoBlocks(t *testing.T) { var ( ctx = context.Background() c = New(ts.URL) - _, err = c.Get(ctx, &glf.Filter{UseLogs: true}, 18000000, 2) + _, err = c.Get(ctx, c.NextURL().String(), &glf.Filter{UseLogs: true}, 18000000, 2) ) tc.WantErr(t, err) const want = "getting logs: eth backend missing logs for block" @@ -353,7 +353,7 @@ func TestError(t *testing.T) { ctx = context.Background() c = New(ts.URL) want = "getting blocks: cache get: rpc=eth_getBlockByNumber code=-32012 msg=credits" - _, got = c.Get(ctx, &glf.Filter{UseBlocks: true}, 1000001, 1) + _, got = c.Get(ctx, c.NextURL().String(), &glf.Filter{UseBlocks: true}, 1000001, 1) ) diff.Test(t, t.Errorf, want, got.Error()) } @@ -361,7 +361,7 @@ func TestError(t *testing.T) { func TestGet(t *testing.T) { ctx := context.Background() const start, limit = 10, 5 - blocks, err := New("").Get(ctx, &glf.Filter{}, start, limit) + blocks, err := New("").Get(ctx, "", &glf.Filter{}, start, limit) diff.Test(t, t.Fatalf, nil, err) diff.Test(t, t.Fatalf, len(blocks), limit) diff.Test(t, t.Fatalf, blocks[0].Num(), uint64(10)) @@ -412,7 +412,7 @@ func TestGet_Cached(t *testing.T) { return nil, fmt.Errorf("no tx at idx %d", idx) } getcall = func() error { - blocks, err := c.Get(ctx, &glf.Filter{UseHeaders: true, UseLogs: true}, 18000000, 1) + blocks, err := c.Get(ctx, c.NextURL().String(), &glf.Filter{UseHeaders: true, UseLogs: true}, 18000000, 1) diff.Test(t, t.Errorf, nil, err) blocks[0].Lock() @@ -450,15 +450,15 @@ func TestGet_Cached_Pruned(t *testing.T) { ctx = context.Background() c = New(ts.URL).WithMaxReads(2) ) - _, err := c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1) + _, err := c.Get(ctx, c.NextURL().String(), &glf.Filter{UseHeaders: true}, 18000000, 1) diff.Test(t, t.Errorf, nil, err) diff.Test(t, t.Errorf, n, int32(1)) - _, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1) + _, err = c.Get(ctx, c.NextURL().String(), &glf.Filter{UseHeaders: true}, 18000000, 1) diff.Test(t, t.Errorf, nil, err) diff.Test(t, t.Errorf, n, int32(1)) //maxreads should have been reached with last 2 calls - _, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1) + _, err = c.Get(ctx, c.NextURL().String(), &glf.Filter{UseHeaders: true}, 18000000, 1) diff.Test(t, t.Errorf, nil, err) diff.Test(t, t.Errorf, n, int32(2)) } @@ -480,7 +480,7 @@ func TestNoLogs(t *testing.T) { ctx := context.Background() c := New(ts.URL) - blocks, err := c.Get(ctx, &glf.Filter{UseBlocks: true, UseLogs: true}, 1000001, 1) + blocks, err := c.Get(ctx, c.NextURL().String(), &glf.Filter{UseBlocks: true, UseLogs: true}, 1000001, 1) diff.Test(t, t.Errorf, nil, err) b := blocks[0] @@ -507,7 +507,7 @@ func TestLatest(t *testing.T) { ctx := context.Background() c := New(ts.URL) - blocks, err := c.Get(ctx, &glf.Filter{UseBlocks: true, UseLogs: true}, 18000000, 1) + blocks, err := c.Get(ctx, c.NextURL().String(), &glf.Filter{UseBlocks: true, UseLogs: true}, 18000000, 1) diff.Test(t, t.Errorf, nil, err) b := blocks[0] diff --git a/shovel/task.go b/shovel/task.go index 2f36417b..a9066bae 100644 --- a/shovel/task.go +++ b/shovel/task.go @@ -30,9 +30,10 @@ import ( var Schema string type Source interface { - Get(context.Context, *glf.Filter, uint64, uint64) ([]eth.Block, error) - Latest(context.Context, uint64) (uint64, []byte, error) - Hash(context.Context, uint64) ([]byte, error) + Get(context.Context, string, *glf.Filter, uint64, uint64) ([]eth.Block, error) + Latest(context.Context, string, uint64) (uint64, []byte, error) + Hash(context.Context, string, uint64) ([]byte, error) + NextURL() *jrpc2.URL } type Destination interface { @@ -303,18 +304,18 @@ func (t *Task) latest(ctx context.Context, pg wpg.Conn) (uint64, []byte, error) switch { case t.start > 0: n := t.start - 1 - h, err := t.src.Hash(ctx, n) + h, err := t.src.Hash(ctx, t.src.NextURL().String(), n) if err != nil { return 0, nil, fmt.Errorf("getting hash for %d: %w", n, err) } slog.InfoContext(t.ctx, "start at config", "num", t.start) return n, h, nil default: - n, _, err := t.src.Latest(ctx, 0) + n, _, err := t.src.Latest(ctx, t.src.NextURL().String(), 0) if err != nil { return 0, nil, err } - h, err := t.src.Hash(ctx, n-1) + h, err := t.src.Hash(ctx, t.src.NextURL().String(), n-1) if err != nil { return 0, nil, fmt.Errorf("getting hash for %d: %w", n-1, err) } @@ -341,10 +342,15 @@ var ( // in no side-effects. func (task *Task) Converge() error { var ( - t0 = time.Now() - nrpc = uint64(0) - ctx = wctx.WithCounter(task.ctx, &nrpc) + ctx = task.ctx + t0 = time.Now() + nextURL = task.src.NextURL() + url = nextURL.String() + nrpc = uint64(0) ) + ctx = wctx.WithSrcURL(ctx, nextURL.Hostname()) + ctx = wctx.WithCounter(ctx, &nrpc) + pgtx, err := task.pgp.Begin(ctx) if err != nil { return fmt.Errorf("starting converge tx: %w", err) @@ -365,7 +371,7 @@ func (task *Task) Converge() error { if task.stop > 0 && localNum >= task.stop { return ErrDone } - gethNum, gethHash, err := task.src.Latest(ctx, localNum) + gethNum, gethHash, err := task.src.Latest(ctx, url, localNum) if err != nil { return fmt.Errorf("getting latest from eth: %w", err) } @@ -411,7 +417,7 @@ func (task *Task) Converge() error { return ErrNothingNew } ctx = wctx.WithNumLimit(ctx, localNum+1, delta) - switch last, err := task.loadinsert(ctx, pgtx, localHash, localNum+1, delta); { + switch last, err := task.loadinsert(ctx, url, pgtx, localHash, localNum+1, delta); { case errors.Is(err, ErrReorg): slog.ErrorContext(ctx, "reorg", "n", localNum, @@ -454,6 +460,7 @@ type hashcheck struct { func (t *Task) loadinsert( ctx context.Context, + url string, pg wpg.Conn, localHash []byte, start, limit uint64, @@ -476,7 +483,7 @@ func (t *Task) loadinsert( } eg.Go(func() error { ctx = wctx.WithNumLimit(ctx, m, n) - blocks, err := t.src.Get(ctx, &t.filter, m, n) + blocks, err := t.src.Get(ctx, url, &t.filter, m, n) if err != nil { slog.ErrorContext(ctx, "loading blocks", "error", err) return fmt.Errorf("loading blocks: %w", err) diff --git a/shovel/task_test.go b/shovel/task_test.go index 30fdf0d6..2cd1f744 100644 --- a/shovel/task_test.go +++ b/shovel/task_test.go @@ -10,6 +10,7 @@ import ( "github.com/indexsupply/x/dig" "github.com/indexsupply/x/eth" + "github.com/indexsupply/x/jrpc2" "github.com/indexsupply/x/shovel/config" "github.com/indexsupply/x/shovel/glf" "github.com/indexsupply/x/tc" @@ -95,11 +96,15 @@ type testGeth struct { blocks []eth.Block } +func (tg *testGeth) NextURL() *jrpc2.URL { + return jrpc2.MustURL("") +} + func (tg *testGeth) factory(config.Source, glf.Filter) Source { return tg } -func (tg *testGeth) Hash(_ context.Context, n uint64) ([]byte, error) { +func (tg *testGeth) Hash(_ context.Context, _ string, n uint64) ([]byte, error) { for j := range tg.blocks { if uint64(tg.blocks[j].Header.Number) == n { return tg.blocks[j].Header.Hash, nil @@ -108,7 +113,7 @@ func (tg *testGeth) Hash(_ context.Context, n uint64) ([]byte, error) { return nil, fmt.Errorf("not found: %d", n) } -func (tg *testGeth) Latest(_ context.Context, _ uint64) (uint64, []byte, error) { +func (tg *testGeth) Latest(_ context.Context, _ string, _ uint64) (uint64, []byte, error) { if len(tg.blocks) == 0 { return 0, nil, nil } @@ -116,7 +121,7 @@ func (tg *testGeth) Latest(_ context.Context, _ uint64) (uint64, []byte, error) return b.Num(), b.Hash(), nil } -func (tg *testGeth) Get(_ context.Context, filter *glf.Filter, start, limit uint64) ([]eth.Block, error) { +func (tg *testGeth) Get(_ context.Context, _ string, filter *glf.Filter, start, limit uint64) ([]eth.Block, error) { if start+limit-1 > tg.blocks[len(tg.blocks)-1].Num() { const tag = "no blocks. start=%d limit=%d latest=%d" return nil, fmt.Errorf(tag, start, limit, tg.blocks[len(tg.blocks)-1].Num()) diff --git a/shovel/web/web.go b/shovel/web/web.go index 189e6287..9c96e757 100644 --- a/shovel/web/web.go +++ b/shovel/web/web.go @@ -218,7 +218,7 @@ func (h *Handler) Prom(w http.ResponseWriter, r *http.Request) { // Source start = time.Now() - srcLatest, _, err = src.Latest(r.Context(), 0) + srcLatest, _, err = src.Latest(r.Context(), src.NextURL().String(), 0) if err != nil { srcErr++ } @@ -288,7 +288,7 @@ func (h *Handler) Diag(w http.ResponseWriter, r *http.Request) { } checkSrc := func(src shovel.Source, dr *DiagResult) { start := time.Now() - n, _, err := src.Latest(ctx, 0) + n, _, err := src.Latest(ctx, src.NextURL().String(), 0) if err != nil { dr.Error = err.Error() } diff --git a/wctx/ctx.go b/wctx/ctx.go index 17f64a35..3d69c100 100644 --- a/wctx/ctx.go +++ b/wctx/ctx.go @@ -15,6 +15,7 @@ const ( versionKey key = 4 counterKey key = 5 numLimitKey key = 6 + srcURLKey key = 7 ) func WithChainID(ctx context.Context, id uint64) context.Context { @@ -52,6 +53,7 @@ func Version(ctx context.Context) string { v, _ := ctx.Value(versionKey).(string) return v } + func WithCounter(ctx context.Context, c *uint64) context.Context { return context.WithValue(ctx, counterKey, c) } @@ -82,3 +84,12 @@ func NumLimit(ctx context.Context) (uint64, uint64) { nl, _ := ctx.Value(numLimitKey).(numLimit) return nl.num, nl.limit } + +func WithSrcURL(ctx context.Context, v string) context.Context { + return context.WithValue(ctx, srcURLKey, v) +} + +func SrcURL(ctx context.Context) string { + v, _ := ctx.Value(srcURLKey).(string) + return v +}