Skip to content

Commit

Permalink
shovel: bugfix. filter ref was not filtering
Browse files Browse the repository at this point in the history
This was a regression from: 42721b2

When the accept method was re-written, little care was taken to not
break filter references. The fix was simple, only exit early if there
are no args AND there are not filter references.
  • Loading branch information
ryandotsmith committed May 31, 2024
1 parent 97622cf commit 0f7417e
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 67 deletions.
2 changes: 1 addition & 1 deletion dig/dig.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ type Filter struct {
}

func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d any) (bool, error) {
if len(f.Arg) == 0 {
if len(f.Arg) == 0 && len(f.Ref.Integration) == 0 {
return true, nil
}
switch v := d.(type) {
Expand Down
106 changes: 51 additions & 55 deletions shovel/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,26 @@ import (

"github.com/indexsupply/x/jrpc2"
"github.com/indexsupply/x/shovel/config"
"github.com/indexsupply/x/tc"
"github.com/indexsupply/x/wpg"
"github.com/jackc/pgx/v5/pgxpool"
"kr.dev/diff"
)

func check(t testing.TB, err error) {
t.Helper()
if err != nil {
t.Fatal(err)
}
}

// Process will download the header,bodies, and receipts data
// if it doesn't exist in: integrations/testdata
// In the case that it needs to fetch the data, an RPC
// client will be used. The RPC endpoint needs to support
// the debug_dbAncient and debug_dbGet methods.
func process(tb testing.TB, pg *pgxpool.Pool, conf config.Root, n uint64) *Task {
check(tb, config.ValidateFix(&conf))
check(tb, config.Migrate(context.Background(), pg, conf))

task, err := NewTask(
WithPG(pg),
WithSource(jrpc2.New("https://ethereum.publicnode.com")),
WithIntegration(conf.Integrations[0]),
WithRange(n, n+1),
)
check(tb, err)
check(tb, task.Converge())
return task
}

func TestIntegrations(t *testing.T) {
cases := []struct {
blockNum uint64
config string
queries []string
deleteQuery string
blockNum uint64
config string
check []string
delete string
}{
{
17943843,
"filter-ref.json",
[]string{
`select count(*) = 1 from tx`,
`select count(*) = 1 from tx_w_block`,
},
"select count(*) = 0 from tx",
},
{
19583743,
"receipt.json",
Expand Down Expand Up @@ -124,30 +105,45 @@ func TestIntegrations(t *testing.T) {
"select count(*) = 0 from seaport_test",
},
}
for _, tc := range cases {
pg := wpg.TestPG(t, Schema)
conf := config.Root{Integrations: []config.Integration{{}}}
decode(t, read(t, tc.config), &conf.Integrations[0])
task := process(t, pg, conf, tc.blockNum)
for i, q := range tc.queries {
var found bool
err := pg.QueryRow(context.Background(), q).Scan(&found)
diff.Test(t, t.Errorf, nil, err)
if err != nil {
t.Logf("failing test query: %d", i)
}
if !found {
t.Errorf("test %s failed", tc.config)
}
for _, c := range cases {
var (
ctx = context.Background()
pg = wpg.TestPG(t, Schema)
conf = config.Root{Integrations: []config.Integration{{}}}
)
decode(t, read(t, c.config), &conf.Integrations)
tc.NoErr(t, config.ValidateFix(&conf))
tc.NoErr(t, config.Migrate(ctx, pg, conf))
for _, ig := range conf.Integrations {
task, err := NewTask(
WithPG(pg),
WithSource(jrpc2.New("https://ethereum.publicnode.com")),
WithIntegration(ig),
WithRange(c.blockNum, c.blockNum+1),
)
tc.NoErr(t, err)
tc.NoErr(t, task.Converge())
}
check(t, c.config, pg, c.check...)
for _, ig := range conf.Integrations {
task, err := NewTask(
WithPG(pg),
WithSource(jrpc2.New("https://ethereum.publicnode.com")),
WithIntegration(ig),
)
tc.NoErr(t, err)
tc.NoErr(t, task.Delete(pg, c.blockNum))
check(t, c.config, pg, c.delete)
}
}
}

check(t, task.Delete(pg, tc.blockNum))

var deleted bool
err := pg.QueryRow(context.Background(), tc.deleteQuery).Scan(&deleted)
diff.Test(t, t.Errorf, nil, err)
if !deleted {
t.Errorf("%s was not cleaned up after ig.Delete", tc.config)
func check(t *testing.T, name string, pg wpg.Conn, queries ...string) {
for i, q := range queries {
var res bool
tc.NoErr(t, pg.QueryRow(context.Background(), q).Scan(&res))
if !res {
t.Errorf("query %s/%d failed: %q", name, i, q)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions shovel/testdata/erc721.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
[{
"name": "erc721",
"enabled": true,
"table": {
Expand Down Expand Up @@ -44,4 +44,4 @@
}
]
}
}
}]
41 changes: 41 additions & 0 deletions shovel/testdata/filter-ref.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[
{
"name": "tx",
"enabled": true,
"table": {
"name": "tx",
"columns": [{"name": "tx_hash", "type": "bytea"}]
},
"block": [
{
"name": "tx_hash",
"column": "tx_hash",
"filter_op": "contains",
"filter_arg": ["0x713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42"]
}
]
},
{
"name": "tx_w_block",
"enabled": true,
"table": {
"name": "tx_w_block",
"columns": [
{"name": "block_num", "type": "numeric"},
{"name": "tx_hash", "type": "bytea"}
]
},
"block": [
{"name": "block_num", "column": "block_num"},
{
"name": "tx_hash",
"column": "tx_hash",
"filter_op": "contains",
"filter_ref": {
"integration": "tx",
"column": "tx_hash"
}
}
]
}
]
4 changes: 2 additions & 2 deletions shovel/testdata/receipt.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
[{
"name": "receipt",
"enabled": true,
"table": {
Expand All @@ -20,4 +20,4 @@
{"name": "tx_input", "column": "tx_input"},
{"name": "tx_gas_used", "column": "tx_gas_used"}
]
}
}]
4 changes: 2 additions & 2 deletions shovel/testdata/seaport.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
[{
"name": "seaport_test",
"enabled": true,
"table": {
Expand Down Expand Up @@ -124,4 +124,4 @@
}
]
}
}
}]
4 changes: 2 additions & 2 deletions shovel/testdata/txinput.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
[{
"name": "txinput",
"enabled": true,
"table": {
Expand All @@ -18,4 +18,4 @@
{"name": "tx_hash", "column": "tx_hash", "filter_op": "contains", "filter_arg": ["713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42"]},
{"name": "tx_input", "column": "tx_input"}
]
}
}]
6 changes: 3 additions & 3 deletions tc/testcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"github.com/kr/pretty"
)

func NoErr(t *testing.T, err error) {
t.Helper()
func NoErr(tb testing.TB, err error) {
tb.Helper()
if err != nil {
t.Fatalf("expected no error. got: %s", err)
tb.Fatalf("expected no error. got: %s", err)
}
}

Expand Down

0 comments on commit 0f7417e

Please sign in to comment.