From 6314fb229f7d75aea4495a5b4e8128866f2e9320 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Thu, 12 Oct 2023 16:31:13 -0700 Subject: [PATCH] e2pg: refactor. clean up task manager loading --- cmd/e2pg/config.json | 96 +++++++++++++++++++------------------------- e2pg/e2pg.go | 56 +++++++++++--------------- 2 files changed, 65 insertions(+), 87 deletions(-) diff --git a/cmd/e2pg/config.json b/cmd/e2pg/config.json index f62cc705..2a0faae0 100644 --- a/cmd/e2pg/config.json +++ b/cmd/e2pg/config.json @@ -1,56 +1,44 @@ { - "pg_url": "postgres:///e2pg", - "eth_sources": [ - {"name": "goerli", "chain_id": 5, "url": "https://5.rlps.indexsupply.net"} - ], - "integrations": [ - { - "name": "ERC20 Transfers", - "enabled": true, - "sources": [{"name": "goerli"}], - "table": { - "name": "e20_transfers", - "columns": [ - {"name": "chain_id", "type": "numeric"}, - {"name": "block_num", "type": "numeric"}, - {"name": "tx_hash", "type": "bytea"}, - {"name": "contract", "type": "bytea"}, - {"name": "f", "type": "bytea"}, - {"name": "t", "type": "bytea"}, - {"name": "amt", "type": "numeric"} - ] - }, - "block": [ - {"name": "chain_id", "column": "chain_id"}, - {"name": "block_num", "column": "block_num"}, - {"name": "tx_hash", "column": "tx_hash"}, - {"name": "log_addr", "column": "contract"} - ], - "event": { - "name": "Transfer", - "type": "event", - "anonymous": false, - "inputs": [ - { - "indexed": true, - "name": "from", - "type": "address", - "column": "f" - }, - { - "indexed": true, - "name": "to", - "type": "address", - "column": "t" - }, - { - "indexed": true, - "name": "amount", - "type": "uint256", - "column": "amt" - } - ] - } - } - ] + "pg_url": "postgres:///e2pg", + "eth_sources": [ + {"name": "goerli", "chain_id": 5, "url": "https://5.rlps.indexsupply.net"} + ], + "integrations": [ + { + "name": "t1", + "enabled": true, + "sources": [{"name": "goerli"}], + "table": { + "name": "t1", + "columns": [ + {"name": "chain_id", "type": "numeric"}, + {"name": "block_num", "type": "numeric"}, + {"name": "tx_hash", "type": "bytea"} + ] + }, + "block": [ + {"name": "chain_id", "column": "chain_id"}, + {"name": "block_num", "column": "block_num"}, + {"name": "tx_hash", "column": "tx_hash"} + ] + }, + { + "name": "t2", + "enabled": true, + "sources": [{"name": "goerli"}], + "table": { + "name": "t2", + "columns": [ + {"name": "chain_id", "type": "numeric"}, + {"name": "block_num", "type": "numeric"}, + {"name": "tx_hash", "type": "bytea"} + ] + }, + "block": [ + {"name": "chain_id", "column": "chain_id"}, + {"name": "block_num", "column": "block_num"}, + {"name": "tx_hash", "column": "tx_hash"} + ] + } + ] } diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index a9428f05..7029f921 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -503,37 +503,33 @@ func (tm *Manager) Run() error { } func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, error) { - var ( - sourcesByName = map[string]SourceConfig{} - allSources []SourceConfig - ) + allSources := map[string]SourceConfig{} dbSources, err := SourceConfigs(ctx, pgp) if err != nil { return nil, fmt.Errorf("loading sources: %w", err) } for _, sc := range dbSources { - sourcesByName[sc.Name] = sc + allSources[sc.Name] = sc } for _, sc := range conf.SourceConfigs { - sourcesByName[sc.Name] = sc - } - for _, sc := range sourcesByName { - allSources = append(allSources, sc) + allSources[sc.Name] = sc } - intgsByName := map[string]Integration{} + allIntgs := map[string]Integration{} dbIntgs, err := Integrations(ctx, pgp) if err != nil { return nil, fmt.Errorf("loading integrations: %w", err) } for _, intg := range dbIntgs { - intgsByName[intg.Name] = intg + allIntgs[intg.Name] = intg } for _, intg := range conf.Integrations { - intgsByName[intg.Name] = intg + allIntgs[intg.Name] = intg } - destsBySource := map[SourceConfig][]Destination{} - for _, ig := range intgsByName { + + // Start per-source main tasks + destBySourceName := map[string][]Destination{} + for _, ig := range allIntgs { if !ig.Enabled { continue } @@ -542,16 +538,16 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er return nil, fmt.Errorf("unable to build integration %s: %w", ig.Name, err) } for _, sc := range ig.SourceConfigs { - destsBySource[sc] = append(destsBySource[sc], dest) + destBySourceName[sc.Name] = append(destBySourceName[sc.Name], dest) } } - // Start per-source main tasks var tasks []*Task - for sc, dests := range destsBySource { - src, err := getSource(allSources, sc.Name) + for _, sc := range allSources { + src, err := getSource(sc) if err != nil { return nil, fmt.Errorf("unkown source: %s", sc.Name) } + dests := destBySourceName[sc.Name] tasks = append(tasks, NewTask( WithName(sc.Name), WithSource(src), @@ -583,22 +579,16 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) { } } -func getSource(scs []SourceConfig, name string) (Source, error) { - for _, sc := range scs { - if sc.Name != name { - continue - } - switch { - case strings.Contains(sc.URL, "rlps"): - return rlps.NewClient(sc.ChainID, sc.URL), nil - case strings.HasPrefix(sc.URL, "http"): - return jrpc2.New(sc.ChainID, sc.URL), nil - default: - // TODO add back support for local geth - return nil, fmt.Errorf("unsupported src type: %v", sc) - } +func getSource(sc SourceConfig) (Source, error) { + switch { + case strings.Contains(sc.URL, "rlps"): + return rlps.NewClient(sc.ChainID, sc.URL), nil + case strings.HasPrefix(sc.URL, "http"): + return jrpc2.New(sc.ChainID, sc.URL), nil + default: + // TODO add back support for local geth + return nil, fmt.Errorf("unsupported src type: %v", sc) } - return nil, fmt.Errorf("unable to find src for %s", name) } func Integrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) {