Skip to content

Commit

Permalink
Merge pull request #2190 from AntelopeIO/controller-self
Browse files Browse the repository at this point in the history
IF: Shutdown thread pool in controller destructor
  • Loading branch information
heifner authored Feb 6, 2024
2 parents ed4209f + e97a9ff commit 4e92a59
Show file tree
Hide file tree
Showing 23 changed files with 262 additions and 221 deletions.
345 changes: 195 additions & 150 deletions libraries/chain/controller.cpp

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ namespace eosio::chain {

template<class bs>
bool first_preferred( const bs& lhs, const bs& rhs ) {
// dpos_irreversible_blocknum == std::numeric_limits<uint32_t>::max() after hotstuff activation
// hotstuff block considered preferred over dpos
// hotstuff blocks compared by block_num as both lhs & rhs dpos_irreversible_blocknum is max uint32_t
// This can be simplified in a future release that assumes hotstuff already activated
return std::pair(lhs.irreversible_blocknum(), lhs.block_num()) > std::pair(rhs.irreversible_blocknum(), rhs.block_num());
}

Expand Down
12 changes: 6 additions & 6 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,12 @@ namespace eosio::chain {

static std::optional<uint64_t> convert_exception_to_error_code( const fc::exception& e );

signal<void(uint32_t)> block_start;
signal<void(const block_signal_params&)> accepted_block_header;
signal<void(const block_signal_params&)> accepted_block;
signal<void(const block_signal_params&)> irreversible_block;
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)> applied_transaction;
signal<void(const vote_message&)> voted_block;
signal<void(uint32_t)>& block_start();
signal<void(const block_signal_params&)>& accepted_block_header();
signal<void(const block_signal_params&)>& accepted_block();
signal<void(const block_signal_params&)>& irreversible_block();
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& applied_transaction();
signal<void(const vote_message&)>& voted_block();

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
wasm_interface& get_wasm_interface();
Expand Down
2 changes: 1 addition & 1 deletion libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ namespace eosio { namespace testing {
control->add_indices();
if (lambda) lambda();
chain_transactions.clear();
control->accepted_block.connect([this]( block_signal_params t ){
control->accepted_block().connect([this]( block_signal_params t ){
const auto& [ block, id ] = t;
FC_ASSERT( block );
for( auto receipt : block->transactions ) {
Expand Down
10 changes: 5 additions & 5 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,12 +1012,12 @@ void chain_plugin_impl::plugin_initialize(const variables_map& options) {
} );

// relay signals to channels
accepted_block_header_connection = chain->accepted_block_header.connect(
accepted_block_header_connection = chain->accepted_block_header().connect(
[this]( const block_signal_params& t ) {
accepted_block_header_channel.publish( priority::medium, t );
} );

accepted_block_connection = chain->accepted_block.connect( [this]( const block_signal_params& t ) {
accepted_block_connection = chain->accepted_block().connect( [this]( const block_signal_params& t ) {
const auto& [ block, id ] = t;
if (_account_query_db) {
_account_query_db->commit_block(block);
Expand All @@ -1034,7 +1034,7 @@ void chain_plugin_impl::plugin_initialize(const variables_map& options) {
accepted_block_channel.publish( priority::high, t );
} );

irreversible_block_connection = chain->irreversible_block.connect( [this]( const block_signal_params& t ) {
irreversible_block_connection = chain->irreversible_block().connect( [this]( const block_signal_params& t ) {
const auto& [ block, id ] = t;

if (_trx_retry_db) {
Expand All @@ -1048,7 +1048,7 @@ void chain_plugin_impl::plugin_initialize(const variables_map& options) {
irreversible_block_channel.publish( priority::low, t );
} );

applied_transaction_connection = chain->applied_transaction.connect(
applied_transaction_connection = chain->applied_transaction().connect(
[this]( std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> t ) {
const auto& [ trace, ptrx ] = t;
if (_account_query_db) {
Expand All @@ -1067,7 +1067,7 @@ void chain_plugin_impl::plugin_initialize(const variables_map& options) {
} );

if (_trx_finality_status_processing || _trx_retry_db) {
block_start_connection = chain->block_start.connect(
block_start_connection = chain->block_start().connect(
[this]( uint32_t block_num ) {
if (_trx_retry_db) {
_trx_retry_db->on_block_start(block_num);
Expand Down
10 changes: 5 additions & 5 deletions plugins/chain_plugin/test/test_account_query_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ BOOST_FIXTURE_TEST_CASE(newaccount_test, validating_tester) { try {
auto aq_db = account_query_db(*control);

//link aq_db to the `accepted_block` signal on the controller
auto c2 = control->accepted_block.connect([&](const block_signal_params& t) {
auto c2 = control->accepted_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
aq_db.commit_block( block );
});
Expand All @@ -63,7 +63,7 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test, validating_tester) { try {
auto aq_db = account_query_db(*control);

//link aq_db to the `accepted_block` signal on the controller
auto c = control->accepted_block.connect([&](const block_signal_params& t) {
auto c = control->accepted_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
aq_db.commit_block( block );
});
Expand Down Expand Up @@ -98,7 +98,7 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test_multi_threaded, validating_tester) { try
auto aq_db = account_query_db(*control);

//link aq_db to the `accepted_block` signal on the controller
auto c = control->accepted_block.connect([&](const block_signal_params& t) {
auto c = control->accepted_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
aq_db.commit_block( block );
});
Expand Down Expand Up @@ -151,7 +151,7 @@ BOOST_AUTO_TEST_CASE(future_fork_test) { try {
auto aq_db = account_query_db(*node_a.control);

//link aq_db to the `accepted_block` signal on the controller
auto c = node_a.control->accepted_block.connect([&](const block_signal_params& t) {
auto c = node_a.control->accepted_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
aq_db.commit_block( block );
});
Expand Down Expand Up @@ -199,7 +199,7 @@ BOOST_AUTO_TEST_CASE(fork_test) { try {
auto aq_db = account_query_db(*node_a.control);

//link aq_db to the `accepted_block` signal on the controller
auto c = node_a.control->accepted_block.connect([&](const block_signal_params& t) {
auto c = node_a.control->accepted_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
aq_db.commit_block( block );
});
Expand Down
8 changes: 4 additions & 4 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4392,20 +4392,20 @@ namespace eosio {

{
chain::controller& cc = chain_plug->chain();
cc.accepted_block_header.connect( [my = shared_from_this()]( const block_signal_params& t ) {
cc.accepted_block_header().connect( [my = shared_from_this()]( const block_signal_params& t ) {
const auto& [ block, id ] = t;
my->on_accepted_block_header( block, id );
} );

cc.accepted_block.connect( [my = shared_from_this()]( const block_signal_params& t ) {
cc.accepted_block().connect( [my = shared_from_this()]( const block_signal_params& t ) {
my->on_accepted_block();
} );
cc.irreversible_block.connect( [my = shared_from_this()]( const block_signal_params& t ) {
cc.irreversible_block().connect( [my = shared_from_this()]( const block_signal_params& t ) {
const auto& [ block, id ] = t;
my->on_irreversible_block( id, block->block_num() );
} );

cc.voted_block.connect( [my = shared_from_this()]( const vote_message& vote ) {
cc.voted_block().connect( [my = shared_from_this()]( const vote_message& vote ) {
my->on_voted_block(vote);
} );
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1343,20 +1343,20 @@ void producer_plugin_impl::plugin_startup() {

chain.set_node_finalizer_keys(_finalizer_keys);

_accepted_block_connection.emplace(chain.accepted_block.connect([this](const block_signal_params& t) {
_accepted_block_connection.emplace(chain.accepted_block().connect([this](const block_signal_params& t) {
const auto& [ block, id ] = t;
on_block(block);
}));
_accepted_block_header_connection.emplace(chain.accepted_block_header.connect([this](const block_signal_params& t) {
_accepted_block_header_connection.emplace(chain.accepted_block_header().connect([this](const block_signal_params& t) {
const auto& [ block, id ] = t;
on_block_header(block->producer, block->block_num(), block->timestamp);
}));
_irreversible_block_connection.emplace(chain.irreversible_block.connect([this](const block_signal_params& t) {
_irreversible_block_connection.emplace(chain.irreversible_block().connect([this](const block_signal_params& t) {
const auto& [ block, id ] = t;
on_irreversible_block(block);
}));

_block_start_connection.emplace(chain.block_start.connect([this, &chain](uint32_t bs) {
_block_start_connection.emplace(chain.block_start().connect([this, &chain](uint32_t bs) {
try {
_snapshot_scheduler.on_start_block(bs, chain);
} catch (const snapshot_execution_exception& e) {
Expand Down
4 changes: 2 additions & 2 deletions plugins/producer_plugin/test/test_trx_full.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ BOOST_AUTO_TEST_CASE(producer) {
std::deque<signed_block_ptr> all_blocks;
std::promise<void> empty_blocks_promise;
std::future<void> empty_blocks_fut = empty_blocks_promise.get_future();
auto ab = chain_plug->chain().accepted_block.connect( [&](const chain::block_signal_params& t) {
auto ab = chain_plug->chain().accepted_block().connect( [&](const chain::block_signal_params& t) {
const auto& [ block, id ] = t;
static int num_empty = std::numeric_limits<int>::max();
all_blocks.push_back( block );
Expand All @@ -140,7 +140,7 @@ BOOST_AUTO_TEST_CASE(producer) {
num_empty = 10;
}
} );
auto bs = chain_plug->chain().block_start.connect( [&]( uint32_t bn ) {
auto bs = chain_plug->chain().block_start().connect( [&]( uint32_t bn ) {
} );

std::atomic<size_t> num_acked = 0;
Expand Down
6 changes: 3 additions & 3 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,17 +324,17 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options)
chain.set_disable_replay_opts(true);
}

applied_transaction_connection.emplace(chain.applied_transaction.connect(
applied_transaction_connection.emplace(chain.applied_transaction().connect(
[&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> t) {
on_applied_transaction(std::get<0>(t), std::get<1>(t));
}));
accepted_block_connection.emplace(
chain.accepted_block.connect([&](const block_signal_params& t) {
chain.accepted_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
on_accepted_block(block, id);
}));
block_start_connection.emplace(
chain.block_start.connect([&](uint32_t block_num) { on_block_start(block_num); }));
chain.block_start().connect([&](uint32_t block_num) { on_block_start(block_num); }));

auto dir_option = options.at("state-history-dir").as<std::filesystem::path>();
std::filesystem::path state_history_dir;
Expand Down
4 changes: 2 additions & 2 deletions plugins/test_control_plugin/test_control_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ class test_control_plugin_impl {

void test_control_plugin_impl::connect() {
_irreversible_block_connection.emplace(
_chain.irreversible_block.connect( [&]( const chain::block_signal_params& t ) {
_chain.irreversible_block().connect( [&]( const chain::block_signal_params& t ) {
const auto& [ block, id ] = t;
applied_irreversible_block( id );
} ));
_accepted_block_connection =
_chain.accepted_block.connect( [&]( const chain::block_signal_params& t ) {
_chain.accepted_block().connect( [&]( const chain::block_signal_params& t ) {
const auto& [ block, id ] = t;
accepted_block( id );
} );
Expand Down
8 changes: 4 additions & 4 deletions plugins/trace_api_plugin/trace_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,29 +363,29 @@ struct trace_api_plugin_impl {
auto& chain = app().find_plugin<chain_plugin>()->chain();

applied_transaction_connection.emplace(
chain.applied_transaction.connect([this](std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t) {
chain.applied_transaction().connect([this](std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t) {
emit_killer([&](){
extraction->signal_applied_transaction(std::get<0>(t), std::get<1>(t));
});
}));

block_start_connection.emplace(
chain.block_start.connect([this](uint32_t block_num) {
chain.block_start().connect([this](uint32_t block_num) {
emit_killer([&](){
extraction->signal_block_start(block_num);
});
}));

accepted_block_connection.emplace(
chain.accepted_block.connect([this](const chain::block_signal_params& t) {
chain.accepted_block().connect([this](const chain::block_signal_params& t) {
emit_killer([&](){
const auto& [ block, id ] = t;
extraction->signal_accepted_block(block, id);
});
}));

irreversible_block_connection.emplace(
chain.irreversible_block.connect([this](const chain::block_signal_params& t) {
chain.irreversible_block().connect([this](const chain::block_signal_params& t) {
const auto& [ block, id ] = t;
emit_killer([&](){
extraction->signal_irreversible_block(block->block_num());
Expand Down
2 changes: 1 addition & 1 deletion tests/test_snapshot_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ BOOST_AUTO_TEST_CASE(snapshot_scheduler_test) {
chain_plugin* chain_plug = app->find_plugin<chain_plugin>();
plugin_promise.set_value({prod_plug, chain_plug});

auto bs = chain_plug->chain().block_start.connect([&prod_plug, &at_block_20_promise](uint32_t bn) {
auto bs = chain_plug->chain().block_start().connect([&prod_plug, &at_block_20_promise](uint32_t bn) {
if(bn == 20u)
at_block_20_promise.set_value();
// catching pending snapshot
Expand Down
22 changes: 11 additions & 11 deletions unittests/api_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ BOOST_AUTO_TEST_CASE(light_validation_skip_cfa) try {
other.execute_setup_policy( setup_policy::full );

transaction_trace_ptr other_trace;
auto cc = other.control->applied_transaction.connect( [&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto cc = other.control->applied_transaction().connect( [&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if( t && t->id == trace->id ) {
other_trace = t;
Expand Down Expand Up @@ -1467,12 +1467,12 @@ void transaction_tests(T& chain) {
{
chain.produce_blocks(10);
transaction_trace_ptr trace;
auto c = chain.control->applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto c = chain.control->applied_transaction().connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if (t && t->receipt && t->receipt->status != transaction_receipt::executed) { trace = t; }
} );
signed_block_ptr block;
auto c2 = chain.control->accepted_block.connect([&](block_signal_params t) {
auto c2 = chain.control->accepted_block().connect([&](block_signal_params t) {
const auto& [ b, id ] = t;
block = b; });

Expand Down Expand Up @@ -1652,7 +1652,7 @@ BOOST_AUTO_TEST_CASE(deferred_inline_action_limit) { try {
chain2.push_block(block);

transaction_trace_ptr trace;
auto c = chain.control->applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto c = chain.control->applied_transaction().connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if (t->scheduled) { trace = t; }
} );
Expand Down Expand Up @@ -1687,7 +1687,7 @@ BOOST_FIXTURE_TEST_CASE(deferred_transaction_tests, validating_tester_no_disable
//schedule
{
transaction_trace_ptr trace;
auto c = control->applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto c = control->applied_transaction().connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if (t->scheduled) { trace = t; }
} );
Expand All @@ -1710,7 +1710,7 @@ BOOST_FIXTURE_TEST_CASE(deferred_transaction_tests, validating_tester_no_disable
{
transaction_trace_ptr trace;
uint32_t count = 0;
auto c = control->applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto c = control->applied_transaction().connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if (t && t->scheduled) { trace = t; ++count; }
} );
Expand All @@ -1736,7 +1736,7 @@ BOOST_FIXTURE_TEST_CASE(deferred_transaction_tests, validating_tester_no_disable
{
transaction_trace_ptr trace;
uint32_t count = 0;
auto c = control->applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto c = control->applied_transaction().connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if (t && t->scheduled) { trace = t; ++count; }
} );
Expand All @@ -1762,7 +1762,7 @@ BOOST_FIXTURE_TEST_CASE(deferred_transaction_tests, validating_tester_no_disable
//schedule and cancel
{
transaction_trace_ptr trace;
auto c = control->applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto c = control->applied_transaction().connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if (t && t->scheduled) { trace = t; }
} );
Expand All @@ -1785,7 +1785,7 @@ BOOST_FIXTURE_TEST_CASE(deferred_transaction_tests, validating_tester_no_disable
//repeated deferred transactions
{
vector<transaction_trace_ptr> traces;
auto c = control->applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto c = control->applied_transaction().connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> x) {
auto& t = std::get<0>(x);
if (t && t->scheduled) {
traces.push_back( t );
Expand Down Expand Up @@ -3863,7 +3863,7 @@ BOOST_AUTO_TEST_CASE(set_finalizer_test) { try {
validating_tester t;

uint32_t lib = 0;
t.control->irreversible_block.connect([&](const block_signal_params& t) {
t.control->irreversible_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
lib = block->block_num();
});
Expand Down Expand Up @@ -3921,7 +3921,7 @@ void test_finality_transition(const vector<account_name>& accounts, const base_t
validating_tester t;

uint32_t lib = 0;
t.control->irreversible_block.connect([&](const block_signal_params& t) {
t.control->irreversible_block().connect([&](const block_signal_params& t) {
const auto& [ block, id ] = t;
lib = block->block_num();
});
Expand Down
Loading

0 comments on commit 4e92a59

Please sign in to comment.