Skip to content

Commit

Permalink
check if tx is inserted to block during waiting time
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-cha committed Dec 10, 2024
1 parent e252c7f commit eb9c37c
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions node/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package broadcaster

import (
"encoding/hex"
"fmt"
"slices"
"sync"
Expand Down Expand Up @@ -147,15 +148,44 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
}

pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC()

// if we have pending txs, wait until timeout
if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) {
waitingTime := timeoutTime.Sub(lastBlockTime)
timer := time.NewTimer(waitingTime)
defer timer.Stop()

ctx.Logger().Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime))
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:

pollingTimer := time.NewTicker(ctx.PollingInterval())
defer pollingTimer.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
break
case <-pollingTimer.C:
}

if len(pendingTxs) == 0 {
return nil
}

txHash, err := hex.DecodeString(pendingTxs[0].TxHash)
if err != nil {
return err
}

res, err := b.rpcClient.QueryTx(ctx, txHash)
if err == nil && res != nil {
err = DeletePendingTx(b.db, pendingTxs[0])
if err != nil {
return err
}
pendingTxs = pendingTxs[1:]
}
}
}

Expand Down

0 comments on commit eb9c37c

Please sign in to comment.