Skip to content

Commit

Permalink
e2pg: record task stop values for e2pg.task and e2pg.intg
Browse files Browse the repository at this point in the history
These values will be used to determine the progress of a task.
  • Loading branch information
ryandotsmith committed Oct 31, 2023
1 parent d7c54ec commit d09bd5c
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
19 changes: 14 additions & 5 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (task *Task) Converge(notx bool) error {
src_name,
backfill,
num,
stop,
hash,
src_num,
src_hash,
Expand All @@ -399,12 +400,13 @@ func (task *Task) Converge(notx bool) error {
latency,
dstat
)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
`
_, err := pg.Exec(task.ctx, uq,
task.srcName,
task.backfill,
last.Num(),
task.stop,
last.Hash(),
gethNum,
gethHash,
Expand Down Expand Up @@ -489,6 +491,7 @@ func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64
task.srcName,
task.backfill,
blks[len(blks)-1].Num(),
task.stop,
time.Since(start),
count,
)
Expand Down Expand Up @@ -564,6 +567,7 @@ type intgUpdate struct {
SrcName string `db:"src_name"`
Backfill bool `db:"backfill"`
Num uint64 `db:"num"`
Stop uint64 `db:"stop"`
Latency time.Duration `db:"latency"`
NRows int64 `db:"nrows"`
}
Expand All @@ -572,14 +576,14 @@ func newIUB(n int) *intgUpdateBuf {
iub := &intgUpdateBuf{}
iub.updates = make([]intgUpdate, n)
iub.table = pgx.Identifier{"e2pg", "intg"}
iub.cols = []string{"name", "src_name", "backfill", "num", "latency", "nrows"}
iub.cols = []string{"name", "src_name", "backfill", "num", "stop", "latency", "nrows"}
return iub
}

type intgUpdateBuf struct {
i int
updates []intgUpdate
out [6]any
out [7]any
table pgx.Identifier
cols []string
}
Expand All @@ -590,6 +594,7 @@ func (b *intgUpdateBuf) update(
srcName string,
backfill bool,
num uint64,
stop uint64,
lat time.Duration,
nrows int64,
) {
Expand All @@ -598,6 +603,7 @@ func (b *intgUpdateBuf) update(
b.updates[j].SrcName = srcName
b.updates[j].Backfill = backfill
b.updates[j].Num = num
b.updates[j].Stop = stop
b.updates[j].Latency = lat
b.updates[j].NRows = nrows
}
Expand All @@ -624,8 +630,9 @@ func (b *intgUpdateBuf) Values() ([]any, error) {
b.out[1] = b.updates[b.i].SrcName
b.out[2] = b.updates[b.i].Backfill
b.out[3] = b.updates[b.i].Num
b.out[4] = b.updates[b.i].Latency
b.out[5] = b.updates[b.i].NRows
b.out[4] = b.updates[b.i].Stop
b.out[5] = b.updates[b.i].Latency
b.out[6] = b.updates[b.i].NRows
b.updates[b.i].changed = false
b.i++
return b.out[:], nil
Expand Down Expand Up @@ -711,6 +718,7 @@ type TaskUpdate struct {
SrcName string `db:"src_name"`
Backfill bool `db:"backfill"`
Num uint64 `db:"num"`
Stop uint64 `db:"stop"`
Hash eth.Bytes `db:"hash"`
SrcNum uint64 `db:"src_num"`
SrcHash eth.Bytes `db:"src_hash"`
Expand All @@ -730,6 +738,7 @@ func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) {
f.src_name,
f.backfill,
f.num,
coalesce(stop, 0) stop,
hash,
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
Expand Down
6 changes: 3 additions & 3 deletions e2pg/e2pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,13 @@ func TestPruneIntg(t *testing.T) {
diff.Test(t, t.Fatalf, err, nil)

iub := newIUB(1)
iub.update(0, "foo", "bar", true, 1, 0, 0)
iub.update(0, "foo", "bar", true, 1, 0, 0, 0)
err = iub.write(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
checkQuery(t, pg, `select count(*) = 1 from e2pg.intg`)

for i := 0; i < 10; i++ {
iub.update(0, "foo", "bar", true, uint64(i+2), 0, 0)
iub.update(0, "foo", "bar", true, uint64(i+2), 0, 0, 0)
err := iub.write(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
}
Expand All @@ -388,7 +388,7 @@ func TestPruneIntg(t *testing.T) {
diff.Test(t, t.Fatalf, err, nil)
checkQuery(t, pg, `select count(*) = 2 from e2pg.intg`)

iub.update(0, "foo", "baz", true, 1, 0, 0)
iub.update(0, "foo", "baz", true, 1, 0, 0, 0)
err = iub.write(ctx, pg)
diff.Test(t, t.Fatalf, err, nil)
checkQuery(t, pg, `select count(*) = 1 from e2pg.intg where src_name = 'baz'`)
Expand Down
6 changes: 6 additions & 0 deletions e2pg/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ var Migrations = map[int]pgmig.Migration{
create unique index on e2pg.intg(name, src_name, backfill, num desc);
`,
},
14: pgmig.Migration{
SQL: `
alter table e2pg.intg add column stop numeric;
alter table e2pg.task add column stop numeric;
`,
},
}
6 changes: 4 additions & 2 deletions e2pg/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ CREATE TABLE e2pg.intg (
backfill boolean DEFAULT false,
num numeric NOT NULL,
latency interval,
nrows numeric
nrows numeric,
stop numeric
);


Expand Down Expand Up @@ -65,7 +66,8 @@ CREATE TABLE e2pg.task (
latency interval,
dstat jsonb,
backfill boolean DEFAULT false,
src_name text
src_name text,
stop numeric
);


Expand Down

0 comments on commit d09bd5c

Please sign in to comment.