diff --git a/cmd/e2pg/main.go b/cmd/e2pg/main.go index d7f18cb0..b257a214 100644 --- a/cmd/e2pg/main.go +++ b/cmd/e2pg/main.go @@ -119,6 +119,13 @@ func main() { check(pprof.StartCPUProfile(&pbuf)) } + go func() { + for { + check(e2pg.PruneIntg(ctx, pg)) + check(e2pg.PruneTask(ctx, pg)) + time.Sleep(time.Minute * 10) + } + }() check(mgr.Run()) switch profile { diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 2209df23..c259b336 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -106,6 +106,7 @@ func WithDestinations(dests ...Destination) Option { t.dests = dests t.filter = filter t.dstat = dstat + t.iub = newIUB(len(t.dests)) } } @@ -143,6 +144,7 @@ type Task struct { dstatMut sync.Mutex dstat map[string]Dstat + iub *intgUpdateBuf filter [][]byte batch []eth.Block @@ -367,6 +369,9 @@ func (task *Task) Converge(notx bool) error { if err != nil { return fmt.Errorf("updating task table: %w", err) } + if err := task.iub.write(task.ctx, pg); err != nil { + return fmt.Errorf("updating integrations: %w", err) + } if err := commit(); err != nil { return fmt.Errorf("commit converge tx: %w", err) } @@ -425,9 +430,15 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64 for j := range task.dests { j := j eg3.Go(func() error { - t0 := time.Now() + start := time.Now() count, err := task.dests[j].Insert(task.ctx, pg, blks) - task.dstatw(task.dests[j].Name(), count, time.Since(t0)) + task.dstatw(task.dests[j].Name(), count, time.Since(start)) + task.iub.updates[j].Name = task.dests[j].Name() + task.iub.updates[j].SrcName = task.srcName + task.iub.updates[j].Backfill = task.backfill + task.iub.updates[j].Num = task.batch[delta-1].Num() + task.iub.updates[j].NRows = count + task.iub.updates[j].Latency = time.Since(start) nrows += count return err }) @@ -438,6 +449,108 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64 return nrows, eg2.Wait() } +type intgUpdate struct { + Name string `db:"name"` + SrcName string `db:"src_name"` + Backfill bool `db:"backfill"` + Num uint64 `db:"num"` + Latency time.Duration `db:"latency"` + NRows int64 `db:"nrows"` +} + +func newIUB(n int) *intgUpdateBuf { + iub := &intgUpdateBuf{} + iub.updates = make([]intgUpdate, n) + iub.table = pgx.Identifier{"e2pg", "intg"} + iub.cols = []string{"name", "src_name", "backfill", "num", "latency", "nrows"} + return iub +} + +type intgUpdateBuf struct { + i int + updates []intgUpdate + out [6]any + table pgx.Identifier + cols []string +} + +func (b *intgUpdateBuf) Next() bool { + return b.i < len(b.updates) +} + +func (b *intgUpdateBuf) Err() error { + return nil +} + +func (b *intgUpdateBuf) Values() ([]any, error) { + if b.i >= len(b.updates) { + return nil, fmt.Errorf("no intg_update at idx %d len=%d", b.i, len(b.updates)) + } + b.out[0] = b.updates[b.i].Name + b.out[1] = b.updates[b.i].SrcName + b.out[2] = b.updates[b.i].Backfill + b.out[3] = b.updates[b.i].Num + b.out[4] = b.updates[b.i].Latency + b.out[5] = b.updates[b.i].NRows + b.i++ + return b.out[:], nil +} + +func (b *intgUpdateBuf) write(ctx context.Context, pg wpg.Conn) error { + _, err := pg.CopyFrom(ctx, b.table, b.cols, b) + b.i = 0 // reset + return err +} + +func PruneTask(ctx context.Context, pg wpg.Conn) error { + const q = ` + delete from e2pg.task + where (src_name, backfill, num) not in ( + select src_name, backfill, num + from ( + select + src_name, + backfill, + num, + row_number() over(partition by src_name, backfill order by num desc) as rn + from e2pg.intg + ) as s + where rn <= 10 + ) + ` + cmd, err := pg.Exec(ctx, q) + if err != nil { + return fmt.Errorf("deleting e2pg.task: %w", err) + } + slog.InfoContext(ctx, "prune-task", "n", cmd.RowsAffected()) + return nil +} + +func PruneIntg(ctx context.Context, pg wpg.Conn) error { + const q = ` + delete from e2pg.intg + where (name, src_name, backfill, num) not in ( + select name, src_name, backfill, num + from ( + select + name, + src_name, + backfill, + num, + row_number() over(partition by name, src_name, backfill order by num desc) as rn + from e2pg.intg + ) as s + where rn <= 10 + ) + ` + cmd, err := pg.Exec(ctx, q) + if err != nil { + return fmt.Errorf("deleting e2pg.intg: %w", err) + } + slog.InfoContext(ctx, "prune-intg", "n", cmd.RowsAffected()) + return nil +} + type jsonDuration time.Duration func (d *jsonDuration) ScanInterval(i pgtype.Interval) error { diff --git a/e2pg/iub_test.go b/e2pg/iub_test.go new file mode 100644 index 00000000..d53aab19 --- /dev/null +++ b/e2pg/iub_test.go @@ -0,0 +1,65 @@ +package e2pg + +import ( + "context" + "strings" + "testing" + + "blake.io/pqx/pqxtest" + "github.com/indexsupply/x/wpg" + "github.com/jackc/pgx/v5/pgxpool" + "kr.dev/diff" +) + +func TestPruneIntg(t *testing.T) { + ctx := context.Background() + + pqxtest.CreateDB(t, Schema) + pg, err := pgxpool.New(ctx, pqxtest.DSNForTest(t)) + diff.Test(t, t.Fatalf, err, nil) + + iub := newIUB(1) + iub.updates[0].Name = "foo" + iub.updates[0].SrcName = "bar" + iub.updates[0].Num = 1 + + err = iub.write(ctx, pg) + diff.Test(t, t.Fatalf, err, nil) + checkQuery(t, pg, `select count(*) = 1 from e2pg.intg`) + + err = iub.write(ctx, pg) + if err == nil || !strings.Contains(err.Error(), "intg_name_src_name_num_idx1") { + t.Errorf("expected 2nd write to return unique error") + } + + for i := 0; i < 10; i++ { + iub.updates[0].Num = uint64(i + 2) + err := iub.write(ctx, pg) + diff.Test(t, t.Fatalf, err, nil) + } + checkQuery(t, pg, `select count(*) = 11 from e2pg.intg`) + err = PruneIntg(ctx, pg) + diff.Test(t, t.Fatalf, err, nil) + checkQuery(t, pg, `select count(*) = 10 from e2pg.intg`) + + iub.updates[0].Name = "foo" + iub.updates[0].SrcName = "baz" + iub.updates[0].Num = 1 + err = iub.write(ctx, pg) + diff.Test(t, t.Fatalf, err, nil) + checkQuery(t, pg, `select count(*) = 1 from e2pg.intg where src_name = 'baz'`) + checkQuery(t, pg, `select count(*) = 11 from e2pg.intg`) + + err = PruneIntg(ctx, pg) + diff.Test(t, t.Fatalf, err, nil) + checkQuery(t, pg, `select count(*) = 11 from e2pg.intg`) +} + +func checkQuery(tb testing.TB, pg wpg.Conn, query string) { + var found bool + err := pg.QueryRow(context.Background(), query).Scan(&found) + diff.Test(tb, tb.Fatalf, err, nil) + if !found { + tb.Errorf("query\n%s\nreturned false", query) + } +} diff --git a/e2pg/migrations.go b/e2pg/migrations.go index de6b6a0c..f7f63847 100644 --- a/e2pg/migrations.go +++ b/e2pg/migrations.go @@ -36,13 +36,27 @@ var Migrations = map[int]pgmig.Migration{ }, 11: pgmig.Migration{ SQL: ` - delete from e2pg.task where insert_at < now() - '2 hours'::interval; - alter table e2pg.task add column backfill bool default false; - alter table e2pg.task add column src_name text; - update e2pg.task set src_name = split_part(id, '-', 1); - alter table e2pg.task drop column id; - create unique index on e2pg.task(src_name, num desc) where backfill = true; - create unique index on e2pg.task(src_name, num desc) where backfill = false; - `, + delete from e2pg.task where insert_at < now() - '2 hours'::interval; + alter table e2pg.task add column backfill bool default false; + alter table e2pg.task add column src_name text; + update e2pg.task set src_name = split_part(id, '-', 1); + alter table e2pg.task drop column id; + create unique index on e2pg.task(src_name, num desc) where backfill = true; + create unique index on e2pg.task(src_name, num desc) where backfill = false; + `, + }, + 12: pgmig.Migration{ + SQL: ` + create table e2pg.intg ( + name text not null, + src_name text not null, + backfill bool default false, + num numeric not null, + latency interval, + nrows numeric + ); + create unique index on e2pg.intg(name, src_name, num desc) where backfill; + create unique index on e2pg.intg(name, src_name, num desc) where not backfill; + `, }, } diff --git a/e2pg/schema.sql b/e2pg/schema.sql index 19aa7c5c..e2764923 100644 --- a/e2pg/schema.sql +++ b/e2pg/schema.sql @@ -27,6 +27,17 @@ CREATE TABLE e2pg.integrations ( +CREATE TABLE e2pg.intg ( + name text NOT NULL, + src_name text NOT NULL, + backfill boolean DEFAULT false, + num numeric NOT NULL, + latency interval, + nrows numeric +); + + + CREATE TABLE e2pg.migrations ( idx integer NOT NULL, hash bytea NOT NULL, @@ -64,6 +75,14 @@ ALTER TABLE ONLY e2pg.migrations +CREATE UNIQUE INDEX intg_name_src_name_num_idx ON e2pg.intg USING btree (name, src_name, num DESC) WHERE backfill; + + + +CREATE UNIQUE INDEX intg_name_src_name_num_idx1 ON e2pg.intg USING btree (name, src_name, num DESC) WHERE (NOT backfill); + + + CREATE UNIQUE INDEX sources_name_chain_id_idx ON e2pg.sources USING btree (name, chain_id); @@ -72,11 +91,11 @@ CREATE UNIQUE INDEX sources_name_idx ON e2pg.sources USING btree (name); -CREATE INDEX task_src_name_num_idx ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = true); +CREATE UNIQUE INDEX task_src_name_num_idx ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = true); -CREATE INDEX task_src_name_num_idx1 ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = false); +CREATE UNIQUE INDEX task_src_name_num_idx1 ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = false);