Skip to content

Commit

Permalink
e2pg: add integration update table
Browse files Browse the repository at this point in the history
This table will be used to control backfill tasks. The task table keeps
a history of blocks processed, but they are used for detecting a
re-organization. It does not track an integration's individual progress.

By introducing the intg table, we now can know the precise set of blocks
that an integration has processed.

This will enable a backfill task to group a set of integrations that
need backfilling and process block history based on the needs of the
integration.

This commit also introduces the notion of e2pg.{task,intg} pruning. We
keep around the last 200 blocks for each src_name / src_name,name. This
should help keep queries on this table fast while also preserving
history for reorgs and debugging.
  • Loading branch information
ryandotsmith committed Oct 25, 2023
1 parent 5fe392d commit 0ce4970
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 12 deletions.
7 changes: 7 additions & 0 deletions cmd/e2pg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func main() {
check(pprof.StartCPUProfile(&pbuf))
}

go func() {
for {
check(e2pg.PruneIntg(ctx, pg))
check(e2pg.PruneTask(ctx, pg))
time.Sleep(time.Minute * 10)
}
}()
check(mgr.Run())

switch profile {
Expand Down
117 changes: 115 additions & 2 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func WithDestinations(dests ...Destination) Option {
t.dests = dests
t.filter = filter
t.dstat = dstat
t.iub = newIUB(len(t.dests))
}
}

