Skip to content

Commit

Permalink
Merge pull request #2325 from steemit/2018-04-11-cleanup-signal-handling
Browse files Browse the repository at this point in the history
Clean up signal handling
  • Loading branch information
Michael Vandeberg authored Apr 11, 2018
2 parents be04206 + cf020bc commit ab3bc5e
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 227 deletions.
152 changes: 74 additions & 78 deletions libraries/chain/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,15 @@ void database::open( const open_args& args )

uint32_t database::reindex( const open_args& args )
{
bool reindex_success = false;
uint32_t last_block_number = 0; // result
reindex_notification note;

BOOST_SCOPE_EXIT(this_,&reindex_success,&last_block_number) {
STEEM_TRY_NOTIFY(this_->_on_reindex_done, reindex_success, last_block_number);
BOOST_SCOPE_EXIT(this_,&note) {
STEEM_TRY_NOTIFY(this_->_post_reindex_signal, note);
} BOOST_SCOPE_EXIT_END

try
{
STEEM_TRY_NOTIFY(_on_reindex_start);
STEEM_TRY_NOTIFY(_pre_reindex_signal, note);

ilog( "Reindexing Blockchain" );
wipe( args.data_dir, args.shared_mem_dir, false );
Expand Down Expand Up @@ -215,10 +214,10 @@ uint32_t database::reindex( const open_args& args )
}

apply_block( itr.first, skip_flags );
last_block_number = itr.first.block_num();
note.last_block_number = itr.first.block_num();

if( (args.benchmark.first > 0) && (last_block_number % args.benchmark.first == 0) )
args.benchmark.second( last_block_number, get_abstract_index_cntr() );
if( (args.benchmark.first > 0) && (note.last_block_number % args.benchmark.first == 0) )
args.benchmark.second( note.last_block_number, get_abstract_index_cntr() );
set_revision( head_block_num() );
_block_log.set_locking( true );
});
Expand All @@ -229,9 +228,9 @@ uint32_t database::reindex( const open_args& args )
auto end = fc::time_point::now();
ilog( "Done reindexing, elapsed time: ${t} sec", ("t",double((end-start).count())/1000000.0 ) );

reindex_success = true;
note.reindex_success = true;

return last_block_number;
return note.last_block_number;
}
FC_CAPTURE_AND_RETHROW( (args.data_dir)(args.shared_mem_dir) )

Expand Down Expand Up @@ -769,9 +768,6 @@ void database::_push_transaction( const signed_transaction& trx )
notify_changed_objects();
// The transaction applied successfully. Merge its changes into the pending block session.
temp_session.squash();

// notify anyone listening to pending transactions
notify_on_pending_transaction( trx );
}

signed_block database::generate_block(
Expand Down Expand Up @@ -958,21 +954,6 @@ void database::clear_pending()
FC_CAPTURE_AND_RETHROW()
}

void database::notify_pre_apply_operation( operation_notification& note )
{
note.trx_id = _current_trx_id;
note.block = _current_block_num;
note.trx_in_block = _current_trx_in_block;
note.op_in_trx = _current_op_in_trx;

STEEM_TRY_NOTIFY( _pre_apply_operation, note )
}

void database::notify_post_apply_operation( const operation_notification& note )
{
STEEM_TRY_NOTIFY( _post_apply_operation, note )
}

