Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up signal handling #2325

Merged
merged 6 commits into from
Apr 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 74 additions & 78 deletions libraries/chain/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,15 @@ void database::open( const open_args& args )

uint32_t database::reindex( const open_args& args )
{
bool reindex_success = false;
uint32_t last_block_number = 0; // result
reindex_notification note;

BOOST_SCOPE_EXIT(this_,&reindex_success,&last_block_number) {
STEEM_TRY_NOTIFY(this_->_on_reindex_done, reindex_success, last_block_number);
BOOST_SCOPE_EXIT(this_,&note) {
STEEM_TRY_NOTIFY(this_->_post_reindex_signal, note);
} BOOST_SCOPE_EXIT_END

try
{
STEEM_TRY_NOTIFY(_on_reindex_start);
STEEM_TRY_NOTIFY(_pre_reindex_signal, note);

ilog( "Reindexing Blockchain" );
wipe( args.data_dir, args.shared_mem_dir, false );
Expand Down Expand Up @@ -215,10 +214,10 @@ uint32_t database::reindex( const open_args& args )
}

apply_block( itr.first, skip_flags );
last_block_number = itr.first.block_num();
note.last_block_number = itr.first.block_num();

if( (args.benchmark.first > 0) && (last_block_number % args.benchmark.first == 0) )
args.benchmark.second( last_block_number, get_abstract_index_cntr() );
if( (args.benchmark.first > 0) && (note.last_block_number % args.benchmark.first == 0) )
args.benchmark.second( note.last_block_number, get_abstract_index_cntr() );
set_revision( head_block_num() );
_block_log.set_locking( true );
});
Expand All @@ -229,9 +228,9 @@ uint32_t database::reindex( const open_args& args )
auto end = fc::time_point::now();
ilog( "Done reindexing, elapsed time: ${t} sec", ("t",double((end-start).count())/1000000.0 ) );

reindex_success = true;
note.reindex_success = true;

return last_block_number;
return note.last_block_number;
}
FC_CAPTURE_AND_RETHROW( (args.data_dir)(args.shared_mem_dir) )

Expand Down Expand Up @@ -769,9 +768,6 @@ void database::_push_transaction( const signed_transaction& trx )
notify_changed_objects();
// The transaction applied successfully. Merge its changes into the pending block session.
temp_session.squash();

// notify anyone listening to pending transactions
notify_on_pending_transaction( trx );
}

signed_block database::generate_block(
Expand Down Expand Up @@ -958,21 +954,6 @@ void database::clear_pending()
FC_CAPTURE_AND_RETHROW()
}

void database::notify_pre_apply_operation( operation_notification& note )
{
note.trx_id = _current_trx_id;
note.block = _current_block_num;
note.trx_in_block = _current_trx_in_block;
note.op_in_trx = _current_op_in_trx;

STEEM_TRY_NOTIFY( _pre_apply_operation, note )
}

void database::notify_post_apply_operation( const operation_notification& note )
{
STEEM_TRY_NOTIFY( _post_apply_operation, note )
}

