From d09bd5c727939f898e61ab2366e87b85382f47ff Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Tue, 31 Oct 2023 10:52:37 -0700 Subject: [PATCH] e2pg: record task stop values for e2pg.task and e2pg.intg These values will be used to determine the progress of a task. --- e2pg/e2pg.go | 19 ++++++++++++++----- e2pg/e2pg_test.go | 6 +++--- e2pg/migrations.go | 6 ++++++ e2pg/schema.sql | 6 ++++-- 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 4d9b40f7..f3a69704 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -391,6 +391,7 @@ func (task *Task) Converge(notx bool) error { src_name, backfill, num, + stop, hash, src_num, src_hash, @@ -399,12 +400,13 @@ func (task *Task) Converge(notx bool) error { latency, dstat ) - values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ` _, err := pg.Exec(task.ctx, uq, task.srcName, task.backfill, last.Num(), + task.stop, last.Hash(), gethNum, gethHash, @@ -489,6 +491,7 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64 task.srcName, task.backfill, blks[len(blks)-1].Num(), + task.stop, time.Since(start), count, ) @@ -564,6 +567,7 @@ type intgUpdate struct { SrcName string `db:"src_name"` Backfill bool `db:"backfill"` Num uint64 `db:"num"` + Stop uint64 `db:"stop"` Latency time.Duration `db:"latency"` NRows int64 `db:"nrows"` } @@ -572,14 +576,14 @@ func newIUB(n int) *intgUpdateBuf { iub := &intgUpdateBuf{} iub.updates = make([]intgUpdate, n) iub.table = pgx.Identifier{"e2pg", "intg"} - iub.cols = []string{"name", "src_name", "backfill", "num", "latency", "nrows"} + iub.cols = []string{"name", "src_name", "backfill", "num", "stop", "latency", "nrows"} return iub } type intgUpdateBuf struct { i int updates []intgUpdate - out [6]any + out [7]any table pgx.Identifier cols []string } @@ -590,6 +594,7 @@ func (b *intgUpdateBuf) update( srcName string, backfill bool, num uint64, + stop uint64, lat time.Duration, nrows int64, ) { @@ -598,6 +603,7 @@ func (b *intgUpdateBuf) update( b.updates[j].SrcName = srcName b.updates[j].Backfill = backfill b.updates[j].Num = num + b.updates[j].Stop = stop b.updates[j].Latency = lat b.updates[j].NRows = nrows } @@ -624,8 +630,9 @@ func (b *intgUpdateBuf) Values() ([]any, error) { b.out[1] = b.updates[b.i].SrcName b.out[2] = b.updates[b.i].Backfill b.out[3] = b.updates[b.i].Num - b.out[4] = b.updates[b.i].Latency - b.out[5] = b.updates[b.i].NRows + b.out[4] = b.updates[b.i].Stop + b.out[5] = b.updates[b.i].Latency + b.out[6] = b.updates[b.i].NRows b.updates[b.i].changed = false b.i++ return b.out[:], nil @@ -711,6 +718,7 @@ type TaskUpdate struct { SrcName string `db:"src_name"` Backfill bool `db:"backfill"` Num uint64 `db:"num"` + Stop uint64 `db:"stop"` Hash eth.Bytes `db:"hash"` SrcNum uint64 `db:"src_num"` SrcHash eth.Bytes `db:"src_hash"` @@ -730,6 +738,7 @@ func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) { f.src_name, f.backfill, f.num, + coalesce(stop, 0) stop, hash, coalesce(src_num, 0) src_num, coalesce(src_hash, '\x00') src_hash, diff --git a/e2pg/e2pg_test.go b/e2pg/e2pg_test.go index c228b981..62a22ee4 100644 --- a/e2pg/e2pg_test.go +++ b/e2pg/e2pg_test.go @@ -373,13 +373,13 @@ func TestPruneIntg(t *testing.T) { diff.Test(t, t.Fatalf, err, nil) iub := newIUB(1) - iub.update(0, "foo", "bar", true, 1, 0, 0) + iub.update(0, "foo", "bar", true, 1, 0, 0, 0) err = iub.write(ctx, pg) diff.Test(t, t.Fatalf, err, nil) checkQuery(t, pg, `select count(*) = 1 from e2pg.intg`) for i := 0; i < 10; i++ { - iub.update(0, "foo", "bar", true, uint64(i+2), 0, 0) + iub.update(0, "foo", "bar", true, uint64(i+2), 0, 0, 0) err := iub.write(ctx, pg) diff.Test(t, t.Fatalf, err, nil) } @@ -388,7 +388,7 @@ func TestPruneIntg(t *testing.T) { diff.Test(t, t.Fatalf, err, nil) checkQuery(t, pg, `select count(*) = 2 from e2pg.intg`) - iub.update(0, "foo", "baz", true, 1, 0, 0) + iub.update(0, "foo", "baz", true, 1, 0, 0, 0) err = iub.write(ctx, pg) diff.Test(t, t.Fatalf, err, nil) checkQuery(t, pg, `select count(*) = 1 from e2pg.intg where src_name = 'baz'`) diff --git a/e2pg/migrations.go b/e2pg/migrations.go index 9593ee77..af18135b 100644 --- a/e2pg/migrations.go +++ b/e2pg/migrations.go @@ -66,4 +66,10 @@ var Migrations = map[int]pgmig.Migration{ create unique index on e2pg.intg(name, src_name, backfill, num desc); `, }, + 14: pgmig.Migration{ + SQL: ` + alter table e2pg.intg add column stop numeric; + alter table e2pg.task add column stop numeric; + `, + }, } diff --git a/e2pg/schema.sql b/e2pg/schema.sql index 52601f9f..55ad7554 100644 --- a/e2pg/schema.sql +++ b/e2pg/schema.sql @@ -33,7 +33,8 @@ CREATE TABLE e2pg.intg ( backfill boolean DEFAULT false, num numeric NOT NULL, latency interval, - nrows numeric + nrows numeric, + stop numeric ); @@ -65,7 +66,8 @@ CREATE TABLE e2pg.task ( latency interval, dstat jsonb, backfill boolean DEFAULT false, - src_name text + src_name text, + stop numeric );