diff --git a/abi2/abi2.go b/abi2/abi2.go index 803dca08..4c487e48 100644 --- a/abi2/abi2.go +++ b/abi2/abi2.go @@ -649,7 +649,7 @@ func (ig Integration) Insert(ctx context.Context, pg wpg.Conn, blocks []eth.Bloc err error skip bool rows [][]any - lwc = &logWithCtx{ctx: ctx} + lwc = &logWithCtx{ctx: wctx.WithIntgName(ctx, ig.Name())} ) for bidx := range blocks { lwc.b = &blocks[bidx] @@ -693,8 +693,10 @@ type logWithCtx struct { func (lwc *logWithCtx) get(name string) any { switch name { - case "task_id": - return wctx.TaskID(lwc.ctx) + case "src_name": + return wctx.SrcName(lwc.ctx) + case "intg_name": + return wctx.IntgName(lwc.ctx) case "chain_id": return wctx.ChainID(lwc.ctx) case "block_hash": diff --git a/cmd/e2pg/main.go b/cmd/e2pg/main.go index 339e9722..d7f18cb0 100644 --- a/cmd/e2pg/main.go +++ b/cmd/e2pg/main.go @@ -60,13 +60,6 @@ func main() { } return "chain", fmt.Sprintf("%.5d", id) }) - lh.RegisterContext(func(ctx context.Context) (string, any) { - id := wctx.TaskID(ctx) - if id == "" { - return "", nil - } - return "task", id - }) slog.SetDefault(slog.New(lh.WithAttrs([]slog.Attr{ slog.Int("p", os.Getpid()), slog.String("v", Commit), @@ -126,7 +119,7 @@ func main() { check(pprof.StartCPUProfile(&pbuf)) } - go mgr.Run() + check(mgr.Run()) switch profile { case "cpu": diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index e79a15f9..2209df23 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -32,7 +32,6 @@ import ( var Schema string type Source interface { - ChainID() uint64 LoadBlocks([][]byte, []geth.Buffer, []eth.Block) error Latest() (uint64, []byte, error) Hash(uint64) ([]byte, error) @@ -47,34 +46,22 @@ type Destination interface { type Option func(t *Task) -func WithName(name string) Option { - return func(t *Task) { - t.Name = name - } -} - -func WithSource(s Source) Option { +func WithSource(chainID uint64, name string, src Source) Option { return func(t *Task) { if t.src != nil { panic("task can only have 1 src") } - t.src = s - t.ctx = wctx.WithChainID(t.ctx, s.ChainID()) - t.id = fmt.Sprintf("%d-main", t.src.ChainID()) - t.ctx = wctx.WithTaskID(t.ctx, t.id) + t.src = src + t.srcName = name + t.chainID = chainID + t.ctx = wctx.WithChainID(t.ctx, chainID) + t.ctx = wctx.WithSrcName(t.ctx, name) } } -func WithBackfillSource(s Source, name string) Option { +func WithBackfill(b bool) Option { return func(t *Task) { - if t.src != nil { - panic("task can only have 1 src") - } - t.backfill = true - t.src = s - t.ctx = wctx.WithChainID(t.ctx, s.ChainID()) - t.id = fmt.Sprintf("%d-backfill-%s", t.src.ChainID(), name) - t.ctx = wctx.WithTaskID(t.ctx, t.id) + t.backfill = b } } @@ -143,12 +130,13 @@ type Dstat struct { } type Task struct { - Name string ctx context.Context - id string backfill bool - src Source + src Source + srcName string + chainID uint64 + pgp *pgxpool.Pool dests []Destination start, stop uint64 @@ -174,15 +162,30 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) { } func (task *Task) Insert(n uint64, h []byte) error { - const q = `insert into e2pg.task (id, num, hash) values ($1, $2, $3)` - _, err := task.pgp.Exec(context.Background(), q, task.id, n, h) + const q = ` + insert into e2pg.task (src_name, backfill, num, hash) + values ($1, $2, $3, $4) + ` + _, err := task.pgp.Exec(task.ctx, q, + task.srcName, + task.backfill, + n, + h, + ) return err } func (task *Task) Latest() (uint64, []byte, error) { - const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1` + const q = ` + select num, hash + from e2pg.task + where src_name = $1 + and backfill = $2 + order by num desc + limit 1 + ` var n, h = uint64(0), []byte{} - err := task.pgp.QueryRow(context.Background(), q, task.id).Scan(&n, &h) + err := task.pgp.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&n, &h) if errors.Is(err, pgx.ErrNoRows) { return n, nil, nil } @@ -216,7 +219,7 @@ func (task *Task) Setup() error { return task.Insert(gethNum-1, h) } -func (task *Task) Run1(updates chan<- string, notx bool) { +func (task *Task) Run1(updates chan<- uint64, notx bool) { switch err := task.Converge(notx); { case errors.Is(err, ErrDone): return @@ -230,7 +233,7 @@ func (task *Task) Run1(updates chan<- string, notx bool) { // try out best to deliver update // but don't stack up work select { - case updates <- task.id: + case updates <- task.chainID: default: } }() @@ -266,15 +269,22 @@ func (task *Task) Converge(notx bool) error { pg = wpg.NewTxLocker(pgTx) //crc32(task) == 1384045349 const lockq = `select pg_advisory_xact_lock(1384045349, $1)` - _, err = pg.Exec(task.ctx, lockq, wctx.ChainID(task.ctx)) + _, err = pg.Exec(task.ctx, lockq, task.chainID) if err != nil { - return fmt.Errorf("task lock %s: %w", task.id, err) + return fmt.Errorf("task lock %d: %w", task.chainID, err) } } for reorgs := 0; reorgs <= 10; { localNum, localHash := uint64(0), []byte{} - const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1` - err := pg.QueryRow(task.ctx, q, task.id).Scan(&localNum, &localHash) + const q = ` + select num, hash + from e2pg.task + where src_name = $1 + and backfill = $2 + order by num desc + limit 1 + ` + err := pg.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&localNum, &localHash) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("getting latest from task: %w", err) } @@ -307,8 +317,13 @@ func (task *Task) Converge(notx bool) error { case errors.Is(err, ErrReorg): reorgs++ slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash)) - const dq = "delete from e2pg.task where id = $1 AND num >= $2" - _, err := pg.Exec(task.ctx, dq, task.id, localNum) + const dq = ` + delete from e2pg.task + where src_name = $1 + and backfill = $2 + and num >= $3 + ` + _, err := pg.Exec(task.ctx, dq, task.srcName, task.backfill, localNum) if err != nil { return fmt.Errorf("deleting block from task table: %w", err) } @@ -324,7 +339,8 @@ func (task *Task) Converge(notx bool) error { var last = task.batch[delta-1] const uq = ` insert into e2pg.task ( - id, + src_name, + backfill, num, hash, src_num, @@ -334,10 +350,11 @@ func (task *Task) Converge(notx bool) error { latency, dstat ) - values ($1, $2, $3, $4, $5, $6, $7, $8, $9) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ` _, err := pg.Exec(task.ctx, uq, - task.id, + task.srcName, + task.backfill, last.Num(), last.Hash(), gethNum, @@ -447,56 +464,40 @@ func (d jsonDuration) String() string { } type TaskUpdate struct { - ID string `db:"id"` - Num uint64 `db:"num"` - Hash eth.Bytes `db:"hash"` - SrcNum uint64 `db:"src_num"` - SrcHash eth.Bytes `db:"src_hash"` - NBlocks uint64 `db:"nblocks"` - NRows uint64 `db:"nrows"` - Latency jsonDuration `db:"latency"` - Dstat map[string]Dstat `db:"dstat"` -} - -func TaskUpdate1(ctx context.Context, pg wpg.Conn, id string) (TaskUpdate, error) { - const q = ` - select - id, - num, - hash, - coalesce(src_num, 0) src_num, - coalesce(src_hash, '\x00') src_hash, - coalesce(nblocks, 0) nblocks, - coalesce(nrows, 0) nrows, - coalesce(latency, '0')::interval latency, - coalesce(dstat, '{}') dstat - from e2pg.task - where id = $1 - order by num desc - limit 1; - ` - row, _ := pg.Query(ctx, q, id) - return pgx.CollectOneRow(row, pgx.RowToStructByName[TaskUpdate]) + SrcName string `db:"src_name"` + Backfill bool `db:"backfill"` + Num uint64 `db:"num"` + Hash eth.Bytes `db:"hash"` + SrcNum uint64 `db:"src_num"` + SrcHash eth.Bytes `db:"src_hash"` + NBlocks uint64 `db:"nblocks"` + NRows uint64 `db:"nrows"` + Latency jsonDuration `db:"latency"` + Dstat map[string]Dstat `db:"dstat"` } func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) { rows, _ := pg.Query(ctx, ` with f as ( - select id, max(num) num - from e2pg.task group by 1 + select src_name, backfill, max(num) num + from e2pg.task group by 1, 2 ) select - f.id, - f.num, - hash, - coalesce(src_num, 0) src_num, - coalesce(src_hash, '\x00') src_hash, - coalesce(nblocks, 0) nblocks, - coalesce(nrows, 0) nrows, - coalesce(latency, '0')::interval latency, - coalesce(dstat, '{}') dstat + f.src_name, + f.backfill, + f.num, + hash, + coalesce(src_num, 0) src_num, + coalesce(src_hash, '\x00') src_hash, + coalesce(nblocks, 0) nblocks, + coalesce(nrows, 0) nrows, + coalesce(latency, '0')::interval latency, + coalesce(dstat, '{}') dstat from f - left join e2pg.task on e2pg.task.id = f.id and e2pg.task.num = f.num; + left join e2pg.task + on e2pg.task.src_name = f.src_name + and e2pg.task.backfill = f.backfill + and e2pg.task.num = f.num; `) return pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate]) } @@ -509,7 +510,7 @@ type Manager struct { running sync.Mutex restart chan struct{} tasks []*Task - updates chan string + updates chan uint64 pgp *pgxpool.Pool conf Config } @@ -517,13 +518,13 @@ type Manager struct { func NewManager(pgp *pgxpool.Pool, conf Config) *Manager { return &Manager{ restart: make(chan struct{}), - updates: make(chan string), + updates: make(chan uint64), pgp: pgp, conf: conf, } } -func (tm *Manager) Updates() string { +func (tm *Manager) Updates() uint64 { return <-tm.updates } @@ -534,7 +535,7 @@ func (tm *Manager) runTask(t *Task) error { for { select { case <-tm.restart: - slog.Info("restart-task", "name", t.Name) + slog.Info("restart-task", "chain", t.chainID) return nil default: t.Run1(tm.updates, false) @@ -613,8 +614,7 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er } dests := destBySourceName[sc.Name] tasks = append(tasks, NewTask( - WithName(sc.Name), - WithSource(src), + WithSource(sc.ChainID, sc.Name, src), WithPG(pgp), WithRange(sc.Start, sc.Stop), WithConcurrency(1, 512), diff --git a/e2pg/e2pg_test.go b/e2pg/e2pg_test.go index 064264ef..edb1fc7a 100644 --- a/e2pg/e2pg_test.go +++ b/e2pg/e2pg_test.go @@ -86,8 +86,6 @@ type testGeth struct { blocks []eth.Block } -func (tg *testGeth) ChainID() uint64 { return 0 } - func (tg *testGeth) Hash(n uint64) ([]byte, error) { for j := range tg.blocks { if uint64(tg.blocks[j].Header.Number) == n { @@ -144,7 +142,7 @@ func TestSetup(t *testing.T) { tg = &testGeth{} pg = testpg(t) task = NewTask( - WithSource(tg), + WithSource(0, "1", tg), WithPG(pg), WithDestinations(newTestDestination()), ) @@ -165,7 +163,7 @@ func TestConverge_Zero(t *testing.T) { tg = &testGeth{} pg = testpg(t) task = NewTask( - WithSource(tg), + WithSource(0, "1", tg), WithPG(pg), WithDestinations(newTestDestination()), ) @@ -179,7 +177,7 @@ func TestConverge_EmptyDestination(t *testing.T) { tg = &testGeth{} dest = newTestDestination() task = NewTask( - WithSource(tg), + WithSource(0, "1", tg), WithPG(pg), WithDestinations(dest), ) @@ -198,7 +196,7 @@ func TestConverge_Reorg(t *testing.T) { tg = &testGeth{} dest = newTestDestination() task = NewTask( - WithSource(tg), + WithSource(0, "1", tg), WithPG(pg), WithDestinations(dest), ) @@ -229,7 +227,7 @@ func TestConverge_DeltaBatchSize(t *testing.T) { tg = &testGeth{} dest = newTestDestination() task = NewTask( - WithSource(tg), + WithSource(0, "1", tg), WithPG(pg), WithConcurrency(workers, batchSize), WithDestinations(dest), @@ -258,13 +256,14 @@ func TestConverge_MultipleTasks(t *testing.T) { dest1 = newTestDestination() dest2 = newTestDestination() task1 = NewTask( - WithSource(tg), + WithSource(0, "1", tg), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest1), ) task2 = NewTask( - WithBackfillSource(tg, "foo"), + WithSource(0, "1", tg), + WithBackfill(true), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest2), @@ -289,7 +288,7 @@ func TestConverge_LocalAhead(t *testing.T) { pg = testpg(t) dest = newTestDestination() task = NewTask( - WithSource(tg), + WithSource(0, "1", tg), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest), diff --git a/e2pg/integration_test.go b/e2pg/integration_test.go index e852e494..94f8e5aa 100644 --- a/e2pg/integration_test.go +++ b/e2pg/integration_test.go @@ -66,7 +66,7 @@ func (th *Helper) Process(dest Destination, n uint64) { var ( geth = NewGeth(th.gt.FileCache, th.gt.Client) task = NewTask( - WithSource(geth), + WithSource(0, "", geth), WithPG(th.PG), WithDestinations(dest), ) diff --git a/e2pg/migrations.go b/e2pg/migrations.go index 01b9cbde..de6b6a0c 100644 --- a/e2pg/migrations.go +++ b/e2pg/migrations.go @@ -34,4 +34,15 @@ var Migrations = map[int]pgmig.Migration{ alter table e2pg.task add column dstat jsonb; `, }, + 11: pgmig.Migration{ + SQL: ` + delete from e2pg.task where insert_at < now() - '2 hours'::interval; + alter table e2pg.task add column backfill bool default false; + alter table e2pg.task add column src_name text; + update e2pg.task set src_name = split_part(id, '-', 1); + alter table e2pg.task drop column id; + create unique index on e2pg.task(src_name, num desc) where backfill = true; + create unique index on e2pg.task(src_name, num desc) where backfill = false; + `, + }, } diff --git a/e2pg/schema.sql b/e2pg/schema.sql index 556552fa..19aa7c5c 100644 --- a/e2pg/schema.sql +++ b/e2pg/schema.sql @@ -44,7 +44,6 @@ CREATE TABLE e2pg.sources ( CREATE TABLE e2pg.task ( - id text NOT NULL, num bigint, hash bytea, insert_at timestamp with time zone DEFAULT now(), @@ -53,7 +52,9 @@ CREATE TABLE e2pg.task ( nblocks numeric, nrows numeric, latency interval, - dstat jsonb + dstat jsonb, + backfill boolean DEFAULT false, + src_name text ); @@ -71,7 +72,11 @@ CREATE UNIQUE INDEX sources_name_idx ON e2pg.sources USING btree (name); -CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, num DESC); +CREATE INDEX task_src_name_num_idx ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = true); + + + +CREATE INDEX task_src_name_num_idx1 ON e2pg.task USING btree (src_name, num DESC) WHERE (backfill = false); diff --git a/e2pg/web/index.html b/e2pg/web/index.html index cbcb4b77..37695694 100644 --- a/e2pg/web/index.html +++ b/e2pg/web/index.html @@ -111,9 +111,8 @@

E2PG

{{ range $sc := .SourceConfigs -}} - {{ $tid := (printf "%d-main" $sc.ChainID) -}} - {{ with $tu := (index $.TaskUpdates $tid) -}} -
+ {{ with $tu := (index $.TaskUpdates $sc.Name) -}} +
{{ $sc.Name }}
{{ $tu.NRows }}
@@ -123,7 +122,7 @@

E2PG