inline const void database::push_virtual_operation( const operation& op, bool force )
{
/*
Expand All @@ -990,24 +971,39 @@ inline const void database::push_virtual_operation( const operation& op, bool fo
notify_post_apply_operation( note );
}

void database::notify_applied_block( const signed_block& block )
void database::notify_pre_apply_operation( operation_notification& note )
{
STEEM_TRY_NOTIFY( _applied_block, block )
note.trx_id = _current_trx_id;
note.block = _current_block_num;
note.trx_in_block = _current_trx_in_block;
note.op_in_trx = _current_op_in_trx;

STEEM_TRY_NOTIFY( _pre_apply_operation_signal, note )
}

void database::notify_on_pending_transaction( const signed_transaction& tx )
void database::notify_post_apply_operation( const operation_notification& note )
{
STEEM_TRY_NOTIFY( _on_pending_transaction, tx )
STEEM_TRY_NOTIFY( _post_apply_operation_signal, note )
}

void database::notify_on_pre_apply_transaction( const signed_transaction& tx )
void database::notify_pre_apply_block( const block_notification& note )
{
STEEM_TRY_NOTIFY( _on_pre_apply_transaction, tx )
STEEM_TRY_NOTIFY( _pre_apply_block_signal, note )
}

void database::notify_on_applied_transaction( const signed_transaction& tx )
void database::notify_post_apply_block( const block_notification& note )
{
STEEM_TRY_NOTIFY( _on_applied_transaction, tx )
STEEM_TRY_NOTIFY( _post_apply_block_signal, note )
}

void database::notify_pre_apply_transaction( const transaction_notification& note )
{
STEEM_TRY_NOTIFY( _pre_apply_transaction_signal, note )
}

void database::notify_post_apply_transaction( const transaction_notification& note )
{
STEEM_TRY_NOTIFY( _post_apply_transaction_signal, note )
}

account_name_type database::get_scheduled_witness( uint32_t slot_num )const
Expand Down Expand Up @@ -2772,8 +2768,17 @@ void database::show_free_memory( bool force, uint32_t current_block_num )

void database::_apply_block( const signed_block& next_block )
{ try {
uint32_t next_block_num = next_block.block_num();
//block_id_type next_block_id = next_block.id();
block_notification note( next_block );

notify_pre_apply_block( note );

const uint32_t next_block_num = note.block_num;

BOOST_SCOPE_EXIT( this_ )
{
this_->_currently_processing_block_id.reset();
} BOOST_SCOPE_EXIT_END
_currently_processing_block_id = note.block_id;

uint32_t skip = get_node_properties().skip_flags;

Expand Down Expand Up @@ -2912,7 +2917,7 @@ void database::_apply_block( const signed_block& next_block )
process_hardforks();

// notify observers that the block has been applied
notify_applied_block( next_block );
notify_post_apply_block( note );

notify_changed_objects();
} //FC_CAPTURE_AND_RETHROW( (next_block.block_num()) ) }
Expand Down Expand Up @@ -3048,20 +3053,20 @@ try {
void database::apply_transaction(const signed_transaction& trx, uint32_t skip)
{
detail::with_skip_flags( *this, skip, [&]() { _apply_transaction(trx); });
notify_on_applied_transaction( trx );
}

void database::_apply_transaction(const signed_transaction& trx)
{ try {
_current_trx_id = trx.id();
transaction_notification note(trx);
_current_trx_id = note.transaction_id;
const transaction_id_type& trx_id = note.transaction_id;
uint32_t skip = get_node_properties().skip_flags;

if( !(skip&skip_validate) ) /* issue #505 explains why this skip_flag is disabled */
trx.validate();

auto& trx_idx = get_index<transaction_index>();
const chain_id_type& chain_id = get_chain_id();
auto trx_id = trx.id();
// idump((trx_id)(skip&skip_transaction_dupe_check));
FC_ASSERT( (skip & skip_transaction_dupe_check) ||
trx_idx.indices().get<by_trx_id>().find(trx_id) == trx_idx.indices().get<by_trx_id>().end(),
Expand Down Expand Up @@ -3116,7 +3121,7 @@ void database::_apply_transaction(const signed_transaction& trx)
});
}

notify_on_pre_apply_transaction( trx );
notify_pre_apply_transaction( note );

//Finally process the operations
_current_op_in_trx = 0;
Expand All @@ -3128,6 +3133,8 @@ void database::_apply_transaction(const signed_transaction& trx)
}
_current_trx_id = transaction_id_type();

notify_post_apply_transaction( note );

} FC_CAPTURE_AND_RETHROW( (trx) ) }

void database::apply_operation(const operation& op)
Expand All @@ -3139,7 +3146,7 @@ void database::apply_operation(const operation& op)
_benchmark_dumper.begin();

_my->_evaluator_registry.get_evaluator( op ).apply( op );

if( _benchmark_dumper.is_enabled() )
_benchmark_dumper.end< true/*APPLY_CONTEXT*/ >( _my->_evaluator_registry.get_evaluator( op ).get_name( op ) );

