From 005a2f78552d8695f48a9065d1a723433b86318d Mon Sep 17 00:00:00 2001 From: galaio <12880651+galaio@users.noreply.github.com> Date: Tue, 6 Aug 2024 16:45:17 +0800 Subject: [PATCH] pevm: opt slot trigger mechanism; (#24) * pevm: opt slot trigger mechanism; * txdag: opt execute stat; * pevm: opt slot trigger mechanism; * txdag: add txdag more validation logic; --------- Co-authored-by: galaio --- core/parallel_state_processor.go | 38 ++++++++++++++------------------ core/state/statedb.go | 6 ++++- core/types/dag.go | 36 ++++++++++++++++++++++++++++++ tests/block_test.go | 1 - 4 files changed, 58 insertions(+), 23 deletions(-) diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 27e1b5eb05..018e66f19a 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -267,7 +267,7 @@ func (p *ParallelStateProcessor) switchSlot(slotIndex int) { func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult { mIndex := p.mergedTxIndex.Load() conflictIndex := txReq.conflictIndex.Load() - if mIndex <= conflictIndex { + if mIndex < conflictIndex { // The conflicted TX has not been finished executing, skip execution. // the transaction failed at check(nonce or balance), actually it has not been executed yet. atomic.CompareAndSwapInt32(&txReq.runnable, 0, 1) @@ -657,6 +657,19 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga } p.mergedTxIndex.Store(int32(resultTxIndex)) + // trigger all slot to run left conflicted txs + for _, slot := range p.slotState { + var wakeupChan chan struct{} + if slot.activatedType == parallelPrimarySlot { + wakeupChan = slot.primaryWakeUpChan + } else { + wakeupChan = slot.shadowWakeUpChan + } + select { + case wakeupChan <- struct{}{}: + default: + } + } return result } @@ -724,28 +737,11 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat if txDAG == nil && len(p.bc.txDAGMapping) > 0 { txDAG = p.bc.txDAGMapping[block.NumberU64()] } - if txDAG != nil && txDAG.TxCount() != len(block.Transactions()) { - log.Warn("parallel process cannot apply the TxDAG with wrong txs length", - "block", block.NumberU64(), "txs", len(block.Transactions()), "txdag", txDAG.TxCount()) + if err := types.ValidateTxDAG(txDAG, len(block.Transactions())); err != nil { + log.Warn("pevm cannot apply wrong txdag", + "block", block.NumberU64(), "txs", len(block.Transactions()), "err", err) txDAG = nil } - // TODO(galaio): check TxDAG validation & excludedTxs in head and continuous - // we only support this format - // convert to normal plain txdag - //parallelIndex := -1 - //if txDAG != nil && txDAG.Type() == types.PlainTxDAGType { - // for i := range allTxs { - // if !txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) { - // if parallelIndex == -1 { - // parallelIndex = i - // } - // continue - // } - // if i > 0 && !txDAG.TxDep(i-1).CheckFlag(types.ExcludedTxFlag) { - // return nil, nil, 0, errors.New("cannot support non-continuous excludedTxs") - // } - // } - //} } txNum := len(allTxs) diff --git a/core/state/statedb.go b/core/state/statedb.go index 158df5e163..e48473467a 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -2409,7 +2409,11 @@ func (s *StateDB) StopTxStat(usedGas uint64) { } // record stat first if metrics.EnabledExpensive && s.stat != nil { - s.stat.Done().WithGas(usedGas).WithRead(len(s.rwSet.ReadSet())) + s.stat.Done().WithGas(usedGas) + rwSet := s.mvStates.RWSet(s.txIndex) + if rwSet != nil { + s.stat.WithRead(len(rwSet.ReadSet())) + } } } diff --git a/core/types/dag.go b/core/types/dag.go index 79e00b3680..76d543aff6 100644 --- a/core/types/dag.go +++ b/core/types/dag.go @@ -78,6 +78,42 @@ func DecodeTxDAG(enc []byte) (TxDAG, error) { } } +func ValidateTxDAG(d TxDAG, txCnt int) error { + if d == nil { + return nil + } + + switch d.Type() { + case EmptyTxDAGType: + return nil + case PlainTxDAGType: + return ValidatePlainTxDAG(d, txCnt) + default: + return fmt.Errorf("unsupported TxDAG type: %v", d.Type()) + } +} + +func ValidatePlainTxDAG(d TxDAG, txCnt int) error { + if d.TxCount() != txCnt { + return fmt.Errorf("PlainTxDAG contains wrong txs count, expect: %v, actual: %v", txCnt, d.TxCount()) + } + for i := 0; i < txCnt; i++ { + dep := d.TxDep(i) + if dep == nil { + return fmt.Errorf("PlainTxDAG contains nil txdep, tx: %v", i) + } + for j, tx := range dep.TxIndexes { + if tx >= uint64(i) || tx >= uint64(txCnt) { + return fmt.Errorf("PlainTxDAG contains the exceed range dependency, tx: %v", i) + } + if j > 0 && dep.TxIndexes[j] <= dep.TxIndexes[j-1] { + return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i) + } + } + } + return nil +} + // EmptyTxDAG indicate that execute txs in sequence // It means no transactions or need timely distribute transaction fees // it only keep partial serial execution when tx cannot delay the distribution or just execute txs in sequence diff --git a/tests/block_test.go b/tests/block_test.go index 1decda1d21..18bb00e98e 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -69,7 +69,6 @@ func TestBlockchainWithTxDAG(t *testing.T) { // t.Skip("test (randomly) skipped on 32-bit windows") // } // execBlockTestWithTxDAG(t, bt, test) - // //execBlockTest(t, bt, test) // }) //}) }