Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2pg: remove task id #187

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions abi2/abi2.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (ig Integration) Insert(ctx context.Context, pg wpg.Conn, blocks []eth.Bloc
err error
skip bool
rows [][]any
lwc = &logWithCtx{ctx: ctx}
lwc = &logWithCtx{ctx: wctx.WithIntgName(ctx, ig.Name())}
)
for bidx := range blocks {
lwc.b = &blocks[bidx]
Expand Down Expand Up @@ -693,8 +693,10 @@ type logWithCtx struct {

func (lwc *logWithCtx) get(name string) any {
switch name {
case "task_id":
return wctx.TaskID(lwc.ctx)
case "src_name":
return wctx.SrcName(lwc.ctx)
case "intg_name":
return wctx.IntgName(lwc.ctx)
case "chain_id":
return wctx.ChainID(lwc.ctx)
case "block_hash":
Expand Down
9 changes: 1 addition & 8 deletions cmd/e2pg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ func main() {
}
return "chain", fmt.Sprintf("%.5d", id)
})
lh.RegisterContext(func(ctx context.Context) (string, any) {
id := wctx.TaskID(ctx)
if id == "" {
return "", nil
}
return "task", id
})
slog.SetDefault(slog.New(lh.WithAttrs([]slog.Attr{
slog.Int("p", os.Getpid()),
slog.String("v", Commit),
Expand Down Expand Up @@ -126,7 +119,7 @@ func main() {
check(pprof.StartCPUProfile(&pbuf))
}

go mgr.Run()
check(mgr.Run())

switch profile {
case "cpu":
Expand Down
174 changes: 87 additions & 87 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
var Schema string

type Source interface {
ChainID() uint64
LoadBlocks([][]byte, []geth.Buffer, []eth.Block) error
Latest() (uint64, []byte, error)
Hash(uint64) ([]byte, error)
Expand All @@ -47,34 +46,22 @@ type Destination interface {

type Option func(t *Task)

func WithName(name string) Option {
return func(t *Task) {
t.Name = name
}
}

func WithSource(s Source) Option {
func WithSource(chainID uint64, name string, src Source) Option {
return func(t *Task) {
if t.src != nil {
panic("task can only have 1 src")
}
t.src = s
t.ctx = wctx.WithChainID(t.ctx, s.ChainID())
t.id = fmt.Sprintf("%d-main", t.src.ChainID())
t.ctx = wctx.WithTaskID(t.ctx, t.id)
t.src = src
t.srcName = name
t.chainID = chainID
t.ctx = wctx.WithChainID(t.ctx, chainID)
t.ctx = wctx.WithSrcName(t.ctx, name)
}
}

func WithBackfillSource(s Source, name string) Option {
func WithBackfill(b bool) Option {
return func(t *Task) {
if t.src != nil {
panic("task can only have 1 src")
}
t.backfill = true
t.src = s
t.ctx = wctx.WithChainID(t.ctx, s.ChainID())
t.id = fmt.Sprintf("%d-backfill-%s", t.src.ChainID(), name)
t.ctx = wctx.WithTaskID(t.ctx, t.id)
t.backfill = b
}
}

Expand Down Expand Up @@ -143,12 +130,13 @@ type Dstat struct {
}

type Task struct {
Name string
ctx context.Context
id string
backfill bool

src Source
src Source
srcName string
chainID uint64

pgp *pgxpool.Pool
dests []Destination
start, stop uint64
Expand All @@ -174,15 +162,30 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) {
}

func (task *Task) Insert(n uint64, h []byte) error {
const q = `insert into e2pg.task (id, num, hash) values ($1, $2, $3)`
_, err := task.pgp.Exec(context.Background(), q, task.id, n, h)
const q = `
insert into e2pg.task (src_name, backfill, num, hash)
values ($1, $2, $3, $4)
`
_, err := task.pgp.Exec(task.ctx, q,
task.srcName,
task.backfill,
n,
h,
)
return err
}

func (task *Task) Latest() (uint64, []byte, error) {
const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
const q = `
select num, hash
from e2pg.task
where src_name = $1
and backfill = $2
order by num desc
limit 1
`
var n, h = uint64(0), []byte{}
err := task.pgp.QueryRow(context.Background(), q, task.id).Scan(&n, &h)
err := task.pgp.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&n, &h)
if errors.Is(err, pgx.ErrNoRows) {
return n, nil, nil
}
Expand Down Expand Up @@ -216,7 +219,7 @@ func (task *Task) Setup() error {
return task.Insert(gethNum-1, h)
}

func (task *Task) Run1(updates chan<- string, notx bool) {
func (task *Task) Run1(updates chan<- uint64, notx bool) {
switch err := task.Converge(notx); {
case errors.Is(err, ErrDone):
return
Expand All @@ -230,7 +233,7 @@ func (task *Task) Run1(updates chan<- string, notx bool) {
// try out best to deliver update
// but don't stack up work
select {
case updates <- task.id:
case updates <- task.chainID:
default:
}
}()
Expand Down Expand Up @@ -266,15 +269,22 @@ func (task *Task) Converge(notx bool) error {
pg = wpg.NewTxLocker(pgTx)
//crc32(task) == 1384045349
const lockq = `select pg_advisory_xact_lock(1384045349, $1)`
_, err = pg.Exec(task.ctx, lockq, wctx.ChainID(task.ctx))
_, err = pg.Exec(task.ctx, lockq, task.chainID)
if err != nil {
return fmt.Errorf("task lock %s: %w", task.id, err)
return fmt.Errorf("task lock %d: %w", task.chainID, err)
}
}
for reorgs := 0; reorgs <= 10; {
localNum, localHash := uint64(0), []byte{}
const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
err := pg.QueryRow(task.ctx, q, task.id).Scan(&localNum, &localHash)
const q = `
select num, hash
from e2pg.task
where src_name = $1
and backfill = $2
order by num desc
limit 1
`
err := pg.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&localNum, &localHash)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("getting latest from task: %w", err)
}
Expand Down Expand Up @@ -307,8 +317,13 @@ func (task *Task) Converge(notx bool) error {
case errors.Is(err, ErrReorg):
reorgs++
slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash))
const dq = "delete from e2pg.task where id = $1 AND num >= $2"
_, err := pg.Exec(task.ctx, dq, task.id, localNum)
const dq = `
delete from e2pg.task
where src_name = $1
and backfill = $2
and num >= $3
`
_, err := pg.Exec(task.ctx, dq, task.srcName, task.backfill, localNum)
if err != nil {
return fmt.Errorf("deleting block from task table: %w", err)
}
Expand All @@ -324,7 +339,8 @@ func (task *Task) Converge(notx bool) error {
var last = task.batch[delta-1]
const uq = `
insert into e2pg.task (
id,
src_name,
backfill,
num,
hash,
src_num,
Expand All @@ -334,10 +350,11 @@ func (task *Task) Converge(notx bool) error {
latency,
dstat
)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`
_, err := pg.Exec(task.ctx, uq,
task.id,
task.srcName,
task.backfill,
last.Num(),
last.Hash(),
gethNum,
Expand Down Expand Up @@ -447,56 +464,40 @@ func (d jsonDuration) String() string {
}

type TaskUpdate struct {
ID string `db:"id"`
Num uint64 `db:"num"`
Hash eth.Bytes `db:"hash"`
SrcNum uint64 `db:"src_num"`
SrcHash eth.Bytes `db:"src_hash"`
NBlocks uint64 `db:"nblocks"`
NRows uint64 `db:"nrows"`
Latency jsonDuration `db:"latency"`
Dstat map[string]Dstat `db:"dstat"`
}

func TaskUpdate1(ctx context.Context, pg wpg.Conn, id string) (TaskUpdate, error) {
const q = `
select
id,
num,
hash,
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
from e2pg.task
where id = $1
order by num desc
limit 1;
`
row, _ := pg.Query(ctx, q, id)
return pgx.CollectOneRow(row, pgx.RowToStructByName[TaskUpdate])
SrcName string `db:"src_name"`
Backfill bool `db:"backfill"`
Num uint64 `db:"num"`
Hash eth.Bytes `db:"hash"`
SrcNum uint64 `db:"src_num"`
SrcHash eth.Bytes `db:"src_hash"`
NBlocks uint64 `db:"nblocks"`
NRows uint64 `db:"nrows"`
Latency jsonDuration `db:"latency"`
Dstat map[string]Dstat `db:"dstat"`
}

func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) {
rows, _ := pg.Query(ctx, `
with f as (
select id, max(num) num
from e2pg.task group by 1
select src_name, backfill, max(num) num
from e2pg.task group by 1, 2
)
select
f.id,
f.num,
hash,
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
f.src_name,
f.backfill,
f.num,
hash,
coalesce(src_num, 0) src_num,
coalesce(src_hash, '\x00') src_hash,
coalesce(nblocks, 0) nblocks,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency,
coalesce(dstat, '{}') dstat
from f
left join e2pg.task on e2pg.task.id = f.id and e2pg.task.num = f.num;
left join e2pg.task
on e2pg.task.src_name = f.src_name
and e2pg.task.backfill = f.backfill
and e2pg.task.num = f.num;
`)
return pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate])
}
Expand All @@ -509,21 +510,21 @@ type Manager struct {
running sync.Mutex
restart chan struct{}
tasks []*Task
updates chan string
updates chan uint64
pgp *pgxpool.Pool
conf Config
}

func NewManager(pgp *pgxpool.Pool, conf Config) *Manager {
return &Manager{
restart: make(chan struct{}),
updates: make(chan string),
updates: make(chan uint64),
pgp: pgp,
conf: conf,
}
}

func (tm *Manager) Updates() string {
func (tm *Manager) Updates() uint64 {
return <-tm.updates
}

Expand All @@ -534,7 +535,7 @@ func (tm *Manager) runTask(t *Task) error {
for {
select {
case <-tm.restart:
slog.Info("restart-task", "name", t.Name)
slog.Info("restart-task", "chain", t.chainID)
return nil
default:
t.Run1(tm.updates, false)
Expand Down Expand Up @@ -613,8 +614,7 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er
}
dests := destBySourceName[sc.Name]
tasks = append(tasks, NewTask(
WithName(sc.Name),
WithSource(src),
WithSource(sc.ChainID, sc.Name, src),
WithPG(pgp),
WithRange(sc.Start, sc.Stop),
WithConcurrency(1, 512),
Expand Down
Loading