diff --git a/Gopkg.lock b/Gopkg.lock index 530cd89dd4f..56c79ab6d54 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -170,9 +170,12 @@ revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" [[projects]] - digest = "1:c568d7727aa262c32bdf8a3f7db83614f7af0ed661474b24588de635c20024c7" + digest = "1:53e8c5c79716437e601696140e8b1801aae4204f4ec54a504333702a49572c4f" name = "github.com/magiconair/properties" - packages = ["."] + packages = [ + ".", + "assert", + ] pruneopts = "UT" revision = "c2353362d570a7bfa228149c62842019201cfb71" version = "v1.8.0" @@ -516,6 +519,7 @@ "github.com/golang/protobuf/ptypes/timestamp", "github.com/gorilla/websocket", "github.com/jmhodges/levigo", + "github.com/magiconair/properties/assert", "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promhttp", diff --git a/abci/types/types.pb.go b/abci/types/types.pb.go index b09213a5fd0..4a378151e23 100644 --- a/abci/types/types.pb.go +++ b/abci/types/types.pb.go @@ -2138,10 +2138,53 @@ func (m *ResponseCheckTx) GetCodespace() string { return "" } + +// TxEvent begin ---------------------------------------------------------------------------------------------------- + +type TxEvent struct { + Address []byte `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Topics [][]byte `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"` + BlockNumber uint64 `protobuf:"varint,4,opt,name=block_number,json=blockNumber,proto3" json:"block_number,omitempty"` + TxHash []byte `protobuf:"bytes,5,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` + BlockHash []byte `protobuf:"bytes,6,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TxEvent) Reset() { *m = TxEvent{} } +func (m *TxEvent) String() string { return proto.CompactTextString(m) } +func (*TxEvent) ProtoMessage() {} +func (*TxEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_types_a177e47fab90f91d, []int{0} +} + +func (m *TxEvent) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TxEvent.Unmarshal(m, b) +} +func (m *TxEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TxEvent.Marshal(b, m, deterministic) +} +func (m *TxEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_TxEvent.Merge(m, src) +} +func (m *TxEvent) XXX_Size() int { + return xxx_messageInfo_TxEvent.Size(m) +} +func (m *TxEvent) XXX_DiscardUnknown() { + xxx_messageInfo_TxEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_TxEvent proto.InternalMessageInfo + +// TxEvent end ---------------------------------------------------------------------------------------------------- + type ResponseDeliverTx struct { Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` Log string `protobuf:"bytes,3,opt,name=log,proto3" json:"log,omitempty"` + Events []TxEvent `protobuf:"bytes,9,opt,name=events,proto3" json:"events,omitempty"` Info string `protobuf:"bytes,4,opt,name=info,proto3" json:"info,omitempty"` GasWanted int64 `protobuf:"varint,5,opt,name=gas_wanted,json=gasWanted,proto3" json:"gas_wanted,omitempty"` GasUsed int64 `protobuf:"varint,6,opt,name=gas_used,json=gasUsed,proto3" json:"gas_used,omitempty"` diff --git a/abci/types/types.proto b/abci/types/types.proto index 8eeecb39202..6d2286483cd 100644 --- a/abci/types/types.proto +++ b/abci/types/types.proto @@ -179,10 +179,20 @@ message ResponseCheckTx { string codespace = 8; } +message TxEvent { + bytes address = 1; + bytes data = 2; + repeated bytes topics = 3; + uint64 block_number = 4; + bytes tx_hash = 5; + bytes block_hash = 6; +} + message ResponseDeliverTx { uint32 code = 1; bytes data = 2; string log = 3; // nondeterministic + repeated TxEvent events = 9; string info = 4; // nondeterministic int64 gas_wanted = 5; int64 gas_used = 6; diff --git a/abci/types/util.go b/abci/types/util.go index 3cde882320a..0644f625e1d 100644 --- a/abci/types/util.go +++ b/abci/types/util.go @@ -3,6 +3,7 @@ package types import ( "bytes" "sort" + common "github.com/tendermint/tendermint/libs/common" ) //------------------------------------------------------------------------------ @@ -32,3 +33,12 @@ func (v ValidatorUpdates) Swap(i, j int) { v[i] = v[j] v[j] = v1 } + +func GetTagByKey(tags []common.KVPair, key string) (common.KVPair, bool) { + for _, tag := range tags { + if bytes.Equal(tag.Key, []byte(key)) { + return tag, true + } + } + return common.KVPair{}, false +} diff --git a/blockchain/store.go b/blockchain/store.go index 498cca68dba..4b48eff4693 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -186,6 +186,20 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s bs.db.SetSync(nil, nil) } +func (bs *BlockStore) RetreatLastBlock() { + height := bs.height + bs.db.Delete(calcBlockMetaKey(height)) + bs.db.Delete(calcBlockCommitKey(height-1)) + bs.db.Delete(calcSeenCommitKey(height)) + BlockStoreStateJSON{Height: height-1 }.Save(bs.db) + // Done! + bs.mtx.Lock() + bs.height = height + bs.mtx.Unlock() + // Flush + bs.db.SetSync(nil, nil) +} + func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { if height != bs.Height()+1 { cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index fa63b4944e8..43b1c9bbf18 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -20,6 +20,9 @@ func AddNodeFlags(cmd *cobra.Command) { // node flags cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing") + cmd.Flags().Bool("deprecated", config.Deprecated, "Mark blockchain as deprecated") + cmd.Flags().Int64("replay_height", config.ReplayHeight, "Specify which height to replay to, this is useful for exporting at any height") + // abci flags cmd.Flags().String("proxy_app", config.ProxyApp, "Proxy app address, or one of: 'kvstore', 'persistent_kvstore', 'counter', 'counter_serial' or 'noop' for local testing.") diff --git a/config/config.go b/config/config.go index 3ac22adbf40..d9951c38366 100644 --- a/config/config.go +++ b/config/config.go @@ -153,6 +153,13 @@ type BaseConfig struct { // and verifying their commits FastSync bool `mapstructure:"fast_sync"` + // If the blockchain is deprecated, run node with Deprecated will + // work in query only mode. Consensus engine and p2p gossip will be + // shutdown + Deprecated bool `mapstructure:"deprecated"` + + ReplayHeight int64 `mapstructure:"replay_height"` + // Database backend: leveldb | memdb | cleveldb DBBackend string `mapstructure:"db_backend"` @@ -189,7 +196,7 @@ type BaseConfig struct { // If true, query the ABCI app on connecting to a new peer // so the app can decide if we should keep the connection or not - FilterPeers bool `mapstructure:"filter_peers"` // false + //FilterPeers bool `mapstructure:"filter_peers"` // false } // DefaultBaseConfig returns a default base configuration for a Tendermint node @@ -206,7 +213,9 @@ func DefaultBaseConfig() BaseConfig { LogFormat: LogFormatPlain, ProfListenAddress: "", FastSync: true, - FilterPeers: false, + Deprecated: false, + ReplayHeight: -1, + //FilterPeers: false, DBBackend: "leveldb", DBPath: "data", } diff --git a/config/toml.go b/config/toml.go index 978255aba10..d88ce369886 100644 --- a/config/toml.go +++ b/config/toml.go @@ -81,6 +81,11 @@ moniker = "{{ .BaseConfig.Moniker }}" # and verifying their commits fast_sync = {{ .BaseConfig.FastSync }} +# If the blockchain is deprecated, run node with Deprecated will +# work in query only mode. Consensus engine and p2p gossip will be +# shutdown +deprecated = {{ .BaseConfig.Deprecated }} + # Database backend: leveldb | memdb | cleveldb db_backend = "{{ .BaseConfig.DBBackend }}" @@ -117,10 +122,6 @@ abci = "{{ .BaseConfig.ABCI }}" # TCP or UNIX socket address for the profiling server to listen on prof_laddr = "{{ .BaseConfig.ProfListenAddress }}" -# If true, query the ABCI app on connecting to a new peer -# so the app can decide if we should keep the connection or not -filter_peers = {{ .BaseConfig.FilterPeers }} - ##### advanced configuration options ##### ##### rpc server configuration options ##### diff --git a/consensus/replay.go b/consensus/replay.go index e47d4892a57..123008c63ba 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -6,12 +6,14 @@ import ( "hash/crc32" "io" "reflect" + "runtime" //"strconv" //"strings" "time" abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" //auto "github.com/tendermint/tendermint/libs/autofile" cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" @@ -235,7 +237,7 @@ func (h *Handshaker) NBlocks() int { } // TODO: retry the handshake/replay if it fails ? -func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { +func (h *Handshaker) Handshake(proxyApp proxy.AppConns, config *cfg.BaseConfig) error { // Handshake is done via ABCI Info on the query conn. res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) @@ -260,8 +262,12 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { h.initialState.Version.Consensus.App = version.Protocol(res.AppVersion) sm.SaveState(h.stateDB, h.initialState) + state := sm.LoadState(h.stateDB) + state.Version.Consensus.App = version.Protocol(res.AppVersion) + sm.SaveState(h.stateDB, state) + // Replay blocks up to the latest in the blockstore. - _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) + _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp, config) if err != nil { return fmt.Errorf("Error on replay: %v", err) } @@ -281,6 +287,7 @@ func (h *Handshaker) ReplayBlocks( appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns, + config *cfg.BaseConfig, ) ([]byte, error) { storeBlockHeight := h.store.Height() stateBlockHeight := state.LastBlockHeight @@ -355,7 +362,7 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false, config) } else if appBlockHeight == storeBlockHeight { // We're good! @@ -368,7 +375,7 @@ func (h *Handshaker) ReplayBlocks( if appBlockHeight < stateBlockHeight { // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true, config) } else if appBlockHeight == stateBlockHeight { // We haven't run Commit (both the state and app are one block behind), @@ -385,7 +392,11 @@ func (h *Handshaker) ReplayBlocks( if err != nil { return nil, err } - mockApp := newMockProxyApp(appHash, abciResponses) + res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) + if err != nil { + return nil, fmt.Errorf("Error calling Info: %v", err) + } + mockApp := newMockProxyApp([]byte(res.Data), abciResponses) h.logger.Info("Replay last block using mock app") state, err = h.replayBlock(state, storeBlockHeight, mockApp) return state.AppHash, err @@ -397,7 +408,7 @@ func (h *Handshaker) ReplayBlocks( return nil, nil } -func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { +func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool, config *cfg.BaseConfig) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. // We replay all blocks from appBlockHeight+1. // @@ -409,6 +420,7 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl // If mutateState == true, the final block is replayed with h.replayBlock() var appHash []byte + var cid types.CommitID var err error finalBlock := storeBlockHeight if mutateState { @@ -417,10 +429,24 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl for i := appBlockHeight + 1; i <= finalBlock; i++ { h.logger.Info("Applying block", "height", i) block := h.store.LoadBlock(i) - appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, state.LastValidators, h.stateDB) + + if len(appHash) != 0 { + if !bytes.Equal(block.Header.AppHash, appHash) { + panic(fmt.Sprintf("AppHash mismatch: expected %s, actual %s", block.AppHash.String(), cmn.HexBytes(appHash).String())) + } + } + + bz, err := sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, state.LastValidators, h.stateDB) if err != nil { return nil, err } + cid = types.UnmarshalCommitID(bz) + appHash = cid.Hash + + if config.ReplayHeight > 0 && i >= config.ReplayHeight { + fmt.Printf("Replay from height %d to height %d successfully", appBlockHeight, config.ReplayHeight) + runtime.Goexit() + } h.nBlocks++ } @@ -434,7 +460,7 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl appHash = state.AppHash } - return appHash, checkAppHash(state, appHash) + return appHash, sm.CheckAppHashAndShardingHash(state, cid) } // ApplyBlock on the proxyApp with the last block. diff --git a/consensus/replay_file.go b/consensus/replay_file.go index d45f9c5a4f6..6589df252a7 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -307,7 +307,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo handshaker := NewHandshaker(stateDB, state, blockStore, gdoc) handshaker.SetEventBus(eventBus) - err = handshaker.Handshake(proxyApp) + err = handshaker.Handshake(proxyApp, &config) if err != nil { cmn.Exit(fmt.Sprintf("Error on handshake: %v", err)) } diff --git a/consensus/state.go b/consensus/state.go index 74ec092ff8d..dad76056823 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -26,6 +26,10 @@ import ( //----------------------------------------------------------------------------- // Errors +const ( + deprecatedToShutdownInterval = 30 +) + var ( ErrInvalidProposalSignature = errors.New("Error invalid proposal signature") ErrInvalidProposalPOLRound = errors.New("Error invalid proposal POL round") @@ -133,6 +137,7 @@ type ConsensusState struct { // for reporting metrics metrics *Metrics + Deprecated bool } // StateOption sets an optional parameter on the ConsensusState. @@ -169,6 +174,7 @@ func NewConsensusState( cs.doPrevote = cs.defaultDoPrevote cs.setProposal = cs.defaultSetProposal + cs.Deprecated = state.Deprecated cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -622,6 +628,12 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { }() for { + if cs.Deprecated { + cs.Logger.Info(fmt.Sprintf("this blockchain has been deprecated. %d seconds later, this node will be shutdown", deprecatedToShutdownInterval)) + time.Sleep(deprecatedToShutdownInterval * time.Second) + cmn.Exit("Shutdown this blockchain node") + } + if maxSteps > 0 { if cs.nSteps >= maxSteps { cs.Logger.Info("reached max steps. exiting receive routine") diff --git a/libs/common/kvpair.go b/libs/common/kvpair.go index 54c3a58c061..3dd5a6841a6 100644 --- a/libs/common/kvpair.go +++ b/libs/common/kvpair.go @@ -2,7 +2,9 @@ package common import ( "bytes" + "encoding/hex" "sort" + "strings" ) //---------------------------------------- @@ -36,6 +38,43 @@ func (kvs KVPairs) Less(i, j int) bool { func (kvs KVPairs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] } func (kvs KVPairs) Sort() { sort.Sort(kvs) } +func (kvs KVPairs) ToString() (str string) { + kvs.Sort() + for _, pair := range kvs { + str += string(pair.Key) + str += ":" + str += hex.EncodeToString(pair.Value) + str += "|" + } + return +} + +func KVPairsFromString(str string) (kvs KVPairs) { + if len(str) == 0 { + return + } + + strs := strings.Split(str, "|") + for _, s := range strs { + if len(s) == 0 { + continue + } + + kv := strings.Split(s, ":") + hash, err := hex.DecodeString(kv[1]) + if err != nil { + panic("invalid hex bytes") + } + + kvp := KVPair{ + Key: []byte(kv[0]), + Value: hash, + } + kvs = append(kvs, kvp) + } + return +} + //---------------------------------------- // KI64Pair diff --git a/node/node.go b/node/node.go index c0a4d736e82..9ad9bf69bf5 100644 --- a/node/node.go +++ b/node/node.go @@ -263,7 +263,7 @@ func NewNode(config *cfg.Config, handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc) handshaker.SetLogger(consensusLogger) handshaker.SetEventBus(eventBus) - if err := handshaker.Handshake(proxyApp); err != nil { + if err := handshaker.Handshake(proxyApp, &config.BaseConfig); err != nil { return nil, fmt.Errorf("Error during handshake: %v", err) } @@ -417,43 +417,42 @@ func NewNode(config *cfg.Config, // Filter peers by addr or pubkey with an ABCI query. // If the query return code is OK, add peer. - if config.FilterPeers { - connFilters = append( - connFilters, - // ABCI query for address filtering. - func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { - res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("Error querying abci app: %v", res) - } - - return nil - }, - ) - - peerFilters = append( - peerFilters, - // ABCI query for ID filtering. - func(_ p2p.IPeerSet, p p2p.Peer) error { - res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("Error querying abci app: %v", res) - } + //if config.FilterPeers { + connFilters = append( + connFilters, + // ABCI query for address filtering. + func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("Error querying abci app: %v", res) + } + + return nil + }, + ) - return nil - }, - ) - } + peerFilters = append( + peerFilters, + // ABCI query for ID filtering. + func(_ p2p.IPeerSet, p p2p.Peer) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("Error querying abci app: %v", res) + } + + return nil + }, + ) p2p.MultiplexTransportConnFilters(connFilters...)(transport) @@ -573,6 +572,11 @@ func (n *Node) OnStart() error { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } + if n.consensusState.Deprecated || n.config.Deprecated { + n.Logger.Info("This blockchain was halt. The consensus engine and p2p gossip have been disabled. Only query rpc interfaces are available") + return nil + } + // Start the transport. addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress) if err != nil { diff --git a/p2p/key.go b/p2p/key.go index 4e662f9f702..3b2e65265ed 100644 --- a/p2p/key.go +++ b/p2p/key.go @@ -2,7 +2,10 @@ package p2p import ( "bytes" + "crypto/rsa" + "crypto/x509" "encoding/hex" + "errors" "fmt" "io/ioutil" @@ -24,8 +27,12 @@ const IDByteLength = crypto.AddressSize // NodeKey is the persistent peer key. // It contains the nodes private key for authentication. +// It also contains keys used in private txs type NodeKey struct { - PrivKey crypto.PrivKey `json:"priv_key"` // our priv key + PrivKey crypto.PrivKey `json:"priv_key"` // our priv key + RSAPrivKey string `json:"rsa_priv_key"` + RSAPubkKey string `json:"rsa_pub_key"` + OrgKeys map[string]map[string]string `json:"org_keys"` } // ID returns the peer's canonical ID - the hash of its public key. @@ -33,11 +40,37 @@ func (nodeKey *NodeKey) ID() ID { return PubKeyToID(nodeKey.PubKey()) } +// GetRSAPrivKey retrive rsa privkey +func (nodeKey *NodeKey) GetRSAPrivKey() (*rsa.PrivateKey, error) { + rk, err := hex.DecodeString(nodeKey.RSAPrivKey) + if err != nil { + return nil, errors.New("Decode RSAPrivKey failed:" + err.Error()) + } + privkey, err := x509.ParsePKCS1PrivateKey(rk) + if err != nil { + return nil, errors.New("Parse RSAPrivKey failed:" + err.Error()) + } + return privkey, nil +} + // PubKey returns the peer's PubKey func (nodeKey *NodeKey) PubKey() crypto.PubKey { return nodeKey.PrivKey.PubKey() } +// Save rewrite +func (nodeKey *NodeKey) Save(filePath string) error { + jsonBytes, err := cdc.MarshalJSONIndent(nodeKey, "", " ") + if err != nil { + return err + } + err = ioutil.WriteFile(filePath, jsonBytes, 0600) + if err != nil { + return err + } + return nil +} + // PubKeyToID returns the ID corresponding to the given PubKey. // It's the hex-encoding of the pubKey.Address(). func PubKeyToID(pubKey crypto.PubKey) ID { @@ -72,11 +105,22 @@ func LoadNodeKey(filePath string) (*NodeKey, error) { func genNodeKey(filePath string) (*NodeKey, error) { privKey := ed25519.GenPrivKey() + + // gen rsk key pair + rsaSK, err := rsa.GenerateKey(crypto.CReader(), 1024) + if err != nil { + return nil, err + } + rsaSKbs := x509.MarshalPKCS1PrivateKey(rsaSK) + rsaPKbs := x509.MarshalPKCS1PublicKey(&rsaSK.PublicKey) + nodeKey := &NodeKey{ - PrivKey: privKey, + PrivKey: privKey, + RSAPrivKey: hex.EncodeToString(rsaSKbs), + RSAPubkKey: hex.EncodeToString(rsaPKbs), } - jsonBytes, err := cdc.MarshalJSON(nodeKey) + jsonBytes, err := cdc.MarshalJSONIndent(nodeKey, "", " ") if err != nil { return nil, err } diff --git a/state/execution.go b/state/execution.go index 3a11ecca446..2d83c32f206 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,6 +1,7 @@ package state import ( + "bytes" "fmt" "time" @@ -10,6 +11,13 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tendermint/libs/common" +) + +const ( + HaltTagKey = "halt_blockchain" + HaltTagValue = "true" + UpgradeFailureTagKey = "upgrade_failure" ) //----------------------------------------------------------------------------- @@ -124,6 +132,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b } fail.Fail() // XXX + preState := state.Copy() // Save the results before we commit. saveABCIResponses(blockExec.db, block.Height, abciResponses) @@ -150,8 +159,12 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b return state, fmt.Errorf("Commit failed for application: %v", err) } + if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Tags, HaltTagKey); ok && bytes.Equal(tag.Value, []byte(HaltTagValue)) { + state.Deprecated = true + } + // Lock mempool, commit app state, update mempoool. - appHash, err := blockExec.Commit(state, block) + bz, err := blockExec.Commit(state, block) if err != nil { return state, fmt.Errorf("Commit failed for application: %v", err) } @@ -161,9 +174,14 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b fail.Fail() // XXX + cid := types.UnmarshalCommitID(bz) // Update the app hash and save the state. - state.AppHash = appHash + state.AppHash = cid.Hash + state.ShardingHash = make(cmn.KVPairs, len(cid.ShardingHash)) + copy(state.ShardingHash, cid.ShardingHash) + SaveState(blockExec.db, state) + SavePreState(blockExec.db, preState) fail.Fail() // XXX @@ -248,7 +266,9 @@ func execBlockOnProxyApp( // TODO: make use of res.Log // TODO: make use of this info // Blocks may include invalid txs. + txRes := r.DeliverTx + if txRes.Code == abci.CodeTypeOK { validTxs++ } else { @@ -291,6 +311,10 @@ func execBlockOnProxyApp( return nil, err } + if tag, ok := abci.GetTagByKey(abciResponses.EndBlock.Tags, UpgradeFailureTagKey); ok { + return nil, fmt.Errorf(string(tag.Value)) + } + logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs) return abciResponses, nil @@ -298,6 +322,16 @@ func execBlockOnProxyApp( func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) { + state := LoadState(stateDB) + // For replaying blocks, load history validator set + if block.Height > 1 && block.Height != state.LastBlockHeight+1 { + var err error + lastValSet, err = LoadValidators(stateDB, block.Height-1) + if err != nil { + panic(fmt.Sprintf("failed to load validatorset at heith %d", state.LastBlockHeight)) + } + } + // Sanity check that commit length matches validator set size - // only applies after first block if block.Height > 1 { diff --git a/state/services.go b/state/services.go index 07d12c5a10c..2b387319062 100644 --- a/state/services.go +++ b/state/services.go @@ -81,6 +81,7 @@ type BlockStoreRPC interface { type BlockStore interface { BlockStoreRPC SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) + RetreatLastBlock() } //----------------------------------------------------------------------------------------------------- diff --git a/state/state.go b/state/state.go index b6253b64512..c31988754a1 100644 --- a/state/state.go +++ b/state/state.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "time" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" "github.com/tendermint/tendermint/version" @@ -13,7 +14,8 @@ import ( // database keys var ( - stateKey = []byte("stateKey") + stateKey = []byte("stateKey") + statePreKey = []byte("statePreKey") ) //----------------------------------------------------------------------------- @@ -80,7 +82,9 @@ type State struct { LastResultsHash []byte // the latest AppHash we've received from calling abci.Commit() - AppHash []byte + AppHash []byte + ShardingHash cmn.KVPairs + Deprecated bool } // Copy makes a copy of the State for mutating. @@ -102,9 +106,11 @@ func (state State) Copy() State { ConsensusParams: state.ConsensusParams, LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged, - AppHash: state.AppHash, + AppHash: state.AppHash, + ShardingHash: state.ShardingHash, LastResultsHash: state.LastResultsHash, + Deprecated: state.Deprecated, } } @@ -154,8 +160,7 @@ func (state State) MakeBlock( state.Version.Consensus, state.ChainID, timestamp, state.LastBlockID, state.LastBlockTotalTx+block.NumTxs, state.Validators.Hash(), state.NextValidators.Hash(), - state.ConsensusParams.Hash(), state.AppHash, state.LastResultsHash, - proposerAddress, + state.ConsensusParams.Hash(), state.AppHash, proposerAddress, state.ShardingHash, ) return block, block.MakePartSet(types.BlockPartSizeBytes) diff --git a/state/store.go b/state/store.go index 0301bc7c358..d4df7147492 100644 --- a/state/store.go +++ b/state/store.go @@ -65,6 +65,10 @@ func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) ( return state, nil } +func LoadPreState(db dbm.DB) State { + return loadState(db, statePreKey) +} + // LoadState loads the State from the database. func LoadState(db dbm.DB) State { return loadState(db, stateKey) @@ -93,6 +97,10 @@ func SaveState(db dbm.DB, state State) { saveState(db, state, stateKey) } +func SavePreState(db dbm.DB, state State) { + saveState(db, state, statePreKey) +} + func saveState(db dbm.DB, state State, key []byte) { nextHeight := state.LastBlockHeight + 1 // If first block, save validators for block 1. diff --git a/state/validation.go b/state/validation.go index 3c63c35b78f..cd2169232ec 100644 --- a/state/validation.go +++ b/state/validation.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/crypto" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/libs/common" ) //----------------------------------------------------- @@ -19,6 +20,13 @@ func validateBlock(evidencePool EvidencePool, stateDB dbm.DB, state State, block return err } + if block.Version.Block != state.Version.Consensus.Block { + return fmt.Errorf("Wrong Block.Header.Version.Block Expected %v, got %v", + state.Version.Consensus.Block, + block.Version.Block, + ) + } + // Validate basic info. if block.Version != state.Version.Consensus { return fmt.Errorf("Wrong Block.Header.Version. Expected %v, got %v", @@ -62,18 +70,14 @@ func validateBlock(evidencePool EvidencePool, stateDB dbm.DB, state State, block block.AppHash, ) } + VerifyShardingHash(state, block) + if !bytes.Equal(block.ConsensusHash, state.ConsensusParams.Hash()) { return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", state.ConsensusParams.Hash(), block.ConsensusHash, ) } - if !bytes.Equal(block.LastResultsHash, state.LastResultsHash) { - return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", - state.LastResultsHash, - block.LastResultsHash, - ) - } if !bytes.Equal(block.ValidatorsHash, state.Validators.Hash()) { return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", state.Validators.Hash(), @@ -203,3 +207,39 @@ func VerifyEvidence(stateDB dbm.DB, state State, evidence types.Evidence) error return nil } + +func CheckAppHashAndShardingHash(state State, ci types.CommitID) error { + if !bytes.Equal(state.AppHash, ci.Hash) { + panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", ci.Hash, state.AppHash).Error()) + } + + shm := make(map[string][]byte) + for _, pair := range state.ShardingHash { + shm[string(pair.Key)] = pair.Value + } + for _, pair := range ci.ShardingHash { + if hash, ok := shm[string(pair.Key)]; ok { + if !bytes.Equal(hash, pair.Value) { + panic(fmt.Errorf("Tendermint state.ShardingHash does not match ShardingHash for sharding %s after replay. Got %X, expected %X", string(pair.Key), hash, pair.Value).Error()) + } + } + } + return nil +} + +func VerifyShardingHash(state State, block *types.Block) error { + kvs := common.KVPairsFromString(block.ShardingHash) + + shm := make(map[string][]byte) + for _, pair := range state.ShardingHash { + shm[string(pair.Key)] = pair.Value + } + for _, pair := range kvs { + if hash, ok := shm[string(pair.Key)]; ok { + if !bytes.Equal(hash, pair.Value) { + panic(fmt.Errorf("Tendermint state.ShardingHash does not match ShardingHash for sharding %s. Got %X, expected %X", string(pair.Key), hash, pair.Value).Error()) + } + } + } + return nil +} diff --git a/types/block.go b/types/block.go index 6616c0ee6ed..0e491eba19b 100644 --- a/types/block.go +++ b/types/block.go @@ -374,6 +374,7 @@ type Header struct { NextValidatorsHash cmn.HexBytes `json:"next_validators_hash"` // validators for the next block ConsensusHash cmn.HexBytes `json:"consensus_hash"` // consensus params for current block AppHash cmn.HexBytes `json:"app_hash"` // state after txs from the previous block + ShardingHash string `json:"sharding_hash"` LastResultsHash cmn.HexBytes `json:"last_results_hash"` // root hash of all results from the txs from the previous block // consensus info @@ -387,8 +388,8 @@ func (h *Header) Populate( version version.Consensus, chainID string, timestamp time.Time, lastBlockID BlockID, totalTxs int64, valHash, nextValHash []byte, - consensusHash, appHash, lastResultsHash []byte, - proposerAddress Address, + consensusHash, appHash, + proposerAddress Address, shardingHash cmn.KVPairs, ) { h.Version = version h.ChainID = chainID @@ -399,8 +400,8 @@ func (h *Header) Populate( h.NextValidatorsHash = nextValHash h.ConsensusHash = consensusHash h.AppHash = appHash - h.LastResultsHash = lastResultsHash h.ProposerAddress = proposerAddress + h.ShardingHash = shardingHash.ToString() } // Hash returns the hash of the header. @@ -427,7 +428,6 @@ func (h *Header) Hash() cmn.HexBytes { cdcEncode(h.NextValidatorsHash), cdcEncode(h.ConsensusHash), cdcEncode(h.AppHash), - cdcEncode(h.LastResultsHash), cdcEncode(h.EvidenceHash), cdcEncode(h.ProposerAddress), }) diff --git a/types/commit_id.go b/types/commit_id.go new file mode 100644 index 00000000000..e9bd77c1720 --- /dev/null +++ b/types/commit_id.go @@ -0,0 +1,25 @@ +package types + +import ( + "github.com/tendermint/go-amino" + cmn "github.com/tendermint/tendermint/libs/common" +) + +// CommitID contains the tree version number and its merkle root. +type CommitID struct { + Version int64 `json:"version"` + Hash []byte `json:"hash"` + ShardingHash cmn.KVPairs `json:"sharding_hash"` +} + +func UnmarshalCommitID(bz []byte) CommitID { + cdc := amino.NewCodec() + + var ch CommitID + err := cdc.UnmarshalJSON(bz, &ch) + if err != nil { + panic(err.Error()) + } + + return ch +}