Skip to content

Commit

Permalink
Merge pull request #2385 from AntelopeIO/GH-2102-vote-processing
Browse files Browse the repository at this point in the history
IF: Populated fork db ASAP; Vote processing off the main thread
  • Loading branch information
heifner authored Apr 10, 2024
2 parents 232e95b + 5cc1b87 commit f9786e1
Show file tree
Hide file tree
Showing 20 changed files with 480 additions and 275 deletions.
30 changes: 1 addition & 29 deletions libraries/chain/block_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ block_state_ptr block_state::create_if_genesis_block(const block_state_legacy& b
// TODO: https://github.com/AntelopeIO/leap/issues/2057
// TODO: Do not aggregate votes on blocks created from block_state_legacy. This can be removed when #2057 complete.
result.pending_qc = pending_quorum_certificate{result.active_finalizer_policy->finalizers.size(), result.active_finalizer_policy->threshold, result.active_finalizer_policy->max_weak_sum_before_weak_final()};
result.valid_qc = {}; // best qc received from the network inside block extension, empty until first savanna proper IF block

// build leaf_node and validation_tree
valid_t::finality_leaf_node_t leaf_node {
Expand All @@ -101,7 +100,7 @@ block_state_ptr block_state::create_if_genesis_block(const block_state_legacy& b
.validation_mroots = { validation_tree.get_root() }
};

result.validated = bsp.is_valid();
result.validated.store(bsp.is_valid());
result.pub_keys_recovered = bsp._pub_keys_recovered;
result.cached_trxs = bsp._cached_trxs;
result.action_mroot = *bsp.action_mroot_savanna;
Expand Down Expand Up @@ -248,33 +247,6 @@ void block_state::verify_qc(const valid_quorum_certificate& qc) const {
invalid_qc_claim, "signature validation failed" );
}

std::optional<quorum_certificate> block_state::get_best_qc() const {
// if pending_qc does not have a valid QC, consider valid_qc only
if( !pending_qc.is_quorum_met() ) {
if( valid_qc ) {
return quorum_certificate{ block_num(), *valid_qc };
} else {
return std::nullopt;
}
}

// extract valid QC from pending_qc
valid_quorum_certificate valid_qc_from_pending = pending_qc.to_valid_quorum_certificate();

// if valid_qc does not have value, consider valid_qc_from_pending only
if( !valid_qc ) {
return quorum_certificate{ block_num(), valid_qc_from_pending };
}

// Both valid_qc and valid_qc_from_pending have value. Compare them and select a better one.
// Strong beats weak. Tie break by valid_qc.
const auto& best_qc =
valid_qc->is_strong() == valid_qc_from_pending.is_strong() ?
*valid_qc : // tie broke by valid_qc
valid_qc->is_strong() ? *valid_qc : valid_qc_from_pending; // strong beats weak
return quorum_certificate{ block_num(), best_qc };
}

valid_t block_state::new_valid(const block_header_state& next_bhs, const digest_type& action_mroot, const digest_type& strong_digest) const {
assert(valid);
assert(next_bhs.core.last_final_block_num() >= core.last_final_block_num());
Expand Down
281 changes: 183 additions & 98 deletions libraries/chain/controller.cpp

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace eosio::chain {

struct block_state_accessor {
static bool is_valid(const block_state& bs) { return bs.is_valid(); }
static void set_valid(block_state& bs, bool v) { bs.validated = v; }
static void set_valid(block_state& bs, bool v) { bs.validated.store(v); }
};

struct block_state_legacy_accessor {
Expand Down Expand Up @@ -128,6 +128,7 @@ namespace eosio::chain {

bsp_t get_block_impl( const block_id_type& id, include_root_t include_root = include_root_t::no ) const;
bool block_exists_impl( const block_id_type& id ) const;
bool validated_block_exists_impl( const block_id_type& id ) const;
void reset_root_impl( const bsp_t& root_bs );
void rollback_head_to_root_impl();
void advance_root_impl( const block_id_type& id );
Expand Down Expand Up @@ -664,7 +665,19 @@ namespace eosio::chain {
return index.find( id ) != index.end();
}

// ------------------ fork_database -------------------------
template<class BSP>
bool fork_database_t<BSP>::validated_block_exists(const block_id_type& id) const {
std::lock_guard g( my->mtx );
return my->validated_block_exists_impl(id);
}

template<class BSP>
bool fork_database_impl<BSP>::validated_block_exists_impl(const block_id_type& id) const {
auto itr = index.find( id );
return itr != index.end() && bs_accessor_t::is_valid(*(*itr));
}

// ------------------ fork_database -------------------------

fork_database::fork_database(const std::filesystem::path& data_dir)
: data_dir(data_dir)
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/hotstuff/finalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
} else {
sig = priv_key.sign({(uint8_t*)digest.data(), (uint8_t*)digest.data() + digest.data_size()});
}
return vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig };
return std::optional{vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig }};
}
return {};
}
Expand Down
108 changes: 75 additions & 33 deletions libraries/chain/hotstuff/hotstuff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,51 @@ inline std::vector<uint32_t> bitset_to_vector(const hs_bitset& bs) {
}

