diff --git a/cmd/e2pg/dashboard.go b/cmd/e2pg/dashboard.go index 4b236168..b14bd1fc 100644 --- a/cmd/e2pg/dashboard.go +++ b/cmd/e2pg/dashboard.go @@ -12,14 +12,14 @@ import ( ) type dashHandler struct { - tasks []*e2pg.Task + mgr *e2pg.Manager clientsMutex sync.Mutex clients map[string]chan e2pg.StatusSnapshot } -func newDashHandler(tasks []*e2pg.Task, snaps <-chan e2pg.StatusSnapshot) *dashHandler { +func newDashHandler(mgr *e2pg.Manager, snaps <-chan e2pg.StatusSnapshot) *dashHandler { dh := &dashHandler{ - tasks: tasks, + mgr: mgr, clients: make(map[string]chan e2pg.StatusSnapshot), } go func() { @@ -163,7 +163,7 @@ func (dh *dashHandler) Index(w http.ResponseWriter, r *http.Request) { return } snaps := make(map[uint64]e2pg.StatusSnapshot) - for _, task := range dh.tasks { + for _, task := range dh.mgr.Tasks() { s := task.Status() snaps[s.ChainID] = s } diff --git a/cmd/e2pg/main.go b/cmd/e2pg/main.go index 06857fc3..f380d9a5 100644 --- a/cmd/e2pg/main.go +++ b/cmd/e2pg/main.go @@ -16,13 +16,12 @@ import ( "time" "github.com/indexsupply/x/e2pg" - "github.com/indexsupply/x/e2pg/config" "github.com/indexsupply/x/pgmig" "github.com/indexsupply/x/wctx" + "github.com/indexsupply/x/wos" "github.com/indexsupply/x/wslog" "github.com/jackc/pgx/v5/pgxpool" - "golang.org/x/sync/errgroup" ) func check(err error) { @@ -78,7 +77,7 @@ func main() { os.Exit(0) } - var conf config.Config + var conf e2pg.Config switch { case cfile == "": fmt.Printf("missing config file\n") @@ -90,19 +89,20 @@ func main() { } if !skipMigrate { - migdb, err := pgxpool.New(ctx, config.Env(conf.PGURL)) + migdb, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL)) check(err) check(pgmig.Migrate(migdb, e2pg.Migrations)) migdb.Close() } - tasks, err := config.NewTasks(conf) + pg, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL)) check(err) var ( - pbuf bytes.Buffer - snaps = make(chan e2pg.StatusSnapshot) - dh = newDashHandler(tasks, snaps) + pbuf bytes.Buffer + snaps = make(chan e2pg.StatusSnapshot) + tskmgr = e2pg.NewManager(pg, snaps, conf) + dh = newDashHandler(tskmgr, snaps) ) mux := http.NewServeMux() mux.HandleFunc("/", dh.Index) @@ -120,13 +120,7 @@ func main() { if profile == "cpu" { check(pprof.StartCPUProfile(&pbuf)) } - var eg errgroup.Group - for i := range tasks { - i := i - check(tasks[i].Setup()) - eg.Go(func() error { tasks[i].Run(snaps, notx); return nil }) - } - eg.Wait() + tskmgr.Run() switch profile { case "cpu": pprof.StopCPUProfile() diff --git a/e2pg/config/config.go b/e2pg/config/config.go deleted file mode 100644 index 9b9ecf12..00000000 --- a/e2pg/config/config.go +++ /dev/null @@ -1,151 +0,0 @@ -package config - -import ( - "context" - "encoding/json" - "fmt" - "os" - "strings" - - "github.com/indexsupply/x/abi2" - "github.com/indexsupply/x/e2pg" - "github.com/indexsupply/x/jrpc2" - "github.com/indexsupply/x/rlps" - "github.com/jackc/pgx/v5/pgxpool" -) - -var compiled = map[string]e2pg.Destination{} - -type EthSource struct { - Name string `json:"name"` - ChainID uint64 `json:"chain_id"` - URL string `json:"url"` -} - -type Source struct { - Name string `json:"name"` - Start uint64 `json:"start"` - Stop uint64 `json:"stop"` -} - -type Compiled struct { - Name string `json:"name"` - Config json.RawMessage `json:"config"` -} - -type Integration struct { - Name string `json:"name"` - Enabled bool `json:"enabled"` - Start string `json:"start"` - Stop string `json:"stop"` - Backfill bool `json:"backfill"` - Sources []Source `json:"sources"` - Table abi2.Table `json:"table"` - Compiled Compiled `json:"compiled"` - Block []abi2.BlockData `json:"block"` - Event abi2.Event `json:"event"` -} - -type Config struct { - PGURL string `json:"pg_url"` - EthSources []EthSource `json:"eth_sources"` - Integrations []Integration `json:"integrations"` -} - -func (conf Config) Empty() bool { - return conf.PGURL == "" -} - -// If s has a $ prefix then we assume -// that it is a placeholder url and the actual -// url is in an env variable. -// -// # If there is no env var for s then the program will crash with an error -// -// if there is no $ prefix then s is returned -func Env(s string) string { - if strings.HasPrefix(s, "$") { - v := os.Getenv(strings.ToUpper(strings.TrimPrefix(s, "$"))) - if v == "" { - fmt.Printf("expected database url in env: %q\n", s) - os.Exit(1) - } - return v - } - return s -} - -func NewTasks(conf Config) ([]*e2pg.Task, error) { - pgp, err := pgxpool.New(context.Background(), Env(conf.PGURL)) - if err != nil { - return nil, fmt.Errorf("dburl invalid: %w", err) - } - destsBySource := map[Source][]e2pg.Destination{} - for _, ig := range conf.Integrations { - if !ig.Enabled { - continue - } - eig, err := getIntegration(pgp, ig) - if err != nil { - return nil, fmt.Errorf("unable to build integration %s: %w", ig.Name, err) - } - for _, src := range ig.Sources { - destsBySource[src] = append(destsBySource[src], eig) - } - } - - // Start per-source main tasks - var tasks []*e2pg.Task - for src, dests := range destsBySource { - node, err := getNode(conf.EthSources, src.Name) - if err != nil { - return nil, fmt.Errorf("unkown source: %s", src.Name) - } - tasks = append(tasks, e2pg.NewTask( - e2pg.WithName(src.Name), - e2pg.WithSource(node), - e2pg.WithPG(pgp), - e2pg.WithRange(src.Start, src.Stop), - e2pg.WithDestinations(dests...), - )) - } - return tasks, nil -} - -func getIntegration(pgp *pgxpool.Pool, ig Integration) (e2pg.Destination, error) { - switch { - case len(ig.Compiled.Name) > 0: - cig, ok := compiled[ig.Name] - if !ok { - return nil, fmt.Errorf("unable to find compiled integration: %s", ig.Name) - } - return cig, nil - default: - aig, err := abi2.New(ig.Event, ig.Block, ig.Table) - if err != nil { - return nil, fmt.Errorf("building abi integration: %w", err) - } - if err := abi2.CreateTable(context.Background(), pgp, aig.Table); err != nil { - return nil, fmt.Errorf("setting up table for abi integration: %w", err) - } - return aig, nil - } -} - -func getNode(srcs []EthSource, name string) (e2pg.Source, error) { - for _, src := range srcs { - if src.Name != name { - continue - } - switch { - case strings.Contains(src.URL, "rlps"): - return rlps.NewClient(src.ChainID, src.URL), nil - case strings.HasPrefix(src.URL, "http"): - return jrpc2.New(src.ChainID, src.URL), nil - default: - // TODO add back support for local node - return nil, fmt.Errorf("unsupported src type: %v", src) - } - } - return nil, fmt.Errorf("unable to find src for %s", name) -} diff --git a/e2pg/config/testhelper/helper.go b/e2pg/config/testhelper/helper.go deleted file mode 100644 index 2169c666..00000000 --- a/e2pg/config/testhelper/helper.go +++ /dev/null @@ -1,82 +0,0 @@ -// Easily test e2pg integrations -package testhelper - -import ( - "context" - "testing" - - "github.com/indexsupply/x/e2pg" - "github.com/indexsupply/x/geth/gethtest" - - "blake.io/pqx/pqxtest" - "github.com/jackc/pgx/v5/pgxpool" - "kr.dev/diff" -) - -func check(t testing.TB, err error) { - t.Helper() - if err != nil { - t.Fatal(err) - } -} - -type H struct { - tb testing.TB - ctx context.Context - PG *pgxpool.Pool - gt *gethtest.Helper -} - -// jrpc.Client is required when testdata isn't -// available in the integrations/testdata directory. -func New(tb testing.TB) *H { - ctx := context.Background() - - pqxtest.CreateDB(tb, e2pg.Schema) - pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(tb)) - diff.Test(tb, tb.Fatalf, err, nil) - - return &H{ - tb: tb, - ctx: ctx, - PG: pg, - gt: gethtest.New(tb, "http://hera:8545"), - } -} - -// Reset the task table. Call this in-between test cases -func (th *H) Reset() { - _, err := th.PG.Exec(context.Background(), "truncate table e2pg.task") - check(th.tb, err) -} - -func (th *H) Context() context.Context { - return th.ctx -} - -func (th *H) Done() { - th.gt.Done() -} - -// Process will download the header,bodies, and receipts data -// if it doesn't exist in: integrations/testdata -// In the case that it needs to fetch the data, an RPC -// client will be used. The RPC endpoint needs to support -// the debug_dbAncient and debug_dbGet methods. -func (th *H) Process(dest e2pg.Destination, n uint64) { - var ( - geth = e2pg.NewGeth(th.gt.FileCache, th.gt.Client) - task = e2pg.NewTask( - e2pg.WithSource(geth), - e2pg.WithPG(th.PG), - e2pg.WithDestinations(dest), - ) - ) - cur, err := geth.Hash(n) - check(th.tb, err) - prev, err := geth.Hash(n - 1) - check(th.tb, err) - th.gt.SetLatest(n, cur) - check(th.tb, task.Insert(n-1, prev)) - check(th.tb, task.Converge(true)) -} diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index bac9003a..a9428f05 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -4,14 +4,20 @@ import ( "bytes" "context" _ "embed" + "encoding/json" "errors" "fmt" "log/slog" + "strings" + "sync" "sync/atomic" "time" + "github.com/indexsupply/x/abi2" "github.com/indexsupply/x/eth" "github.com/indexsupply/x/geth" + "github.com/indexsupply/x/jrpc2" + "github.com/indexsupply/x/rlps" "github.com/indexsupply/x/wctx" "github.com/indexsupply/x/wpg" @@ -237,26 +243,24 @@ func (task *Task) Setup() error { return task.Insert(gethNum-1, h) } -func (task *Task) Run(snaps chan<- StatusSnapshot, notx bool) { - for { - switch err := task.Converge(notx); { - case err == nil: - go func() { - snap := task.Status() - slog.InfoContext(task.ctx, "", "n", snap.Num, "h", snap.Hash) - select { - case snaps <- snap: - default: - } - }() - case errors.Is(err, ErrDone): - return - case errors.Is(err, ErrNothingNew): - time.Sleep(time.Second) - default: - time.Sleep(time.Second) - slog.ErrorContext(task.ctx, "error", err) - } +func (task *Task) Run1(snaps chan<- StatusSnapshot, notx bool) { + switch err := task.Converge(notx); { + case err == nil: + go func() { + snap := task.Status() + slog.InfoContext(task.ctx, "", "n", snap.Num, "h", snap.Hash) + select { + case snaps <- snap: + default: + } + }() + case errors.Is(err, ErrDone): + return + case errors.Is(err, ErrNothingNew): + time.Sleep(time.Second) + default: + time.Sleep(time.Second) + slog.ErrorContext(task.ctx, "error", err) } } @@ -423,3 +427,276 @@ func (task *Task) writeIndex(localHash []byte, pg wpg.Conn, delta uint64) error task.stat.ihash = last.Hash() return nil } + +var compiled = map[string]Destination{} + +// Loads, Starts, and provides method for Restarting tasks +// based on config stored in the DB and in the config file. +type Manager struct { + running sync.Mutex + restart chan struct{} + snaps chan<- StatusSnapshot + tasks []*Task + pgp *pgxpool.Pool + conf Config +} + +func NewManager(pgp *pgxpool.Pool, snaps chan<- StatusSnapshot, conf Config) *Manager { + return &Manager{ + restart: make(chan struct{}), + snaps: snaps, + pgp: pgp, + conf: conf, + } +} + +// TODO(r): remove once old dashboard is gone +func (tm *Manager) Tasks() []*Task { + return tm.tasks +} + +func (tm *Manager) runTask(t *Task) error { + if err := t.Setup(); err != nil { + return fmt.Errorf("setting up task: %w", err) + } + for { + select { + case <-tm.restart: + slog.Info("restart-task", "name", t.Name) + return nil + default: + t.Run1(tm.snaps, false) + } + } +} + +// Ensures all running tasks stop +// and calls [Manager.Run] in a new go routine. +func (tm *Manager) Restart() { + close(tm.restart) + go tm.Run() +} + +// Loads ethereum sources and integrations from both the config file +// and the database and assembles the nessecary tasks and runs all +// tasks in a loop. +// +// Acquires a lock to ensure only on routine is running. +// Releases lock on return +func (tm *Manager) Run() error { + tm.running.Lock() + defer tm.running.Unlock() + tm.restart = make(chan struct{}) + var err error + tm.tasks, err = loadTasks(context.Background(), tm.pgp, tm.conf) + if err != nil { + return fmt.Errorf("loading tasks: %w", err) + } + var eg errgroup.Group + for i := range tm.tasks { + i := i + eg.Go(func() error { + return tm.runTask(tm.tasks[i]) + }) + } + return eg.Wait() +} + +func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, error) { + var ( + sourcesByName = map[string]SourceConfig{} + allSources []SourceConfig + ) + dbSources, err := SourceConfigs(ctx, pgp) + if err != nil { + return nil, fmt.Errorf("loading sources: %w", err) + } + for _, sc := range dbSources { + sourcesByName[sc.Name] = sc + } + for _, sc := range conf.SourceConfigs { + sourcesByName[sc.Name] = sc + } + for _, sc := range sourcesByName { + allSources = append(allSources, sc) + } + + intgsByName := map[string]Integration{} + dbIntgs, err := Integrations(ctx, pgp) + if err != nil { + return nil, fmt.Errorf("loading integrations: %w", err) + } + for _, intg := range dbIntgs { + intgsByName[intg.Name] = intg + } + for _, intg := range conf.Integrations { + intgsByName[intg.Name] = intg + } + destsBySource := map[SourceConfig][]Destination{} + for _, ig := range intgsByName { + if !ig.Enabled { + continue + } + dest, err := getDest(pgp, ig) + if err != nil { + return nil, fmt.Errorf("unable to build integration %s: %w", ig.Name, err) + } + for _, sc := range ig.SourceConfigs { + destsBySource[sc] = append(destsBySource[sc], dest) + } + } + // Start per-source main tasks + var tasks []*Task + for sc, dests := range destsBySource { + src, err := getSource(allSources, sc.Name) + if err != nil { + return nil, fmt.Errorf("unkown source: %s", sc.Name) + } + tasks = append(tasks, NewTask( + WithName(sc.Name), + WithSource(src), + WithPG(pgp), + WithRange(sc.Start, sc.Stop), + WithDestinations(dests...), + )) + } + return tasks, nil +} + +func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) { + switch { + case len(ig.Compiled.Name) > 0: + cig, ok := compiled[ig.Name] + if !ok { + return nil, fmt.Errorf("unable to find compiled integration: %s", ig.Name) + } + return cig, nil + default: + aig, err := abi2.New(ig.Event, ig.Block, ig.Table) + if err != nil { + return nil, fmt.Errorf("building abi integration: %w", err) + } + if err := abi2.CreateTable(context.Background(), pgp, aig.Table); err != nil { + return nil, fmt.Errorf("setting up table for abi integration: %w", err) + } + return aig, nil + } +} + +func getSource(scs []SourceConfig, name string) (Source, error) { + for _, sc := range scs { + if sc.Name != name { + continue + } + switch { + case strings.Contains(sc.URL, "rlps"): + return rlps.NewClient(sc.ChainID, sc.URL), nil + case strings.HasPrefix(sc.URL, "http"): + return jrpc2.New(sc.ChainID, sc.URL), nil + default: + // TODO add back support for local geth + return nil, fmt.Errorf("unsupported src type: %v", sc) + } + } + return nil, fmt.Errorf("unable to find src for %s", name) +} + +func Integrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) { + var res []Integration + const q = `select conf from e2pg.integrations` + rows, err := pgp.Query(ctx, q) + if err != nil { + return nil, fmt.Errorf("querying integrations: %w", err) + } + for rows.Next() { + var buf = []byte{} + if err := rows.Scan(&buf); err != nil { + return nil, fmt.Errorf("scanning integration: %w", err) + } + var intg Integration + if err := json.Unmarshal(buf, &intg); err != nil { + return nil, fmt.Errorf("unmarshaling integration: %w", err) + } + res = append(res, intg) + } + return res, nil +} + +func SourceConfigs(ctx context.Context, pgp *pgxpool.Pool) ([]SourceConfig, error) { + var res []SourceConfig + const q = `select name, chain_id, url from e2pg.sources` + rows, err := pgp.Query(ctx, q) + if err != nil { + return nil, fmt.Errorf("querying sources: %w", err) + } + for rows.Next() { + var s SourceConfig + if err := rows.Scan(&s.Name, &s.ChainID, &s.URL); err != nil { + return nil, fmt.Errorf("scanning source: %w", err) + } + res = append(res, s) + } + return res, nil +} + +type SourceConfig struct { + Name string `json:"name"` + ChainID uint64 `json:"chain_id"` + URL string `json:"url"` + Start uint64 `json:"start"` + Stop uint64 `json:"stop"` +} + +type Compiled struct { + Name string `json:"name"` + Config json.RawMessage `json:"config"` +} + +type Integration struct { + Name string `json:"name"` + Enabled bool `json:"enabled"` + Start string `json:"start"` + Stop string `json:"stop"` + Backfill bool `json:"backfill"` + SourceConfigs []SourceConfig `json:"sources"` + Table abi2.Table `json:"table"` + Compiled Compiled `json:"compiled"` + Block []abi2.BlockData `json:"block"` + Event abi2.Event `json:"event"` +} + +type Config struct { + PGURL string `json:"pg_url"` + SourceConfigs []SourceConfig `json:"eth_sources"` + Integrations []Integration `json:"integrations"` +} + +func (conf Config) Empty() bool { + return conf.PGURL == "" +} + +func (conf Config) Valid(intg Integration) error { + return nil +} + +func (conf Config) AllIntegrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) { + res, err := Integrations(ctx, pgp) + if err != nil { + return nil, fmt.Errorf("loading db integrations: %w", err) + } + for i := range conf.Integrations { + res = append(res, conf.Integrations[i]) + } + return res, nil +} + +func (conf Config) AllSourceConfigs(ctx context.Context, pgp *pgxpool.Pool) ([]SourceConfig, error) { + res, err := SourceConfigs(ctx, pgp) + if err != nil { + return nil, fmt.Errorf("loading db integrations: %w", err) + } + for i := range conf.SourceConfigs { + res = append(res, conf.SourceConfigs[i]) + } + return res, nil +} diff --git a/e2pg/config/integration_test.go b/e2pg/integration_test.go similarity index 60% rename from e2pg/config/integration_test.go rename to e2pg/integration_test.go index 7ab5bd94..4e5c5b7a 100644 --- a/e2pg/config/integration_test.go +++ b/e2pg/integration_test.go @@ -1,24 +1,87 @@ -package config +package e2pg import ( - "database/sql" + "context" "encoding/json" "os" "testing" "blake.io/pqx/pqxtest" - "github.com/indexsupply/x/e2pg/config/testhelper" - "github.com/jackc/pgx/v5/stdlib" + "github.com/indexsupply/x/geth/gethtest" + "github.com/jackc/pgx/v5/pgxpool" "kr.dev/diff" ) -func TestMain(m *testing.M) { - sql.Register("postgres", stdlib.GetDefaultDriver()) - pqxtest.TestMain(m) +func check(t testing.TB, err error) { + t.Helper() + if err != nil { + t.Fatal(err) + } +} + +type Helper struct { + tb testing.TB + ctx context.Context + PG *pgxpool.Pool + gt *gethtest.Helper +} + +// jrpc.Client is required when testdata isn't +// available in the integrations/testdata directory. +func NewHelper(tb testing.TB) *Helper { + ctx := context.Background() + + pqxtest.CreateDB(tb, Schema) + pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(tb)) + diff.Test(tb, tb.Fatalf, err, nil) + + return &Helper{ + tb: tb, + ctx: ctx, + PG: pg, + gt: gethtest.New(tb, "http://hera:8545"), + } +} + +// Reset the task table. Call this in-between test cases +func (th *Helper) Reset() { + _, err := th.PG.Exec(context.Background(), "truncate table e2pg.task") + check(th.tb, err) +} + +func (th *Helper) Context() context.Context { + return th.ctx +} + +func (th *Helper) Done() { + th.gt.Done() +} + +// Process will download the header,bodies, and receipts data +// if it doesn't exist in: integrations/testdata +// In the case that it needs to fetch the data, an RPC +// client will be used. The RPC endpoint needs to support +// the debug_dbAncient and debug_dbGet methods. +func (th *Helper) Process(dest Destination, n uint64) { + var ( + geth = NewGeth(th.gt.FileCache, th.gt.Client) + task = NewTask( + WithSource(geth), + WithPG(th.PG), + WithDestinations(dest), + ) + ) + cur, err := geth.Hash(n) + check(th.tb, err) + prev, err := geth.Hash(n - 1) + check(th.tb, err) + th.gt.SetLatest(n, cur) + check(th.tb, task.Insert(n-1, prev)) + check(th.tb, task.Converge(true)) } func TestIntegrations(t *testing.T) { - th := testhelper.New(t) + th := NewHelper(t) defer th.Done() cases := []struct { blockNum uint64 @@ -82,9 +145,9 @@ func TestIntegrations(t *testing.T) { th.Reset() ig := Integration{} decode(t, read(t, tc.config), &ig) - eig, err := getIntegration(th.PG, ig) + dest, err := getDest(th.PG, ig) diff.Test(t, t.Errorf, nil, err) - th.Process(eig, tc.blockNum) + th.Process(dest, tc.blockNum) for i, q := range tc.queries { var found bool err := th.PG.QueryRow(th.Context(), q).Scan(&found) diff --git a/e2pg/migrations.go b/e2pg/migrations.go index fb0b09a4..6a1e272e 100644 --- a/e2pg/migrations.go +++ b/e2pg/migrations.go @@ -14,4 +14,13 @@ var Migrations = map[int]pgmig.Migration{ create index on e2pg.task(id, number desc); `, }, + 9: pgmig.Migration{ + SQL: ` + create table e2pg.sources(name text, chain_id int, url text); + create unique index on e2pg.sources(name, chain_id); + + create table e2pg.integrations(name text, conf jsonb); + create unique index on e2pg.sources(name); + `, + }, } diff --git a/e2pg/schema.sql b/e2pg/schema.sql index f12129b2..d29b63bf 100644 --- a/e2pg/schema.sql +++ b/e2pg/schema.sql @@ -20,6 +20,13 @@ SET default_tablespace = ''; SET default_table_access_method = heap; +CREATE TABLE e2pg.integrations ( + name text, + conf jsonb +); + + + CREATE TABLE e2pg.migrations ( idx integer NOT NULL, hash bytea NOT NULL, @@ -28,6 +35,14 @@ CREATE TABLE e2pg.migrations ( +CREATE TABLE e2pg.sources ( + name text, + chain_id integer, + url text +); + + + CREATE TABLE e2pg.task ( id text NOT NULL, number bigint, @@ -42,6 +57,14 @@ ALTER TABLE ONLY e2pg.migrations +CREATE UNIQUE INDEX sources_name_chain_id_idx ON e2pg.sources USING btree (name, chain_id); + + + +CREATE UNIQUE INDEX sources_name_idx ON e2pg.sources USING btree (name); + + + CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, number DESC); diff --git a/e2pg/config/testdata/erc721.json b/e2pg/testdata/erc721.json similarity index 100% rename from e2pg/config/testdata/erc721.json rename to e2pg/testdata/erc721.json diff --git a/e2pg/config/testdata/seaport.json b/e2pg/testdata/seaport.json similarity index 100% rename from e2pg/config/testdata/seaport.json rename to e2pg/testdata/seaport.json diff --git a/e2pg/config/testdata/txinput.json b/e2pg/testdata/txinput.json similarity index 100% rename from e2pg/config/testdata/txinput.json rename to e2pg/testdata/txinput.json diff --git a/wos/os.go b/wos/os.go new file mode 100644 index 00000000..61b41d0d --- /dev/null +++ b/wos/os.go @@ -0,0 +1,26 @@ +package wos + +import ( + "fmt" + "os" + "strings" +) + +// If s has a $ prefix then we assume +// that it is a placeholder url and the actual +// url is in an env variable. +// +// # If there is no env var for s then the program will crash with an error +// +// if there is no $ prefix then s is returned +func Getenv(s string) string { + if strings.HasPrefix(s, "$") { + v := os.Getenv(strings.ToUpper(strings.TrimPrefix(s, "$"))) + if v == "" { + fmt.Printf("expected database url in env: %q\n", s) + os.Exit(1) + } + return v + } + return s +}