Skip to content

Commit

Permalink
jrpc2: bugfix. duplicate txs
Browse files Browse the repository at this point in the history
jrpc2.Client structs are shared between shovel tasks and therefore run
concurrently. There was a race condition after downloading (possibly
cached via bcache) a header and before downloading the logs. This would
result in duplicate transactions being added to the block. The
duplicated transactions would contain only the requested logs (as
opposed to the entire set of the transaction's logs).

In some cases this duplicated data is not a problem because the
integration process removes logs that don't match the integration's
configured event signature. However, if you have multiple integrations
sharing an event signature the integration process will process the log
more than once. In this case, the integration will encounter an error
when coping the data to the PG table because the table contains a unique
constraint. This will cause n-1 integrations (which share the event
signature) to fail. Eventually the n-1 integrations may succeed because
the block/header is not cached forever. If other integrations are making
progress or if the Shovel process is restarted, this problem may go
away.

The end result of this bug is that an integration may not be able to
make progress but there should be no duplicated or missing data in the
database because of the unique constraint.

Users hitting this bug will see errors in the logs that report a unique
constraint violation on one of the N integrations that share the event
signature.

The fix for this bug was to introduce concurrency control around the
adding of txs to an in-memory (or cached) block. Once the jrpc2 client
downloads the block (or header) the multiple, concurrent requests to
download logs (or receipts) will synchronize their access when adding
(or reading) a transaction from the block. The client will ask the block
for a tx identified by the tx idx. The block will either return an
existing tx or create a new one and add it to it's txs slice. A similar
process happens to ensure we don't add duplicate logs.

Given that we now have synchronized access to a block's transactions and
a transaction's logs, I think we have all the footguns accounted for.

It was nice that the PG unique constraint prevented data corruption.
  • Loading branch information
ryandotsmith committed Mar 2, 2024
1 parent 749e218 commit f2a31a4
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 52 deletions.
20 changes: 20 additions & 0 deletions eth/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func (hb *Bytes) Write(p []byte) (int, error) {
}

type Block struct {
sync.Mutex

Header
Txs Txs `json:"transactions"`
}
Expand All @@ -132,6 +134,18 @@ func (b Block) String() string {
)
}

func (b *Block) Tx(idx uint64) *Tx {
b.Lock()
defer b.Unlock()
for i := range b.Txs {
if uint64(b.Txs[i].Idx) == idx {
return &b.Txs[i]
}
}
b.Txs = append(b.Txs, Tx{Idx: Uint64(idx)})
return &b.Txs[len(b.Txs)-1]
}