inline const void database::push_virtual_operation( const operation& op, bool force )
{
/*
Expand All @@ -990,24 +971,39 @@ inline const void database::push_virtual_operation( const operation& op, bool fo
notify_post_apply_operation( note );
}

void database::notify_applied_block( const signed_block& block )
void database::notify_pre_apply_operation( operation_notification& note )
{
STEEM_TRY_NOTIFY( _applied_block, block )
note.trx_id = _current_trx_id;
note.block = _current_block_num;
note.trx_in_block = _current_trx_in_block;
note.op_in_trx = _current_op_in_trx;

STEEM_TRY_NOTIFY( _pre_apply_operation_signal, note )
}

void database::notify_on_pending_transaction( const signed_transaction& tx )
void database::notify_post_apply_operation( const operation_notification& note )
{
STEEM_TRY_NOTIFY( _on_pending_transaction, tx )
STEEM_TRY_NOTIFY( _post_apply_operation_signal, note )
}

void database::notify_on_pre_apply_transaction( const signed_transaction& tx )
void database::notify_pre_apply_block( const block_notification& note )
{
STEEM_TRY_NOTIFY( _on_pre_apply_transaction, tx )
STEEM_TRY_NOTIFY( _pre_apply_block_signal, note )
}

void database::notify_on_applied_transaction( const signed_transaction& tx )
void database::notify_post_apply_block( const block_notification& note )
{
STEEM_TRY_NOTIFY( _on_applied_transaction, tx )
STEEM_TRY_NOTIFY( _post_apply_block_signal, note )
}

void database::notify_pre_apply_transaction( const transaction_notification& note )
{
STEEM_TRY_NOTIFY( _pre_apply_transaction_signal, note )
}

void database::notify_post_apply_transaction( const transaction_notification& note )
{
STEEM_TRY_NOTIFY( _post_apply_transaction_signal, note )
}

account_name_type database::get_scheduled_witness( uint32_t slot_num )const
Expand Down Expand Up @@ -2772,8 +2768,17 @@ void database::show_free_memory( bool force, uint32_t current_block_num )

void database::_apply_block( const signed_block& next_block )
{ try {
uint32_t next_block_num = next_block.block_num();
//block_id_type next_block_id = next_block.id();
block_notification note( next_block );

notify_pre_apply_block( note );

const uint32_t next_block_num = note.block_num;

BOOST_SCOPE_EXIT( this_ )
{
this_->_currently_processing_block_id.reset();
} BOOST_SCOPE_EXIT_END
_currently_processing_block_id = note.block_id;

uint32_t skip = get_node_properties().skip_flags;

Expand Down Expand Up @@ -2912,7 +2917,7 @@ void database::_apply_block( const signed_block& next_block )
process_hardforks();

// notify observers that the block has been applied
notify_applied_block( next_block );
notify_post_apply_block( note );

notify_changed_objects();
} //FC_CAPTURE_AND_RETHROW( (next_block.block_num()) ) }
Expand Down Expand Up @@ -3048,20 +3053,20 @@ try {
void database::apply_transaction(const signed_transaction& trx, uint32_t skip)
{
detail::with_skip_flags( *this, skip, [&]() { _apply_transaction(trx); });
notify_on_applied_transaction( trx );
}

void database::_apply_transaction(const signed_transaction& trx)
{ try {
_current_trx_id = trx.id();
transaction_notification note(trx);
_current_trx_id = note.transaction_id;
const transaction_id_type& trx_id = note.transaction_id;
uint32_t skip = get_node_properties().skip_flags;

if( !(skip&skip_validate) ) /* issue #505 explains why this skip_flag is disabled */
trx.validate();

auto& trx_idx = get_index<transaction_index>();
const chain_id_type& chain_id = get_chain_id();
auto trx_id = trx.id();
// idump((trx_id)(skip&skip_transaction_dupe_check));
FC_ASSERT( (skip & skip_transaction_dupe_check) ||
trx_idx.indices().get<by_trx_id>().find(trx_id) == trx_idx.indices().get<by_trx_id>().end(),
Expand Down Expand Up @@ -3116,7 +3121,7 @@ void database::_apply_transaction(const signed_transaction& trx)
});
}

notify_on_pre_apply_transaction( trx );
notify_pre_apply_transaction( note );

//Finally process the operations
_current_op_in_trx = 0;
Expand All @@ -3128,6 +3133,8 @@ void database::_apply_transaction(const signed_transaction& trx)
}
_current_trx_id = transaction_id_type();

notify_post_apply_transaction( note );

} FC_CAPTURE_AND_RETHROW( (trx) ) }

void database::apply_operation(const operation& op)
Expand All @@ -3139,7 +3146,7 @@ void database::apply_operation(const operation& op)
_benchmark_dumper.begin();

_my->_evaluator_registry.get_evaluator( op ).apply( op );

if( _benchmark_dumper.is_enabled() )
_benchmark_dumper.end< true/*APPLY_CONTEXT*/ >( _my->_evaluator_registry.get_evaluator( op ).get_name( op ) );

