diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index fa9a0830..8c18ca40 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -45,16 +45,17 @@ type Destination interface { type Option func(t *Task) -func WithSource(chainID uint64, name string, src Source) Option { +func WithSourceFactory(f func(SourceConfig) Source) Option { return func(t *Task) { - if t.src != nil { - panic("task can only have 1 src") - } - t.src = src - t.srcName = name - t.chainID = chainID - t.ctx = wctx.WithChainID(t.ctx, chainID) - t.ctx = wctx.WithSrcName(t.ctx, name) + t.srcFactory = f + } +} + +func WithSourceConfig(sc SourceConfig) Option { + return func(t *Task) { + t.srcConfig = sc + t.ctx = wctx.WithChainID(t.ctx, sc.ChainID) + t.ctx = wctx.WithSrcName(t.ctx, sc.Name) } } @@ -106,14 +107,16 @@ func WithDestinations(dests ...Destination) Option { func NewTask(opts ...Option) *Task { t := &Task{ - ctx: context.Background(), - batch: make([]eth.Block, 1), - batchSize: 1, - workers: 1, + ctx: context.Background(), + batch: make([]eth.Block, 1), + batchSize: 1, + workers: 1, + srcFactory: getSource, } for _, opt := range opts { opt(t) } + t.src = t.srcFactory(t.srcConfig) slog.InfoContext(t.ctx, "starting task", "dest-count", len(t.dests)) return t } @@ -122,9 +125,9 @@ type Task struct { ctx context.Context backfill bool - src Source - srcName string - chainID uint64 + srcFactory func(SourceConfig) Source + srcConfig SourceConfig + src Source pgp *pgxpool.Pool dests []Destination @@ -144,9 +147,9 @@ func (t *Task) Setup() error { case t.backfill: var maxStart uint64 for i, d := range t.dests { - err := t.destRanges[i].load(t.ctx, t.pgp, d.Name(), t.srcName) + err := t.destRanges[i].load(t.ctx, t.pgp, d.Name(), t.srcConfig.Name) if err != nil { - return fmt.Errorf("loading dest range for %s/%s: %w", d.Name(), t.srcName, err) + return fmt.Errorf("loading dest range for %s/%s: %w", d.Name(), t.srcConfig.Name, err) } if maxStart == 0 || maxStart < t.destRanges[i].start { maxStart = t.destRanges[i].start @@ -160,14 +163,14 @@ func (t *Task) Setup() error { return fmt.Errorf("getting hash for %d: %w", maxStart, err) } const dq = `delete from e2pg.task where src_name = $1 and backfill` - if _, err := t.pgp.Exec(t.ctx, dq, t.srcName); err != nil { - return fmt.Errorf("resetting backfill task %s: %q", t.srcName, err) + if _, err := t.pgp.Exec(t.ctx, dq, t.srcConfig.Name); err != nil { + return fmt.Errorf("resetting backfill task %s: %q", t.srcConfig.Name, err) } const iq = ` insert into e2pg.task(src_name, backfill, num, hash) values ($1, $2, $3, $4) ` - _, err = t.pgp.Exec(t.ctx, iq, t.srcName, t.backfill, maxStart, h) + _, err = t.pgp.Exec(t.ctx, iq, t.srcConfig.Name, t.backfill, maxStart, h) if err != nil { return fmt.Errorf("inserting into task table: %w", err) } @@ -212,14 +215,14 @@ func (t *Task) initRows(n uint64, h []byte) error { and backfill = $2 limit 1 ` - err := t.pgp.QueryRow(t.ctx, eq, t.srcName, t.backfill).Scan(&exists) + err := t.pgp.QueryRow(t.ctx, eq, t.srcConfig.Name, t.backfill).Scan(&exists) switch { case errors.Is(err, pgx.ErrNoRows): const iq = ` insert into e2pg.task(src_name, backfill, num, hash) values ($1, $2, $3, $4) ` - _, err := t.pgp.Exec(t.ctx, iq, t.srcName, t.backfill, n, h) + _, err := t.pgp.Exec(t.ctx, iq, t.srcConfig.Name, t.backfill, n, h) if err != nil { return fmt.Errorf("inserting into task table: %w", err) } @@ -235,14 +238,14 @@ func (t *Task) initRows(n uint64, h []byte) error { and backfill = $3 limit 1 ` - err := t.pgp.QueryRow(t.ctx, eq, d.Name(), t.srcName, t.backfill).Scan(&exists) + err := t.pgp.QueryRow(t.ctx, eq, d.Name(), t.srcConfig.Name, t.backfill).Scan(&exists) switch { case errors.Is(err, pgx.ErrNoRows): const iq = ` insert into e2pg.intg(name, src_name, backfill, num) values ($1, $2, $3, $4) ` - _, err := t.pgp.Exec(t.ctx, iq, d.Name(), t.srcName, t.backfill, n) + _, err := t.pgp.Exec(t.ctx, iq, d.Name(), t.srcConfig.Name, t.backfill, n) if err != nil { return fmt.Errorf("inserting into intg table: %w", err) } @@ -297,9 +300,9 @@ func (task *Task) Converge(notx bool) error { pg = wpg.NewTxLocker(pgTx) //crc32(task) == 1384045349 const lockq = `select pg_advisory_xact_lock(1384045349, $1)` - _, err = pg.Exec(task.ctx, lockq, lockid(task.chainID, task.backfill)) + _, err = pg.Exec(task.ctx, lockq, lockid(task.srcConfig.ChainID, task.backfill)) if err != nil { - return fmt.Errorf("task lock %d: %w", task.chainID, err) + return fmt.Errorf("task lock %d: %w", task.srcConfig.ChainID, err) } } for reorgs := 0; reorgs <= 10; { @@ -312,7 +315,7 @@ func (task *Task) Converge(notx bool) error { order by num desc limit 1 ` - err := pg.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&localNum, &localHash) + err := pg.QueryRow(task.ctx, q, task.srcConfig.Name, task.backfill).Scan(&localNum, &localHash) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("getting latest from task: %w", err) } @@ -350,7 +353,7 @@ func (task *Task) Converge(notx bool) error { and backfill = $2 and num >= $3 ` - _, err := pg.Exec(task.ctx, rq1, task.srcName, task.backfill, localNum) + _, err := pg.Exec(task.ctx, rq1, task.srcConfig.Name, task.backfill, localNum) if err != nil { return fmt.Errorf("deleting block from task table: %w", err) } @@ -360,7 +363,7 @@ func (task *Task) Converge(notx bool) error { and backfill = $2 and num >= $3 ` - _, err = pg.Exec(task.ctx, rq2, task.srcName, task.backfill, localNum) + _, err = pg.Exec(task.ctx, rq2, task.srcConfig.Name, task.backfill, localNum) if err != nil { return fmt.Errorf("deleting block from task table: %w", err) } @@ -390,7 +393,7 @@ func (task *Task) Converge(notx bool) error { values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ` _, err := pg.Exec(task.ctx, uq, - task.srcName, + task.srcConfig.Name, task.backfill, last.Num(), task.stop, @@ -470,7 +473,7 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64 count, err := task.dests[j].Insert(task.ctx, pg, blks) task.iub.update(j, task.dests[j].Name(), - task.srcName, + task.srcConfig.Name, task.backfill, blks[len(blks)-1].Num(), task.stop, @@ -874,7 +877,7 @@ func (tm *Manager) runTask(t *Task) { for { select { case <-tm.restart: - slog.Info("restart-task", "chain", t.chainID) + slog.Info("restart-task", "chain", t.srcConfig.ChainID) return default: switch err := t.Converge(false); { @@ -891,7 +894,7 @@ func (tm *Manager) runTask(t *Task) { // try out best to deliver update // but don't stack up work select { - case tm.updates <- t.chainID: + case tm.updates <- t.srcConfig.ChainID: default: } }() @@ -1002,12 +1005,8 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er } var tasks []*Task for _, sc := range allSources { - src, err := getSource(sc) - if err != nil { - return nil, fmt.Errorf("unkown source: %s", sc.Name) - } tasks = append(tasks, NewTask( - WithSource(sc.ChainID, sc.Name, src), + WithSourceConfig(sc), WithPG(pgp), WithRange(sc.Start, sc.Stop), WithConcurrency(1, 100), @@ -1018,7 +1017,7 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er } tasks = append(tasks, NewTask( WithBackfill(true), - WithSource(sc.ChainID, sc.Name, src), + WithSourceConfig(sc), WithPG(pgp), WithRange(startBF[sc.Name], 0), WithConcurrency(1, 100), @@ -1051,15 +1050,15 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) { } } -func getSource(sc SourceConfig) (Source, error) { +func getSource(sc SourceConfig) Source { switch { case strings.Contains(sc.URL, "rlps"): - return rlps.NewClient(sc.ChainID, sc.URL), nil + return rlps.NewClient(sc.ChainID, sc.URL) case strings.HasPrefix(sc.URL, "http"): - return jrpc2.New(sc.ChainID, sc.URL), nil + return jrpc2.New(sc.ChainID, sc.URL) default: // TODO add back support for local geth - return nil, fmt.Errorf("unsupported src type: %v", sc) + panic(fmt.Sprintf("unsupported src type: %v", sc)) } } diff --git a/e2pg/e2pg_test.go b/e2pg/e2pg_test.go index 1f8a5b44..df9e4f3a 100644 --- a/e2pg/e2pg_test.go +++ b/e2pg/e2pg_test.go @@ -145,7 +145,8 @@ func TestSetup(t *testing.T) { tg = &testGeth{} pg = testpg(t) task = NewTask( - WithSource(0, "foo", tg), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return tg }), WithPG(pg), WithDestinations(newTestDestination("foo")), ) @@ -166,10 +167,10 @@ func TestSetup(t *testing.T) { func TestConverge_Zero(t *testing.T) { var ( - tg = &testGeth{} pg = testpg(t) task = NewTask( - WithSource(0, "foo", tg), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }), WithPG(pg), WithDestinations(newTestDestination("foo")), ) @@ -183,7 +184,8 @@ func TestConverge_EmptyDestination(t *testing.T) { tg = &testGeth{} dest = newTestDestination("foo") task = NewTask( - WithSource(0, "foo", tg), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return tg }), WithPG(pg), WithDestinations(dest), ) @@ -202,7 +204,8 @@ func TestConverge_Reorg(t *testing.T) { tg = &testGeth{} dest = newTestDestination("foo") task = NewTask( - WithSource(0, "foo", tg), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return tg }), WithPG(pg), WithDestinations(dest), ) @@ -262,7 +265,8 @@ func TestConverge_DeltaBatchSize(t *testing.T) { tg = &testGeth{} dest = newTestDestination("foo") task = NewTask( - WithSource(0, "foo", tg), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return tg }), WithPG(pg), WithConcurrency(workers, batchSize), WithDestinations(dest), @@ -291,13 +295,15 @@ func TestConverge_MultipleTasks(t *testing.T) { dest1 = newTestDestination("foo") dest2 = newTestDestination("bar") task1 = NewTask( - WithSource(0, "a", tg), + WithSourceConfig(SourceConfig{Name: "a"}), + WithSourceFactory(func(SourceConfig) Source { return tg }), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest1), ) task2 = NewTask( - WithSource(0, "b", tg), + WithSourceConfig(SourceConfig{Name: "b"}), + WithSourceFactory(func(SourceConfig) Source { return tg }), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest2), @@ -322,7 +328,8 @@ func TestConverge_LocalAhead(t *testing.T) { pg = testpg(t) dest = newTestDestination("foo") task = NewTask( - WithSource(0, "foo", tg), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return tg }), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest), @@ -407,7 +414,8 @@ func TestInitRows(t *testing.T) { task := NewTask( WithPG(pg), - WithSource(0, "foo", &testGeth{}), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }), WithDestinations(newTestDestination("bar")), ) err = task.initRows(42, hash(42)) @@ -422,7 +430,8 @@ func TestInitRows(t *testing.T) { task = NewTask( WithPG(pg), - WithSource(0, "foo", &testGeth{}), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }), WithDestinations(newTestDestination("bar"), newTestDestination("baz")), ) err = task.initRows(42, hash(42)) @@ -433,7 +442,8 @@ func TestInitRows(t *testing.T) { task = NewTask( WithPG(pg), WithBackfill(true), - WithSource(0, "foo", &testGeth{}), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }), WithDestinations(newTestDestination("bar"), newTestDestination("baz")), ) err = task.initRows(42, hash(42)) @@ -490,13 +500,15 @@ func TestDestRanges_Load(t *testing.T) { task1 := NewTask( WithPG(pg), - WithSource(0, "foo", &testGeth{}), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }), WithDestinations(newTestDestination("bar")), ) task2 := NewTask( WithPG(pg), WithBackfill(true), - WithSource(0, "foo", &testGeth{}), + WithSourceConfig(SourceConfig{Name: "foo"}), + WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }), WithDestinations(newTestDestination("bar")), ) err = task1.initRows(42, hash(42)) diff --git a/e2pg/integration_test.go b/e2pg/integration_test.go index 8ddfbea8..947bc0ee 100644 --- a/e2pg/integration_test.go +++ b/e2pg/integration_test.go @@ -67,7 +67,8 @@ func (th *Helper) Process(dest Destination, n uint64) { var ( geth = NewGeth(th.gt.FileCache, th.gt.Client) task = NewTask( - WithSource(0, "testhelper", geth), + WithSourceConfig(SourceConfig{Name: "testhelper"}), + WithSourceFactory(func(SourceConfig) Source { return geth }), WithPG(th.PG), WithDestinations(dest), WithRange(n, n+1),