Skip to content

Commit

Permalink
e2pg: remove task id
Browse files Browse the repository at this point in the history
This change is needed for the new backfill model. The goal is to remove
the notion of creating tasks from E2PG's API. Instead, E2PG will use the
config to determine which tasks need to be created. Each ETH source will
have a main task and a backfill task. A task will be identified by it's
src_name and backfill columns.
  • Loading branch information
ryandotsmith committed Oct 24, 2023
1 parent ad967d4 commit 62d17de
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 146 deletions.
8 changes: 5 additions & 3 deletions abi2/abi2.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (ig Integration) Insert(ctx context.Context, pg wpg.Conn, blocks []eth.Bloc
err error
skip bool
rows [][]any
lwc = &logWithCtx{ctx: ctx}
lwc = &logWithCtx{ctx: wctx.WithIntgName(ctx, ig.Name())}
)
for bidx := range blocks {
lwc.b = &blocks[bidx]
Expand Down Expand Up @@ -693,8 +693,10 @@ type logWithCtx struct {

func (lwc *logWithCtx) get(name string) any {
switch name {
case "task_id":
return wctx.TaskID(lwc.ctx)
case "src_name":
return wctx.SrcName(lwc.ctx)
case "intg_name":
return wctx.IntgName(lwc.ctx)
case "chain_id":
return wctx.ChainID(lwc.ctx)
case "block_hash":
Expand Down
9 changes: 1 addition & 8 deletions cmd/e2pg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ func main() {
}
return "chain", fmt.Sprintf("%.5d", id)
})
lh.RegisterContext(func(ctx context.Context) (string, any) {
id := wctx.TaskID(ctx)
if id == "" {
return "", nil
}
return "task", id
})
slog.SetDefault(slog.New(lh.WithAttrs([]slog.Attr{
slog.Int("p", os.Getpid()),
slog.String("v", Commit),
Expand Down Expand Up @@ -126,7 +119,7 @@ func main() {
check(pprof.StartCPUProfile(&pbuf))
}

go mgr.Run()
check(mgr.Run())

switch profile {
case "cpu":
Expand Down
174 changes: 87 additions & 87 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
var Schema string

type Source interface {
ChainID() uint64
LoadBlocks([][]byte, []geth.Buffer, []eth.Block) error
Latest() (uint64, []byte, error)
Hash(uint64) ([]byte, error)
Expand All @@ -47,34 +46,22 @@ type Destination interface {

type Option func(t *Task)

func WithName(name string) Option {
return func(t *Task) {
t.Name = name
}
}

func WithSource(s Source) Option {
func WithSource(chainID uint64, name string, src Source) Option {
return func(t *Task) {
if t.src != nil {
panic("task can only have 1 src")
}
t.src = s
t.ctx = wctx.WithChainID(t.ctx, s.ChainID())
t.id = fmt.Sprintf("%d-main", t.src.ChainID())
t.ctx = wctx.WithTaskID(t.ctx, t.id)
t.src = src
t.srcName = name
t.chainID = chainID
t.ctx = wctx.WithChainID(t.ctx, chainID)
t.ctx = wctx.WithSrcName(t.ctx, name)
}
}

func WithBackfillSource(s Source, name string) Option {
func WithBackfill(b bool) Option {
return func(t *Task) {
if t.src != nil {
panic("task can only have 1 src")
}
t.backfill = true
t.src = s
t.ctx = wctx.WithChainID(t.ctx, s.ChainID())
t.id = fmt.Sprintf("%d-backfill-%s", t.src.ChainID(), name)
t.ctx = wctx.WithTaskID(t.ctx, t.id)
t.backfill = b
}
}

Expand Down Expand Up @@ -143,12 +130,13 @@ type Dstat struct {
}

type Task struct {
Name string
ctx context.Context
id string
backfill bool

src Source
src Source
srcName string
chainID uint64

pgp *pgxpool.Pool
dests []Destination
start, stop uint64
Expand All @@ -174,15 +162,30 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) {
}

func (task *Task) Insert(n uint64, h []byte) error {
const q = `insert into e2pg.task (id, num, hash) values ($1, $2, $3)`
_, err := task.pgp.Exec(context.Background(), q, task.id, n, h)
const q = `
insert into e2pg.task (src_name, backfill, num, hash)
values ($1, $2, $3, $4)
`
_, err := task.pgp.Exec(task.ctx, q,
task.srcName,
task.backfill,
n,
h,
)
return err
}

func (task *Task) Latest() (uint64, []byte, error) {
const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
const q = `
select num, hash
from e2pg.task
where src_name = $1
and backfill = $2
order by num desc
limit 1
`
var n, h = uint64(0), []byte{}
err := task.pgp.QueryRow(context.Background(), q, task.id).Scan(&n, &h)
err := task.pgp.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&n, &h)
if errors.Is(err, pgx.ErrNoRows) {
return n, nil, nil
}
Expand Down Expand Up @@ -216,7 +219,7 @@ func (task *Task) Setup() error {
return task.Insert(gethNum-1, h)
}

func (task *Task) Run1(updates chan<- string, notx bool) {
func (task *Task) Run1(updates chan<- uint64, notx bool) {
switch err := task.Converge(notx); {
case errors.Is(err, ErrDone):
return
Expand All @@ -230,7 +233,7 @@ func (task *Task) Run1(updates chan<- string, notx bool) {
// try out best to deliver update
// but don't stack up work
select {
case updates <- task.id:
case updates <- task.chainID:
default:
}
}()
Expand Down Expand Up @@ -266,15 +269,22 @@ func (task *Task) Converge(notx bool) error {
pg = wpg.NewTxLocker(pgTx)
//crc32(task) == 1384045349
const lockq = `select pg_advisory_xact_lock(1384045349, $1)`
_, err = pg.Exec(task.ctx, lockq, wctx.ChainID(task.ctx))
_, err = pg.Exec(task.ctx, lockq, task.chainID)
if err != nil {
return fmt.Errorf("task lock %s: %w", task.id, err)
return fmt.Errorf("task lock %d: %w", task.chainID, err)
}
}
for reorgs := 0; reorgs <= 10; {
localNum, localHash := uint64(0), []byte{}
const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
err := pg.QueryRow(task.ctx, q, task.id).Scan(&localNum, &localHash)
const q = `
select num, hash
from e2pg.task
where src_name = $1
and backfill = $2
order by num desc
limit 1
`
err := pg.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&localNum, &localHash)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("getting latest from task: %w", err)
}
Expand Down Expand Up @@ -307,8 +317,13 @@ func (task *Task) Converge(notx bool) error {
case errors.Is(err, ErrReorg):
reorgs++
slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash))
const dq = "delete from e2pg.task where id = $1 AND num >= $2"
_, err := pg.Exec(task.ctx, dq, task.id, localNum)
const dq = `
delete from e2pg.task
where src_name = $1
and backfill = $2
and num >= $3
`
_, err := pg.Exec(task.ctx, dq, task.srcName, task.backfill, localNum)
if err != nil {
return fmt.Errorf("deleting block from task table: %w", err)
}
Expand All @@ -324,7 +339,8 @@ func (task *Task) Converge(notx bool) error {
var last = task.batch[delta-1]
const uq = `
insert into e2pg.task (
id,
src_name,
backfill,
num,
hash,
src_num,
Expand All @@ -334,10 +350,11 @@ func (task *Task) Converge(notx bool) error {
latency,
dstat
)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`
_, err := pg.Exec(task.ctx, uq,
task.id,
task.srcName,
task.backfill,
last.Num(),
last.Hash(),
gethNum,
Expand Down Expand Up @@ -447,56 +464,40 @@ func (d jsonDuration) String() string {
}

type TaskUpdate struct {
ID string `db:"id"`
Num uint64 `db:"num"`
Hash eth.Bytes `db:"hash"`
SrcNum uint64 `db:"src_num"`
SrcHash eth.Bytes `db:"src_hash"`
NBlocks uint64 `db:"nblocks"`
NRows uint64 `db:"nrows"`
Latency jsonDuration `db:"latency"`
Dstat map[string]Dstat `db:"dstat"`
}

func TaskUpdate1(ctx context.Context, pg wpg.Conn, id string) (TaskUpdate, error) {
const q = `
select
id,
num,
hash,
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
from e2pg.task
where id = $1
order by num desc
limit 1;
`
row, _ := pg.Query(ctx, q, id)
return pgx.CollectOneRow(row, pgx.RowToStructByName[TaskUpdate])
SrcName string `db:"src_name"`
Backfill bool `db:"backfill"`
Num uint64 `db:"num"`
Hash eth.Bytes `db:"hash"`
SrcNum uint64 `db:"src_num"`
SrcHash eth.Bytes `db:"src_hash"`
NBlocks uint64 `db:"nblocks"`
NRows uint64 `db:"nrows"`
Latency jsonDuration `db:"latency"`
Dstat map[string]Dstat `db:"dstat"`
}

func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) {
rows, _ := pg.Query(ctx, `
with f as (
select id, max(num) num
from e2pg.task group by 1
select src_name, backfill, max(num) num
from e2pg.task group by 1, 2
)
select
f.id,
f.num,
hash,
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
f.src_name,
f.backfill,
f.num,
hash,
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
from f
left join e2pg.task on e2pg.task.id = f.id and e2pg.task.num = f.num;
left join e2pg.task
on e2pg.task.src_name = f.src_name
and e2pg.task.backfill = f.backfill
and e2pg.task.num = f.num;
`)
return pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate])
}
Expand All @@ -509,21 +510,21 @@ type Manager struct {
running sync.Mutex
restart chan struct{}
tasks []*Task
updates chan string
updates chan uint64
pgp *pgxpool.Pool
conf Config
}

func NewManager(pgp *pgxpool.Pool, conf Config) *Manager {
return &Manager{
restart: make(chan struct{}),
updates: make(chan string),
updates: make(chan uint64),
pgp: pgp,
conf: conf,
}
}

func (tm *Manager) Updates() string {
func (tm *Manager) Updates() uint64 {
return <-tm.updates
}

Expand All @@ -534,7 +535,7 @@ func (tm *Manager) runTask(t *Task) error {
for {
select {
case <-tm.restart:
slog.Info("restart-task", "name", t.Name)
slog.Info("restart-task", "chain", t.chainID)
return nil
default:
t.Run1(tm.updates, false)
Expand Down Expand Up @@ -613,8 +614,7 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er
}
dests := destBySourceName[sc.Name]
tasks = append(tasks, NewTask(
WithName(sc.Name),
WithSource(src),
WithSource(sc.ChainID, sc.Name, src),
WithPG(pgp),
WithRange(sc.Start, sc.Stop),
WithConcurrency(1, 512),
Expand Down
Loading

0 comments on commit 62d17de

Please sign in to comment.