bool pending_quorum_certificate::has_voted(size_t index) const {
std::lock_guard g(*_mtx);
return _strong_votes._bitset.at(index) || _weak_votes._bitset.at(index);
return _strong_votes.has_voted(index) || _weak_votes.has_voted(index);
}

bool pending_quorum_certificate::has_voted_no_lock(bool strong, size_t index) const {
if (strong) {
return _strong_votes._bitset[index];
return _strong_votes.has_voted(index);
}
return _weak_votes.has_voted(index);
}

void pending_quorum_certificate::votes_t::reflector_init() {
_processed = std::vector<std::atomic<bool>>(_bitset.size());
for (size_t i = 0; i < _bitset.size(); ++i) {
if (_bitset[i]) {
_processed[i].store(true, std::memory_order_relaxed);
}
}
return _weak_votes._bitset[index];
}

bool pending_quorum_certificate::votes_t::has_voted(size_t index) const {
assert(index < _processed.size());
return _processed[index].load(std::memory_order_relaxed);
}


vote_status pending_quorum_certificate::votes_t::add_vote(size_t index, const bls_signature& sig) {
if (_bitset[index]) { // check here as could have come in while unlocked
return vote_status::duplicate; // shouldn't be already present
}
_processed[index].store(true, std::memory_order_relaxed);
_bitset.set(index);
_sig.aggregate(sig); // works even if _sig is default initialized (fp2::zero())
return vote_status::success;
}

void pending_quorum_certificate::votes_t::reset(size_t num_finalizers) {
if (num_finalizers != _bitset.size())
_bitset.resize(num_finalizers);
_bitset.reset();
_sig = bls_aggregate_signature();
}

pending_quorum_certificate::pending_quorum_certificate()
: _mtx(std::make_unique<std::mutex>()) {
}

