Skip to content

Commit

Permalink
e2pg: introduce a destRange for backfill filtering
Browse files Browse the repository at this point in the history
This is needed because there will soon be a backfill task per source
that will write to destinations that may or may not have the same level
of progress. E.g. it is likely that there is an integration that has
been backfilled from block: 0, 100 but on e2pg restart, a new
integration has been added that requires a backfill of 0,200 and so the
backfill task will work on both integrations but starting at 0 again. So
the first integration will not be asked to insert data until the
backfill task reaches 100 again. Similarly, an integration may not accept
new Insert calls if it is already up to date with its main task while
other backfill tasks still have blocks to process.
  • Loading branch information
ryandotsmith committed Oct 27, 2023
1 parent dee33aa commit b335f3f
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 9 deletions.
87 changes: 79 additions & 8 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func WithDestinations(dests ...Destination) Option {
t.filter = filter
t.dstat = dstat
t.iub = newIUB(len(t.dests))
t.destRanges = make([]destRange, len(t.dests))
}
}

Expand Down Expand Up @@ -140,6 +141,7 @@ type Task struct {

pgp *pgxpool.Pool
dests []Destination
destRanges []destRange
start, stop uint64

dstatMut sync.Mutex
Expand All @@ -163,25 +165,39 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) {
t.dstat[name] = s
}

