diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index c4db6a3a..0dcd770f 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -107,6 +107,7 @@ func WithDestinations(dests ...Destination) Option { t.filter = filter t.dstat = dstat t.iub = newIUB(len(t.dests)) + t.destRanges = make([]destRange, len(t.dests)) } } @@ -140,6 +141,7 @@ type Task struct { pgp *pgxpool.Pool dests []Destination + destRanges []destRange start, stop uint64 dstatMut sync.Mutex @@ -163,25 +165,39 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) { t.dstat[name] = s } -func (task *Task) Setup() error { +func (t *Task) Setup() error { switch { - case task.start > 0: - h, err := task.src.Hash(task.start - 1) + case t.start > 0: + h, err := t.src.Hash(t.start - 1) if err != nil { return err } - return task.initRows(task.start-1, h) + if err := t.initRows(t.start-1, h); err != nil { + return fmt.Errorf("init rows for user start: %w", err) + } default: - gethNum, _, err := task.src.Latest() + gethNum, _, err := t.src.Latest() if err != nil { return err } - h, err := task.src.Hash(gethNum - 1) + h, err := t.src.Hash(gethNum - 1) if err != nil { return fmt.Errorf("getting hash for %d: %w", gethNum-1, err) } - return task.initRows(gethNum-1, h) + if err := t.initRows(gethNum-1, h); err != nil { + return fmt.Errorf("init rows for latest: %w", err) + } + } + if !t.backfill { + return nil + } + for i, d := range t.dests { + err := t.destRanges[i].load(t.ctx, t.pgp, d.Name(), t.srcName) + if err != nil { + return fmt.Errorf("loading dest range for %s/%s: %w", d.Name(), t.srcName, err) + } } + return nil } // inserts an e2pg.task unless one with {src_name,backfill} already exists @@ -439,12 +455,13 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64 j := j eg3.Go(func() error { start := time.Now() + blks = task.destRanges[j].filter(blks) count, err := task.dests[j].Insert(task.ctx, pg, blks) task.dstatw(task.dests[j].Name(), count, time.Since(start)) task.iub.updates[j].Name = task.dests[j].Name() task.iub.updates[j].SrcName = task.srcName task.iub.updates[j].Backfill = task.backfill - task.iub.updates[j].Num = task.batch[delta-1].Num() + task.iub.updates[j].Num = blks[len(blks)-1].Num() task.iub.updates[j].NRows = count task.iub.updates[j].Latency = time.Since(start) nrows += count @@ -457,6 +474,60 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64 return nrows, eg2.Wait() } +type destRange struct{ start, stop uint64 } + +func (r *destRange) load(ctx context.Context, pg wpg.Conn, name, srcName string) error { + const startQuery = ` + select num + from e2pg.intg + where name = $1 + and src_name = $2 + and backfill = true + order by num desc + limit 1 + ` + err := pg.QueryRow(ctx, startQuery, name, srcName).Scan(&r.start) + if err != nil { + return fmt.Errorf("start for %s/%s: %w", name, srcName, err) + } + const stopQuery = ` + select num + from e2pg.intg + where name = $1 + and src_name = $2 + and backfill = false + order by num asc + limit 1 + ` + err = pg.QueryRow(ctx, stopQuery, name, srcName).Scan(&r.stop) + if err != nil { + return fmt.Errorf("stop for %s/%s: %w", name, srcName, err) + } + return nil +} + +func (r *destRange) filter(blks []eth.Block) []eth.Block { + switch { + case r.stop == 0: + return blks + case len(blks) == 0: + return blks + case blks[0].Num() >= r.start && blks[len(blks)-1].Num() <= r.stop: + return blks + default: + var n, m = 0, len(blks) + for i := range blks { + switch blks[i].Num() { + case r.start: + n = i + case r.stop: + m = i + 1 + } + } + return blks[n:m] + } +} + type intgUpdate struct { Name string `db:"name"` SrcName string `db:"src_name"` diff --git a/e2pg/e2pg_test.go b/e2pg/e2pg_test.go index 5628e71b..1dadaf5a 100644 --- a/e2pg/e2pg_test.go +++ b/e2pg/e2pg_test.go @@ -411,7 +411,6 @@ func TestPruneIntg(t *testing.T) { func TestInitRows(t *testing.T) { ctx := context.Background() - pqxtest.CreateDB(t, Schema) pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(t)) diff.Test(t, t.Fatalf, err, nil) @@ -492,3 +491,95 @@ func TestInitRows(t *testing.T) { and backfill = false `) } + +func TestDestRanges_Load(t *testing.T) { + ctx := context.Background() + pqxtest.CreateDB(t, Schema) + pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(t)) + diff.Test(t, t.Fatalf, err, nil) + + task1 := NewTask( + WithPG(pg), + WithSource(0, "foo", &testGeth{}), + WithDestinations(newTestDestination("bar")), + ) + task2 := NewTask( + WithPG(pg), + WithBackfill(true), + WithSource(0, "foo", &testGeth{}), + WithDestinations(newTestDestination("bar")), + ) + err = task1.initRows(42, hash(42)) + diff.Test(t, t.Fatalf, err, nil) + err = task2.initRows(10, hash(10)) + diff.Test(t, t.Fatalf, err, nil) + + diff.Test(t, t.Fatalf, len(task2.destRanges), 1) + err = task2.destRanges[0].load(ctx, pg, "bar", "foo") + diff.Test(t, t.Fatalf, err, nil) + diff.Test(t, t.Errorf, task2.destRanges[0].start, uint64(10)) + diff.Test(t, t.Errorf, task2.destRanges[0].stop, uint64(42)) +} + +func TestDestRanges_Filter(t *testing.T) { + br := func(l, h uint64) (res []eth.Block) { + for i := l; i <= h; i++ { + res = append(res, eth.Block{Header: eth.Header{Number: eth.Uint64(i)}}) + } + return + } + + cases := []struct { + desc string + input []eth.Block + r destRange + want []eth.Block + }{ + { + desc: "empty input", + input: []eth.Block{}, + r: destRange{}, + want: []eth.Block{}, + }, + { + desc: "empty range", + input: []eth.Block{ + eth.Block{Header: eth.Header{Number: 42}}, + eth.Block{Header: eth.Header{Number: 43}}, + }, + r: destRange{}, + want: []eth.Block{ + eth.Block{Header: eth.Header{Number: 42}}, + eth.Block{Header: eth.Header{Number: 43}}, + }, + }, + { + desc: "[0, 10] -> [1,9]", + input: br(0, 10), + r: destRange{start: 1, stop: 9}, + want: br(1, 9), + }, + { + desc: "[0, 10] -> [0,5]", + input: br(0, 10), + r: destRange{start: 0, stop: 5}, + want: br(0, 5), + }, + { + desc: "[0, 10] -> [5,10]", + input: br(0, 10), + r: destRange{start: 5, stop: 10}, + want: br(5, 10), + }, + { + desc: "[0, 10] -> [10, 15]", + input: br(0, 10), + r: destRange{start: 10, stop: 15}, + want: br(10, 10), + }, + } + for _, tc := range cases { + got := tc.r.filter(tc.input) + diff.Test(t, t.Errorf, got, tc.want) + } +}