diff --git a/abi2/abi2.go b/abi2/abi2.go index cd67c732..72501c5a 100644 --- a/abi2/abi2.go +++ b/abi2/abi2.go @@ -766,7 +766,25 @@ func (ig Integration) Name() string { return ig.name } func (ig Integration) Events(context.Context) [][]byte { return [][]byte{} } -func (ig Integration) Delete(context.Context, wpg.Conn, uint64) error { return nil } +func (ig Integration) Delete(ctx context.Context, pg wpg.Conn, n uint64) error { + const q = ` + delete from %s + where src_name = $1 + and intg_name = $2 + and block_num >= $3 + ` + _, err := pg.Exec(ctx, + ig.tname(q), + wctx.SrcName(ctx), + ig.name, + n, + ) + return err +} + +func (ig Integration) tname(query string) string { + return fmt.Sprintf(query, ig.Table.Name) +} func (ig Integration) Insert(ctx context.Context, pg wpg.Conn, blocks []eth.Block) (int64, error) { var ( diff --git a/e2pg/integration_test.go b/e2pg/integration_test.go index d85ea298..8ddfbea8 100644 --- a/e2pg/integration_test.go +++ b/e2pg/integration_test.go @@ -8,6 +8,7 @@ import ( "blake.io/pqx/pqxtest" "github.com/indexsupply/x/geth/gethtest" + "github.com/indexsupply/x/wctx" "github.com/jackc/pgx/v5/pgxpool" "kr.dev/diff" ) @@ -66,12 +67,13 @@ func (th *Helper) Process(dest Destination, n uint64) { var ( geth = NewGeth(th.gt.FileCache, th.gt.Client) task = NewTask( - WithSource(0, "", geth), + WithSource(0, "testhelper", geth), WithPG(th.PG), WithDestinations(dest), WithRange(n, n+1), ) ) + th.ctx = wctx.WithSrcName(th.ctx, "testhelper") cur, err := geth.Hash(n) check(th.tb, err) th.gt.SetLatest(n, cur) @@ -84,9 +86,10 @@ func TestIntegrations(t *testing.T) { th := NewHelper(t) defer th.Done() cases := []struct { - blockNum uint64 - config string - queries []string + blockNum uint64 + config string + queries []string + deleteQuery string }{ { 17943843, @@ -95,10 +98,11 @@ func TestIntegrations(t *testing.T) { ` select count(*) = 1 from txinput_test where tx_hash = '\x713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42' - and block_number = 17943843 + and block_num = 17943843 and block_time = 1692387935 `, }, + "select count(*) = 0 from txinput_test", }, { 17943843, @@ -110,6 +114,7 @@ func TestIntegrations(t *testing.T) { and contract = '\x57f1887a8bf19b14fc0df6fd9b2acc9af147ea85' `, }, + "select count(*) = 0 from erc721_test", }, { 17943843, @@ -143,6 +148,7 @@ func TestIntegrations(t *testing.T) { and consideration_recipient = '\x0000a26b00c1f0df003000390027140000faa719' `, }, + "select count(*) = 0 from seaport_test", }, } for _, tc := range cases { @@ -163,6 +169,14 @@ func TestIntegrations(t *testing.T) { t.Errorf("test %s failed", tc.config) } } + + diff.Test(t, t.Errorf, nil, dest.Delete(th.Context(), th.PG, tc.blockNum)) + var deleted bool + err = th.PG.QueryRow(th.Context(), tc.deleteQuery).Scan(&deleted) + diff.Test(t, t.Errorf, nil, err) + if !deleted { + t.Errorf("%s was not cleaned up after ig.Delete", tc.config) + } } } diff --git a/e2pg/testdata/txinput.json b/e2pg/testdata/txinput.json index f63f6198..a29f46df 100644 --- a/e2pg/testdata/txinput.json +++ b/e2pg/testdata/txinput.json @@ -4,7 +4,7 @@ "name": "txinput_test", "columns": [ {"name": "chain_id", "type": "numeric"}, - {"name": "block_number", "type": "numeric"}, + {"name": "block_num", "type": "numeric"}, {"name": "block_time", "type": "numeric"}, {"name": "tx_hash", "type": "bytea"}, {"name": "tx_input", "type": "bytea"} @@ -12,7 +12,7 @@ }, "block": [ {"name": "chain_id", "column": "chain_id"}, - {"name": "block_num", "column": "block_number"}, + {"name": "block_num", "column": "block_num"}, {"name": "block_time", "column": "block_time"}, {"name": "tx_hash", "column": "tx_hash", "filter_op": "contains", "filter_arg": ["713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42"]}, {"name": "tx_input", "column": "tx_input"}