Expand Down Expand Up @@ -3193,14 +3200,11 @@ boost::signals2::connection database::connect_impl( TSignal& signal, const TNoti
{
fcall<TNotification> fcall_wrapper(func,_benchmark_dumper,plugin,item_name);

if (group == -1)
return signal.connect(fcall_wrapper);
else
return signal.connect(group, fcall_wrapper);
return signal.connect(group, fcall_wrapper);
}

template< bool IS_PRE_OPERATION >
boost::signals2::connection database::any_apply_operation_proxy_impl( const operation_notification_t& func,
boost::signals2::connection database::any_apply_operation_handler_impl( const apply_operation_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
auto complex_func = [this, func, &plugin]( const operation_notification& o )
Expand All @@ -3224,67 +3228,57 @@ boost::signals2::connection database::any_apply_operation_proxy_impl( const oper
};

if( IS_PRE_OPERATION )
{
if (group == -1)
return _pre_apply_operation.connect(complex_func);
else
return _pre_apply_operation.connect(group, complex_func);
}
return _pre_apply_operation_signal.connect(group, complex_func);
else
{
if (group == -1)
return _post_apply_operation.connect(complex_func);
else
return _post_apply_operation.connect(group, complex_func);
}
return _post_apply_operation_signal.connect(group, complex_func);
}

boost::signals2::connection database::pre_apply_operation_proxy( const operation_notification_t& func,
boost::signals2::connection database::add_pre_apply_operation_handler( const apply_operation_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return any_apply_operation_proxy_impl< true/*IS_PRE_OPERATION*/ >( func, plugin, group );
return any_apply_operation_handler_impl< true/*IS_PRE_OPERATION*/ >( func, plugin, group );
}

boost::signals2::connection database::post_apply_operation_proxy( const operation_notification_t& func,
boost::signals2::connection database::add_post_apply_operation_handler( const apply_operation_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return any_apply_operation_proxy_impl< false/*IS_PRE_OPERATION*/ >( func, plugin, group );
return any_apply_operation_handler_impl< false/*IS_PRE_OPERATION*/ >( func, plugin, group );
}

boost::signals2::connection database::on_pending_transaction_proxy( const transaction_notification_t& func,
boost::signals2::connection database::add_pre_apply_transaction_handler( const apply_transaction_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_pending_transaction, func, plugin, group, "@transaction");
return connect_impl(_pre_apply_transaction_signal, func, plugin, group, "->transaction");
}

boost::signals2::connection database::on_pre_apply_transaction_proxy( const transaction_notification_t& func,
boost::signals2::connection database::add_post_apply_transaction_handler( const apply_transaction_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_pre_apply_transaction, func, plugin, group, "->transaction");
return connect_impl(_pre_apply_transaction_signal, func, plugin, group, "<-transaction");
}

boost::signals2::connection database::on_applied_transaction_proxy( const transaction_notification_t& func,
boost::signals2::connection database::add_pre_apply_block_handler( const apply_block_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_pre_apply_transaction, func, plugin, group, "<-transaction");
return connect_impl(_pre_apply_block_signal, func, plugin, group, "->block");
}

boost::signals2::connection database::applied_block_proxy( const block_notification_t& func,
boost::signals2::connection database::add_post_apply_block_handler( const apply_block_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_applied_block, func, plugin, group, "<-block");
return connect_impl(_post_apply_block_signal, func, plugin, group, "<-block");
}

boost::signals2::connection database::on_reindex_start_proxy(const on_reindex_start_notification_t& func,
boost::signals2::connection database::add_pre_reindex_handler(const reindex_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_reindex_start, func, plugin, group, "->reindex");
return connect_impl(_pre_reindex_signal, func, plugin, group, "->reindex");
}

boost::signals2::connection database::on_reindex_done_proxy(const on_reindex_done_notification_t& func,
boost::signals2::connection database::add_post_reindex_handler(const reindex_handler_t& func,
const abstract_plugin& plugin, int32_t group )
{
return connect_impl(_on_reindex_done, func, plugin, group, "<-reindex");
return connect_impl(_post_reindex_signal, func, plugin, group, "<-reindex");
}

const witness_object& database::validate_block_header( uint32_t skip, const signed_block& next_block )const
Expand Down Expand Up @@ -3362,7 +3356,9 @@ void database::update_global_dynamic_data( const signed_block& b )
}

dgp.head_block_number = b.block_num();
dgp.head_block_id = b.id();
// Following FC_ASSERT should never fail, as _currently_processing_block_id is always set by caller
FC_ASSERT( _currently_processing_block_id.valid() );
dgp.head_block_id = *_currently_processing_block_id;
dgp.time = b.timestamp;
dgp.current_aslot += missed_blocks+1;
} );
Expand Down
20 changes: 20 additions & 0 deletions libraries/chain/include/steem/chain/block_notification.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <steem/protocol/block.hpp>

namespace steem { namespace chain {

struct block_notification
{
block_notification( const steem::protocol::signed_block& b ) : block(b)
{
block_id = b.id();
block_num = block_header::num_from_id( block_id );
}

steem::protocol::block_id_type block_id;
uint32_t block_num = 0;
const steem::protocol::signed_block& block;
};

} }
Loading