Skip to content

Commit

Permalink
e2pg: refactor. clean up task manager loading
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandotsmith committed Oct 12, 2023
1 parent 215f8f3 commit 6314fb2
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 87 deletions.
96 changes: 42 additions & 54 deletions cmd/e2pg/config.json
Original file line number Diff line number Diff line change
@@ -1,56 +1,44 @@
{
"pg_url": "postgres:///e2pg",
"eth_sources": [
{"name": "goerli", "chain_id": 5, "url": "https://5.rlps.indexsupply.net"}
],
"integrations": [
{
"name": "ERC20 Transfers",
"enabled": true,
"sources": [{"name": "goerli"}],
"table": {
"name": "e20_transfers",
"columns": [
{"name": "chain_id", "type": "numeric"},
{"name": "block_num", "type": "numeric"},
{"name": "tx_hash", "type": "bytea"},
{"name": "contract", "type": "bytea"},
{"name": "f", "type": "bytea"},
{"name": "t", "type": "bytea"},
{"name": "amt", "type": "numeric"}
]
},
"block": [
{"name": "chain_id", "column": "chain_id"},
{"name": "block_num", "column": "block_num"},
{"name": "tx_hash", "column": "tx_hash"},
{"name": "log_addr", "column": "contract"}
],
"event": {
"name": "Transfer",
"type": "event",
"anonymous": false,
"inputs": [
{
"indexed": true,
"name": "from",
"type": "address",
"column": "f"
},
{
"indexed": true,
"name": "to",
"type": "address",
"column": "t"
},
{
"indexed": true,
"name": "amount",
"type": "uint256",
"column": "amt"
}
]
}
}
]
"pg_url": "postgres:///e2pg",
"eth_sources": [
{"name": "goerli", "chain_id": 5, "url": "https://5.rlps.indexsupply.net"}
],
"integrations": [
{
"name": "t1",
"enabled": true,
"sources": [{"name": "goerli"}],
"table": {
"name": "t1",
"columns": [
{"name": "chain_id", "type": "numeric"},
{"name": "block_num", "type": "numeric"},
{"name": "tx_hash", "type": "bytea"}
]
},
"block": [
{"name": "chain_id", "column": "chain_id"},
{"name": "block_num", "column": "block_num"},
{"name": "tx_hash", "column": "tx_hash"}
]
},
{
"name": "t2",
"enabled": true,
"sources": [{"name": "goerli"}],
"table": {
"name": "t2",
"columns": [
{"name": "chain_id", "type": "numeric"},
{"name": "block_num", "type": "numeric"},
{"name": "tx_hash", "type": "bytea"}
]
},
"block": [
{"name": "chain_id", "column": "chain_id"},
{"name": "block_num", "column": "block_num"},
{"name": "tx_hash", "column": "tx_hash"}
]
}
]
}
56 changes: 23 additions & 33 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,37 +503,33 @@ func (tm *Manager) Run() error {
}

func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, error) {
var (
sourcesByName = map[string]SourceConfig{}
allSources []SourceConfig
)
allSources := map[string]SourceConfig{}
dbSources, err := SourceConfigs(ctx, pgp)
if err != nil {
return nil, fmt.Errorf("loading sources: %w", err)
}
for _, sc := range dbSources {
sourcesByName[sc.Name] = sc
allSources[sc.Name] = sc
}
for _, sc := range conf.SourceConfigs {
sourcesByName[sc.Name] = sc
}
for _, sc := range sourcesByName {
allSources = append(allSources, sc)
allSources[sc.Name] = sc
}

intgsByName := map[string]Integration{}
allIntgs := map[string]Integration{}
dbIntgs, err := Integrations(ctx, pgp)
if err != nil {
return nil, fmt.Errorf("loading integrations: %w", err)
}
for _, intg := range dbIntgs {
intgsByName[intg.Name] = intg
allIntgs[intg.Name] = intg
}
for _, intg := range conf.Integrations {
intgsByName[intg.Name] = intg
allIntgs[intg.Name] = intg
}
destsBySource := map[SourceConfig][]Destination{}
for _, ig := range intgsByName {

// Start per-source main tasks
destBySourceName := map[string][]Destination{}
for _, ig := range allIntgs {
if !ig.Enabled {
continue
}
Expand All @@ -542,16 +538,16 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er
return nil, fmt.Errorf("unable to build integration %s: %w", ig.Name, err)
}
for _, sc := range ig.SourceConfigs {
destsBySource[sc] = append(destsBySource[sc], dest)
destBySourceName[sc.Name] = append(destBySourceName[sc.Name], dest)
}
}
// Start per-source main tasks
var tasks []*Task
for sc, dests := range destsBySource {
src, err := getSource(allSources, sc.Name)
for _, sc := range allSources {
src, err := getSource(sc)
if err != nil {
return nil, fmt.Errorf("unkown source: %s", sc.Name)
}
dests := destBySourceName[sc.Name]
tasks = append(tasks, NewTask(
WithName(sc.Name),
WithSource(src),
Expand Down Expand Up @@ -583,22 +579,16 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) {
}
}

func getSource(scs []SourceConfig, name string) (Source, error) {
for _, sc := range scs {
if sc.Name != name {
continue
}
switch {
case strings.Contains(sc.URL, "rlps"):
return rlps.NewClient(sc.ChainID, sc.URL), nil
case strings.HasPrefix(sc.URL, "http"):
return jrpc2.New(sc.ChainID, sc.URL), nil
default:
// TODO add back support for local geth
return nil, fmt.Errorf("unsupported src type: %v", sc)
}
func getSource(sc SourceConfig) (Source, error) {
switch {
case strings.Contains(sc.URL, "rlps"):
return rlps.NewClient(sc.ChainID, sc.URL), nil
case strings.HasPrefix(sc.URL, "http"):
return jrpc2.New(sc.ChainID, sc.URL), nil
default:
// TODO add back support for local geth
return nil, fmt.Errorf("unsupported src type: %v", sc)
}
return nil, fmt.Errorf("unable to find src for %s", name)
}

func Integrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) {
Expand Down

0 comments on commit 6314fb2

Please sign in to comment.