Skip to content

Commit

Permalink
e2pg/config: declarative based indexing (#172)
Browse files Browse the repository at this point in the history
Breaking change. Old config file format is no longer valid.

New config file format enables declarative based indexing.
  • Loading branch information
ryandotsmith authored Oct 6, 2023
1 parent a999b7c commit 0edb55f
Show file tree
Hide file tree
Showing 10 changed files with 601 additions and 395 deletions.
257 changes: 138 additions & 119 deletions abi2/abi2.go

Large diffs are not rendered by default.

155 changes: 0 additions & 155 deletions abi2/integration_test.go

This file was deleted.

78 changes: 56 additions & 22 deletions cmd/e2pg/config.json
Original file line number Diff line number Diff line change
@@ -1,22 +1,56 @@
[
{
"name": "mainnet",
"id": 1,
"chain": 1,
"concurrency": 8,
"batch": 128,
"eth": "https://1.rlps.indexsupply.net",
"pg": "postgres:///e2pg",
"integrations": ["erc721", "erc1155"]
},
{
"name": "bedrock",
"id": 2,
"chain": 10,
"concurrency": 8,
"batch": 1024,
"eth": "http://10.rlps.indexsupply.net",
"pg": "postgres:///e2pg",
"integrations": ["erc20"]
}
]
{
"pg_url": "postgres:///e2pg",
"eth_sources": [
{"name": "goerli", "chain_id": 5, "url": "https://5.rlps.indexsupply.net"}
],
"integrations": [
{
"name": "ERC20 Transfers",
"enabled": true,
"sources": [{"name": "goerli"}],
"table": {
"name": "e20_transfers",
"columns": [
{"name": "chain_id", "type": "numeric"},
{"name": "block_num", "type": "numeric"},
{"name": "tx_hash", "type": "bytea"},
{"name": "contract", "type": "bytea"},
{"name": "f", "type": "bytea"},
{"name": "t", "type": "bytea"},
{"name": "amt", "type": "numeric"}
]
},
"block": [
{"name": "chain_id", "column": "chain_id"},
{"name": "block_num", "column": "block_num"},
{"name": "tx_hash", "column": "tx_hash"},
{"name": "log_addr", "column": "contract"}
],
"event": {
"name": "Transfer",
"type": "event",
"anonymous": false,
"inputs": [
{
"indexed": true,
"name": "from",
"type": "address",
"column": "f"
},
{
"indexed": true,
"name": "to",
"type": "address",
"column": "t"
},
{
"indexed": true,
"name": "amount",
"type": "uint256",
"column": "amt"
}
]
}
}
]
}
2 changes: 1 addition & 1 deletion cmd/e2pg/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (dh *dashHandler) Index(w http.ResponseWriter, r *http.Request) {
}
snaps := make(map[uint64]e2pg.StatusSnapshot)
for _, task := range dh.tasks {
snaps[task.ChainID] = task.Status()
snaps[task.ID] = task.Status()
}
err = tmpl.Execute(w, snaps)
if err != nil {
Expand Down
40 changes: 6 additions & 34 deletions cmd/e2pg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"os"
"runtime/debug"
"runtime/pprof"
"strings"
"time"

"github.com/indexsupply/x/e2pg"
Expand All @@ -37,26 +36,13 @@ func main() {
ctx = context.Background()
cfile string

conf config.Config
intgs string

skipMigrate bool
listen string
notx bool
profile string
version bool
)
flag.StringVar(&cfile, "config", "", "task config file")
flag.Uint64Var(&conf.ID, "id", 0, "task id")
flag.Uint64Var(&conf.ChainID, "chain", 0, "task id")
flag.StringVar(&conf.FreezerPath, "ef", "/storage/geth/geth/chaindata/ancient/chain/", "path to freezer files")
flag.StringVar(&conf.PGURL, "pg", "", "postgres url")
flag.StringVar(&conf.ETHURL, "e", "", "address or socket for rpc server")
flag.Uint64Var(&conf.Concurrency, "c", 2, "number of concurrent workers")
flag.Uint64Var(&conf.Batch, "b", 128, "batch size")
flag.Uint64Var(&conf.Begin, "begin", 0, "starting block. 0 starts at latest")
flag.Uint64Var(&conf.End, "end", 0, "ending block. 0 never ends")
flag.StringVar(&intgs, "i", "", "list of integrations")

flag.BoolVar(&skipMigrate, "skip-migrate", false, "do not run db migrations on startup")
flag.StringVar(&listen, "l", ":8546", "dashboard server listen address")
Expand Down Expand Up @@ -86,44 +72,30 @@ func main() {
slog.String("v", Commit),
})))

if len(intgs) > 0 {
for _, s := range strings.Split(intgs, ",") {
conf.Integrations = append(conf.Integrations, s)
}
}

if version {
fmt.Printf("v%s-%s\n", Version, Commit)
os.Exit(0)
}

var confs []config.Config
var conf config.Config
switch {
case cfile != "" && !conf.Empty():
fmt.Printf("unable to use config file and command line args\n")
case cfile == "":
fmt.Printf("missing config file\n")
os.Exit(1)
case cfile != "":
f, err := os.Open(cfile)
check(err)
confs = []config.Config{}
check(json.NewDecoder(f).Decode(&confs))
case !conf.Empty():
confs = []config.Config{conf}
}

if len(confs) == 0 {
fmt.Println("Must specify at least 1 task configuration")
os.Exit(1)
check(json.NewDecoder(f).Decode(&conf))
}

if !skipMigrate {
migdb, err := pgxpool.New(ctx, config.Env(confs[0].PGURL))
migdb, err := pgxpool.New(ctx, config.Env(conf.PGURL))
check(err)
check(pgmig.Migrate(migdb, e2pg.Migrations))
migdb.Close()
}

tasks, err := config.NewTasks(confs...)
tasks, err := config.NewTasks(conf)
check(err)

var (
Expand Down
Loading

0 comments on commit 0edb55f

Please sign in to comment.