Skip to content

Commit

Permalink
pevm: opt slot trigger mechanism; (bnb-chain#24)
Browse files Browse the repository at this point in the history
* pevm: opt slot trigger mechanism;

* txdag: opt execute stat;

* pevm: opt slot trigger mechanism;

* txdag: add txdag more validation logic;

---------

Co-authored-by: galaio <[email protected]>
  • Loading branch information
2 people authored and sunny2022da committed Aug 6, 2024
1 parent 3c49b34 commit 005a2f7
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 23 deletions.
38 changes: 17 additions & 21 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (p *ParallelStateProcessor) switchSlot(slotIndex int) {
func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult {
mIndex := p.mergedTxIndex.Load()
conflictIndex := txReq.conflictIndex.Load()
if mIndex <= conflictIndex {
if mIndex < conflictIndex {
// The conflicted TX has not been finished executing, skip execution.
// the transaction failed at check(nonce or balance), actually it has not been executed yet.
atomic.CompareAndSwapInt32(&txReq.runnable, 0, 1)
Expand Down Expand Up @@ -657,6 +657,19 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga
}
p.mergedTxIndex.Store(int32(resultTxIndex))

// trigger all slot to run left conflicted txs
for _, slot := range p.slotState {
var wakeupChan chan struct{}
if slot.activatedType == parallelPrimarySlot {
wakeupChan = slot.primaryWakeUpChan
} else {
wakeupChan = slot.shadowWakeUpChan
}
select {
case wakeupChan <- struct{}{}:
default:
}
}
return result
}

Expand Down Expand Up @@ -724,28 +737,11 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
if txDAG == nil && len(p.bc.txDAGMapping) > 0 {
txDAG = p.bc.txDAGMapping[block.NumberU64()]
}
if txDAG != nil && txDAG.TxCount() != len(block.Transactions()) {
log.Warn("parallel process cannot apply the TxDAG with wrong txs length",
"block", block.NumberU64(), "txs", len(block.Transactions()), "txdag", txDAG.TxCount())
if err := types.ValidateTxDAG(txDAG, len(block.Transactions())); err != nil {
log.Warn("pevm cannot apply wrong txdag",
"block", block.NumberU64(), "txs", len(block.Transactions()), "err", err)
txDAG = nil
}
// TODO(galaio): check TxDAG validation & excludedTxs in head and continuous
// we only support this format
// convert to normal plain txdag
//parallelIndex := -1
//if txDAG != nil && txDAG.Type() == types.PlainTxDAGType {
// for i := range allTxs {
// if !txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) {
// if parallelIndex == -1 {
// parallelIndex = i
// }
// continue
// }
// if i > 0 && !txDAG.TxDep(i-1).CheckFlag(types.ExcludedTxFlag) {
// return nil, nil, 0, errors.New("cannot support non-continuous excludedTxs")
// }
// }
//}
}

txNum := len(allTxs)
Expand Down
6 changes: 5 additions & 1 deletion core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2409,7 +2409,11 @@ func (s *StateDB) StopTxStat(usedGas uint64) {
}
// record stat first
if metrics.EnabledExpensive && s.stat != nil {
s.stat.Done().WithGas(usedGas).WithRead(len(s.rwSet.ReadSet()))
s.stat.Done().WithGas(usedGas)
rwSet := s.mvStates.RWSet(s.txIndex)
if rwSet != nil {
s.stat.WithRead(len(rwSet.ReadSet()))
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions core/types/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,42 @@ func DecodeTxDAG(enc []byte) (TxDAG, error) {
}
}

func ValidateTxDAG(d TxDAG, txCnt int) error {
if d == nil {
return nil
}

switch d.Type() {
case EmptyTxDAGType:
return nil
case PlainTxDAGType:
return ValidatePlainTxDAG(d, txCnt)
default:
return fmt.Errorf("unsupported TxDAG type: %v", d.Type())
}
}

func ValidatePlainTxDAG(d TxDAG, txCnt int) error {
if d.TxCount() != txCnt {
return fmt.Errorf("PlainTxDAG contains wrong txs count, expect: %v, actual: %v", txCnt, d.TxCount())
}
for i := 0; i < txCnt; i++ {
dep := d.TxDep(i)
if dep == nil {
return fmt.Errorf("PlainTxDAG contains nil txdep, tx: %v", i)
}
for j, tx := range dep.TxIndexes {
if tx >= uint64(i) || tx >= uint64(txCnt) {
return fmt.Errorf("PlainTxDAG contains the exceed range dependency, tx: %v", i)
}
if j > 0 && dep.TxIndexes[j] <= dep.TxIndexes[j-1] {
return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i)
}
}
}
return nil
}

// EmptyTxDAG indicate that execute txs in sequence
// It means no transactions or need timely distribute transaction fees
// it only keep partial serial execution when tx cannot delay the distribution or just execute txs in sequence
Expand Down
1 change: 0 additions & 1 deletion tests/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func TestBlockchainWithTxDAG(t *testing.T) {
// t.Skip("test (randomly) skipped on 32-bit windows")
// }
// execBlockTestWithTxDAG(t, bt, test)
// //execBlockTest(t, bt, test)
// })
//})
}
Expand Down

0 comments on commit 005a2f7

Please sign in to comment.