Skip to content

Commit

Permalink
shovel: multiple source urls
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandotsmith committed Jun 7, 2024
1 parent 0f7417e commit 788d272
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 72 deletions.
4 changes: 2 additions & 2 deletions cmd/shovel/demo.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
"name": "mainnet",
"chain_id": 1,
"url": "https://ethereum-rpc.publicnode.com"
"url": "https://hidden-necessary-mound.quiknode.pro/70d6a30b07922a142e049ceea8be11d385650593/,https://ethereum-rpc.publicnode.com"
},
{
"name": "base",
Expand All @@ -15,7 +15,7 @@
"integrations": [{
"name": "usdc-transfer",
"enabled": true,
"sources": [{"name": "mainnet"}, {"name": "base"}],
"sources": [{"name": "mainnet"}],
"table": {
"name": "usdc",
"columns": [
Expand Down
7 changes: 7 additions & 0 deletions cmd/shovel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func main() {
}
return "ig", igName
})
lh.RegisterContext(func(ctx context.Context) (string, any) {
srcURL := wctx.SrcURL(ctx)
if srcURL == "" {
return "", nil
}
return "host", srcURL
})
lh.RegisterContext(func(ctx context.Context) (string, any) {
srcName := wctx.SrcName(ctx)
if srcName == "" {
Expand Down
104 changes: 71 additions & 33 deletions jrpc2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"log/slog"
"net"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unicode"

Expand All @@ -31,22 +33,50 @@ import (
"nhooyr.io/websocket/wsjson"
)

type URL struct {
parsed *url.URL
provided string
success, total uint64
lastError time.Time
}

func MustURL(provided string) *URL {
parsed, err := url.Parse(provided)
if err != nil {
fmt.Printf("unable to parse url: %s\n", provided)
os.Exit(1)
}
return &URL{parsed: parsed, provided: provided}
}

func (u *URL) Hostname() string {
return u.parsed.Hostname()
}

func (u *URL) String() string {
return u.parsed.String()
}

func randbytes() []byte {
b := make([]byte, 10)
rand.Read(b)
return b
}

func New(url string) *Client {
func New(providedURL string) *Client {
var urls []*URL
for _, provided := range strings.Split(providedURL, ",") {
urls = append(urls, MustURL(provided))
}
return &Client{
d: strings.Contains(url, "debug"),
nocache: strings.Contains(url, "nocache"),
d: strings.Contains(providedURL, "debug"),
nocache: strings.Contains(providedURL, "nocache"),
hc: &http.Client{
Timeout: 10 * time.Second,
Transport: gzhttp.Transport(http.DefaultTransport),
},
urls: urls,
pollDuration: time.Second,
url: url,
lcache: NumHash{maxreads: 20},
bcache: cache{maxreads: 20},
hcache: cache{maxreads: 20},
Expand All @@ -57,16 +87,23 @@ type Client struct {
nocache bool
d bool
hc *http.Client
url string
urls []*URL
wsurl string

reqCounter uint64
pollDuration time.Duration

lcache NumHash
bcache cache
hcache cache
}

func (c *Client) NextURL() *URL {
atomic.AddUint64(&c.reqCounter, 1)
next := c.reqCounter % uint64(len(c.urls))
return c.urls[next]
}

func (c *Client) WithMaxReads(n int) *Client {
c.lcache.maxreads = n
c.bcache.maxreads = n
Expand Down Expand Up @@ -98,7 +135,7 @@ type request struct {
Params []any `json:"params"`
}

func (c *Client) do(ctx context.Context, dest, req any) error {
func (c *Client) do(ctx context.Context, url string, dest, req any) error {
var (
eg errgroup.Group
r, w = io.Pipe()
Expand All @@ -109,7 +146,7 @@ func (c *Client) do(ctx context.Context, dest, req any) error {
return json.NewEncoder(w).Encode(req)
})
eg.Go(func() error {
req, err := http.NewRequest("POST", c.url, c.debug(r))
req, err := http.NewRequest("POST", url, c.debug(r))
if err != nil {
return fmt.Errorf("unable to new request: %w", err)
}
Expand Down Expand Up @@ -266,14 +303,14 @@ func (c *Client) wsListen(ctx context.Context) {
}
}

func (c *Client) httpPoll(ctx context.Context) {
func (c *Client) httpPoll(ctx context.Context, url string) {
var (
ticker = time.NewTicker(c.pollDuration)
hresp = headerResp{}
)
defer ticker.Stop()
for range ticker.C {
err := c.do(ctx, &hresp, request{
err := c.do(ctx, url, &hresp, request{
ID: "1",
Version: "2.0",
Method: "eth_getBlockByNumber",
Expand Down Expand Up @@ -305,23 +342,23 @@ func (c *Client) httpPoll(ctx context.Context) {
// When n is 0, Latest always fetches the latest block
// rather than using the cached value,
// bypassing the caching mechanism.
func (c *Client) Latest(ctx context.Context, n uint64) (uint64, []byte, error) {
func (c *Client) Latest(ctx context.Context, url string, n uint64) (uint64, []byte, error) {
c.lcache.once.Do(func() {
switch {
case len(c.wsurl) > 0:
slog.DebugContext(ctx, "jrpc2 ws listening")
go c.wsListen(context.Background())
default:
slog.DebugContext(ctx, "jrpc2 http polling")
go c.httpPoll(context.Background())
go c.httpPoll(context.Background(), url)
}
})
if n, h, ok := c.lcache.get(ctx, n); ok {
return n, h, nil
}

hresp := headerResp{}
err := c.do(ctx, &hresp, request{
err := c.do(ctx, url, &hresp, request{
ID: fmt.Sprintf("latest-%d-%x", n, randbytes()),
Version: "2.0",
Method: "eth_getBlockByNumber",
Expand All @@ -342,9 +379,9 @@ func (c *Client) Latest(ctx context.Context, n uint64) (uint64, []byte, error) {
return uint64(hresp.Number), hresp.Hash, nil
}

func (c *Client) Hash(ctx context.Context, n uint64) ([]byte, error) {
func (c *Client) Hash(ctx context.Context, url string, n uint64) ([]byte, error) {
hresp := headerResp{}
err := c.do(ctx, &hresp, request{
err := c.do(ctx, url, &hresp, request{
ID: fmt.Sprintf("hash-%d-%x", n, randbytes()),
Version: "2.0",
Method: "eth_getBlockByNumber",
Expand All @@ -368,6 +405,7 @@ type blockmap map[uint64]*eth.Block

func (c *Client) Get(
ctx context.Context,
url string,
filter *glf.Filter,
start, limit uint64,
) ([]eth.Block, error) {
Expand All @@ -385,12 +423,12 @@ func (c *Client) Get(
)
switch {
case filter.UseBlocks:
blocks, err = c.bcache.get(c.nocache, ctx, start, limit, c.blocks)
blocks, err = c.bcache.get(c.nocache, ctx, url, start, limit, c.blocks)
if err != nil {
return nil, fmt.Errorf("getting blocks: %w", err)
}
case filter.UseHeaders:
blocks, err = c.hcache.get(c.nocache, ctx, start, limit, c.headers)
blocks, err = c.hcache.get(c.nocache, ctx, url, start, limit, c.headers)
if err != nil {
return nil, fmt.Errorf("getting headers: %w", err)
}
Expand All @@ -411,15 +449,15 @@ func (c *Client) Get(

switch {
case filter.UseReceipts:
if err := c.receipts(ctx, bm, start, limit); err != nil {
if err := c.receipts(ctx, url, bm, start, limit); err != nil {
return nil, fmt.Errorf("getting receipts: %w", err)
}
case filter.UseLogs:
if err := c.logs(ctx, filter, bm, start, limit); err != nil {
if err := c.logs(ctx, url, filter, bm, start, limit); err != nil {
return nil, fmt.Errorf("getting logs: %w", err)
}
case filter.UseTraces:
if err := c.traces(ctx, bm, start, limit); err != nil {
if err := c.traces(ctx, url, bm, start, limit); err != nil {
return nil, fmt.Errorf("getting traces: %w", err)
}
}
Expand All @@ -444,7 +482,7 @@ type cache struct {
segments map[key]*segment
}

type getter func(ctx context.Context, start, limit uint64) ([]eth.Block, error)
type getter func(ctx context.Context, url string, start, limit uint64) ([]eth.Block, error)

func (c *cache) pruneMaxRead() {
for k, v := range c.segments {
Expand Down Expand Up @@ -473,9 +511,9 @@ func (c *cache) pruneSegments() {
}
}

func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f getter) ([]eth.Block, error) {
func (c *cache) get(nocache bool, ctx context.Context, url string, start, limit uint64, f getter) ([]eth.Block, error) {
if nocache {
return f(ctx, start, limit)
return f(ctx, url, start, limit)
}
c.Lock()
if c.segments == nil {
Expand All @@ -497,7 +535,7 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge
return seg.d, nil
}

blocks, err := f(ctx, start, limit)
blocks, err := f(ctx, url, start, limit)
if err != nil {
return nil, fmt.Errorf("cache get: %w", err)
}
Expand All @@ -507,7 +545,7 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge
return seg.d, nil
}

func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block, error) {
func (c *Client) blocks(ctx context.Context, url string, start, limit uint64) ([]eth.Block, error) {
var (
t0 = time.Now()
reqs = make([]request, limit)
Expand All @@ -523,7 +561,7 @@ func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block,
}
resps[i].Block = &blocks[i]
}
err := c.do(ctx, &resps, reqs)
err := c.do(ctx, url, &resps, reqs)
if err != nil {
return nil, fmt.Errorf("requesting blocks: %w", err)
}
Expand Down Expand Up @@ -571,7 +609,7 @@ type headerResp struct {
*eth.Header `json:"result"`
}

func (c *Client) headers(ctx context.Context, start, limit uint64) ([]eth.Block, error) {
func (c *Client) headers(ctx context.Context, url string, start, limit uint64) ([]eth.Block, error) {
var (
t0 = time.Now()
reqs = make([]request, limit)
Expand All @@ -587,7 +625,7 @@ func (c *Client) headers(ctx context.Context, start, limit uint64) ([]eth.Block,
}
resps[i].Header = &blocks[i].Header
}
err := c.do(ctx, &resps, reqs)
err := c.do(ctx, url, &resps, reqs)
if err != nil {
return nil, fmt.Errorf("requesting headers: %w", err)
}
Expand Down Expand Up @@ -620,7 +658,7 @@ type receiptResp struct {
Result []receiptResult `json:"result"`
}

func (c *Client) receipts(ctx context.Context, bm blockmap, start, limit uint64) error {
func (c *Client) receipts(ctx context.Context, url string, bm blockmap, start, limit uint64) error {
var (
reqs = make([]request, limit)
resps = make([]receiptResp, limit)
Expand All @@ -633,7 +671,7 @@ func (c *Client) receipts(ctx context.Context, bm blockmap, start, limit uint64)
Params: []any{eth.EncodeUint64(start + i)},
}
}
err := c.do(ctx, &resps, reqs)
err := c.do(ctx, url, &resps, reqs)
if err != nil {
return fmt.Errorf("requesting receipts: %w", err)
}
Expand Down Expand Up @@ -688,7 +726,7 @@ type logResp struct {
Result []logResult `json:"result"`
}

func (c *Client) logs(ctx context.Context, filter *glf.Filter, bm blockmap, start, limit uint64) error {
func (c *Client) logs(ctx context.Context, url string, filter *glf.Filter, bm blockmap, start, limit uint64) error {
var (
t0 = time.Now()
lf = struct {
Expand All @@ -707,7 +745,7 @@ func (c *Client) logs(ctx context.Context, filter *glf.Filter, bm blockmap, star
&logResp{},
}
)
err := c.do(ctx, &resp, []request{
err := c.do(ctx, url, &resp, []request{
request{
ID: fmt.Sprintf("blocks-%d-%d-%x", start, limit, randbytes()),
Version: "2.0",
Expand Down Expand Up @@ -788,7 +826,7 @@ type traceBlockResp struct {
Result []traceBlockResult `json:"result"`
}

func (c *Client) traces(ctx context.Context, bm blockmap, start, limit uint64) error {
func (c *Client) traces(ctx context.Context, url string, bm blockmap, start, limit uint64) error {
t0 := time.Now()
for i := uint64(0); i < limit; i++ {
res := traceBlockResp{}
Expand All @@ -798,7 +836,7 @@ func (c *Client) traces(ctx context.Context, bm blockmap, start, limit uint64) e
Method: "trace_block",
Params: []any{eth.EncodeUint64(start + i)},
}
err := c.do(ctx, &res, req)
err := c.do(ctx, url, &res, req)
if err != nil {
return fmt.Errorf("requesting traces: %w", err)
}
Expand Down
Loading

0 comments on commit 788d272

Please sign in to comment.