type Log struct {
Idx Uint64 `json:"logIndex"`
Address Bytes `json:"address"`
Expand All @@ -158,6 +172,12 @@ func (l *Log) UnmarshalRLP(b []byte) {
type Logs []Log

func (ls *Logs) Add(other *Log) {
for i := range *ls {
if (*ls)[i].Idx == other.Idx {
return
}
}

l := Log{}
l.Idx = other.Idx
l.Address.Write(other.Address)
Expand Down
50 changes: 8 additions & 42 deletions jrpc2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ type key struct {

type (
blockmap map[uint64]*eth.Block
txmap map[key]*eth.Tx
)

func (c *Client) Get(
Expand Down Expand Up @@ -379,23 +378,18 @@ func (c *Client) Get(
}
}

bm, tm := make(blockmap), make(txmap)
bm := make(blockmap)
for i := range blocks {
bm[blocks[i].Num()] = &blocks[i]
for j := range blocks[i].Txs {
t := &blocks[i].Txs[j]
k := key{blocks[i].Num(), uint64(t.Idx)}
tm[k] = t
}
}

switch {
case filter.UseReceipts:
if err := c.receipts(ctx, bm, tm, start, limit); err != nil {
if err := c.receipts(ctx, bm, start, limit); err != nil {
return nil, fmt.Errorf("getting receipts: %w", err)
}
case filter.UseLogs:
if err := c.logs(ctx, filter, bm, tm, start, limit); err != nil {
if err := c.logs(ctx, filter, bm, start, limit); err != nil {
return nil, fmt.Errorf("getting logs: %w", err)
}
}
Expand Down Expand Up @@ -591,7 +585,7 @@ type receiptResp struct {
Result []receiptResult `json:"result"`
}

func (c *Client) receipts(ctx context.Context, bm blockmap, tm txmap, start, limit uint64) error {
func (c *Client) receipts(ctx context.Context, bm blockmap, start, limit uint64) error {
var (
reqs = make([]request, limit)
resps = make([]receiptResp, limit)
Expand Down Expand Up @@ -624,17 +618,7 @@ func (c *Client) receipts(ctx context.Context, bm blockmap, tm txmap, start, lim
}
b.Header.Hash.Write(resps[i].Result[0].BlockHash)
for j := range resps[i].Result {
k := key{b.Num(), uint64(resps[i].Result[j].TxIdx)}
if tx, ok := tm[k]; ok {
tx.Status.Write(byte(resps[i].Result[j].Status))
tx.GasUsed = resps[i].Result[j].GasUsed
tx.Logs = make([]eth.Log, len(resps[i].Result[j].Logs))
copy(tx.Logs, resps[i].Result[j].Logs)
continue
}

tx := eth.Tx{}
tx.Idx = resps[i].Result[j].TxIdx
tx := b.Tx(uint64(resps[i].Result[j].TxIdx))
tx.PrecompHash.Write(resps[i].Result[j].TxHash)
tx.Type.Write(byte(resps[i].Result[j].TxType))
tx.From.Write(resps[i].Result[j].TxFrom)
Expand All @@ -643,7 +627,6 @@ func (c *Client) receipts(ctx context.Context, bm blockmap, tm txmap, start, lim
tx.GasUsed = resps[i].Result[j].GasUsed
tx.Logs = make([]eth.Log, len(resps[i].Result[j].Logs))
copy(tx.Logs, resps[i].Result[j].Logs)
b.Txs = append(b.Txs, tx)
}
}
return nil
Expand All @@ -663,7 +646,7 @@ type logResp struct {
Result []logResult `json:"result"`
}

func (c *Client) logs(ctx context.Context, filter *glf.Filter, bm blockmap, tm txmap, start, limit uint64) error {
func (c *Client) logs(ctx context.Context, filter *glf.Filter, bm blockmap, start, limit uint64) error {
var (
t0 = time.Now()
lf = struct {
Expand Down Expand Up @@ -700,35 +683,18 @@ func (c *Client) logs(ctx context.Context, filter *glf.Filter, bm blockmap, tm t
}
logsByTx[k] = []logResult{lresp.Result[i]}
}

for k, logs := range logsByTx {
b, ok := bm[k.a]
if !ok {
return fmt.Errorf("block not found")
}
b.Header.Hash.Write(logs[0].BlockHash)

if tx, ok := tm[k]; ok {
for i := range logs {
var found bool
for j := range tx.Logs {
if tx.Logs[j].Idx == logs[i].Log.Idx {
found = true
}
}
if !found {
tx.Logs = append(tx.Logs, *logs[i].Log)
}
}
continue
}
tx := eth.Tx{}
tx.Idx = eth.Uint64(k.b)
tx := b.Tx(k.b)
tx.PrecompHash.Write(logs[0].TxHash)
tx.Logs = make([]eth.Log, 0, len(logs))
for i := range logs {
tx.Logs.Add(logs[i].Log)
}
b.Txs = append(b.Txs, tx)
}
slog.Debug("http get logs",
"start", start,
Expand Down
53 changes: 43 additions & 10 deletions jrpc2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"os"
"sort"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/indexsupply/x/eth"
"github.com/indexsupply/x/shovel/glf"
"golang.org/x/sync/errgroup"
"kr.dev/diff"
)

Expand Down Expand Up @@ -298,29 +301,59 @@ func TestGet(t *testing.T) {
}

func TestGet_Cached(t *testing.T) {
// reqCount is for testing concurrency.
// We want to recreate a scenario where
// 2 go routines attempt to call Get with
// identical request parameters. One
// routine will call eth_getBlockByNumber to
// download the header and the other routine will
// use the cached header. Once the header has been
// downloaded, both routines will download logs.
// Our test ensures that concurrent go routines
// don't add duplicate tx/log data to the shared
// block header.
var reqCount uint64
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
diff.Test(t, t.Fatalf, nil, err)
switch {
case strings.Contains(string(body), "eth_getBlockByNumber"):
atomic.AddUint64(&reqCount, 1)
_, err := w.Write([]byte(block18000000JSON))
diff.Test(t, t.Fatalf, nil, err)
case strings.Contains(string(body), "eth_getLogs"):
for ; reqCount == 0; time.Sleep(time.Second) {
}
_, err := w.Write([]byte(logs18000000JSON))
diff.Test(t, t.Fatalf, nil, err)
}
}))
defer ts.Close()

ctx := context.Background()
c := New(ts.URL)
blocks, err := c.Get(ctx, &glf.Filter{UseBlocks: true, UseLogs: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, len(blocks[0].Txs[0].Logs), 1)

blocks, err = c.Get(ctx, &glf.Filter{UseBlocks: true, UseLogs: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, len(blocks[0].Txs[0].Logs), 1)
var (
ctx = context.Background()
c = New(ts.URL)
findTx = func(b eth.Block, idx uint64) (eth.Tx, error) {
for i := range b.Txs {
if b.Txs[i].Idx == eth.Uint64(idx) {
return b.Txs[i], nil
}
}
return eth.Tx{}, fmt.Errorf("no tx at idx %d", idx)
}
getcall = func() error {
blocks, err := c.Get(ctx, &glf.Filter{UseHeaders: true, UseLogs: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, len(blocks[0].Txs), 65)
tx, err := findTx(blocks[0], 0)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, len(tx.Logs), 1)
return nil
}
)
eg := errgroup.Group{}
eg.Go(getcall)
eg.Go(getcall)
eg.Wait()
}

func TestNoLogs(t *testing.T) {
Expand Down

0 comments on commit f2a31a4

Please sign in to comment.