Skip to content

Commit

Permalink
fix reorg pg tx bug. increase reorg depth limit
Browse files Browse the repository at this point in the history
As reported in #272, there was an issue where the system would become
unresponsive during a reorg. One clue was that a pg error that was given
to shovel indicating an idle-in-transaction timeout.

I reproduced this issue by running shovel on Polygon for a few days.
Eventually I noticed that my system encountered a reorg, attempted to
recover, and then eventually deadlocked. I used a live debugger to
inspect the running process and to my surprise I saw that the
shovel.Task was waiting on a semaphore in the pool. Here is the relevant
stack:

```
golang.org/x/sync/semaphore.(*Weighted).Acquire(0x140001802d0, {0x10590bd78, 0x1400012a450}, 0x1)
	/Users/r/go/pkg/mod/golang.org/x/[email protected]/semaphore/semaphore.go:74 +0x414
github.com/jackc/puddle/v2.(*Pool[...]).acquire(0x105918e20, {0x10590bd78, 0x1400012a450})
	/Users/r/go/pkg/mod/github.com/jackc/puddle/[email protected]/pool.go:350 +0x78
github.com/jackc/puddle/v2.(*Pool[...]).Acquire(0x105918e20, {0x10590bd78, 0x1400012a450})
	/Users/r/go/pkg/mod/github.com/jackc/puddle/[email protected]/pool.go:337 +0xc0
github.com/jackc/pgx/v5/pgxpool.(*Pool).Acquire(0x140001f00c0, {0x10590bd78?, 0x1400012a450?})
	/Users/r/go/pkg/mod/github.com/jackc/pgx/[email protected]/pgxpool/pool.go:525 +0xf4
github.com/jackc/pgx/v5/pgxpool.(*Pool).QueryRow(0x140001ac1e0?, {0x10590bd78, 0x1400012a450}, {0x105694449, 0x77}, {0x14000112100, 0x2, 0x2})
	/Users/r/go/pkg/mod/github.com/jackc/pgx/[email protected]/pgxpool/pool.go:652 +0x38
github.com/indexsupply/shovel/shovel.(*Task).latest(0x1400013e248, {0x10590bd78, 0x1400012a630}, {0x10590bcd0, 0x140001f00c0})
	/Users/r/src/shovel/shovel/task.go:298 +0x14c
github.com/indexsupply/shovel/shovel.(*Task).Converge(0x1400013e248)
	/Users/r/src/shovel/shovel/task.go:375 +0x29c
github.com/indexsupply/shovel/shovel.(*Manager).runTask(0x140001a5720, 0x1400013e248)
	/Users/r/src/shovel/shovel/task.go:726 +0x44
github.com/indexsupply/shovel/shovel.(*Manager).Run.func1()
	/Users/r/src/shovel/shovel/task.go:782 +0x44
created by github.com/indexsupply/shovel/shovel.(*Manager).Run in goroutine 42
	/Users/r/src/shovel/shovel/task.go:781 +0x230
```

I then used pg_stat_activity on the database to inspect the connection
state of shovel's PG connections. I learned that there were no
connections. I inspected the PG logs and found these:

```
2024-11-06 18:03:23.610 PST [74045] FATAL:  terminating connection due to idle-in-transaction timeout
```

Upon inspecting the code that starts database transactions for reorg I
noticed a critical bug. We defer a rollback if there is an error
deleting data during a reorg. But the defer push the noop function onto
the stack instead of the mutated rollback function.

This whole business of starting a database transaction only if we are in
a reorg is overly complicated. I don't think there is any negative
implication (other than maybe a millisecond to start a tx) for always
using a database transaction for doing our task_updates bookkeeping. It
makes the code much simpler to reason about.

After this change, I was able to restart Shovel and it worked through
the rerog and recovered back to a normal converge loop.

There is still an issue of the database pool not timing out or returning
an error when all its connections are terminated due to
idle-in-transaction errors. I will need to look more into this issue.
Perhaps we need to configure the pool differently.
  • Loading branch information
ryandotsmith committed Nov 7, 2024
1 parent 0648522 commit 51f0111
Showing 1 changed file with 14 additions and 25 deletions.
39 changes: 14 additions & 25 deletions shovel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,26 +353,14 @@ func (task *Task) Converge() error {
ctx = wctx.WithSrcHost(ctx, nextURL.Hostname())
ctx = wctx.WithCounter(ctx, &nrpc)

var (
pg wpg.Conn = task.pgp
commit = func() error { return nil }
rollback = func() error { return nil }

startReorgTx = sync.OnceFunc(func() {
slog.ErrorContext(ctx, "starting reorg db tx")
pgtx, err := task.pgp.Begin(ctx)
if err != nil {
panic("unable to start pg tx during reorg")
}
commit = func() error { return pgtx.Commit(ctx) }
rollback = func() error { return pgtx.Rollback(ctx) }
pg = pgtx
})
)
defer rollback()
pgtx, err := task.pgp.Begin(ctx)
if err != nil {
return fmt.Errorf("unable to start tx: %w", err)
}
defer pgtx.Rollback(ctx)

for reorgs := 0; reorgs <= 10; reorgs++ {
localNum, localHash, err := task.latest(ctx, pg)
for reorgs := 0; reorgs <= 1000; reorgs++ {
localNum, localHash, err := task.latest(ctx, pgtx)
if err != nil {
return fmt.Errorf("getting latest from task: %w", err)
}
Expand All @@ -389,7 +377,7 @@ func (task *Task) Converge() error {
)
switch {
case len(task.destConfig.Dependencies) > 0:
depNum, depHash, err := task.latestDependency(pg)
depNum, depHash, err := task.latestDependency(pgtx)
if err != nil {
return fmt.Errorf("getting latest from dependencies: %w", err)
}
Expand Down Expand Up @@ -431,18 +419,19 @@ func (task *Task) Converge() error {
"n", localNum,
"h", fmt.Sprintf("%.4x", localHash),
)
startReorgTx()
if err := task.Delete(pg, localNum); err != nil {
if err := task.Delete(pgtx, localNum); err != nil {
return fmt.Errorf("deleting during reorg: %w", err)
}
continue
}
if err != nil {
return fmt.Errorf("loading data: %w", err)
}
commit()
if err := pgtx.Commit(ctx); err != nil {
return fmt.Errorf("comitting task_updates tx: %w", err)
}

pgtx, err := task.pgp.Begin(ctx)
pgtx, err = task.pgp.Begin(ctx)
if err != nil {
return fmt.Errorf("starting insert pg tx: %w", err)
}
Expand All @@ -458,7 +447,7 @@ func (task *Task) Converge() error {
return fmt.Errorf("updating task: %w", err)
}
if err := pgtx.Commit(ctx); err != nil {
return fmt.Errorf("committing tx: %w", err)
return fmt.Errorf("committing task tx: %w", err)
}
slog.InfoContext(ctx, "converge",
"n", last.Num(),
Expand Down

0 comments on commit 51f0111

Please sign in to comment.