Skip to content

Commit

Permalink
simplify task update scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandotsmith committed Oct 14, 2023
1 parent abb0dcc commit c875f3a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 73 deletions.
108 changes: 35 additions & 73 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) {
}

func (task *Task) Insert(n uint64, h []byte) error {
const q = `insert into e2pg.task (id, number, hash) values ($1, $2, $3)`
const q = `insert into e2pg.task (id, num, hash) values ($1, $2, $3)`
_, err := task.pgp.Exec(context.Background(), q, task.id, n, h)
return err
}

func (task *Task) Latest() (uint64, []byte, error) {
const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1`
const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
var n, h = uint64(0), []byte{}
err := task.pgp.QueryRow(context.Background(), q, task.id).Scan(&n, &h)
if errors.Is(err, pgx.ErrNoRows) {
Expand Down Expand Up @@ -275,7 +275,7 @@ func (task *Task) Converge(notx bool) error {
}
for reorgs := 0; reorgs <= 10; {
localNum, localHash := uint64(0), []byte{}
const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1`
const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
err := pg.QueryRow(task.ctx, q, task.id).Scan(&localNum, &localHash)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("getting latest from task: %w", err)
Expand Down Expand Up @@ -309,7 +309,7 @@ func (task *Task) Converge(notx bool) error {
case errors.Is(err, ErrReorg):
reorgs++
slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash))
const dq = "delete from e2pg.task where id = $1 AND number >= $2"
const dq = "delete from e2pg.task where id = $1 AND num >= $2"
_, err := pg.Exec(task.ctx, dq, task.id, localNum)
if err != nil {
return fmt.Errorf("deleting block from task table: %w", err)
Expand All @@ -327,7 +327,7 @@ func (task *Task) Converge(notx bool) error {
const uq = `
insert into e2pg.task (
id,
number,
num,
hash,
src_num,
src_hash,
Expand Down Expand Up @@ -449,92 +449,54 @@ func (d jsonDuration) String() string {
}

type TaskUpdate struct {
ID string
Num uint64
Hash eth.Bytes
SrcNum uint64
SrcHash eth.Bytes
NBlocks uint64
NRows uint64
Latency jsonDuration
Dstat map[string]Dstat
ID string `db:"id"`
Num uint64 `db:"num"`
Hash eth.Bytes `db:"hash"`
SrcNum uint64 `db:"src_num"`
SrcHash eth.Bytes `db:"src_hash"`
NBlocks uint64 `db:"nblocks"`
NRows uint64 `db:"nrows"`
Latency jsonDuration `db:"latency"`
Dstat map[string]Dstat `db:"dstat"`
}

func TaskUpdate1(ctx context.Context, pg wpg.Conn, id string) (TaskUpdate, error) {
const q = `
select
id,
number,
num,
hash,
coalesce(src_num, 0),
coalesce(src_hash, '\x00'),
coalesce(nblocks, 0),
coalesce(nrows, 0),
coalesce(latency, '0')::interval,
coalesce(dstat, '{}')
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
from e2pg.task
where id = $1
order by number desc
order by num desc
limit 1;
`
var tu = TaskUpdate{}
err := pg.QueryRow(ctx, q, id).Scan(
&tu.ID,
&tu.Num,
&tu.Hash,
&tu.SrcNum,
&tu.SrcHash,
&tu.NBlocks,
&tu.NRows,
&tu.Latency,
&tu.Dstat,
)
if err != nil {
return tu, fmt.Errorf("querying for task updates: %w", err)
}
return tu, nil
row, _ := pg.Query(ctx, q, id)
return pgx.CollectOneRow(row, pgx.RowToStructByName[TaskUpdate])
}

func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) {
const q = `
rows, _ := pg.Query(ctx, `
select distinct on (id)
id,
number,
num,
hash,
coalesce(src_num, 0),
coalesce(src_hash, '\x00'),
coalesce(nblocks, 0),
coalesce(nrows, 0),
coalesce(latency, '0')::interval,
coalesce(dstat, '{}')
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
from e2pg.task
order by id, number desc;
`
rows, err := pg.Query(ctx, q)
if err != nil {
return nil, fmt.Errorf("querying for task updates: %w", err)
}
defer rows.Close()
var tus []TaskUpdate
for rows.Next() {
var tu TaskUpdate
err := rows.Scan(
&tu.ID,
&tu.Num,
&tu.Hash,
&tu.SrcNum,
&tu.SrcHash,
&tu.NBlocks,
&tu.NRows,
&tu.Latency,
&tu.Dstat,
)
if err != nil {
return nil, fmt.Errorf("scanning task update: %w", err)
}
tus = append(tus, tu)
}
return tus, nil
order by id, num desc;
`)
return pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate])
}

var compiled = map[string]Destination{}
Expand Down
1 change: 1 addition & 0 deletions e2pg/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var Migrations = map[int]pgmig.Migration{
},
10: pgmig.Migration{
SQL: `
alter table e2pg.task rename column number to num;
alter table e2pg.task add column src_hash bytea;
alter table e2pg.task add column src_num numeric;
alter table e2pg.task add column nblocks numeric;
Expand Down

0 comments on commit c875f3a

Please sign in to comment.