From 0edb55f2c5ee26ed43a6cdad85d27494522a08a8 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Fri, 6 Oct 2023 15:48:30 -0400 Subject: [PATCH] e2pg/config: declarative based indexing (#172) Breaking change. Old config file format is no longer valid. New config file format enables declarative based indexing. --- abi2/abi2.go | 257 ++++++++++++++++-------------- abi2/integration_test.go | 155 ------------------ cmd/e2pg/config.json | 78 ++++++--- cmd/e2pg/dashboard.go | 2 +- cmd/e2pg/main.go | 40 +---- e2pg/config/config.go | 167 +++++++++++-------- e2pg/config/integration_test.go | 113 +++++++++++++ e2pg/config/testdata/erc721.json | 44 +++++ e2pg/config/testdata/seaport.json | 122 ++++++++++++++ e2pg/config/testdata/txinput.json | 18 +++ 10 files changed, 601 insertions(+), 395 deletions(-) delete mode 100644 abi2/integration_test.go create mode 100644 e2pg/config/integration_test.go create mode 100644 e2pg/config/testdata/erc721.json create mode 100644 e2pg/config/testdata/seaport.json create mode 100644 e2pg/config/testdata/txinput.json diff --git a/abi2/abi2.go b/abi2/abi2.go index 5647f207..0a0b3b17 100644 --- a/abi2/abi2.go +++ b/abi2/abi2.go @@ -5,11 +5,9 @@ import ( "bytes" "context" "encoding/hex" - "encoding/json" "errors" "fmt" "log/slog" - "slices" "strconv" "strings" @@ -195,12 +193,6 @@ func sel(pos int, t atype) atype { type row [][]byte -func (r *row) write(i int, d []byte) { - (*r)[i] = slices.Grow((*r)[i], len(d)) - (*r)[i] = (*r)[i][:len(d)] - copy((*r)[i], d) -} - func (r *Result) Len() int { return r.n } @@ -258,7 +250,7 @@ func (r *Result) Scan(input []byte) error { for i := 0; i < r.Len(); i++ { for j := 0; j < len(r.singleton); j++ { if len(r.singleton[j]) > 0 { - r.collection[i].write(j, r.singleton[j]) + r.collection[i][j] = r.singleton[j] } } } @@ -372,6 +364,32 @@ type Input struct { Column string `json:"column"` Pos int `json:"column_pos"` + Filter +} + +type Filter struct { + Op string `json:"filter_op"` + Arg []string `json:"filter_arg"` +} + +func (f Filter) Accept(d []byte) bool { + switch { + case strings.HasSuffix(f.Op, "contains"): + var res bool + for i := range f.Arg { + hb, _ := hex.DecodeString(f.Arg[i]) + if bytes.Equal(hb, d) { + res = true + break + } + } + if strings.HasPrefix(f.Op, "!") { + return !res + } + return res + default: + return true + } } func parseArray(elm atype, s string) atype { @@ -459,9 +477,15 @@ func (inp Input) Selected() []Input { return res } -type Extra struct { - Table Table `json:"table"` - Metadata []string `json:"metadata"` +type BlockData struct { + Name string `json:"name"` + Column string `json:"column"` + Pos int `json:"column_pos"` + Filter +} + +func (bd BlockData) Empty() bool { + return len(bd.Name) == 0 } type Event struct { @@ -469,7 +493,6 @@ type Event struct { Name string `json:"name"` Type string `json:"type"` Inputs []Input `json:"inputs"` - Extra Extra `json:"extra"` } func (e Event) ABIType() atype { @@ -523,19 +546,21 @@ func (e Event) numIndexed() int { } type coldef struct { - Input Input - Column Column - Metadata bool + Input Input + BlockData BlockData + Column Column } // Implements the [e2pg.Integration] interface type Integration struct { Event Event + Table Table Columns []string coldefs []coldef - numIndexed int - numSelected int + numIndexed int + numSelected int + numBDSelected int resultCache *Result sighash []byte @@ -548,18 +573,6 @@ type Conn interface { Query(context.Context, string, ...any) (pgx.Rows, error) } -// Queries the e2pg.events table using the event id. -// Same as calling [New] except the js is loaded from the database. -func Load(ctx context.Context, pgp *pgxpool.Pool, id uint64) (Integration, error) { - const q = `select abi from e2pg.events where id = $1` - var js []byte - err := pgp.QueryRow(ctx, q, id).Scan(&js) - if err != nil { - return Integration{}, fmt.Errorf("loading integration %d: %w", id, err) - } - return New(js) -} - // js must be a json encoded abi event. // // {"name": "MyEent", "type": "event", "inputs": [{"indexed": true, "name": "f", "type": "t"}]} @@ -579,47 +592,56 @@ func Load(ctx context.Context, pgp *pgxpool.Pool, id uint64) (Integration, error // For example: // // {"name": "my_column", "type": "db_type", "filter_op": "contains", "filter_arg": ["0x000"]} -func New(js []byte) (Integration, error) { - ig := Integration{} - if err := json.Unmarshal(js, &ig.Event); err != nil { - return ig, fmt.Errorf("parsing event json: %w", err) +func New(ev Event, bd []BlockData, table Table) (Integration, error) { + ig := Integration{ + Event: ev, + Table: table, + numIndexed: ev.numIndexed(), } - ig.resultCache = NewResult(ig.Event.ABIType()) - ig.sighash = ig.Event.SignatureHash() + ig.resultCache = NewResult(ev.ABIType()) + ig.sighash = ev.SignatureHash() - var ( - selected = ig.Event.Selected() - md = ig.Event.Extra.Metadata - cols = ig.Event.Extra.Table.Cols - ) - if len(cols) != len(selected)+len(md) { + selected := ev.Selected() + if len(table.Cols) != len(selected)+len(bd) { return ig, fmt.Errorf("number of columns in table definitino must equal number of selected columns + metadata columns") } - ig.numIndexed = ig.Event.numIndexed() - ig.numSelected = len(selected) - - var colCount int for _, input := range selected { - ig.Columns = append(ig.Columns, cols[colCount].Name) + c, err := col(table, input.Column) + if err != nil { + return ig, err + } + ig.Columns = append(ig.Columns, c.Name) ig.coldefs = append(ig.coldefs, coldef{ Input: input, - Column: cols[colCount], + Column: c, }) - colCount++ + ig.numSelected++ } - for range md { - ig.Columns = append(ig.Columns, cols[colCount].Name) + for _, data := range bd { + c, err := col(table, data.Column) + if err != nil { + return ig, err + } + ig.Columns = append(ig.Columns, c.Name) ig.coldefs = append(ig.coldefs, coldef{ - Metadata: true, - Column: cols[colCount], + BlockData: data, + Column: c, }) - colCount++ + ig.numBDSelected++ } + return ig, nil } -func (ig Integration) Table() Table { return ig.Event.Extra.Table } +func col(t Table, name string) (Column, error) { + for i := range t.Cols { + if t.Cols[i].Name == name { + return t.Cols[i], nil + } + } + return Column{}, fmt.Errorf("table %q doesn't contain column %q", t.Name, name) +} func (ig Integration) Events(context.Context) [][]byte { return [][]byte{} } @@ -628,6 +650,7 @@ func (ig Integration) Delete(context.Context, e2pg.PG, uint64) error { return ni func (ig Integration) Insert(ctx context.Context, pg e2pg.PG, blocks []eth.Block) (int64, error) { var ( err error + skip bool rows [][]any lwc = &logWithCtx{ctx: ctx} ) @@ -637,16 +660,17 @@ func (ig Integration) Insert(ctx context.Context, pg e2pg.PG, blocks []eth.Block lwc.r = &lwc.b.Receipts[ridx] lwc.t = &lwc.b.Txs[ridx] lwc.ridx = ridx + rows, skip, err = ig.processTx(rows, lwc) + if err != nil { + return 0, fmt.Errorf("processing tx: %w", err) + } + if skip { + continue + } for lidx := range blocks[bidx].Receipts[ridx].Logs { lwc.l = &lwc.r.Logs[lidx] lwc.lidx = lidx - if len(lwc.l.Topics)-1 != ig.numIndexed { - continue - } - if !bytes.Equal(ig.sighash, lwc.l.Topics[0]) { - continue - } - rows, err = ig.process(rows, lwc) + rows, err = ig.processLog(rows, lwc) if err != nil { return 0, fmt.Errorf("processing log: %w", err) } @@ -655,7 +679,7 @@ func (ig Integration) Insert(ctx context.Context, pg e2pg.PG, blocks []eth.Block } return pg.CopyFrom( ctx, - pgx.Identifier{ig.Table().Name}, + pgx.Identifier{ig.Table.Name}, ig.Columns, pgx.CopyFromRows(rows), ) @@ -693,6 +717,8 @@ func (lwc *logWithCtx) get(name string) any { return lwc.t.To.Bytes() case "tx_value": return lwc.t.Value.Dec() + case "tx_input": + return lwc.t.Data.Bytes() case "log_idx": return lwc.lidx case "log_addr": @@ -702,30 +728,36 @@ func (lwc *logWithCtx) get(name string) any { } } -func (ig Integration) process(rows [][]any, lwc *logWithCtx) ([][]any, error) { +func (ig Integration) processTx(rows [][]any, lwc *logWithCtx) ([][]any, bool, error) { switch { - case ig.numIndexed == ig.numSelected: + case ig.numSelected > 0: + return rows, false, nil + case ig.numBDSelected > 0: row := make([]any, len(ig.coldefs)) for i, def := range ig.coldefs { switch { - case def.Input.Indexed: - d := dbtype(def.Input.Type, lwc.l.Topics[1+i]) - if b, ok := d.([]byte); ok && !def.Column.Accept(b) { - return nil, nil - } - row[i] = d - case def.Metadata: + case !def.BlockData.Empty(): d := lwc.get(def.Column.Name) - if b, ok := d.([]byte); ok && !def.Column.Accept(b) { - return nil, nil + if b, ok := d.([]byte); ok && !def.BlockData.Accept(b) { + return rows, true, nil } row[i] = d default: - return nil, fmt.Errorf("no rows for un-indexed data") + return rows, false, fmt.Errorf("expected only blockdata coldef") } } rows = append(rows, row) - default: + } + return rows, true, nil +} + +func (ig Integration) processLog(rows [][]any, lwc *logWithCtx) ([][]any, error) { + switch { + case len(lwc.l.Topics)-1 != ig.numIndexed: + return rows, nil + case !bytes.Equal(ig.sighash, lwc.l.Topics[0]): + return rows, nil + case ig.numSelected > ig.numIndexed: err := ig.resultCache.Scan(lwc.l.Data) if err != nil { return nil, fmt.Errorf("scanning abi data: %w", err) @@ -739,16 +771,16 @@ func (ig Integration) process(rows [][]any, lwc *logWithCtx) ([][]any, error) { d := lwc.l.Topics[ictr] row[j] = dbtype(def.Input.Type, d) ictr++ - case def.Metadata: + case !def.BlockData.Empty(): d := lwc.get(def.Column.Name) - if b, ok := d.([]byte); ok && !def.Column.Accept(b) { - return nil, nil + if b, ok := d.([]byte); ok && !def.BlockData.Accept(b) { + return rows, nil } row[j] = d default: d := ig.resultCache.At(i)[actr] - if !def.Column.Accept(d) { - return nil, nil + if !def.Input.Accept(d) { + return rows, nil } row[j] = dbtype(def.Input.Type, d) actr++ @@ -756,6 +788,27 @@ func (ig Integration) process(rows [][]any, lwc *logWithCtx) ([][]any, error) { } rows = append(rows, row) } + default: + row := make([]any, len(ig.coldefs)) + for i, def := range ig.coldefs { + switch { + case def.Input.Indexed: + d := dbtype(def.Input.Type, lwc.l.Topics[1+i]) + if b, ok := d.([]byte); ok && !def.Input.Accept(b) { + return rows, nil + } + row[i] = d + case !def.BlockData.Empty(): + d := lwc.get(def.BlockData.Name) + if b, ok := d.([]byte); ok && !def.BlockData.Accept(b) { + return rows, nil + } + row[i] = d + default: + return nil, fmt.Errorf("no rows for un-indexed data") + } + } + rows = append(rows, row) } return rows, nil } @@ -793,7 +846,7 @@ func (ig Integration) Count(ctx context.Context, pg *pgxpool.Pool, chainID uint6 ` var ( res string - fq = fmt.Sprintf(q, ig.Table().Name) + fq = fmt.Sprintf(q, ig.Table.Name) ) err := pg.QueryRow(ctx, fq, chainID).Scan(&res) if err != nil { @@ -820,7 +873,7 @@ func (ig Integration) RecentRows(ctx context.Context, pgp *pgxpool.Pool, chainID } } q.WriteString(" from ") - q.WriteString(ig.Table().Name) + q.WriteString(ig.Table.Name) q.WriteString(" where chain_id = $1") q.WriteString(" order by block_num desc limit 10") @@ -835,32 +888,8 @@ func (ig Integration) RecentRows(ctx context.Context, pgp *pgxpool.Pool, chainID } type Column struct { - Name string `json:"name"` - Type string `json:"type"` - FilterOp string `json:"filter_op"` - FilterArg []string `json:"filter_arg"` -} - -// Uses FilterOp and FilterArg to check if d passes the filter. -// Current filter_ops include: contains and !contains -func (c Column) Accept(d []byte) bool { - switch { - case strings.HasSuffix(c.FilterOp, "contains"): - var res bool - for i := range c.FilterArg { - hb, _ := hex.DecodeString(c.FilterArg[i]) - if bytes.Equal(hb, d) { - res = true - break - } - } - if strings.HasPrefix(c.FilterOp, "!") { - return !res - } - return res - default: - return true - } + Name string `json:"name"` + Type string `json:"type"` } type Table struct { @@ -868,16 +897,6 @@ type Table struct { Cols []Column `json:"columns"` } -func (t Table) Filters() []Column { - var res []Column - for i := range t.Cols { - if t.Cols[i].FilterOp != "" { - res = append(res, t.Cols[i]) - } - } - return res -} - func CreateTable(ctx context.Context, pg Conn, t Table) error { var s strings.Builder s.WriteString(fmt.Sprintf("create table if not exists %s(", t.Name)) diff --git a/abi2/integration_test.go b/abi2/integration_test.go deleted file mode 100644 index 7e8ef04a..00000000 --- a/abi2/integration_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package abi2 - -import ( - "database/sql" - "testing" - - "github.com/indexsupply/x/integrations/testhelper" - - "blake.io/pqx/pqxtest" - "github.com/jackc/pgx/v5/stdlib" - "kr.dev/diff" -) - -func TestMain(m *testing.M) { - sql.Register("postgres", stdlib.GetDefaultDriver()) - pqxtest.TestMain(m) -} - -func TestInsert(t *testing.T) { - th := testhelper.New(t) - defer th.Done() - cases := []struct { - blockNum uint64 - query string - }{ - { - 17943843, - ` - select true from seaport_test - where order_hash = '\x796820863892f449ec5dc02582e6708292255cb1ed21f5c3b0ff4bff04328ff7' - `, - }, - } - for _, tc := range cases { - th.Reset() - c, err := New(j) - diff.Test(t, t.Fatalf, nil, err) - diff.Test(t, t.Fatalf, nil, CreateTable(th.Context(), th.PG, c.Table())) - th.Process(c, tc.blockNum) - var found bool - diff.Test(t, t.Errorf, nil, th.PG.QueryRow(th.Context(), tc.query).Scan(&found)) - } -} - -var j = []byte(`{ - "anonymous": false, - "inputs": [ - { - "indexed": false, - "internalType": "bytes32", - "name": "orderHash", - "type": "bytes32", - "column": "order_hash" - }, - { - "indexed": true, - "internalType": "address", - "name": "offerer", - "type": "address", - "column": "offerer" - }, - { - "indexed": true, - "internalType": "address", - "name": "zone", - "type": "address", - "column": "zone" - }, - { - "indexed": false, - "internalType": "address", - "name": "recipient", - "type": "address", - "column": "recipient" - }, - { - "components": [ - { - "internalType": "enum ItemType", - "name": "itemType", - "type": "uint8" - }, - { - "internalType": "address", - "name": "token", - "type": "address", - "column": "offer_token" - }, - { - "internalType": "uint256", - "name": "identifier", - "type": "uint256" - }, - { - "internalType": "uint256", - "name": "amount", - "type": "uint256" - } - ], - "indexed": false, - "internalType": "struct SpentItem[]", - "name": "offer", - "type": "tuple[]" - }, - { - "components": [ - { - "internalType": "enum ItemType", - "name": "itemType", - "type": "uint8" - }, - { - "internalType": "address", - "name": "token", - "type": "address" - }, - { - "internalType": "uint256", - "name": "identifier", - "type": "uint256" - }, - { - "internalType": "uint256", - "name": "amount", - "type": "uint256" - }, - { - "internalType": "address payable", - "name": "recipient", - "type": "address", - "column": "consideration_recipient" - } - ], - "indexed": false, - "internalType": "struct ReceivedItem[]", - "name": "consideration", - "type": "tuple[]" - } - ], - "extra": { - "table": { - "name": "seaport_test", - "columns": [ - {"name": "order_hash", "type": "bytea"}, - {"name": "consideration_recipient", "type": "bytea"}, - {"name": "offer_token", "type": "bytea"}, - {"name": "offerer", "type": "bytea"}, - {"name": "zone", "type": "bytea"}, - {"name": "recipient", "type": "bytea"} - ] - } - }, - "name": "OrderFulfilled", - "type": "event" -}`) diff --git a/cmd/e2pg/config.json b/cmd/e2pg/config.json index 9b588cc8..f62cc705 100644 --- a/cmd/e2pg/config.json +++ b/cmd/e2pg/config.json @@ -1,22 +1,56 @@ -[ - { - "name": "mainnet", - "id": 1, - "chain": 1, - "concurrency": 8, - "batch": 128, - "eth": "https://1.rlps.indexsupply.net", - "pg": "postgres:///e2pg", - "integrations": ["erc721", "erc1155"] - }, - { - "name": "bedrock", - "id": 2, - "chain": 10, - "concurrency": 8, - "batch": 1024, - "eth": "http://10.rlps.indexsupply.net", - "pg": "postgres:///e2pg", - "integrations": ["erc20"] - } -] +{ + "pg_url": "postgres:///e2pg", + "eth_sources": [ + {"name": "goerli", "chain_id": 5, "url": "https://5.rlps.indexsupply.net"} + ], + "integrations": [ + { + "name": "ERC20 Transfers", + "enabled": true, + "sources": [{"name": "goerli"}], + "table": { + "name": "e20_transfers", + "columns": [ + {"name": "chain_id", "type": "numeric"}, + {"name": "block_num", "type": "numeric"}, + {"name": "tx_hash", "type": "bytea"}, + {"name": "contract", "type": "bytea"}, + {"name": "f", "type": "bytea"}, + {"name": "t", "type": "bytea"}, + {"name": "amt", "type": "numeric"} + ] + }, + "block": [ + {"name": "chain_id", "column": "chain_id"}, + {"name": "block_num", "column": "block_num"}, + {"name": "tx_hash", "column": "tx_hash"}, + {"name": "log_addr", "column": "contract"} + ], + "event": { + "name": "Transfer", + "type": "event", + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "from", + "type": "address", + "column": "f" + }, + { + "indexed": true, + "name": "to", + "type": "address", + "column": "t" + }, + { + "indexed": true, + "name": "amount", + "type": "uint256", + "column": "amt" + } + ] + } + } + ] +} diff --git a/cmd/e2pg/dashboard.go b/cmd/e2pg/dashboard.go index 5a510d3f..150d0253 100644 --- a/cmd/e2pg/dashboard.go +++ b/cmd/e2pg/dashboard.go @@ -164,7 +164,7 @@ func (dh *dashHandler) Index(w http.ResponseWriter, r *http.Request) { } snaps := make(map[uint64]e2pg.StatusSnapshot) for _, task := range dh.tasks { - snaps[task.ChainID] = task.Status() + snaps[task.ID] = task.Status() } err = tmpl.Execute(w, snaps) if err != nil { diff --git a/cmd/e2pg/main.go b/cmd/e2pg/main.go index 0b75f170..affa5b43 100644 --- a/cmd/e2pg/main.go +++ b/cmd/e2pg/main.go @@ -13,7 +13,6 @@ import ( "os" "runtime/debug" "runtime/pprof" - "strings" "time" "github.com/indexsupply/x/e2pg" @@ -37,9 +36,6 @@ func main() { ctx = context.Background() cfile string - conf config.Config - intgs string - skipMigrate bool listen string notx bool @@ -47,16 +43,6 @@ func main() { version bool ) flag.StringVar(&cfile, "config", "", "task config file") - flag.Uint64Var(&conf.ID, "id", 0, "task id") - flag.Uint64Var(&conf.ChainID, "chain", 0, "task id") - flag.StringVar(&conf.FreezerPath, "ef", "/storage/geth/geth/chaindata/ancient/chain/", "path to freezer files") - flag.StringVar(&conf.PGURL, "pg", "", "postgres url") - flag.StringVar(&conf.ETHURL, "e", "", "address or socket for rpc server") - flag.Uint64Var(&conf.Concurrency, "c", 2, "number of concurrent workers") - flag.Uint64Var(&conf.Batch, "b", 128, "batch size") - flag.Uint64Var(&conf.Begin, "begin", 0, "starting block. 0 starts at latest") - flag.Uint64Var(&conf.End, "end", 0, "ending block. 0 never ends") - flag.StringVar(&intgs, "i", "", "list of integrations") flag.BoolVar(&skipMigrate, "skip-migrate", false, "do not run db migrations on startup") flag.StringVar(&listen, "l", ":8546", "dashboard server listen address") @@ -86,44 +72,30 @@ func main() { slog.String("v", Commit), }))) - if len(intgs) > 0 { - for _, s := range strings.Split(intgs, ",") { - conf.Integrations = append(conf.Integrations, s) - } - } - if version { fmt.Printf("v%s-%s\n", Version, Commit) os.Exit(0) } - var confs []config.Config + var conf config.Config switch { - case cfile != "" && !conf.Empty(): - fmt.Printf("unable to use config file and command line args\n") + case cfile == "": + fmt.Printf("missing config file\n") os.Exit(1) case cfile != "": f, err := os.Open(cfile) check(err) - confs = []config.Config{} - check(json.NewDecoder(f).Decode(&confs)) - case !conf.Empty(): - confs = []config.Config{conf} - } - - if len(confs) == 0 { - fmt.Println("Must specify at least 1 task configuration") - os.Exit(1) + check(json.NewDecoder(f).Decode(&conf)) } if !skipMigrate { - migdb, err := pgxpool.New(ctx, config.Env(confs[0].PGURL)) + migdb, err := pgxpool.New(ctx, config.Env(conf.PGURL)) check(err) check(pgmig.Migrate(migdb, e2pg.Migrations)) migdb.Close() } - tasks, err := config.NewTasks(confs...) + tasks, err := config.NewTasks(conf) check(err) var ( diff --git a/e2pg/config/config.go b/e2pg/config/config.go index 1c17e7f5..1adb280c 100644 --- a/e2pg/config/config.go +++ b/e2pg/config/config.go @@ -2,10 +2,12 @@ package config import ( "context" + "encoding/json" "fmt" "os" "strings" + "github.com/indexsupply/x/abi2" "github.com/indexsupply/x/e2pg" "github.com/indexsupply/x/integrations/erc1155" "github.com/indexsupply/x/integrations/erc20" @@ -17,7 +19,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -var Integrations = map[string]e2pg.Integration{ +var compiled = map[string]e2pg.Integration{ "erc20": erc20.Integration, "erc721": erc721.Integration, "erc1155": erc1155.Integration, @@ -25,22 +27,44 @@ var Integrations = map[string]e2pg.Integration{ "txinputs": txinputs.Integration, } +type EthSource struct { + Name string `json:"name"` + ChainID uint64 `json:"chain_id"` + URL string `json:"url"` +} + +type Source struct { + Name string `json:"name"` + Start uint64 `json:"start"` + Stop uint64 `json:"stop"` +} + +type Compiled struct { + Name string `json:"name"` + Config json.RawMessage `json:"config"` +} + +type Integration struct { + Name string `json:"name"` + Enabled bool `json:"enabled"` + Start string `json:"start"` + Stop string `json:"stop"` + Backfill bool `json:"backfill"` + Sources []Source `json:"sources"` + Table abi2.Table `json:"table"` + Compiled Compiled `json:"compiled"` + Block []abi2.BlockData `json:"block"` + Event abi2.Event `json:"event"` +} + type Config struct { - Name string `json:"name"` - ID uint64 `json:"id"` - ChainID uint64 `json:"chain"` - Concurrency uint64 `json:"concurrency"` - Batch uint64 `json:"batch"` - ETHURL string `json:"eth"` - PGURL string `json:"pg"` - FreezerPath string `json:"freezer"` - Integrations []string `json:"integrations"` - Begin uint64 `json:"begin"` - End uint64 `json:"end"` + PGURL string `json:"pg_url"` + EthSources []EthSource `json:"eth_sources"` + Integrations []Integration `json:"integrations"` } func (conf Config) Empty() bool { - return conf.ETHURL == "" || conf.PGURL == "" + return conf.PGURL == "" } // If s has a $ prefix then we assume @@ -62,71 +86,86 @@ func Env(s string) string { return s } -func NewTasks(confs ...Config) ([]*e2pg.Task, error) { - var ( - err error - tasks []*e2pg.Task - nodes = map[string]e2pg.Node{} - dbs = map[string]*pgxpool.Pool{} - ) - for _, conf := range confs { - pgp, ok := dbs[conf.PGURL] - if !ok { - pgp, err = pgxpool.New(context.Background(), Env(conf.PGURL)) - if err != nil { - return nil, fmt.Errorf("%s dburl invalid: %w", conf.Name, err) - } - dbs[conf.PGURL] = pgp +func NewTasks(conf Config) ([]*e2pg.Task, error) { + pgp, err := pgxpool.New(context.Background(), Env(conf.PGURL)) + if err != nil { + return nil, fmt.Errorf("dburl invalid: %w", err) + } + intgsBySource := map[Source][]e2pg.Integration{} + for _, ig := range conf.Integrations { + if !ig.Enabled { + continue } - node, ok := nodes[conf.ETHURL] - if !ok { - node, err = parseNode(Env(conf.ETHURL), conf.FreezerPath) - if err != nil { - return nil, fmt.Errorf("%s ethurl invalid: %w", conf.Name, err) - } - nodes[conf.ETHURL] = node + eig, err := getIntegration(pgp, ig) + if err != nil { + return nil, fmt.Errorf("unable to build integration: %s", ig.Name) } - - var intgs []e2pg.Integration - switch { - case len(conf.Integrations) == 0: - for _, ig := range Integrations { - intgs = append(intgs, ig) - } - default: - for _, name := range conf.Integrations { - ig, ok := Integrations[name] - if !ok { - return nil, fmt.Errorf("unable to find integration: %q", name) - } - intgs = append(intgs, ig) - } + for _, src := range ig.Sources { + intgsBySource[src] = append(intgsBySource[src], eig) } + } + var ( + taskID uint64 = 1 + tasks []*e2pg.Task + ) + // Start per-source main tasks + for src, intgs := range intgsBySource { + chainID, node, err := getNode(conf.EthSources, src.Name) + if err != nil { + return nil, fmt.Errorf("unkown source: %s", src.Name) + } tasks = append(tasks, e2pg.NewTask( - conf.ID, - conf.ChainID, - conf.Name, - conf.Batch, - conf.Concurrency, + taskID, + chainID, + src.Name, + 1, + 1, node, pgp, - conf.Begin, - conf.End, + src.Start, + 0, intgs..., )) + taskID++ } return tasks, nil } -func parseNode(url, fpath string) (e2pg.Node, error) { +func getIntegration(pgp *pgxpool.Pool, ig Integration) (e2pg.Integration, error) { switch { - case strings.Contains(url, "rlps"): - return rlps.NewClient(url), nil - case strings.HasPrefix(url, "http"): - return jrpc2.New(url), nil + case len(ig.Compiled.Name) > 0: + cig, ok := compiled[ig.Name] + if !ok { + return nil, fmt.Errorf("unable to find compiled integration: %s", ig.Name) + } + return cig, nil default: - // TODO add back support for local node - return nil, fmt.Errorf("unable to create node for %s", url) + aig, err := abi2.New(ig.Event, ig.Block, ig.Table) + if err != nil { + return nil, fmt.Errorf("building abi integration: %w", err) + } + if err := abi2.CreateTable(context.Background(), pgp, aig.Table); err != nil { + return nil, fmt.Errorf("setting up table for abi integration: %w", err) + } + return aig, nil + } +} + +func getNode(srcs []EthSource, name string) (uint64, e2pg.Node, error) { + for _, src := range srcs { + if src.Name != name { + continue + } + switch { + case strings.Contains(src.URL, "rlps"): + return src.ChainID, rlps.NewClient(src.URL), nil + case strings.HasPrefix(src.URL, "http"): + return src.ChainID, jrpc2.New(src.URL), nil + default: + // TODO add back support for local node + return 0, nil, fmt.Errorf("unsupported src type: %v", src) + } } + return 0, nil, fmt.Errorf("unable to find src for %s", name) } diff --git a/e2pg/config/integration_test.go b/e2pg/config/integration_test.go new file mode 100644 index 00000000..facc235d --- /dev/null +++ b/e2pg/config/integration_test.go @@ -0,0 +1,113 @@ +package config + +import ( + "database/sql" + "encoding/json" + "os" + "testing" + + "blake.io/pqx/pqxtest" + "github.com/indexsupply/x/integrations/testhelper" + "github.com/jackc/pgx/v5/stdlib" + "kr.dev/diff" +) + +func TestMain(m *testing.M) { + sql.Register("postgres", stdlib.GetDefaultDriver()) + pqxtest.TestMain(m) +} + +func TestIntegrations(t *testing.T) { + th := testhelper.New(t) + defer th.Done() + cases := []struct { + blockNum uint64 + config string + queries []string + }{ + { + 17943843, + "txinput.json", + []string{ + ` + select count(*) = 1 from txinput_test + where tx_hash = '\x713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42' + `, + }, + }, + { + 17943843, + "erc721.json", + []string{ + ` + select count(*) = 4 from erc721_test + where tx_hash = '\x713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42' + `, + }, + }, + { + 17943843, + "seaport.json", + []string{ + ` + select true from seaport_test + where order_hash = '\xdaf50b59a508ee06e269125af28e796477ebf55d22a3c6a24e42d038d9d8d8ee' + and tx_hash = '\x713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42' + and log_idx = 3 + and offer_token is null + and consideration_recipient = '\x5e97a8773122bde31d44756f271c87893991a6ea' + `, + ` + select true from seaport_test + where order_hash = '\xdaf50b59a508ee06e269125af28e796477ebf55d22a3c6a24e42d038d9d8d8ee' + and tx_hash = '\x713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42' + and log_idx = 3 + and offer_token is null + and consideration_recipient = '\x0000a26b00c1f0df003000390027140000faa719' + `, + ` + select true from seaport_test + where order_hash = '\xdaf50b59a508ee06e269125af28e796477ebf55d22a3c6a24e42d038d9d8d8ee' + and tx_hash = '\x713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42' + and log_idx = 3 + and offer_token = '\x57f1887a8bf19b14fc0df6fd9b2acc9af147ea85' + and consideration_recipient is null + `, + }, + }, + } + for _, tc := range cases { + th.Reset() + ig := Integration{} + decode(t, read(t, tc.config), &ig) + eig, err := getIntegration(th.PG, ig) + diff.Test(t, t.Errorf, nil, err) + th.Process(eig, tc.blockNum) + for i, q := range tc.queries { + var found bool + err := th.PG.QueryRow(th.Context(), q).Scan(&found) + diff.Test(t, t.Errorf, nil, err) + if err != nil { + t.Logf("failing test query: %d", i) + } + if !found { + t.Errorf("test %s failed", tc.config) + } + } + } +} + +func read(tb testing.TB, name string) []byte { + path := "testdata/" + name + b, err := os.ReadFile(path) + if err != nil { + tb.Fatalf("unable to read file %s", path) + } + return b +} + +func decode(tb testing.TB, js []byte, dest any) { + if err := json.Unmarshal(js, dest); err != nil { + tb.Fatalf("decoding json: %.4s error: %s", string(js), err.Error()) + } +} diff --git a/e2pg/config/testdata/erc721.json b/e2pg/config/testdata/erc721.json new file mode 100644 index 00000000..332a28c2 --- /dev/null +++ b/e2pg/config/testdata/erc721.json @@ -0,0 +1,44 @@ +{ + "name": "erc721", + "table": { + "name": "erc721_test", + "columns": [ + {"name": "chain_id", "type": "numeric"}, + {"name": "block_num", "type": "numeric"}, + {"name": "tx_hash", "type": "bytea"}, + {"name": "f", "type": "bytea"}, + {"name": "t", "type": "bytea"}, + {"name": "token", "type": "numeric"} + ] + }, + "block": [ + {"name": "chain_id", "column": "chain_id"}, + {"name": "block_num", "column": "block_num"}, + {"name": "tx_hash", "column": "tx_hash", "filter_op": "contains", "filter_arg": ["713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42"]} + ], + "event": { + "name": "Transfer", + "type": "event", + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "from", + "type": "address", + "column": "f" + }, + { + "indexed": true, + "name": "to", + "type": "address", + "column": "t" + }, + { + "indexed": true, + "name": "tokenId", + "type": "uint256", + "column": "token" + } + ] + } +} diff --git a/e2pg/config/testdata/seaport.json b/e2pg/config/testdata/seaport.json new file mode 100644 index 00000000..5d638f8b --- /dev/null +++ b/e2pg/config/testdata/seaport.json @@ -0,0 +1,122 @@ +{ + "name": "seaport_test", + "table": { + "name": "seaport_test", + "columns": [ + {"name": "chain_id", "type": "numeric"}, + {"name": "order_hash", "type": "bytea"}, + {"name": "consideration_recipient", "type": "bytea"}, + {"name": "offer_token", "type": "bytea"}, + {"name": "offerer", "type": "bytea"}, + {"name": "zone", "type": "bytea"}, + {"name": "recipient", "type": "bytea"}, + {"name": "block_num", "type": "numeric"}, + {"name": "tx_hash", "type": "bytea"}, + {"name": "log_idx", "type": "numeric"} + ] + }, + "block": [ + {"name": "chain_id", "column": "chain_id"}, + {"name": "block_num", "column": "block_num"}, + {"name": "tx_hash", "column": "tx_hash", "filter_op": "contains", "filter_arg": ["713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42"]}, + {"name": "log_idx", "column": "log_idx"} + ], + "event": { + "anonymous": false, + "name": "OrderFulfilled", + "type": "event", + "inputs": [ + { + "indexed": false, + "internalType": "bytes32", + "name": "orderHash", + "type": "bytes32", + "column": "order_hash" + }, + { + "indexed": true, + "internalType": "address", + "name": "offerer", + "type": "address", + "column": "offerer" + }, + { + "indexed": true, + "internalType": "address", + "name": "zone", + "type": "address", + "column": "zone" + }, + { + "indexed": false, + "internalType": "address", + "name": "recipient", + "type": "address", + "column": "recipient" + }, + { + "components": [ + { + "internalType": "enum ItemType", + "name": "itemType", + "type": "uint8" + }, + { + "internalType": "address", + "name": "token", + "type": "address", + "column": "offer_token" + }, + { + "internalType": "uint256", + "name": "identifier", + "type": "uint256" + }, + { + "internalType": "uint256", + "name": "amount", + "type": "uint256" + } + ], + "indexed": false, + "internalType": "struct SpentItem[]", + "name": "offer", + "type": "tuple[]" + }, + { + "components": [ + { + "internalType": "enum ItemType", + "name": "itemType", + "type": "uint8" + }, + { + "internalType": "address", + "name": "token", + "type": "address" + }, + { + "internalType": "uint256", + "name": "identifier", + "type": "uint256" + }, + { + "internalType": "uint256", + "name": "amount", + "type": "uint256" + }, + { + "internalType": "address payable", + "name": "recipient", + "type": "address", + "column": "consideration_recipient" + } + ], + "indexed": false, + "internalType": "struct ReceivedItem[]", + "name": "consideration", + "type": "tuple[]" + } + ] + } +} diff --git a/e2pg/config/testdata/txinput.json b/e2pg/config/testdata/txinput.json new file mode 100644 index 00000000..a2339286 --- /dev/null +++ b/e2pg/config/testdata/txinput.json @@ -0,0 +1,18 @@ +{ + "name": "txinput", + "table": { + "name": "txinput_test", + "columns": [ + {"name": "chain_id", "type": "numeric"}, + {"name": "block_num", "type": "numeric"}, + {"name": "tx_hash", "type": "bytea"}, + {"name": "tx_input", "type": "bytea"} + ] + }, + "block": [ + {"name": "chain_id", "column": "chain_id"}, + {"name": "block_num", "column": "block_num"}, + {"name": "tx_hash", "column": "tx_hash", "filter_op": "contains", "filter_arg": ["713df81a2ab53db1d01531106fc5de43012a401ddc3e0586d522e5c55a162d42"]}, + {"name": "tx_input", "column": "tx_input"} + ] +}