Expand Down Expand Up @@ -143,6 +144,7 @@ type Task struct {

dstatMut sync.Mutex
dstat map[string]Dstat
iub *intgUpdateBuf

filter [][]byte
batch []eth.Block
Expand Down Expand Up @@ -367,6 +369,9 @@ func (task *Task) Converge(notx bool) error {
if err != nil {
return fmt.Errorf("updating task table: %w", err)
}
if err := task.iub.write(task.ctx, pg); err != nil {
return fmt.Errorf("updating integrations: %w", err)
}
if err := commit(); err != nil {
return fmt.Errorf("commit converge tx: %w", err)
}
Expand Down Expand Up @@ -425,9 +430,15 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64
for j := range task.dests {
j := j
eg3.Go(func() error {
t0 := time.Now()
start := time.Now()
count, err := task.dests[j].Insert(task.ctx, pg, blks)
task.dstatw(task.dests[j].Name(), count, time.Since(t0))
task.dstatw(task.dests[j].Name(), count, time.Since(start))
task.iub.updates[j].Name = task.dests[j].Name()
task.iub.updates[j].SrcName = task.srcName
task.iub.updates[j].Backfill = task.backfill
task.iub.updates[j].Num = task.batch[delta-1].Num()
task.iub.updates[j].NRows = count
task.iub.updates[j].Latency = time.Since(start)
nrows += count
return err
})
Expand All @@ -438,6 +449,108 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64
return nrows, eg2.Wait()
}

type intgUpdate struct {
Name string `db:"name"`
SrcName string `db:"src_name"`
Backfill bool `db:"backfill"`
Num uint64 `db:"num"`
Latency time.Duration `db:"latency"`
NRows int64 `db:"nrows"`
}

func newIUB(n int) *intgUpdateBuf {
iub := &intgUpdateBuf{}
iub.updates = make([]intgUpdate, n)
iub.table = pgx.Identifier{"e2pg", "intg"}
iub.cols = []string{"name", "src_name", "backfill", "num", "latency", "nrows"}
return iub
}

type intgUpdateBuf struct {
i int
updates []intgUpdate
out [6]any
table pgx.Identifier
cols []string
}

func (b *intgUpdateBuf) Next() bool {
return b.i < len(b.updates)
}

func (b *intgUpdateBuf) Err() error {
return nil
}

func (b *intgUpdateBuf) Values() ([]any, error) {
if b.i >= len(b.updates) {
return nil, fmt.Errorf("no intg_update at idx %d len=%d", b.i, len(b.updates))
}
b.out[0] = b.updates[b.i].Name
b.out[1] = b.updates[b.i].SrcName
b.out[2] = b.updates[b.i].Backfill
b.out[3] = b.updates[b.i].Num
b.out[4] = b.updates[b.i].Latency
b.out[5] = b.updates[b.i].NRows
b.i++
return b.out[:], nil
}

func (b *intgUpdateBuf) write(ctx context.Context, pg wpg.Conn) error {
_, err := pg.CopyFrom(ctx, b.table, b.cols, b)
b.i = 0 // reset
return err
}

func PruneTask(ctx context.Context, pg wpg.Conn) error {
const q = `
delete from e2pg.task
where (src_name, backfill, num) not in (
select src_name, backfill, num
from (
select
src_name,
backfill,
num,
row_number() over(partition by src_name, backfill order by num desc) as rn
from e2pg.intg
) as s
where rn <= 10
)
`
cmd, err := pg.Exec(ctx, q)
if err != nil {
return fmt.Errorf("deleting e2pg.task: %w", err)
}
slog.InfoContext(ctx, "prune-task", "n", cmd.RowsAffected())
return nil
}

func PruneIntg(ctx context.Context, pg wpg.Conn) error {
const q = `
delete from e2pg.intg
where (name, src_name, backfill, num) not in (
select name, src_name, backfill, num
from (
select
name,
src_name,
backfill,
num,
row_number() over(partition by name, src_name, backfill order by num desc) as rn
from e2pg.intg
) as s
where rn <= 10
)
`
cmd, err := pg.Exec(ctx, q)
if err != nil {
return fmt.Errorf("deleting e2pg.intg: %w", err)
}
slog.InfoContext(ctx, "prune-intg", "n", cmd.RowsAffected())
return nil
}

type jsonDuration time.Duration

func (d *jsonDuration) ScanInterval(i pgtype.Interval) error {
Expand Down
65 changes: 65 additions & 0 deletions e2pg/iub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package e2pg

import (
"context"
"strings"
"testing"

"blake.io/pqx/pqxtest"
"github.com/indexsupply/x/wpg"
"github.com/jackc/pgx/v5/pgxpool"
"kr.dev/diff"
)

func TestPruneIntg(t *testing.T) {
ctx := context.Background()

pqxtest.CreateDB(t, Schema)
pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(t))
diff.Test(t, t.Fatalf, err, nil)

iub := newIUB(1)
iub.updates[0].Name = "foo"
iub.updates[0].SrcName = "bar"
iub.updates[0].Num = 1

err = iub.write(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
checkQuery(t, pg, `select count(*) = 1 from e2pg.intg`)

err = iub.write(ctx, pg)
if err == nil || !strings.Contains(err.Error(), "intg_name_src_name_num_idx1") {
t.Errorf("expected 2nd write to return unique error")
}

for i := 0; i < 10; i++ {
iub.updates[0].Num = uint64(i + 2)
err := iub.write(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
}
checkQuery(t, pg, `select count(*) = 11 from e2pg.intg`)
err = PruneIntg(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
checkQuery(t, pg, `select count(*) = 10 from e2pg.intg`)

iub.updates[0].Name = "foo"
iub.updates[0].SrcName = "baz"
iub.updates[0].Num = 1
err = iub.write(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
checkQuery(t, pg, `select count(*) = 1 from e2pg.intg where src_name = 'baz'`)
checkQuery(t, pg, `select count(*) = 11 from e2pg.intg`)

err = PruneIntg(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
checkQuery(t, pg, `select count(*) = 11 from e2pg.intg`)
}

func checkQuery(tb testing.TB, pg wpg.Conn, query string) {
var found bool
err := pg.QueryRow(context.Background(), query).Scan(&found)
diff.Test(tb, tb.Fatalf, err, nil)
if !found {
tb.Errorf("query\n%s\nreturned false", query)
}
}
30 changes: 22 additions & 8 deletions e2pg/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,27 @@ var Migrations = map[int]pgmig.Migration{
},
11: pgmig.Migration{
SQL: `
delete from e2pg.task where insert_at < now() - '2 hours'::interval;
alter table e2pg.task add column backfill bool default false;
alter table e2pg.task add column src_name text;
update e2pg.task set src_name = split_part(id, '-', 1);
alter table e2pg.task drop column id;
create unique index on e2pg.task(src_name, num desc) where backfill = true;
create unique index on e2pg.task(src_name, num desc) where backfill = false;
`,
delete from e2pg.task where insert_at < now() - '2 hours'::interval;
alter table e2pg.task add column backfill bool default false;
alter table e2pg.task add column src_name text;
update e2pg.task set src_name = split_part(id, '-', 1);
alter table e2pg.task drop column id;
create unique index on e2pg.task(src_name, num desc) where backfill = true;
create unique index on e2pg.task(src_name, num desc) where backfill = false;
`,
},
12: pgmig.Migration{
SQL: `
create table e2pg.intg (
name text not null,
src_name text not null,
backfill bool default false,
num numeric not null,
latency interval,
nrows numeric
);
create unique index on e2pg.intg(name, src_name, num desc) where backfill;
create unique index on e2pg.intg(name, src_name, num desc) where not backfill;
`,
},
}
23 changes: 21 additions & 2 deletions e2pg/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ CREATE TABLE e2pg.integrations (



CREATE TABLE e2pg.intg (
name text NOT NULL,
src_name text NOT NULL,
backfill boolean DEFAULT false,
num numeric NOT NULL,
latency interval,
nrows numeric
);



CREATE TABLE e2pg.migrations (
idx integer NOT NULL,
hash bytea NOT NULL,
Expand Down Expand Up @@ -64,6 +75,14 @@ ALTER TABLE ONLY e2pg.migrations



CREATE UNIQUE INDEX intg_name_src_name_num_idx ON e2pg.intg USING btree (name, src_name, num DESC) WHERE backfill;



CREATE UNIQUE INDEX intg_name_src_name_num_idx1 ON e2pg.intg USING btree (name, src_name, num DESC) WHERE (NOT backfill);



CREATE UNIQUE INDEX sources_name_chain_id_idx ON e2pg.sources USING btree (name, chain_id);


Expand All @@ -72,11 +91,11 @@ CREATE UNIQUE INDEX sources_name_idx ON e2pg.sources USING btree (name);



CREATE INDEX task_src_name_num_idx ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = true);
CREATE UNIQUE INDEX task_src_name_num_idx ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = true);



CREATE INDEX task_src_name_num_idx1 ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = false);
CREATE UNIQUE INDEX task_src_name_num_idx1 ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = false);



Expand Down

0 comments on commit 0ce4970

Please sign in to comment.