Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move broadcast_transaction_synchronous to condenser_api #2317

Merged
merged 5 commits into from
May 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 130 additions & 13 deletions libraries/plugins/apis/condenser_api/condenser_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,31 @@
#include <boost/range/iterator_range.hpp>
#include <boost/algorithm/string.hpp>

#include <boost/thread/future.hpp>
#include <boost/thread/lock_guard.hpp>

#define CHECK_ARG_SIZE( s ) \
FC_ASSERT( args.size() == s, "Expected #s argument(s), was ${n}", ("n", args.size()) );

namespace steem { namespace plugins { namespace condenser_api {

namespace detail
{
typedef std::function< void( const broadcast_transaction_synchronous_return& ) > confirmation_callback;

class condenser_api_impl
{
public:
condenser_api_impl() : _db( appbase::app().get_plugin< chain::chain_plugin >().db() ) {}
condenser_api_impl() :
_p2p( appbase::app().get_plugin< steem::plugins::p2p::p2p_plugin >() ),
_chain( appbase::app().get_plugin< steem::plugins::chain::chain_plugin >() ),
_db( _chain.db() )
{
_on_post_apply_block_conn = _db.add_post_apply_block_handler(
[&]( const block_notification& note ){ on_post_apply_block( note.block ); },
appbase::app().get_plugin< steem::plugins::condenser_api::condenser_api_plugin >(),
0 );
}

DECLARE_API_IMPL(
(get_version)
Expand Down Expand Up @@ -125,17 +139,28 @@ namespace detail

void set_pending_payout( discussion& d );

chain::database& _db;

std::shared_ptr< database_api::database_api > _database_api;
std::shared_ptr< block_api::block_api > _block_api;
std::shared_ptr< account_history::account_history_api > _account_history_api;
std::shared_ptr< account_by_key::account_by_key_api > _account_by_key_api;
std::shared_ptr< network_broadcast_api::network_broadcast_api > _network_broadcast_api;
std::shared_ptr< tags::tags_api > _tags_api;
std::shared_ptr< follow::follow_api > _follow_api;
std::shared_ptr< market_history::market_history_api > _market_history_api;
std::shared_ptr< witness::witness_api > _witness_api;
void on_post_apply_block( const signed_block& b );

steem::plugins::p2p::p2p_plugin& _p2p;
steem::plugins::chain::chain_plugin& _chain;

chain::database& _db;

std::shared_ptr< database_api::database_api > _database_api;
std::shared_ptr< block_api::block_api > _block_api;
std::shared_ptr< account_history::account_history_api > _account_history_api;
std::shared_ptr< account_by_key::account_by_key_api > _account_by_key_api;
std::shared_ptr< network_broadcast_api::network_broadcast_api > _network_broadcast_api;
std::shared_ptr< tags::tags_api > _tags_api;
std::shared_ptr< follow::follow_api > _follow_api;
std::shared_ptr< market_history::market_history_api > _market_history_api;
std::shared_ptr< witness::witness_api > _witness_api;

map< transaction_id_type, confirmation_callback > _callbacks;
map< time_point_sec, vector< transaction_id_type > > _callback_expirations;
boost::signals2::connection _on_post_apply_block_conn;

boost::mutex _mtx;
};

DEFINE_API_IMPL( condenser_api_impl, get_version )
Expand Down Expand Up @@ -1604,7 +1629,57 @@ namespace detail
CHECK_ARG_SIZE( 1 )
FC_ASSERT( _network_broadcast_api, "network_broadcast_api_plugin not enabled." );

return _network_broadcast_api->broadcast_transaction_synchronous( { signed_transaction( args[0].as< legacy_signed_transaction >() ) } );
signed_transaction trx = args[0].as< legacy_signed_transaction >();
auto txid = trx.id();
boost::promise< broadcast_transaction_synchronous_return > p;

{
boost::lock_guard< boost::mutex > guard( _mtx );
_callbacks[ txid ] = [&p]( const broadcast_transaction_synchronous_return& r )
{
p.set_value( r );
};
_callback_expirations[ trx.expiration ].push_back( txid );
}

try
{
/* It may look strange to call these without the lock and then lock again in the case of an exception,
* but it is correct and avoids deadlock. accept_transaction is trained along with all other writes, including
* pushing blocks. Pushing blocks do not originate from this code path and will never have this lock.
* However, it will trigger the on_post_apply_block callback and then attempt to acquire the lock. In this case,
* this thread will be waiting on accept_block so it can write and the block thread will be waiting on this
* thread for the lock.
*/
_chain.accept_transaction( trx );
_p2p.broadcast_transaction( trx );
}
catch( fc::exception& e )
{
boost::lock_guard< boost::mutex > guard( _mtx );

// The callback may have been cleared in the meantine, so we need to check for existence.
auto c_itr = _callbacks.find( txid );
if( c_itr != _callbacks.end() ) _callbacks.erase( c_itr );

// We do not need to clean up _callback_expirations because on_post_apply_block handles this case.

throw e;
}
catch( ... )
{
boost::lock_guard< boost::mutex > guard( _mtx );

// The callback may have been cleared in the meantine, so we need to check for existence.
auto c_itr = _callbacks.find( txid );
if( c_itr != _callbacks.end() ) _callbacks.erase( c_itr );

throw fc::unhandled_exception(
FC_LOG_MESSAGE( warn, "Unknown error occured when pushing transaction" ),
std::current_exception() );
}

return p.get_future().get();
}

DEFINE_API_IMPL( condenser_api_impl, broadcast_block )
Expand Down Expand Up @@ -1865,6 +1940,48 @@ namespace detail
d.url += "#@" + d.author + "/" + d.permlink;
}

void condenser_api_impl::on_post_apply_block( const signed_block& b )
{ try {
boost::lock_guard< boost::mutex > guard( _mtx );
int32_t block_num = int32_t(b.block_num());
if( _callbacks.size() )
{
for( size_t trx_num = 0; trx_num < b.transactions.size(); ++trx_num )
{
const auto& trx = b.transactions[trx_num];
auto id = trx.id();
auto itr = _callbacks.find( id );
if( itr == _callbacks.end() ) continue;
itr->second( broadcast_transaction_synchronous_return( id, block_num, int32_t( trx_num ), false ) );
_callbacks.erase( itr );
}
}

/// clear all expirations
while( true )
{
auto exp_it = _callback_expirations.begin();
if( exp_it == _callback_expirations.end() )
break;
if( exp_it->first >= b.timestamp )
break;
for( const transaction_id_type& txid : exp_it->second )
{
auto cb_it = _callbacks.find( txid );
// If it's empty, that means the transaction has been confirmed and has been deleted by the above check.
if( cb_it == _callbacks.end() )
continue;

confirmation_callback callback = cb_it->second;
transaction_id_type txid_byval = txid; // can't pass in by reference as it's going to be deleted
callback( broadcast_transaction_synchronous_return( txid_byval, block_num, -1, true ) );

_callbacks.erase( cb_it );
}
_callback_expirations.erase( exp_it );
}
} FC_LOG_AND_RETHROW() }

} // detail

condenser_api::condenser_api()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,20 @@ struct get_version_return

typedef map< uint32_t, api_operation_object > get_account_history_return_type;

typedef vector< variant > broadcast_transaction_synchronous_args;

struct broadcast_transaction_synchronous_return
{
broadcast_transaction_synchronous_return() {}
broadcast_transaction_synchronous_return( transaction_id_type txid, int32_t bn, int32_t tn, bool ex )
: id(txid), block_num(bn), trx_num(tn), expired(ex) {}

transaction_id_type id;
int32_t block_num = 0;
int32_t trx_num = 0;
bool expired = false;
};

struct ticker
{
ticker() {}
Expand Down Expand Up @@ -912,7 +926,6 @@ DEFINE_API_ARGS( get_replies_by_last_update, vector< variant >, ve
DEFINE_API_ARGS( get_discussions_by_author_before_date, vector< variant >, vector< discussion > )
DEFINE_API_ARGS( get_account_history, vector< variant >, get_account_history_return_type )
DEFINE_API_ARGS( broadcast_transaction, vector< variant >, json_rpc::void_type )
DEFINE_API_ARGS( broadcast_transaction_synchronous, vector< variant >, network_broadcast_api::broadcast_transaction_synchronous_return )
DEFINE_API_ARGS( broadcast_block, vector< variant >, json_rpc::void_type )
DEFINE_API_ARGS( get_followers, vector< variant >, vector< follow::api_follow_object > )
DEFINE_API_ARGS( get_following, vector< variant >, vector< follow::api_follow_object > )
Expand Down Expand Up @@ -1187,6 +1200,9 @@ FC_REFLECT_ENUM( steem::plugins::condenser_api::withdraw_route_type, (incoming)(
FC_REFLECT( steem::plugins::condenser_api::get_version_return,
(blockchain_version)(steem_revision)(fc_revision) )

FC_REFLECT( steem::plugins::condenser_api::broadcast_transaction_synchronous_return,
(id)(block_num)(trx_num)(expired) )

FC_REFLECT( steem::plugins::condenser_api::ticker,
(latest)(lowest_ask)(highest_bid)(percent_change)(steem_volume)(sbd_volume) )

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@ typedef vector< legacy_block_header_extensions > legacy_block_header_extensions_
struct legacy_signed_transaction
{
legacy_signed_transaction() {}

legacy_signed_transaction( const signed_transaction& t ) :
ref_block_num( t.ref_block_num ),
ref_block_prefix( t.ref_block_prefix ),
expiration( t.expiration )
{
for( const auto& o : t.operations )
{
legacy_operation op;
o.visit( legacy_operation_conversion_visitor( op ) );
operations.push_back( op );
}

// Signed transaction extensions field exists, but must be empty
// Don't worry about copying them.

signatures.insert( signatures.end(), t.signatures.begin(), t.signatures.end() );
}

legacy_signed_transaction( const annotated_signed_transaction& t ) :
ref_block_num( t.ref_block_num ),
ref_block_prefix( t.ref_block_prefix ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,13 @@ struct broadcast_transaction_args

typedef void_type broadcast_transaction_return;

typedef broadcast_transaction_args broadcast_transaction_synchronous_args;

struct broadcast_transaction_synchronous_return
{
broadcast_transaction_synchronous_return() {}
broadcast_transaction_synchronous_return( transaction_id_type txid, int32_t bn, int32_t tn, bool ex )
: id(txid), block_num(bn), trx_num(tn), expired(ex) {}

transaction_id_type id;
int32_t block_num = 0;
int32_t trx_num = 0;
bool expired = false;
};

struct broadcast_block_args
{
signed_block block;
};

typedef void_type broadcast_block_return;

typedef std::function< void( const broadcast_transaction_synchronous_return& ) > confirmation_callback;

namespace detail{ class network_broadcast_api_impl; }

class network_broadcast_api
Expand All @@ -63,7 +47,6 @@ class network_broadcast_api

DECLARE_API(
(broadcast_transaction)
(broadcast_transaction_synchronous)
(broadcast_block)
)

Expand All @@ -78,6 +61,3 @@ FC_REFLECT( steem::plugins::network_broadcast_api::broadcast_transaction_args,

FC_REFLECT( steem::plugins::network_broadcast_api::broadcast_block_args,
(block) )

FC_REFLECT( steem::plugins::network_broadcast_api::broadcast_transaction_synchronous_return,
(id)(block_num)(trx_num)(expired) )
Loading