func (task *Task) Setup() error {
func (t *Task) Setup() error {
switch {
case task.start > 0:
h, err := task.src.Hash(task.start - 1)
case t.start > 0:
h, err := t.src.Hash(t.start - 1)
if err != nil {
return err
}
return task.initRows(task.start-1, h)
if err := t.initRows(t.start-1, h); err != nil {
return fmt.Errorf("init rows for user start: %w", err)
}
default:
gethNum, _, err := task.src.Latest()
gethNum, _, err := t.src.Latest()
if err != nil {
return err
}
h, err := task.src.Hash(gethNum - 1)
h, err := t.src.Hash(gethNum - 1)
if err != nil {
return fmt.Errorf("getting hash for %d: %w", gethNum-1, err)
}
return task.initRows(gethNum-1, h)
if err := t.initRows(gethNum-1, h); err != nil {
return fmt.Errorf("init rows for latest: %w", err)
}
}
if !t.backfill {
return nil
}
for i, d := range t.dests {
err := t.destRanges[i].load(t.ctx, t.pgp, d.Name(), t.srcName)
if err != nil {
return fmt.Errorf("loading dest range for %s/%s: %w", d.Name(), t.srcName, err)
}
}
return nil
}

// inserts an e2pg.task unless one with {src_name,backfill} already exists
Expand Down Expand Up @@ -439,12 +455,13 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64
j := j
eg3.Go(func() error {
start := time.Now()
blks = task.destRanges[j].filter(blks)
count, err := task.dests[j].Insert(task.ctx, pg, blks)
task.dstatw(task.dests[j].Name(), count, time.Since(start))
task.iub.updates[j].Name = task.dests[j].Name()
task.iub.updates[j].SrcName = task.srcName
task.iub.updates[j].Backfill = task.backfill
task.iub.updates[j].Num = task.batch[delta-1].Num()
task.iub.updates[j].Num = blks[len(blks)-1].Num()
task.iub.updates[j].NRows = count
task.iub.updates[j].Latency = time.Since(start)
nrows += count
Expand All @@ -457,6 +474,60 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64
return nrows, eg2.Wait()
}

type destRange struct{ start, stop uint64 }

func (r *destRange) load(ctx context.Context, pg wpg.Conn, name, srcName string) error {
const startQuery = `
select num
from e2pg.intg
where name = $1
and src_name = $2
and backfill = true
order by num desc
limit 1
`
err := pg.QueryRow(ctx, startQuery, name, srcName).Scan(&r.start)
if err != nil {
return fmt.Errorf("start for %s/%s: %w", name, srcName, err)
}
const stopQuery = `
select num
from e2pg.intg
where name = $1
and src_name = $2
and backfill = false
order by num asc
limit 1
`
err = pg.QueryRow(ctx, stopQuery, name, srcName).Scan(&r.stop)
if err != nil {
return fmt.Errorf("stop for %s/%s: %w", name, srcName, err)
}
return nil
}

func (r *destRange) filter(blks []eth.Block) []eth.Block {
switch {
case r.stop == 0:
return blks
case len(blks) == 0:
return blks
case blks[0].Num() >= r.start && blks[len(blks)-1].Num() <= r.stop:
return blks
default:
var n, m = 0, len(blks)
for i := range blks {
switch blks[i].Num() {
case r.start:
n = i
case r.stop:
m = i + 1
}
}
return blks[n:m]
}
}

type intgUpdate struct {
Name string `db:"name"`
SrcName string `db:"src_name"`
Expand Down
93 changes: 92 additions & 1 deletion e2pg/e2pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ func TestPruneIntg(t *testing.T) {

func TestInitRows(t *testing.T) {
ctx := context.Background()

pqxtest.CreateDB(t, Schema)
pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(t))
diff.Test(t, t.Fatalf, err, nil)
Expand Down Expand Up @@ -492,3 +491,95 @@ func TestInitRows(t *testing.T) {
and backfill = false
`)
}

func TestDestRanges_Load(t *testing.T) {
ctx := context.Background()
pqxtest.CreateDB(t, Schema)
pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(t))
diff.Test(t, t.Fatalf, err, nil)

task1 := NewTask(
WithPG(pg),
WithSource(0, "foo", &testGeth{}),
WithDestinations(newTestDestination("bar")),
)
task2 := NewTask(
WithPG(pg),
WithBackfill(true),
WithSource(0, "foo", &testGeth{}),
WithDestinations(newTestDestination("bar")),
)
err = task1.initRows(42, hash(42))
diff.Test(t, t.Fatalf, err, nil)
err = task2.initRows(10, hash(10))
diff.Test(t, t.Fatalf, err, nil)

diff.Test(t, t.Fatalf, len(task2.destRanges), 1)
err = task2.destRanges[0].load(ctx, pg, "bar", "foo")
diff.Test(t, t.Fatalf, err, nil)
diff.Test(t, t.Errorf, task2.destRanges[0].start, uint64(10))
diff.Test(t, t.Errorf, task2.destRanges[0].stop, uint64(42))
}

func TestDestRanges_Filter(t *testing.T) {
br := func(l, h uint64) (res []eth.Block) {
for i := l; i <= h; i++ {
res = append(res, eth.Block{Header: eth.Header{Number: eth.Uint64(i)}})
}
return
}

cases := []struct {
desc string
input []eth.Block
r destRange
want []eth.Block
}{
{
desc: "empty input",
input: []eth.Block{},
r: destRange{},
want: []eth.Block{},
},
{
desc: "empty range",
input: []eth.Block{
eth.Block{Header: eth.Header{Number: 42}},
eth.Block{Header: eth.Header{Number: 43}},
},
r: destRange{},
want: []eth.Block{
eth.Block{Header: eth.Header{Number: 42}},
eth.Block{Header: eth.Header{Number: 43}},
},
},
{
desc: "[0, 10] -> [1,9]",
input: br(0, 10),
r: destRange{start: 1, stop: 9},
want: br(1, 9),
},
{
desc: "[0, 10] -> [0,5]",
input: br(0, 10),
r: destRange{start: 0, stop: 5},
want: br(0, 5),
},
{
desc: "[0, 10] -> [5,10]",
input: br(0, 10),
r: destRange{start: 5, stop: 10},
want: br(5, 10),
},
{
desc: "[0, 10] -> [10, 15]",
input: br(0, 10),
r: destRange{start: 10, stop: 15},
want: br(10, 10),
},
}
for _, tc := range cases {
got := tc.r.filter(tc.input)
diff.Test(t, t.Errorf, got, tc.want)
}
}

0 comments on commit b335f3f

Please sign in to comment.