diff --git a/node/broadcaster/broadcaster.go b/node/broadcaster/broadcaster.go index 123ce86..efbbd8f 100644 --- a/node/broadcaster/broadcaster.go +++ b/node/broadcaster/broadcaster.go @@ -1,6 +1,7 @@ package broadcaster import ( + "encoding/hex" "fmt" "slices" "sync" @@ -147,15 +148,44 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las } pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC() + // if we have pending txs, wait until timeout if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) { waitingTime := timeoutTime.Sub(lastBlockTime) timer := time.NewTimer(waitingTime) + defer timer.Stop() + ctx.Logger().Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime)) - select { - case <-ctx.Done(): - return ctx.Err() - case <-timer.C: + + pollingTimer := time.NewTicker(ctx.PollingInterval()) + defer pollingTimer.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + break + case <-pollingTimer.C: + } + + if len(pendingTxs) == 0 { + return nil + } + + txHash, err := hex.DecodeString(pendingTxs[0].TxHash) + if err != nil { + return err + } + + res, err := b.rpcClient.QueryTx(ctx, txHash) + if err == nil && res != nil { + err = DeletePendingTx(b.db, pendingTxs[0]) + if err != nil { + return err + } + pendingTxs = pendingTxs[1:] + } } }