pending_quorum_certificate::pending_quorum_certificate(size_t num_finalizers, uint64_t quorum, uint64_t max_weak_sum_before_weak_final)
: _mtx(std::make_unique<std::mutex>())
, _quorum(quorum)
, _max_weak_sum_before_weak_final(max_weak_sum_before_weak_final) {
_weak_votes.resize(num_finalizers);
_strong_votes.resize(num_finalizers);
, _max_weak_sum_before_weak_final(max_weak_sum_before_weak_final)
, _weak_votes(num_finalizers)
, _strong_votes(num_finalizers) {
}

bool pending_quorum_certificate::is_quorum_met() const {
Expand Down Expand Up @@ -133,34 +141,30 @@ vote_status pending_quorum_certificate::add_vote(block_num_type block_num, bool
const bls_public_key& pubkey, const bls_signature& sig, uint64_t weight) {
vote_status s = vote_status::success;

std::unique_lock g(*_mtx);
state_t pre_state = _state;
state_t post_state = pre_state;
if (has_voted_no_lock(strong, index)) {
s = vote_status::duplicate;
} else {
g.unlock();
if (!fc::crypto::blslib::verify(pubkey, proposal_digest, sig)) {
wlog( "signature from finalizer ${i} cannot be verified", ("i", index) );
s = vote_status::invalid_signature;
} else {
g.lock();
s = strong ? add_strong_vote(index, sig, weight)
: add_weak_vote(index, sig, weight);
post_state = _state;
g.unlock();
}
dlog("block_num: ${bn}, vote strong: ${sv}, duplicate", ("bn", block_num)("sv", strong));
return vote_status::duplicate;
}

if (!fc::crypto::blslib::verify(pubkey, proposal_digest, sig)) {
wlog( "signature from finalizer ${i} cannot be verified", ("i", index) );
return vote_status::invalid_signature;
}

std::unique_lock g(*_mtx);
state_t pre_state = _state;
s = strong ? add_strong_vote(index, sig, weight)
: add_weak_vote(index, sig, weight);
state_t post_state = _state;
g.unlock();

dlog("block_num: ${bn}, vote strong: ${sv}, status: ${s}, pre-state: ${pre}, post-state: ${state}, quorum_met: ${q}",
("bn", block_num)("sv", strong)("s", s)("pre", pre_state)("state", post_state)("q", is_quorum_met(post_state)));
return s;
}

// thread safe
// called by get_best_qc which acquires a mutex
valid_quorum_certificate pending_quorum_certificate::to_valid_quorum_certificate() const {
std::lock_guard g(*_mtx);

valid_quorum_certificate valid_qc;

if( _state == state_t::strong ) {
Expand All @@ -177,6 +181,44 @@ valid_quorum_certificate pending_quorum_certificate::to_valid_quorum_certificate
return valid_qc;
}

std::optional<quorum_certificate> pending_quorum_certificate::get_best_qc(block_num_type block_num) const {
std::lock_guard g(*_mtx);
// if pending_qc does not have a valid QC, consider valid_qc only
if( !is_quorum_met_no_lock() ) {
if( _valid_qc ) {
return std::optional{quorum_certificate{ block_num, *_valid_qc }};
} else {
return std::nullopt;
}
}

// extract valid QC from pending_qc
valid_quorum_certificate valid_qc_from_pending = to_valid_quorum_certificate();

// if valid_qc does not have value, consider valid_qc_from_pending only
if( !_valid_qc ) {
return std::optional{quorum_certificate{ block_num, valid_qc_from_pending }};
}

// Both valid_qc and valid_qc_from_pending have value. Compare them and select a better one.
// Strong beats weak. Tie break by valid_qc.
const auto& best_qc =
_valid_qc->is_strong() == valid_qc_from_pending.is_strong() ?
*_valid_qc : // tie broken by valid_qc
_valid_qc->is_strong() ? *_valid_qc : valid_qc_from_pending; // strong beats weak
return std::optional{quorum_certificate{ block_num, best_qc }};
}

void pending_quorum_certificate::set_valid_qc(const valid_quorum_certificate& qc) {
std::lock_guard g(*_mtx);
_valid_qc = qc;
}

bool pending_quorum_certificate::valid_qc_is_strong() const {
std::lock_guard g(*_mtx);
return _valid_qc && _valid_qc->is_strong();
}

bool pending_quorum_certificate::is_quorum_met_no_lock() const {
return is_quorum_met(_state);
}
Expand Down
12 changes: 7 additions & 5 deletions libraries/chain/include/eosio/chain/block_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/action_receipt.hpp>
#include <eosio/chain/incremental_merkle.hpp>
#include <eosio/chain/thread_utils.hpp>

namespace eosio::chain {

Expand Down Expand Up @@ -69,12 +70,11 @@ struct block_state : public block_header_state { // block_header_state provi
digest_type strong_digest; // finalizer_digest (strong, cached so we can quickly validate votes)
weak_digest_t weak_digest; // finalizer_digest (weak, cached so we can quickly validate votes)
pending_quorum_certificate pending_qc; // where we accumulate votes we receive
std::optional<valid_quorum_certificate> valid_qc; // best qc received from the network inside block extension
std::optional<valid_t> valid;

// ------ updated for votes, used for fork_db ordering ------------------------------
private:
bool validated = false; // We have executed the block's trxs and verified that action merkle root (block id) matches.
copyable_atomic<bool> validated{false}; // We have executed the block's trxs and verified that action merkle root (block id) matches.

// ------ data members caching information available elsewhere ----------------------
bool pub_keys_recovered = false;
Expand All @@ -83,7 +83,7 @@ struct block_state : public block_header_state { // block_header_state provi
std::optional<digest_type> base_digest; // For finality_data sent to SHiP, computed on demand in get_finality_data()

// ------ private methods -----------------------------------------------------------
bool is_valid() const { return validated; }
bool is_valid() const { return validated.load(); }
bool is_pub_keys_recovered() const { return pub_keys_recovered; }
deque<transaction_metadata_ptr> extract_trxs_metas();
void set_trxs_metas(deque<transaction_metadata_ptr>&& trxs_metas, bool keys_recovered);
Expand All @@ -103,7 +103,9 @@ struct block_state : public block_header_state { // block_header_state provi
const extensions_type& header_extensions() const { return block_header_state::header.header_extensions; }
uint32_t irreversible_blocknum() const { return core.last_final_block_num(); } // backwards compatibility
uint32_t last_final_block_num() const { return core.last_final_block_num(); }
std::optional<quorum_certificate> get_best_qc() const;
std::optional<quorum_certificate> get_best_qc() const { return pending_qc.get_best_qc(block_num()); } // thread safe
bool valid_qc_is_strong() const { return pending_qc.valid_qc_is_strong(); } // thread safe
void set_valid_qc(const valid_quorum_certificate& qc) { pending_qc.set_valid_qc(qc); }

protocol_feature_activation_set_ptr get_activated_protocol_features() const { return block_header_state::activated_protocol_features; }
uint32_t last_qc_block_num() const { return core.latest_qc_claim().block_num; }
Expand Down Expand Up @@ -164,4 +166,4 @@ using block_state_pair = std::pair<std::shared_ptr<block_state_legacy>, blo
FC_REFLECT( eosio::chain::valid_t::finality_leaf_node_t, (major_version)(minor_version)(block_num)(finality_digest)(action_mroot) )
FC_REFLECT( eosio::chain::valid_t, (validation_tree)(validation_mroots))
FC_REFLECT( eosio::chain::finality_data_t, (major_version)(minor_version)(active_finalizer_policy_generation)(action_mroot)(base_digest))
FC_REFLECT_DERIVED( eosio::chain::block_state, (eosio::chain::block_header_state), (block)(strong_digest)(weak_digest)(pending_qc)(valid_qc)(valid)(validated) )
FC_REFLECT_DERIVED( eosio::chain::block_state, (eosio::chain::block_header_state), (block)(strong_digest)(weak_digest)(pending_qc)(valid)(validated) )
7 changes: 5 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ namespace eosio::chain {

void assemble_and_complete_block( block_report& br, const signer_callback_type& signer_callback );
void sign_block( const signer_callback_type& signer_callback );
void commit_block();
void commit_block(block_report& br);
void maybe_switch_forks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

// thread-safe
Expand Down Expand Up @@ -276,7 +276,8 @@ namespace eosio::chain {
// thread-safe
signed_block_ptr fetch_block_by_id( const block_id_type& id )const;
// thread-safe
bool block_exists( const block_id_type& id)const;
bool block_exists(const block_id_type& id) const;
bool validated_block_exists(const block_id_type& id) const;
// thread-safe
std::optional<signed_block_header> fetch_block_header_by_number( uint32_t block_num )const;
// thread-safe
Expand Down Expand Up @@ -372,6 +373,8 @@ namespace eosio::chain {
signal<void(const block_signal_params&)>& accepted_block();
signal<void(const block_signal_params&)>& irreversible_block();
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& applied_transaction();

// Unlike other signals, voted_block can be signaled from other threads than the main thread.
signal<void(const vote_message&)>& voted_block();

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/fork_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace eosio::chain {

bsp_t get_block( const block_id_type& id, include_root_t include_root = include_root_t::no ) const;
bool block_exists( const block_id_type& id ) const;
bool validated_block_exists( const block_id_type& id ) const;

/**
* Purges any existing blocks from the fork database and resets the root block_header_state to the provided value.
Expand Down
Loading

0 comments on commit f9786e1

Please sign in to comment.