Skip to content

Commit

Permalink
Merge pull request #1 from wukongcheng/silei/v0.32.1
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
Tri-stone authored Jan 7, 2020
2 parents 54b7925 + 0be442e commit f9b7fd8
Show file tree
Hide file tree
Showing 23 changed files with 213 additions and 45 deletions.
1 change: 1 addition & 0 deletions abci/types/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion abci/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ message RequestBeginBlock {
Header header = 2 [(gogoproto.nullable)=false];
LastCommitInfo last_commit_info = 3 [(gogoproto.nullable)=false];
repeated Evidence byzantine_validators = 4 [(gogoproto.nullable)=false];
repeated bytes txs = 5;
}

enum CheckTxType {
Expand Down Expand Up @@ -198,7 +199,7 @@ message ResponseDeliverTx {
uint32 code = 1;
bytes data = 2;
string log = 3; // nondeterministic
repeated VMEvent events = 9;
repeated VMEvent vmevents = 9;
string info = 4; // nondeterministic
int64 gas_wanted = 5;
int64 gas_used = 6;
Expand Down
9 changes: 9 additions & 0 deletions abci/types/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,12 @@ func GetDefaultTags(events []Event) []common.KVPair {
}
return nil
}

func GetAllTags(events []Event) (pairs []common.KVPair) {
for _, v := range events {
ps := make([]common.KVPair, len(v.Attributes))
copy(ps, v.Attributes)
pairs = append(pairs, ps...)
}
return pairs
}
2 changes: 1 addition & 1 deletion blockchain/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ FOR_LOOP:
bcR.pool.PopRequest()

// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
bcR.store.SaveBlock(first, firstParts, second.LastCommit, true)

// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
Expand Down
49 changes: 42 additions & 7 deletions blockchain/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block {
return block
}

// LoadBlockData returns all the block datas with the given height.
// If no block is found for that height, it returns false.
func (bs *BlockStore) LoadWholeBlock(height int64) (found bool, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
// load block meta
var blockMeta = bs.LoadBlockMeta(height)
if blockMeta == nil {
found = false
return
}

// build partset
blockParts = types.NewPartSetFromHeader(blockMeta.BlockID.PartsHeader)

// load block
block = new(types.Block)
buf := []byte{}
for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ {
part := bs.LoadBlockPart(height, i)
buf = append(buf, part.Bytes...)

blockParts.AddPart(part)
}
err := cdc.UnmarshalBinaryLengthPrefixed(buf, block)
if err != nil {
// NOTE: The existence of meta should imply the existence of the
// block. So, make sure meta is only saved after blocks are saved.
panic(cmn.ErrorWrap(err, "Error reading block"))
}

seenCommit = bs.LoadSeenCommit(height)

found = true
return
}

// LoadBlockPart returns the Part at the given index
// from the block at the given height.
// If no part is found for the given height and index, it returns nil.
Expand Down Expand Up @@ -142,12 +177,12 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit {
// If all the nodes restart after committing a block,
// we need this to reload the precommits to catch-up nodes to the
// most recent height. Otherwise they'd stall at H-1.
func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit, checkContiguous bool) {
if block == nil {
panic("BlockStore can only save a non-nil block")
}
height := block.Height
if g, w := height, bs.Height()+1; g != w {
if g, w := height, bs.Height()+1; checkContiguous && g != w {
panic(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g))
}
if !blockParts.IsComplete() {
Expand All @@ -162,7 +197,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
// Save block parts
for i := 0; i < blockParts.Total(); i++ {
part := blockParts.GetPart(i)
bs.saveBlockPart(height, i, part)
bs.saveBlockPart(height, i, part, checkContiguous)
}

// Save block commit (duplicate and separate from the Block)
Expand All @@ -189,9 +224,9 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
func (bs *BlockStore) RetreatLastBlock() {
height := bs.height
bs.db.Delete(calcBlockMetaKey(height))
bs.db.Delete(calcBlockCommitKey(height-1))
bs.db.Delete(calcBlockCommitKey(height - 1))
bs.db.Delete(calcSeenCommitKey(height))
BlockStoreStateJSON{Height: height-1 }.Save(bs.db)
BlockStoreStateJSON{Height: height - 1}.Save(bs.db)
// Done!
bs.mtx.Lock()
bs.height = height
Expand All @@ -200,8 +235,8 @@ func (bs *BlockStore) RetreatLastBlock() {
bs.db.SetSync(nil, nil)
}

func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) {
if height != bs.Height()+1 {
func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part, checkContiguous bool) {
if checkContiguous && height != bs.Height()+1 {
panic(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height))
}
partBytes := cdc.MustMarshalBinaryBare(part)
Expand Down
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,14 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Recheck: true,
Recheck: false,
Broadcast: true,
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
Size: 20000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
CacheSize: 20000,
}
}

Expand Down Expand Up @@ -706,7 +706,7 @@ type ConsensusConfig struct {
func DefaultConsensusConfig() *ConsensusConfig {
return &ConsensusConfig{
WalPath: filepath.Join(defaultDataDir, "cs.wal", "wal"),
TimeoutPropose: 3000 * time.Millisecond,
TimeoutPropose: 1000 * time.Millisecond,
TimeoutProposeDelta: 500 * time.Millisecond,
TimeoutPrevote: 1000 * time.Millisecond,
TimeoutPrevoteDelta: 500 * time.Millisecond,
Expand Down
11 changes: 9 additions & 2 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ var (
msgQueueSize = 1000
)

var (
Switch *p2p.Switch
)

// msgs from the reactor which may update the state
type msgInfo struct {
Msg ConsensusMessage `json:"msg"`
Expand Down Expand Up @@ -136,7 +140,7 @@ type ConsensusState struct {
evsw tmevents.EventSwitch

// for reporting metrics
metrics *Metrics
metrics *Metrics
Deprecated bool
}

Expand Down Expand Up @@ -1267,6 +1271,9 @@ func (cs *ConsensusState) tryFinalizeCommit(height int64) {

// go
cs.finalizeCommit(height)

// check if peer needs to be removed
Switch.CheckPeers()
}

// Increment height and goto cstypes.RoundStepNewHeight
Expand Down Expand Up @@ -1304,7 +1311,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// but may differ from the LastCommit included in the next block
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
cs.blockStore.SaveBlock(block, blockParts, seenCommit, true)
} else {
// Happens during replay if we already saved the block but didn't commit
cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height)
Expand Down
32 changes: 27 additions & 5 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID})
return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}, false)
}

