diff --git a/shovel/task.go b/shovel/task.go index 6a534d9..4bc6c6a 100644 --- a/shovel/task.go +++ b/shovel/task.go @@ -353,26 +353,14 @@ func (task *Task) Converge() error { ctx = wctx.WithSrcHost(ctx, nextURL.Hostname()) ctx = wctx.WithCounter(ctx, &nrpc) - var ( - pg wpg.Conn = task.pgp - commit = func() error { return nil } - rollback = func() error { return nil } - - startReorgTx = sync.OnceFunc(func() { - slog.ErrorContext(ctx, "starting reorg db tx") - pgtx, err := task.pgp.Begin(ctx) - if err != nil { - panic("unable to start pg tx during reorg") - } - commit = func() error { return pgtx.Commit(ctx) } - rollback = func() error { return pgtx.Rollback(ctx) } - pg = pgtx - }) - ) - defer rollback() + pgtx, err := task.pgp.Begin(ctx) + if err != nil { + return fmt.Errorf("unable to start tx: %w", err) + } + defer pgtx.Rollback(ctx) - for reorgs := 0; reorgs <= 10; reorgs++ { - localNum, localHash, err := task.latest(ctx, pg) + for reorgs := 0; reorgs <= 1000; reorgs++ { + localNum, localHash, err := task.latest(ctx, pgtx) if err != nil { return fmt.Errorf("getting latest from task: %w", err) } @@ -389,7 +377,7 @@ func (task *Task) Converge() error { ) switch { case len(task.destConfig.Dependencies) > 0: - depNum, depHash, err := task.latestDependency(pg) + depNum, depHash, err := task.latestDependency(pgtx) if err != nil { return fmt.Errorf("getting latest from dependencies: %w", err) } @@ -431,8 +419,7 @@ func (task *Task) Converge() error { "n", localNum, "h", fmt.Sprintf("%.4x", localHash), ) - startReorgTx() - if err := task.Delete(pg, localNum); err != nil { + if err := task.Delete(pgtx, localNum); err != nil { return fmt.Errorf("deleting during reorg: %w", err) } continue @@ -440,9 +427,11 @@ func (task *Task) Converge() error { if err != nil { return fmt.Errorf("loading data: %w", err) } - commit() + if err := pgtx.Commit(ctx); err != nil { + return fmt.Errorf("comitting task_updates tx: %w", err) + } - pgtx, err := task.pgp.Begin(ctx) + pgtx, err = task.pgp.Begin(ctx) if err != nil { return fmt.Errorf("starting insert pg tx: %w", err) } @@ -458,7 +447,7 @@ func (task *Task) Converge() error { return fmt.Errorf("updating task: %w", err) } if err := pgtx.Commit(ctx); err != nil { - return fmt.Errorf("committing tx: %w", err) + return fmt.Errorf("committing task tx: %w", err) } slog.InfoContext(ctx, "converge", "n", last.Num(),