Expand Down Expand Up @@ -3193,14 +3200,11 @@ boost::signals2::connection database::connect_impl( TSignal& signal, const TNoti
{
fcall<TNotification> fcall_wrapper(func,_benchmark_dumper,plugin,item_name);

if (group == -1)
return signal.connect(fcall_wrapper);
else
return signal.connect(group, fcall_wrapper);
return signal.connect(group, fcall_wrapper);
}

template< bool IS_PRE_OPERATION >
boost::signals2::connection database::any_apply_operation_proxy_impl( const operation_notification_t& func,
boost::signals2::connection database::any_apply_operation_handler_impl( const apply_operation_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
auto complex_func = [this, func, &plugin]( const operation_notification& o )
Expand All @@ -3224,67 +3228,57 @@ boost::signals2::connection database::any_apply_operation_proxy_impl( const oper
};

if( IS_PRE_OPERATION )
{
if (group == -1)
return _pre_apply_operation.connect(complex_func);
else
return _pre_apply_operation.connect(group, complex_func);
}
return _pre_apply_operation_signal.connect(group, complex_func);
else
{
if (group == -1)
return _post_apply_operation.connect(complex_func);
else
return _post_apply_operation.connect(group, complex_func);
}
return _post_apply_operation_signal.connect(group, complex_func);
}

boost::signals2::connection database::pre_apply_operation_proxy( const operation_notification_t& func,
boost::signals2::connection database::add_pre_apply_operation_handler( const apply_operation_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return any_apply_operation_proxy_impl< true/*IS_PRE_OPERATION*/ >( func, plugin, group );
return any_apply_operation_handler_impl< true/*IS_PRE_OPERATION*/ >( func, plugin, group );
}

boost::signals2::connection database::post_apply_operation_proxy( const operation_notification_t& func,
boost::signals2::connection database::add_post_apply_operation_handler( const apply_operation_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return any_apply_operation_proxy_impl< false/*IS_PRE_OPERATION*/ >( func, plugin, group );
return any_apply_operation_handler_impl< false/*IS_PRE_OPERATION*/ >( func, plugin, group );
}

boost::signals2::connection database::on_pending_transaction_proxy( const transaction_notification_t& func,
boost::signals2::connection database::add_pre_apply_transaction_handler( const apply_transaction_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_pending_transaction, func, plugin, group, "@transaction");
return connect_impl(_pre_apply_transaction_signal, func, plugin, group, "->transaction");
}

boost::signals2::connection database::on_pre_apply_transaction_proxy( const transaction_notification_t& func,
boost::signals2::connection database::add_post_apply_transaction_handler( const apply_transaction_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_pre_apply_transaction, func, plugin, group, "->transaction");
return connect_impl(_pre_apply_transaction_signal, func, plugin, group, "<-transaction");
}

boost::signals2::connection database::on_applied_transaction_proxy( const transaction_notification_t& func,
boost::signals2::connection database::add_pre_apply_block_handler( const apply_block_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_pre_apply_transaction, func, plugin, group, "<-transaction");
return connect_impl(_pre_apply_block_signal, func, plugin, group, "->block");
}

boost::signals2::connection database::applied_block_proxy( const block_notification_t& func,
boost::signals2::connection database::add_post_apply_block_handler( const apply_block_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_applied_block, func, plugin, group, "<-block");
return connect_impl(_post_apply_block_signal, func, plugin, group, "<-block");
}

boost::signals2::connection database::on_reindex_start_proxy(const on_reindex_start_notification_t& func,
boost::signals2::connection database::add_pre_reindex_handler(const reindex_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_reindex_start, func, plugin, group, "->reindex");
return connect_impl(_pre_reindex_signal, func, plugin, group, "->reindex");
}

boost::signals2::connection database::on_reindex_done_proxy(const on_reindex_done_notification_t& func,
boost::signals2::connection database::add_post_reindex_handler(const reindex_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_reindex_done, func, plugin, group, "<-reindex");
return connect_impl(_post_reindex_signal, func, plugin, group, "<-reindex");
}

const witness_object& database::validate_block_header( uint32_t skip, const signed_block& next_block )const
Expand Down Expand Up @@ -3362,7 +3356,9 @@ void database::update_global_dynamic_data( const signed_block& b )
}

dgp.head_block_number = b.block_num();
dgp.head_block_id = b.id();
// Following FC_ASSERT should never fail, as _currently_processing_block_id is always set by caller
FC_ASSERT( _currently_processing_block_id.valid() );
dgp.head_block_id = *_currently_processing_block_id;
dgp.time = b.timestamp;
dgp.current_aslot += missed_blocks+1;
} );
Expand Down
20 changes: 20 additions & 0 deletions libraries/chain/include/steem/chain/block_notification.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <steem/protocol/block.hpp>

namespace steem { namespace chain {

struct block_notification
{
block_notification( const steem::protocol::signed_block& b ) : block(b)
{
block_id = b.id();
block_num = block_header::num_from_id( block_id );
}

steem::protocol::block_id_type block_id;
uint32_t block_num = 0;
const steem::protocol::signed_block& block;
};

} }
Loading

0 comments on commit ab3bc5e

Please sign in to comment.