func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo, reactor bool) (err error) {
mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()
Expand Down Expand Up @@ -279,8 +279,24 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t
return err
}

reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, cb))
if !reactor {
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, cb))
} else {
memTx := &mempoolTx{
height: mem.height,
gasWanted: 0,
tx: tx,
}
memTx.senders.Store(txInfo.SenderID, true)
mem.addTx(memTx)
mem.logger.Debug("Added good transaction",
"tx", txID(tx),
"height", memTx.height,
"total", mem.Size(),
)
mem.notifyTxsAvailable()
}

return nil
}
Expand Down Expand Up @@ -374,7 +390,7 @@ func (mem *CListMempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Resp
}
memTx.senders.Store(peerID, true)
mem.addTx(memTx)
mem.logger.Info("Added good transaction",
mem.logger.Debug("Added good transaction",
"tx", txID(tx),
"res", r,
"height", memTx.height,
Expand Down Expand Up @@ -467,6 +483,9 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
time.Sleep(time.Millisecond * 10)
}

//fmt.Println("\n")
//fmt.Printf("TM Propose mem.size(): %d \n", mem.Size())

var totalBytes int64
var totalGas int64
// TODO: we will get a performance boost if we have a good estimate of avg
Expand Down Expand Up @@ -534,6 +553,8 @@ func (mem *CListMempool) Update(
mem.postCheck = postCheck
}

//fmt.Printf("\n TM Update txs/ mem.size() : %d \n", mem.Size())

for i, tx := range txs {
if deliverTxResponses[i].Code == abci.CodeTypeOK {
// Add valid committed tx to the cache (if missing).
Expand Down Expand Up @@ -574,6 +595,7 @@ func (mem *CListMempool) Update(

// Update metrics
mem.metrics.Size.Set(float64(mem.Size()))
//fmt.Printf("TM after recheck txs/ mem.size() : %d \n", mem.Size())

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) {
tx := txs[int(txNum)]

// this will err with ErrTxInCache many times ...
mempool.CheckTxWithInfo(tx, nil, TxInfo{SenderID: uint16(peerID)})
mempool.CheckTxWithInfo(tx, nil, TxInfo{SenderID: uint16(peerID)}, false)
}
err := mempool.FlushAppConn()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Mempool interface {
// meta data about the tx.
// Currently this metadata is the peer who sent it, used to prevent the tx
// from being gossiped back to them.
CheckTxWithInfo(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error
CheckTxWithInfo(tx types.Tx, callback func(*abci.Response), txInfo TxInfo, reactor bool) error

// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
Expand Down
4 changes: 2 additions & 2 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *TxMessage:
peerID := memR.ids.GetForPeer(src)
err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID})
err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}, true)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err)
memR.Logger.Debug("Could not check tx", "tx", txID(msg.Tx), "err", err)
}
// broadcasting happens from go routines per peer
default:
Expand Down
2 changes: 1 addition & 1 deletion mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error {
return nil
}
func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response),
_ mempl.TxInfo) error {
_ mempl.TxInfo, _ bool) error {
return nil
}
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
Expand Down
Loading

0 comments on commit f9b7fd8

Please sign in to comment.