Skip to content

Commit

Permalink
e2pg: use factory for creating task source
Browse files Browse the repository at this point in the history
This is ground work for a future commit where we will have multiple
sources per task. A task can have concurrent "parts" and each part will
have its own source.
  • Loading branch information
ryandotsmith committed Nov 6, 2023
1 parent ea1eba1 commit af851d5
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 59 deletions.
87 changes: 43 additions & 44 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ type Destination interface {

type Option func(t *Task)

func WithSource(chainID uint64, name string, src Source) Option {
func WithSourceFactory(f func(SourceConfig) Source) Option {
return func(t *Task) {
if t.src != nil {
panic("task can only have 1 src")
}
t.src = src
t.srcName = name
t.chainID = chainID
t.ctx = wctx.WithChainID(t.ctx, chainID)
t.ctx = wctx.WithSrcName(t.ctx, name)
t.srcFactory = f
}
}

func WithSourceConfig(sc SourceConfig) Option {
return func(t *Task) {
t.srcConfig = sc
t.ctx = wctx.WithChainID(t.ctx, sc.ChainID)
t.ctx = wctx.WithSrcName(t.ctx, sc.Name)
}
}

Expand Down Expand Up @@ -106,14 +107,16 @@ func WithDestinations(dests ...Destination) Option {

func NewTask(opts ...Option) *Task {
t := &Task{
ctx: context.Background(),
batch: make([]eth.Block, 1),
batchSize: 1,
workers: 1,
ctx: context.Background(),
batch: make([]eth.Block, 1),
batchSize: 1,
workers: 1,
srcFactory: getSource,
}
for _, opt := range opts {
opt(t)
}
t.src = t.srcFactory(t.srcConfig)
slog.InfoContext(t.ctx, "starting task", "dest-count", len(t.dests))
return t
}
Expand All @@ -122,9 +125,9 @@ type Task struct {
ctx context.Context
backfill bool

src Source
srcName string
chainID uint64
srcFactory func(SourceConfig) Source
srcConfig SourceConfig
src Source

pgp *pgxpool.Pool
dests []Destination
Expand All @@ -144,9 +147,9 @@ func (t *Task) Setup() error {
case t.backfill:
var maxStart uint64
for i, d := range t.dests {
err := t.destRanges[i].load(t.ctx, t.pgp, d.Name(), t.srcName)
err := t.destRanges[i].load(t.ctx, t.pgp, d.Name(), t.srcConfig.Name)
if err != nil {
return fmt.Errorf("loading dest range for %s/%s: %w", d.Name(), t.srcName, err)
return fmt.Errorf("loading dest range for %s/%s: %w", d.Name(), t.srcConfig.Name, err)
}
if maxStart == 0 || maxStart < t.destRanges[i].start {
maxStart = t.destRanges[i].start
Expand All @@ -160,14 +163,14 @@ func (t *Task) Setup() error {
return fmt.Errorf("getting hash for %d: %w", maxStart, err)
}
const dq = `delete from e2pg.task where src_name = $1 and backfill`
if _, err := t.pgp.Exec(t.ctx, dq, t.srcName); err != nil {
return fmt.Errorf("resetting backfill task %s: %q", t.srcName, err)
if _, err := t.pgp.Exec(t.ctx, dq, t.srcConfig.Name); err != nil {
return fmt.Errorf("resetting backfill task %s: %q", t.srcConfig.Name, err)
}
const iq = `
insert into e2pg.task(src_name, backfill, num, hash)
values ($1, $2, $3, $4)
`
_, err = t.pgp.Exec(t.ctx, iq, t.srcName, t.backfill, maxStart, h)
_, err = t.pgp.Exec(t.ctx, iq, t.srcConfig.Name, t.backfill, maxStart, h)
if err != nil {
return fmt.Errorf("inserting into task table: %w", err)
}
Expand Down Expand Up @@ -212,14 +215,14 @@ func (t *Task) initRows(n uint64, h []byte) error {
and backfill = $2
limit 1
`
err := t.pgp.QueryRow(t.ctx, eq, t.srcName, t.backfill).Scan(&exists)
err := t.pgp.QueryRow(t.ctx, eq, t.srcConfig.Name, t.backfill).Scan(&exists)
switch {
case errors.Is(err, pgx.ErrNoRows):
const iq = `
insert into e2pg.task(src_name, backfill, num, hash)
values ($1, $2, $3, $4)
`
_, err := t.pgp.Exec(t.ctx, iq, t.srcName, t.backfill, n, h)
_, err := t.pgp.Exec(t.ctx, iq, t.srcConfig.Name, t.backfill, n, h)
if err != nil {
return fmt.Errorf("inserting into task table: %w", err)
}
Expand All @@ -235,14 +238,14 @@ func (t *Task) initRows(n uint64, h []byte) error {
and backfill = $3
limit 1
`
err := t.pgp.QueryRow(t.ctx, eq, d.Name(), t.srcName, t.backfill).Scan(&exists)
err := t.pgp.QueryRow(t.ctx, eq, d.Name(), t.srcConfig.Name, t.backfill).Scan(&exists)
switch {
case errors.Is(err, pgx.ErrNoRows):
const iq = `
insert into e2pg.intg(name, src_name, backfill, num)
values ($1, $2, $3, $4)
`
_, err := t.pgp.Exec(t.ctx, iq, d.Name(), t.srcName, t.backfill, n)
_, err := t.pgp.Exec(t.ctx, iq, d.Name(), t.srcConfig.Name, t.backfill, n)
if err != nil {
return fmt.Errorf("inserting into intg table: %w", err)
}
Expand Down Expand Up @@ -297,9 +300,9 @@ func (task *Task) Converge(notx bool) error {
pg = wpg.NewTxLocker(pgTx)
//crc32(task) == 1384045349
const lockq = `select pg_advisory_xact_lock(1384045349, $1)`
_, err = pg.Exec(task.ctx, lockq, lockid(task.chainID, task.backfill))
_, err = pg.Exec(task.ctx, lockq, lockid(task.srcConfig.ChainID, task.backfill))
if err != nil {
return fmt.Errorf("task lock %d: %w", task.chainID, err)
return fmt.Errorf("task lock %d: %w", task.srcConfig.ChainID, err)
}
}
for reorgs := 0; reorgs <= 10; {
Expand All @@ -312,7 +315,7 @@ func (task *Task) Converge(notx bool) error {
order by num desc
limit 1
`
err := pg.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&localNum, &localHash)
err := pg.QueryRow(task.ctx, q, task.srcConfig.Name, task.backfill).Scan(&localNum, &localHash)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("getting latest from task: %w", err)
}
Expand Down Expand Up @@ -350,7 +353,7 @@ func (task *Task) Converge(notx bool) error {
and backfill = $2
and num >= $3
`
_, err := pg.Exec(task.ctx, rq1, task.srcName, task.backfill, localNum)
_, err := pg.Exec(task.ctx, rq1, task.srcConfig.Name, task.backfill, localNum)
if err != nil {
return fmt.Errorf("deleting block from task table: %w", err)
}
Expand All @@ -360,7 +363,7 @@ func (task *Task) Converge(notx bool) error {
and backfill = $2
and num >= $3
`
_, err = pg.Exec(task.ctx, rq2, task.srcName, task.backfill, localNum)
_, err = pg.Exec(task.ctx, rq2, task.srcConfig.Name, task.backfill, localNum)
if err != nil {
return fmt.Errorf("deleting block from task table: %w", err)
}
Expand Down Expand Up @@ -390,7 +393,7 @@ func (task *Task) Converge(notx bool) error {
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`
_, err := pg.Exec(task.ctx, uq,
task.srcName,
task.srcConfig.Name,
task.backfill,
last.Num(),
task.stop,
Expand Down Expand Up @@ -470,7 +473,7 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64
count, err := task.dests[j].Insert(task.ctx, pg, blks)
task.iub.update(j,
task.dests[j].Name(),
task.srcName,
task.srcConfig.Name,
task.backfill,
blks[len(blks)-1].Num(),
task.stop,
Expand Down Expand Up @@ -874,7 +877,7 @@ func (tm *Manager) runTask(t *Task) {
for {
select {
case <-tm.restart:
slog.Info("restart-task", "chain", t.chainID)
slog.Info("restart-task", "chain", t.srcConfig.ChainID)
return
default:
switch err := t.Converge(false); {
Expand All @@ -891,7 +894,7 @@ func (tm *Manager) runTask(t *Task) {
// try out best to deliver update
// but don't stack up work
select {
case tm.updates <- t.chainID:
case tm.updates <- t.srcConfig.ChainID:
default:
}
}()
Expand Down Expand Up @@ -1002,12 +1005,8 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er
}
var tasks []*Task
for _, sc := range allSources {
src, err := getSource(sc)
if err != nil {
return nil, fmt.Errorf("unkown source: %s", sc.Name)
}
tasks = append(tasks, NewTask(
WithSource(sc.ChainID, sc.Name, src),
WithSourceConfig(sc),
WithPG(pgp),
WithRange(sc.Start, sc.Stop),
WithConcurrency(1, 100),
Expand All @@ -1018,7 +1017,7 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er
}
tasks = append(tasks, NewTask(
WithBackfill(true),
WithSource(sc.ChainID, sc.Name, src),
WithSourceConfig(sc),
WithPG(pgp),
WithRange(startBF[sc.Name], 0),
WithConcurrency(1, 100),
Expand Down Expand Up @@ -1051,15 +1050,15 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) {
}
}

func getSource(sc SourceConfig) (Source, error) {
func getSource(sc SourceConfig) Source {
switch {
case strings.Contains(sc.URL, "rlps"):
return rlps.NewClient(sc.ChainID, sc.URL), nil
return rlps.NewClient(sc.ChainID, sc.URL)
case strings.HasPrefix(sc.URL, "http"):
return jrpc2.New(sc.ChainID, sc.URL), nil
return jrpc2.New(sc.ChainID, sc.URL)
default:
// TODO add back support for local geth
return nil, fmt.Errorf("unsupported src type: %v", sc)
panic(fmt.Sprintf("unsupported src type: %v", sc))
}
}

Expand Down
40 changes: 26 additions & 14 deletions e2pg/e2pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func TestSetup(t *testing.T) {
tg = &testGeth{}
pg = testpg(t)
task = NewTask(
WithSource(0, "foo", tg),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return tg }),
WithPG(pg),
WithDestinations(newTestDestination("foo")),
)
Expand All @@ -166,10 +167,10 @@ func TestSetup(t *testing.T) {

func TestConverge_Zero(t *testing.T) {
var (
tg = &testGeth{}
pg = testpg(t)
task = NewTask(
WithSource(0, "foo", tg),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }),
WithPG(pg),
WithDestinations(newTestDestination("foo")),
)
Expand All @@ -183,7 +184,8 @@ func TestConverge_EmptyDestination(t *testing.T) {
tg = &testGeth{}
dest = newTestDestination("foo")
task = NewTask(
WithSource(0, "foo", tg),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return tg }),
WithPG(pg),
WithDestinations(dest),
)
Expand All @@ -202,7 +204,8 @@ func TestConverge_Reorg(t *testing.T) {
tg = &testGeth{}
dest = newTestDestination("foo")
task = NewTask(
WithSource(0, "foo", tg),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return tg }),
WithPG(pg),
WithDestinations(dest),
)
Expand Down Expand Up @@ -262,7 +265,8 @@ func TestConverge_DeltaBatchSize(t *testing.T) {
tg = &testGeth{}
dest = newTestDestination("foo")
task = NewTask(
WithSource(0, "foo", tg),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return tg }),
WithPG(pg),
WithConcurrency(workers, batchSize),
WithDestinations(dest),
Expand Down Expand Up @@ -291,13 +295,15 @@ func TestConverge_MultipleTasks(t *testing.T) {
dest1 = newTestDestination("foo")
dest2 = newTestDestination("bar")
task1 = NewTask(
WithSource(0, "a", tg),
WithSourceConfig(SourceConfig{Name: "a"}),
WithSourceFactory(func(SourceConfig) Source { return tg }),
WithPG(pg),
WithConcurrency(1, 3),
WithDestinations(dest1),
)
task2 = NewTask(
WithSource(0, "b", tg),
WithSourceConfig(SourceConfig{Name: "b"}),
WithSourceFactory(func(SourceConfig) Source { return tg }),
WithPG(pg),
WithConcurrency(1, 3),
WithDestinations(dest2),
Expand All @@ -322,7 +328,8 @@ func TestConverge_LocalAhead(t *testing.T) {
pg = testpg(t)
dest = newTestDestination("foo")
task = NewTask(
WithSource(0, "foo", tg),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return tg }),
WithPG(pg),
WithConcurrency(1, 3),
WithDestinations(dest),
Expand Down Expand Up @@ -407,7 +414,8 @@ func TestInitRows(t *testing.T) {

task := NewTask(
WithPG(pg),
WithSource(0, "foo", &testGeth{}),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }),
WithDestinations(newTestDestination("bar")),
)
err = task.initRows(42, hash(42))
Expand All @@ -422,7 +430,8 @@ func TestInitRows(t *testing.T) {

task = NewTask(
WithPG(pg),
WithSource(0, "foo", &testGeth{}),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }),
WithDestinations(newTestDestination("bar"), newTestDestination("baz")),
)
err = task.initRows(42, hash(42))
Expand All @@ -433,7 +442,8 @@ func TestInitRows(t *testing.T) {
task = NewTask(
WithPG(pg),
WithBackfill(true),
WithSource(0, "foo", &testGeth{}),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }),
WithDestinations(newTestDestination("bar"), newTestDestination("baz")),
)
err = task.initRows(42, hash(42))
Expand Down Expand Up @@ -490,13 +500,15 @@ func TestDestRanges_Load(t *testing.T) {

task1 := NewTask(
WithPG(pg),
WithSource(0, "foo", &testGeth{}),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }),
WithDestinations(newTestDestination("bar")),
)
task2 := NewTask(
WithPG(pg),
WithBackfill(true),
WithSource(0, "foo", &testGeth{}),
WithSourceConfig(SourceConfig{Name: "foo"}),
WithSourceFactory(func(SourceConfig) Source { return &testGeth{} }),
WithDestinations(newTestDestination("bar")),
)
err = task1.initRows(42, hash(42))
Expand Down
3 changes: 2 additions & 1 deletion e2pg/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (th *Helper) Process(dest Destination, n uint64) {
var (
geth = NewGeth(th.gt.FileCache, th.gt.Client)
task = NewTask(
WithSource(0, "testhelper", geth),
WithSourceConfig(SourceConfig{Name: "testhelper"}),
WithSourceFactory(func(SourceConfig) Source { return geth }),
WithPG(th.PG),
WithDestinations(dest),
WithRange(n, n+1),
Expand Down

0 comments on commit af851d5

Please sign in to comment.