Skip to content

Commit

Permalink
Merge branch 'silei/v0.32.1' of https://github.com/wukongcheng/tender…
Browse files Browse the repository at this point in the history
…mint into silei/mempool
  • Loading branch information
chengsilei committed Sep 3, 2019
2 parents 6017360 + a7dbf0f commit d910cf0
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 16 deletions.
40 changes: 25 additions & 15 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type CListMempool struct {
}

var _ Mempool = &CListMempool{}
var txNum = 0
var cliNum = 0

// CListMempoolOption sets an optional parameter on the mempool.
type CListMempoolOption func(*CListMempool)
Expand Down Expand Up @@ -209,25 +211,11 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
cliNum = cliNum + 1
return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}, false)
}

func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo, reactor bool) (err error) {
mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()

var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)
if memSize >= mem.config.Size ||
int64(len(tx))+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize, mem.config.Size,
txsBytes, mem.config.MaxTxsBytes}
}

// The size of the corresponding amino-encoded TxMessage
// can't be larger than the maxMsgSize, otherwise we can't
// relay it to peers.
Expand All @@ -241,6 +229,21 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t
}
}

//mem.proxyMtx.Lock()
//// use defer to unlock mutex because application (*local client*) might panic
//defer mem.proxyMtx.Unlock()

var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)
if memSize >= mem.config.Size ||
int64(len(tx))+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize, mem.config.Size,
txsBytes, mem.config.MaxTxsBytes}
}

// CACHE
if !mem.cache.Push(tx) {
// Record a new sender for a tx we've already seen.
Expand Down Expand Up @@ -345,6 +348,8 @@ func (mem *CListMempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abc
if externalCb != nil {
externalCb(res)
}

txNum = txNum + 1
}
}

Expand Down Expand Up @@ -475,6 +480,11 @@ func (mem *CListMempool) notifyTxsAvailable() {
}

func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
mem.logger.Info(fmt.Sprintf("TM got txs: %d", txNum))
mem.logger.Info(fmt.Sprintf("TM got cli: %d", cliNum))
txNum = 0
cliNum = 0

mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion p2p/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type NodeKey struct {
PrivKey crypto.PrivKey `json:"priv_key"` // our priv key
RSAPrivKey string `json:"rsa_priv_key"`
RSAPubkKey string `json:"rsa_pub_key"`
OrgKeys map[string]map[string]string `json:"org_keys"`
GroupKeys map[string]map[string]string `json:"group_keys"`
}

// ID returns the peer's canonical ID - the hash of its public key.
Expand Down
4 changes: 4 additions & 0 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, blockExec.db)
endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
blockExec.logger.Info(fmt.Sprintf("Block Processing Time: %f", float64(endTime-startTime) / 1000000))
if err != nil {
return state, ErrProxyAppConn(err)
}
Expand Down Expand Up @@ -306,13 +307,16 @@ func execBlockOnProxyApp(
return nil, err
}

startTime := time.Now().UnixNano()
// Run txs of block.
for _, tx := range block.Txs {
proxyAppConn.DeliverTxAsync(abci.RequestDeliverTx{Tx: tx})
if err := proxyAppConn.Error(); err != nil {
return nil, err
}
}
endTime := time.Now().UnixNano()
logger.Info(fmt.Sprintf("Run txs of block Time: %f, len(Txs): %d", float64(endTime-startTime) / 1000000, len(block.Txs)))

// End block.
abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
Expand Down

0 comments on commit d910cf0

Please sign in to comment.