From 202c44d5b379996b256ff5724d817f14fb791dce Mon Sep 17 00:00:00 2001 From: Mattias Johansson Date: Fri, 24 Jan 2020 09:24:00 +0100 Subject: [PATCH] New additional asynchronous mechanism --- autobahn/wamp_async.hpp | 105 +++++ autobahn/wamp_async.ipp | 154 +++++++ autobahn/wamp_call.hpp | 12 +- autobahn/wamp_call.ipp | 8 +- autobahn/wamp_rawsocket_transport.hpp | 32 +- autobahn/wamp_rawsocket_transport.ipp | 63 ++- autobahn/wamp_register_request.hpp | 15 +- autobahn/wamp_register_request.ipp | 17 +- autobahn/wamp_session.hpp | 392 +++++++++++++++-- autobahn/wamp_session.ipp | 416 +++++++++++++++--- autobahn/wamp_subscribe_request.hpp | 15 +- autobahn/wamp_subscribe_request.ipp | 17 +- autobahn/wamp_tcp_transport.hpp | 3 + autobahn/wamp_tcp_transport.ipp | 15 + autobahn/wamp_transport.hpp | 37 +- autobahn/wamp_unregister_request.hpp | 11 +- autobahn/wamp_unregister_request.ipp | 10 +- autobahn/wamp_unsubscribe_request.hpp | 11 +- autobahn/wamp_unsubscribe_request.ipp | 10 +- autobahn/wamp_websocket_transport.hpp | 38 +- autobahn/wamp_websocket_transport.ipp | 41 +- .../wamp_websocketpp_websocket_transport.hpp | 2 +- .../wamp_websocketpp_websocket_transport.ipp | 4 +- examples/callee.cpp | 86 ++-- examples/caller.cpp | 34 +- examples/publisher.cpp | 67 +-- 26 files changed, 1355 insertions(+), 260 deletions(-) create mode 100644 autobahn/wamp_async.hpp create mode 100644 autobahn/wamp_async.ipp diff --git a/autobahn/wamp_async.hpp b/autobahn/wamp_async.hpp new file mode 100644 index 00000000..162e9d23 --- /dev/null +++ b/autobahn/wamp_async.hpp @@ -0,0 +1,105 @@ +/////////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) Crossbar.io Technologies GmbH and contributors +// +// Boost Software License - Version 1.0 - August 17th, 2003 +// +// Permission is hereby granted, free of charge, to any person or organization +// obtaining a copy of the software and accompanying documentation covered by +// this license (the "Software") to use, reproduce, display, distribute, +// execute, and transmit the Software, and to prepare derivative works of the +// Software, and to permit third-parties to whom the Software is furnished to +// do so, all subject to the following: +// +// The copyright notices in the Software and this entire statement, including +// the above license grant, this restriction and the following disclaimer, +// must be included in all copies of the Software, in whole or in part, and +// all derivative works of the Software, unless such copies or derivative +// works are solely in the form of machine-executable object code generated by +// a source language processor. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +/////////////////////////////////////////////////////////////////////////////// + +#ifndef AUTOBAHN_WAMP_ASYNC_HPP +#define AUTOBAHN_WAMP_ASYNC_HPP + +#include "boost_config.hpp" + +#include +#include +#include + +namespace autobahn { + +/// Async mechanism, either a promise or handlers. +template +class wamp_async +{ +public: + using on_success_handler = std::function; + using on_exception_handler = std::function; + + /// Use promise + wamp_async(); + + /// Use handlers + wamp_async(on_success_handler && on_sucess, + on_exception_handler && on_exception); + + void set_value(const T &); + void set_value(T &&); + void set_exception(const boost::exception_ptr &); + + bool is_promise() const; + boost::promise& promise(); + on_success_handler& on_success(); + on_exception_handler& on_exception(); + +private: + using pair_type = std::pair< on_success_handler, on_exception_handler >; + + boost::variant< boost::promise, pair_type > m_async; +}; + +/// Async mechanism, either a promise or handlers, void specialization. +template <> +class wamp_async +{ +public: + using on_success_handler = std::function; + using on_exception_handler = std::function; + + /// Use promise + wamp_async(); + + /// Use handlers + wamp_async(on_success_handler && on_sucess, + on_exception_handler && on_exception); + + void set_value(); + void set_exception(const boost::exception_ptr &); + + bool is_promise() const; + boost::promise& promise(); + on_success_handler& on_success(); + on_exception_handler& on_exception(); + +private: + using pair_type = std::pair< on_success_handler, on_exception_handler >; + + boost::variant< boost::promise, pair_type > m_async; +}; + +} // namespace autobahn + +#include "wamp_async.ipp" + +#endif // AUTOBAHN_WAMP_ASYNC_HPP \ No newline at end of file diff --git a/autobahn/wamp_async.ipp b/autobahn/wamp_async.ipp new file mode 100644 index 00000000..51b5866d --- /dev/null +++ b/autobahn/wamp_async.ipp @@ -0,0 +1,154 @@ +/////////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) Crossbar.io Technologies GmbH and contributors +// +// Boost Software License - Version 1.0 - August 17th, 2003 +// +// Permission is hereby granted, free of charge, to any person or organization +// obtaining a copy of the software and accompanying documentation covered by +// this license (the "Software") to use, reproduce, display, distribute, +// execute, and transmit the Software, and to prepare derivative works of the +// Software, and to permit third-parties to whom the Software is furnished to +// do so, all subject to the following: +// +// The copyright notices in the Software and this entire statement, including +// the above license grant, this restriction and the following disclaimer, +// must be included in all copies of the Software, in whole or in part, and +// all derivative works of the Software, unless such copies or derivative +// works are solely in the form of machine-executable object code generated by +// a source language processor. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +/////////////////////////////////////////////////////////////////////////////// + +namespace autobahn { + +template +inline wamp_async::wamp_async() + : m_async(boost::promise()) +{ +} + +template +inline wamp_async::wamp_async(on_success_handler && on_success, + on_exception_handler && on_exception) + : m_async(pair_type({std::move(on_success), std::move(on_exception)})) +{ +} + +template +inline void wamp_async::set_value(const T & t) +{ + if (is_promise()) { + promise().set_value(t); + } + else if (on_success()) { + on_success()(t); + } +} + +template +inline void wamp_async::set_value(T && t) +{ + if (is_promise()) { + promise().set_value(std::forward(t)); + } + else if (on_success()) { + on_success()(std::forward(t)); + } +} + +template +inline void wamp_async::set_exception(const boost::exception_ptr & eptr) +{ + if (is_promise()) { + promise().set_exception(eptr); + } + else if (on_exception()) { + on_exception()(eptr); + } +} + +template +inline bool wamp_async::is_promise() const +{ + return m_async.which() == 0; +} + +template +inline boost::promise& wamp_async::promise() +{ + return boost::get>(m_async); +} + +template +inline typename wamp_async::on_success_handler& wamp_async::on_success() +{ + return boost::get(m_async).first; +} + +template +inline typename wamp_async::on_exception_handler& wamp_async::on_exception() +{ + return boost::get(m_async).second; +} + +inline wamp_async::wamp_async() + : m_async(boost::promise()) +{ +} + +inline wamp_async::wamp_async(on_success_handler && on_success, + on_exception_handler && on_exception) + : m_async(pair_type({std::move(on_success), std::move(on_exception)})) +{ +} + +inline void wamp_async::set_value() +{ + if (is_promise()) { + promise().set_value(); + } + else if (on_success()) { + on_success()(); + } +} + +inline void wamp_async::set_exception(const boost::exception_ptr & eptr) +{ + if (is_promise()) { + promise().set_exception(eptr); + } + else if (on_exception()) { + on_exception()(eptr); + } +} + +inline bool wamp_async::is_promise() const +{ + return m_async.which() == 0; +} + +inline boost::promise& wamp_async::promise() +{ + return boost::get>(m_async); +} + +inline wamp_async::on_success_handler& wamp_async::on_success() +{ + return boost::get(m_async).first; +} + +inline wamp_async::on_exception_handler& wamp_async::on_exception() +{ + return boost::get(m_async).second; +} + +} // namespace autobahn diff --git a/autobahn/wamp_call.hpp b/autobahn/wamp_call.hpp index 1aa613c0..1f6e558e 100644 --- a/autobahn/wamp_call.hpp +++ b/autobahn/wamp_call.hpp @@ -31,8 +31,9 @@ #ifndef AUTOBAHN_WAMP_CALL_HPP #define AUTOBAHN_WAMP_CALL_HPP -#include "wamp_call_result.hpp" #include "boost_config.hpp" +#include "wamp_async.hpp" +#include "wamp_call_result.hpp" namespace autobahn { @@ -40,13 +41,18 @@ namespace autobahn { class wamp_call { public: + using on_success_handler = wamp_async::on_success_handler; + using on_exception_handler = wamp_async::on_exception_handler; + wamp_call(); + wamp_call(on_success_handler&& on_success, + on_exception_handler&& on_exception); - boost::promise& result(); + wamp_async& result(); void set_result(wamp_call_result&& value); private: - boost::promise m_result; + wamp_async m_result; }; } // namespace autobahn diff --git a/autobahn/wamp_call.ipp b/autobahn/wamp_call.ipp index 2bd680df..cb67149c 100644 --- a/autobahn/wamp_call.ipp +++ b/autobahn/wamp_call.ipp @@ -35,7 +35,13 @@ inline wamp_call::wamp_call() { } -inline boost::promise& wamp_call::result() +inline wamp_call::wamp_call(on_success_handler&& on_success, + on_exception_handler&& on_exception) + : m_result(std::move(on_success), std::move(on_exception)) +{ +} + +inline wamp_async& wamp_call::result() { return m_result; } diff --git a/autobahn/wamp_rawsocket_transport.hpp b/autobahn/wamp_rawsocket_transport.hpp index d5e93e19..95a0fae3 100644 --- a/autobahn/wamp_rawsocket_transport.hpp +++ b/autobahn/wamp_rawsocket_transport.hpp @@ -89,11 +89,25 @@ class wamp_rawsocket_transport : */ virtual boost::future connect() override; + /*! + * @copydoc wamp_transport::connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) + */ + virtual void connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) override; + /*! * @copydoc wamp_transport::disconnect() */ virtual boost::future disconnect() override; + /*! + * @copydoc wamp_transport::disconnect(on_success_handler&& on_success, + on_exception_handler&& on_exception) + */ + virtual void disconnect(on_success_handler&& on_success, + on_exception_handler&& on_exception) override; + /*! * @copydoc wamp_transport::is_connected() */ @@ -149,11 +163,21 @@ class wamp_rawsocket_transport : */ virtual bool has_handler() const override; +protected: + + wamp_async& connect_async() const; + + wamp_async& disconnect_async() const; + protected: socket_type& socket(); private: + void do_connect(); + + void do_disconnect(); + void handshake_reply_handler( const boost::system::error_code& error_code, std::size_t /* bytes_transferred */); @@ -180,14 +204,14 @@ class wamp_rawsocket_transport : endpoint_type m_remote_endpoint; /*! - * The promise that is fulfilled when the connect attempt is complete. + * The async operation that is fulfilled when the connect attempt is complete. */ - boost::promise m_connect; + wamp_async m_connect; /*! - * The promise that is fulfilled when the disconnect attempt is complete. + * The async operation that is fulfilled when the disconnect attempt is complete. */ - boost::promise m_disconnect; + wamp_async m_disconnect; /*! * The handler to be called when pausing. diff --git a/autobahn/wamp_rawsocket_transport.ipp b/autobahn/wamp_rawsocket_transport.ipp index 34ffbfce..beff54a2 100644 --- a/autobahn/wamp_rawsocket_transport.ipp +++ b/autobahn/wamp_rawsocket_transport.ipp @@ -59,13 +59,23 @@ wamp_rawsocket_transport::wamp_rawsocket_transport( } template -boost::future wamp_rawsocket_transport::connect() +wamp_async& wamp_rawsocket_transport::connect_async() const { - m_connect = boost::promise(); // reset the promise + return m_connect; +} +template +wamp_async& wamp_rawsocket_transport::disconnect_async() const +{ + return m_disconnect; +} + +template +void wamp_rawsocket_transport::do_connect() +{ if (m_socket.is_open()) { m_connect.set_exception(boost::copy_exception(network_error("network transport already connected"))); - return m_connect.get_future(); + return; } std::weak_ptr> weak_self = this->shared_from_this(); @@ -77,8 +87,9 @@ boost::future wamp_rawsocket_transport::connect() if (error_code) { m_socket.close(); // async_connect will leave it open - m_connect.set_exception(boost::copy_exception( - std::system_error(error_code.value(), std::system_category(), "connect"))); + auto eptr = boost::copy_exception( + std::system_error(error_code.value(), std::system_category(), "connect")); + m_connect.set_exception(eptr); return; } @@ -113,26 +124,58 @@ boost::future wamp_rawsocket_transport::connect() boost::asio::buffer(m_handshake_buffer, sizeof(m_handshake_buffer)), handshake_reply); } catch (const std::exception& e) { - m_connect.set_exception(boost::copy_exception(e)); + auto eptr = boost::copy_exception(e); + m_connect.set_exception(eptr); } }; m_socket.async_connect(m_remote_endpoint, connect_handler); +} - return m_connect.get_future(); +template +boost::future wamp_rawsocket_transport::connect() +{ + m_connect = wamp_async(); + do_connect(); + return m_connect.promise().get_future(); } template -boost::future wamp_rawsocket_transport::disconnect() +void wamp_rawsocket_transport::connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) +{ + m_connect = wamp_async(std::move(on_success), std::move(on_exception)); + std::cout << m_connect.is_promise() << std::endl; + do_connect(); +} + +template +void wamp_rawsocket_transport::do_disconnect() { if (!m_socket.is_open()) { - throw network_error("network transport already disconnected"); + m_disconnect.set_exception(boost::copy_exception(network_error("network transport already disconnected"))); + return; } m_socket.close(); m_disconnect.set_value(); - return m_disconnect.get_future(); +} + +template +boost::future wamp_rawsocket_transport::disconnect() +{ + m_disconnect = wamp_async(); + do_disconnect(); + return m_disconnect.promise().get_future(); +} + +template +void wamp_rawsocket_transport::disconnect(on_success_handler&& on_success, + on_exception_handler&& on_exception) +{ + m_disconnect = wamp_async(std::move(on_success), std::move(on_exception)); + do_connect(); } template diff --git a/autobahn/wamp_register_request.hpp b/autobahn/wamp_register_request.hpp index aa6ea9bf..3a4db02c 100644 --- a/autobahn/wamp_register_request.hpp +++ b/autobahn/wamp_register_request.hpp @@ -31,9 +31,10 @@ #ifndef AUTOBAHN_WAMP_REGISTER_REQUEST_HPP #define AUTOBAHN_WAMP_REGISTER_REQUEST_HPP +#include "boost_config.hpp" +#include "wamp_async.hpp" #include "wamp_procedure.hpp" #include "wamp_registration.hpp" -#include "boost_config.hpp" namespace autobahn { @@ -41,18 +42,26 @@ namespace autobahn { class wamp_register_request { public: + using on_success_handler = wamp_async::on_success_handler; + using on_exception_handler = wamp_async::on_exception_handler; + wamp_register_request(); + wamp_register_request(on_success_handler&& on_success, + on_exception_handler&& on_exception); wamp_register_request(const wamp_procedure& procedure); + wamp_register_request(const wamp_procedure& procedure, + on_success_handler&& on_success, + on_exception_handler&& on_exception); wamp_register_request(wamp_register_request&& other); const wamp_procedure& procedure() const; - boost::promise& response(); + wamp_async& response(); void set_procedure(wamp_procedure procedure) const; void set_response(const wamp_registration& registration); private: wamp_procedure m_procedure; - boost::promise m_response; + wamp_async m_response; }; } // namespace autobahn diff --git a/autobahn/wamp_register_request.ipp b/autobahn/wamp_register_request.ipp index 0596e9cb..ba96e48a 100644 --- a/autobahn/wamp_register_request.ipp +++ b/autobahn/wamp_register_request.ipp @@ -36,12 +36,27 @@ inline wamp_register_request::wamp_register_request() { } +inline wamp_register_request::wamp_register_request(on_success_handler&& on_success, + on_exception_handler&& on_exception) + : m_procedure() + , m_response(std::move(on_success), std::move(on_exception)) +{ +} + inline wamp_register_request::wamp_register_request(const wamp_procedure& procedure) : m_procedure(procedure) , m_response() { } +inline wamp_register_request::wamp_register_request(const wamp_procedure& procedure, + on_success_handler&& on_success, + on_exception_handler&& on_exception) + : m_procedure(procedure) + , m_response(std::move(on_success), std::move(on_exception)) +{ +} + inline wamp_register_request::wamp_register_request(wamp_register_request&& other) : m_procedure(std::move(other.m_procedure)) , m_response(std::move(other.m_response)) @@ -53,7 +68,7 @@ inline const wamp_procedure& wamp_register_request::procedure() const return m_procedure; } -inline boost::promise& wamp_register_request::response() +inline wamp_async& wamp_register_request::response() { return m_response; } diff --git a/autobahn/wamp_session.hpp b/autobahn/wamp_session.hpp index 5c26a943..60d573cb 100644 --- a/autobahn/wamp_session.hpp +++ b/autobahn/wamp_session.hpp @@ -31,15 +31,19 @@ #ifndef AUTOBAHN_SESSION_HPP #define AUTOBAHN_SESSION_HPP +#include "boost_config.hpp" +#include "wamp_async.hpp" +#include "wamp_authenticate.hpp" #include "wamp_call_options.hpp" #include "wamp_call_result.hpp" #include "wamp_event_handler.hpp" #include "wamp_message.hpp" #include "wamp_procedure.hpp" #include "wamp_publish_options.hpp" +#include "wamp_registration.hpp" #include "wamp_subscribe_options.hpp" +#include "wamp_subscription.hpp" #include "wamp_transport_handler.hpp" -#include "boost_config.hpp" #include @@ -69,13 +73,10 @@ namespace autobahn { class wamp_call; class wamp_message; class wamp_register_request; -class wamp_registration; class wamp_subscribe_request; -class wamp_subscription; class wamp_transport; class wamp_unregister_request; class wamp_unsubscribe_request; -class wamp_authenticate; class wamp_challenge; /** \defgroup PUB Publishing events @@ -91,6 +92,33 @@ class wamp_session : public wamp_transport_handler, public std::enable_shared_from_this { +public: + + /** Handler aliases + */ + using start_on_success_handler = wamp_async::on_success_handler; + using start_on_exception_handler = wamp_async::on_exception_handler; + using stop_on_success_handler = wamp_async::on_success_handler; + using stop_on_exception_handler = wamp_async::on_exception_handler; + using join_on_success_handler = wamp_async::on_success_handler; + using join_on_exception_handler = wamp_async::on_exception_handler; + using leave_on_success_handler = wamp_async::on_success_handler; + using leave_on_exception_handler = wamp_async::on_exception_handler; + using publish_on_success_handler = wamp_async::on_success_handler; + using publish_on_exception_handler = wamp_async::on_exception_handler; + using subscribe_on_success_handler = wamp_async::on_success_handler; + using subscribe_on_exception_handler = wamp_async::on_exception_handler; + using unsubscribe_on_success_handler = wamp_async::on_success_handler; + using unsubscribe_on_exception_handler = wamp_async::on_exception_handler; + using call_on_success_handler = wamp_async::on_success_handler; + using call_on_exception_handler = wamp_async::on_exception_handler; + using provide_on_success_handler = wamp_async::on_success_handler; + using provide_on_exception_handler = wamp_async::on_exception_handler; + using unprovide_on_success_handler = wamp_async::on_success_handler; + using unprovide_on_exception_handler = wamp_async::on_exception_handler; + using challenge_on_success_handler = wamp_async::on_success_handler; + using challenge_on_exception_handler = wamp_async::on_exception_handler; + public: /*! @@ -106,21 +134,46 @@ class wamp_session : ~wamp_session(); /*! - * Establishes a session with the router. + * Establishes a session with the router, using boost::future as asynchronus + * mechanism. * * \return A future that indicates if the session was successfully started. */ boost::future start(); /*! - * Stops the session with the router. + * Establishes a session with the router, using handlers as asynchrous + * mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * + * \return A future that indicates if the session was successfully started. + */ + void start( + start_on_success_handler&& on_success, + start_on_exception_handler&& on_exception); + + /*! + * Stops the session with the router, using boost::future as asynchronus + * mechanism. * * \return A future that indicates if the session was successfully stopped. */ boost::future stop(); /*! - * Join a realm with the session. + * Stops the session with the router, using handlers as asynchrous + * mechanism. + * + * \return A future that indicates if the session was successfully stopped. + */ + void stop( + stop_on_success_handler&& on_success, + stop_on_exception_handler&& on_exception); + + /*! + * Join a realm with the session, using boost::future as asynchronus mechanism. * * \param realm The realm to join on the application router. * \param authmethods The authentication methods this instance support e.g. "wampcra","ticket" @@ -133,7 +186,24 @@ class wamp_session : const std::string& authid = ""); /*! - * Leave the realm. + * Join a realm with the session, using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param realm The realm to join on the application router. + * \param authmethods The authentication methods this instance support e.g. "wampcra","ticket" + * \param authid The username or maybe an other identifier for the user to join. + * \return A future that resolves with the session ID when the realm was joined. + */ + void join( + join_on_success_handler&& on_success, + join_on_exception_handler&& on_exception, + const std::string& realm, + const std::vector& authmethods = std::vector(), + const std::string& authid = ""); + + /*! + * Leave the realm, using boost::future as asynchronus mechanism. * * \param reason An optional WAMP URI providing a reason for leaving. * \return A future that resolves with the reason sent by the peer. @@ -141,32 +211,83 @@ class wamp_session : boost::future leave( const std::string& reason = std::string("wamp.error.close_realm")); + /*! + * Leave the realm, using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param reason An optional WAMP URI providing a reason for leaving. + * \return A future that resolves with the reason sent by the peer. + */ + void leave( + leave_on_success_handler&& on_success, + leave_on_exception_handler&& on_exception, + const std::string& reason = std::string("wamp.error.close_realm")); + /*! * \ingroup PUB - * Publish an event with empty payload to a topic. + * Publish an event with empty payload to a topic, using boost::future as asynchronus + * mechanism. * + * \param topic The URI of the topic to publish to. + * \return A future that resolves once the the topic has been published to. + */ + boost::future publish( + const std::string& topic, + const wamp_publish_options& options = wamp_publish_options()); + + /*! + * \ingroup PUB + * Publish an event with empty payload to a topic, using handlers as asynchrous + * mechanism. * + * \param on_success The success handler + * \param on_exception The exception handler * \param topic The URI of the topic to publish to. * \return A future that resolves once the the topic has been published to. */ - boost::future publish(const std::string& topic, - const wamp_publish_options& options = wamp_publish_options()); + void publish( + publish_on_success_handler&& on_success, + publish_on_exception_handler&& on_exception, + const std::string& topic, + const wamp_publish_options& options = wamp_publish_options()); /*! * \ingroup PUB - * Publish an event with positional payload to a topic. + * Publish an event with positional payload to a topic, using boost::future as asynchronus + * mechanism. * * \param topic The URI of the topic to publish to. * \param arguments The positional payload for the event. * \return A future that resolves once the the topic has been published to. */ template - boost::future publish(const std::string& topic, const List& arguments, - const wamp_publish_options& options = wamp_publish_options()); + boost::future publish( + const std::string& topic, const List& arguments, + const wamp_publish_options& options = wamp_publish_options()); + + /*! + * \ingroup PUB + * Publish an event with positional payload to a topic, using handlers as asynchrous + * mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param topic The URI of the topic to publish to. + * \param arguments The positional payload for the event. + * \return A future that resolves once the the topic has been published to. + */ + template + void publish( + publish_on_success_handler&& on_success, + publish_on_exception_handler&& on_exception, + const std::string& topic, const List& arguments, + const wamp_publish_options& options = wamp_publish_options()); /*! * \ingroup PUB - * Publish an event with both positional and keyword payload to a topic. + * Publish an event with both positional and keyword payload to a topic, + * using boost::future as asynchronus mechanism. * * \param topic The URI of the topic to publish to. * \param arguments The positional payload for the event. @@ -181,7 +302,29 @@ class wamp_session : const wamp_publish_options& options = wamp_publish_options()); /*! - * Subscribe a handler to a topic to receive events. + * \ingroup PUB + * Publish an event with both positional and keyword payload to a topic, + * using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param topic The URI of the topic to publish to. + * \param arguments The positional payload for the event. + * \param kw_arguments The keyword payload for the event. + * \return A future that resolves once the the topic has been published to. + */ + template + void publish( + publish_on_success_handler&& on_success, + publish_on_exception_handler&& on_exception, + const std::string& topic, + const List& arguments, + const Map& kw_arguments, + const wamp_publish_options& options = wamp_publish_options()); + + /*! + * Subscribe a handler to a topic to receive events, + * using boost::future as asynchronus mechanism. * * \param topic The URI of the topic to subscribe to. * \param handler The handler that will receive events under the subscription. @@ -194,15 +337,50 @@ class wamp_session : const wamp_subscribe_options& options = wamp_subscribe_options()); /*! - * Unubscribe a handler to previously subscribed topic. + * Subscribe a handler to a topic to receive events, + * using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param topic The URI of the topic to subscribe to. + * \param handler The handler that will receive events under the subscription. + * \param options The options to pass in the subscribe request to the router. + * \return A future that resolves to the autobahn::subscription. + */ + void subscribe( + subscribe_on_success_handler&& on_success, + subscribe_on_exception_handler&& on_exception, + const std::string& topic, + const wamp_event_handler& handler, + const wamp_subscribe_options& options = wamp_subscribe_options()); + + /*! + * Unubscribe a handler to previously subscribed topic, + * using boost::future as asynchronus mechanism. + * + * \param subscription The subscription to unsubscribe from. + * \return A future that resolves to the unsubscribed response. + */ + boost::future unsubscribe( + const wamp_subscription& subscription); + + /*! + * Unubscribe a handler to previously subscribed topic, + * using handlers as asynchrous mechanism. * + * \param on_success The success handler + * \param on_exception The exception handler * \param subscription The subscription to unsubscribe from. * \return A future that resolves to the unsubscribed response. */ - boost::future unsubscribe(const wamp_subscription& subscription); + void unsubscribe( + unsubscribe_on_success_handler&& on_success, + unsubscribe_on_exception_handler&& on_exception, + const wamp_subscription& subscription); /*! - * Calls a remote procedure with no arguments. + * Calls a remote procedure with no arguments, + * using boost::future as asynchronus mechanism. * * \param procedure The URI of the remote procedure to call. * \param options The options to pass in the call to the router. @@ -213,7 +391,24 @@ class wamp_session : const wamp_call_options& options = wamp_call_options()); /*! - * Calls a remote procedure with positional arguments. + * Calls a remote procedure with no arguments, + * using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param procedure The URI of the remote procedure to call. + * \param options The options to pass in the call to the router. + * \return A future that resolves to the result of the remote procedure call. + */ + void call( + call_on_success_handler&& on_success, + call_on_exception_handler&& on_exception, + const std::string& procedure, + const wamp_call_options& options = wamp_call_options()); + + /*! + * Calls a remote procedure with positional arguments, + * using boost::future as asynchronus mechanism. * * \param procedure The URI of the remote procedure to call. * \param arguments The positional arguments for the call. @@ -227,7 +422,27 @@ class wamp_session : const wamp_call_options& options = wamp_call_options()); /*! - * Calls a remote procedure with positional and keyword arguments. + * Calls a remote procedure with positional arguments, + * using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param procedure The URI of the remote procedure to call. + * \param arguments The positional arguments for the call. + * \param options The options to pass in the call to the router. + * \return A future that resolves to the result of the remote procedure call. + */ + template + void call( + call_on_success_handler&& on_success, + call_on_exception_handler&& on_exception, + const std::string& procedure, + const List& arguments, + const wamp_call_options& options = wamp_call_options()); + + /*! + * Calls a remote procedure with positional and keyword arguments, + * using boost::future as asynchronus mechanism. * * \param procedure The URI of the remote procedure to call. * \param arguments The positional arguments for the call. @@ -242,7 +457,28 @@ class wamp_session : const wamp_call_options& options = wamp_call_options()); /*! - * Register a procedure that can be called remotely. + * Calls a remote procedure with positional and keyword arguments, + * using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param procedure The URI of the remote procedure to call. + * \param arguments The positional arguments for the call. + * \param kw_arguments The keyword arguments for the call. + * \param options The options to pass in the call to the router. + * \return A future that resolves to the result of the remote procedure call. + */ + template + void call( + call_on_success_handler&& on_success, + call_on_exception_handler&& on_exception, + const std::string& procedure, + const List& arguments, const Map& kw_arguments, + const wamp_call_options& options = wamp_call_options()); + + /*! + * Register a procedure that can be called remotely, + * using boost::future as asynchronus mechanism. * * \param uri The URI associated with the procedure. * \param procedure The procedure to be exposed as a remotely callable procedure. @@ -255,22 +491,74 @@ class wamp_session : const provide_options& options = provide_options()); /*! - * Unregister a handler to previosly registered service. + * Register a procedure that can be called remotely, + * using handlers as asynchrous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param uri The URI associated with the procedure. + * \param procedure The procedure to be exposed as a remotely callable procedure. + * \param options Options for registering the procedure. + * \return A future that resolves to a autobahn::registration + */ + void provide( + provide_on_success_handler&& on_success, + provide_on_exception_handler&& on_exception, + const std::string& uri, + const wamp_procedure& procedure, + const provide_options& options = provide_options()); + + /*! + * Unregister a handler to previosly registered service, + * using boost::future as asynchronus mechanism. + * + * \param registration The registration to unregister. + * \return A future that resolves to the unregistered response. + */ + boost::future unprovide( + const wamp_registration& registration); + + /*! + * Unregister a handler to previosly registered service, + * using handlers as asynchrous mechanism. * + * \param on_success The success handler + * \param on_exception The exception handler * \param registration The registration to unregister. * \return A future that resolves to the unregistered response. */ - boost::future unprovide(const wamp_registration& registration); + void unprovide( + unprovide_on_success_handler&& on_success, + unprovide_on_exception_handler&& on_exception, + const wamp_registration& registration); /*! * Function called by the session when authenticating. It always has to be * re-implemented (if authentication is part of the system). + * Uses boost::future as asynchronous mechanism. * * \param challenge The challenge from the router containing enough information * for the system to prove membership. * \return A future that resolves to an authentication response. */ - virtual boost::future on_challenge(const wamp_challenge& challenge); + virtual boost::future on_challenge( + const wamp_challenge& challenge); + + /*! + * Function called by the session when authenticating. It always has to be + * re-implemented (if authentication is part of the system). + * Uses handlers as asynchronous mechanism. + * + * \param on_success The success handler + * \param on_exception The exception handler + * \param challenge The challenge from the router containing enough information + * for the system to prove membership. + * \return A future that resolves to an authentication response. + */ + virtual void on_challenge( + challenge_on_success_handler&& on_success, + challenge_on_exception_handler&& on_exception, + const wamp_challenge& challenge); /*! * Accessor method to WELCOME DETAILS dictionary containing router roles @@ -312,6 +600,52 @@ class wamp_session : virtual void on_detach(bool was_clean, const std::string& reason) override; virtual void on_message(wamp_message&& message) override; + // WAMP operations + void do_start(); + void do_stop(); + void do_join(const std::string& realm, + const std::vector& authmethods, + const std::string& authid); + void do_leave(const std::string& reason); + void do_publish(const std::shared_ptr>& result, + const std::string& topic, + const wamp_publish_options& options); + template + void do_publish(const std::shared_ptr>& result, + const std::string& topic, const List& arguments, + const wamp_publish_options& options); + template + void do_publish(const std::shared_ptr>& result, + const std::string& topic, + const List& arguments, + const Map& kw_arguments, + const wamp_publish_options& options); + void do_subscribe(const std::shared_ptr& subscribe_request, + const std::string& topic, + const wamp_event_handler& handler, + const wamp_subscribe_options& options); + void do_unsubscribe(const std::shared_ptr& unsubscribe_request, + const wamp_subscription& subscription); + void do_call(const std::shared_ptr& call, + const std::string& procedure, + const wamp_call_options& options); + template + void do_call(const std::shared_ptr& call, + const std::string& procedure, + const List& arguments, + const wamp_call_options& options); + template + void do_call(const std::shared_ptr& call, + const std::string& procedure, + const List& arguments, const Map& kw_arguments, + const wamp_call_options& options); + void do_provide(const std::shared_ptr& register_request, + const std::string& uri, + const wamp_procedure& procedure, + const provide_options& options); + void do_unprovide(const std::shared_ptr& unregister_request, + const wamp_registration& registration); + // WAMP message processing void process_error(wamp_message&& message); void process_welcome(wamp_message&& message); @@ -349,21 +683,21 @@ class wamp_session : uint64_t m_session_id; // Synchronization for dealing with starting the session. - boost::promise m_session_start; + wamp_async m_session_start; // Future to be fired when session was joined. - boost::promise m_session_join; + wamp_async m_session_join; // Whether or not we have already sent a goodbye when leaving the session. bool m_goodbye_sent; - boost::promise m_session_leave; + wamp_async m_session_leave; // Set to true when the session is stopped. bool m_running; // Synchronization for dealing with stopping the session - boost::promise m_session_stop; + wamp_async m_session_stop; ////////////////////////////////////////////////////////////////////////////////////// // Caller diff --git a/autobahn/wamp_session.ipp b/autobahn/wamp_session.ipp index 4aeadb61..746fec32 100644 --- a/autobahn/wamp_session.ipp +++ b/autobahn/wamp_session.ipp @@ -35,14 +35,11 @@ #include "wamp_message.hpp" #include "wamp_message_type.hpp" #include "wamp_publication.hpp" -#include "wamp_registration.hpp" #include "wamp_register_request.hpp" #include "wamp_subscribe_request.hpp" -#include "wamp_subscription.hpp" #include "wamp_transport.hpp" #include "wamp_unregister_request.hpp" #include "wamp_unsubscribe_request.hpp" -#include "wamp_authenticate.hpp" #include "wamp_challenge.hpp" #include "wamp_auth_utils.hpp" @@ -78,7 +75,7 @@ inline wamp_session::~wamp_session() { } -inline boost::future wamp_session::start() +inline void wamp_session::do_start() { auto weak_self = std::weak_ptr(this->shared_from_this()); @@ -101,11 +98,24 @@ inline boost::future wamp_session::start() m_running = true; m_session_start.set_value(); }); +} - return m_session_start.get_future(); +inline boost::future wamp_session::start() +{ + m_session_start = wamp_async(); + do_start(); + return m_session_start.promise().get_future(); } -inline boost::future wamp_session::stop() +inline void wamp_session::start( + start_on_success_handler&& on_success, + start_on_exception_handler&& on_exception) +{ + m_session_start = wamp_async(std::move(on_success), std::move(on_exception)); + do_start(); +} + +inline void wamp_session::do_stop() { auto weak_self = std::weak_ptr(this->shared_from_this()); @@ -133,11 +143,24 @@ inline boost::future wamp_session::stop() m_running = false; m_session_stop.set_value(); }); +} - return m_session_stop.get_future(); +inline boost::future wamp_session::stop() +{ + m_session_stop = wamp_async(); + do_stop(); + return m_session_stop.promise().get_future(); } -inline boost::future wamp_session::join( +inline void wamp_session::stop( + stop_on_success_handler&& on_success, + stop_on_exception_handler&& on_exception) +{ + m_session_stop = wamp_async(std::move(on_success), std::move(on_exception)); + do_stop(); +} + +inline void wamp_session::do_join( const std::string& realm, const std::vector& authentication_methods, const std::string& authentication_id) @@ -192,11 +215,31 @@ inline boost::future wamp_session::join( m_session_join.set_exception(boost::copy_exception(e)); } }); +} - return m_session_join.get_future(); +inline boost::future wamp_session::join( + const std::string& realm, + const std::vector& authentication_methods, + const std::string& authentication_id) +{ + m_session_join = wamp_async(); + do_join(realm, authentication_methods, authentication_id); + return m_session_join.promise().get_future(); } -inline boost::future wamp_session::leave(const std::string& reason) +inline void wamp_session::join( + join_on_success_handler&& on_success, + join_on_exception_handler&& on_exception, + const std::string& realm, + const std::vector& authentication_methods, + const std::string& authentication_id) +{ + m_session_join = wamp_async(std::move(on_success), std::move(on_exception)); + do_join(realm, authentication_methods, authentication_id); +} + +inline void wamp_session::do_leave( + const std::string& reason) { auto message = std::make_shared(3); message->set_field(0, static_cast(message_type::GOODBYE)); @@ -224,11 +267,29 @@ inline boost::future wamp_session::leave(const std::string& reason) m_session_id = 0; }); +} + +inline boost::future wamp_session::leave( + const std::string& reason) +{ + m_session_leave = wamp_async(); + do_leave(reason); + return m_session_leave.promise().get_future(); +} - return m_session_leave.get_future(); +inline void wamp_session::leave( + leave_on_success_handler&& on_success, + leave_on_exception_handler&& on_exception, + const std::string& reason) +{ + m_session_leave = wamp_async(std::move(on_success), std::move(on_exception)); + do_leave(reason); } -inline boost::future wamp_session::publish(const std::string& topic,const wamp_publish_options& options) +inline void wamp_session::do_publish( + const std::shared_ptr>& result, + const std::string& topic, + const wamp_publish_options& options) { uint64_t request_id = ++m_request_id; @@ -238,7 +299,6 @@ inline boost::future wamp_session::publish(const std::string& topic,const message->set_field(2, options); message->set_field(3, topic); - auto result = std::make_shared>(); auto weak_self = std::weak_ptr(this->shared_from_this()); m_io_service.dispatch([=]() { @@ -254,12 +314,33 @@ inline boost::future wamp_session::publish(const std::string& topic,const result->set_exception(boost::copy_exception(e)); } }); +} - return result->get_future(); +inline boost::future wamp_session::publish( + const std::string& topic, + const wamp_publish_options& options) +{ + auto result = std::make_shared>(); + do_publish(result, topic, options); + return result->promise().get_future(); +} + +inline void wamp_session::publish( + publish_on_success_handler&& on_success, + publish_on_exception_handler&& on_exception, + const std::string& topic, + const wamp_publish_options& options) +{ + auto result = std::make_shared>(std::move(on_success), std::move(on_exception)); + do_publish(result, topic, options); } template -inline boost::future wamp_session::publish(const std::string& topic, const List& arguments,const wamp_publish_options& options) +inline void wamp_session::do_publish( + const std::shared_ptr>& result, + const std::string& topic, + const List& arguments, + const wamp_publish_options& options) { uint64_t request_id = ++m_request_id; @@ -270,7 +351,6 @@ inline boost::future wamp_session::publish(const std::string& topic, const message->set_field(3, topic); message->set_field(4, arguments); - auto result = std::make_shared>(); auto weak_self = std::weak_ptr(this->shared_from_this()); m_io_service.dispatch([=]() { @@ -286,13 +366,38 @@ inline boost::future wamp_session::publish(const std::string& topic, const result->set_exception(boost::copy_exception(e)); } }); +} + +template +inline boost::future wamp_session::publish( + const std::string& topic, + const List& arguments, + const wamp_publish_options& options) +{ + auto result = std::make_shared>(); + do_publish(result, topic, arguments, options); + return result->promise().get_future(); +} - return result->get_future(); +template +inline void wamp_session::publish( + publish_on_success_handler&& on_success, + publish_on_exception_handler&& on_exception, + const std::string& topic, + const List& arguments, + const wamp_publish_options& options) +{ + auto result = std::make_shared>(std::move(on_success), std::move(on_exception)); + do_publish(result, topic, arguments, options); } template -inline boost::future wamp_session::publish( - const std::string& topic, const List& arguments, const Map& kw_arguments,const wamp_publish_options& options) +inline void wamp_session::do_publish( + const std::shared_ptr>& result, + const std::string& topic, + const List& arguments, + const Map& kw_arguments, + const wamp_publish_options& options) { uint64_t request_id = ++m_request_id; @@ -304,7 +409,6 @@ inline boost::future wamp_session::publish( message->set_field(4, arguments); message->set_field(5, kw_arguments); - auto result = std::make_shared>(); auto weak_self = std::weak_ptr(this->shared_from_this()); m_io_service.dispatch([=]() { @@ -320,11 +424,35 @@ inline boost::future wamp_session::publish( result->set_exception(boost::copy_exception(e)); } }); +} - return result->get_future(); +template +inline boost::future wamp_session::publish( + const std::string& topic, + const List& arguments, + const Map& kw_arguments, + const wamp_publish_options& options) +{ + auto result = std::make_shared>(); + do_publish(result, topic, arguments, kw_arguments, options); + return result->promise().get_future(); } -inline boost::future wamp_session::subscribe( +template +inline void wamp_session::publish( + publish_on_success_handler&& on_success, + publish_on_exception_handler&& on_exception, + const std::string& topic, + const List& arguments, + const Map& kw_arguments, + const wamp_publish_options& options) +{ + auto result = std::make_shared>(std::move(on_success), std::move(on_exception)); + do_publish(result, topic, arguments, kw_arguments, options); +} + +inline void wamp_session::do_subscribe( + const std::shared_ptr& subscribe_request, const std::string& topic, const wamp_event_handler& handler, const wamp_subscribe_options& options) @@ -338,7 +466,6 @@ inline boost::future wamp_session::subscribe( message->set_field(3, topic); auto weak_self = std::weak_ptr(this->shared_from_this()); - auto subscribe_request = std::make_shared(handler); m_io_service.dispatch([=]() { auto shared_self = weak_self.lock(); @@ -353,11 +480,35 @@ inline boost::future wamp_session::subscribe( subscribe_request->response().set_exception(boost::copy_exception(e)); } }); +} - return subscribe_request->response().get_future(); +inline boost::future wamp_session::subscribe( + const std::string& topic, + const wamp_event_handler& handler, + const wamp_subscribe_options& options) +{ + auto subscribe_request = std::make_shared(handler); + do_subscribe(subscribe_request, topic, handler, options); + return subscribe_request->response().promise().get_future(); } -inline boost::future wamp_session::unsubscribe(const wamp_subscription& subscription) +inline void wamp_session::subscribe( + subscribe_on_success_handler&& on_success, + subscribe_on_exception_handler&& on_exception, + const std::string& topic, + const wamp_event_handler& handler, + const wamp_subscribe_options& options) +{ + auto subscribe_request = std::make_shared( + handler, + std::move(on_success), + std::move(on_exception)); + do_subscribe(subscribe_request, topic, handler, options); +} + +inline void wamp_session::do_unsubscribe( + const std::shared_ptr& unsubscribe_request, + const wamp_subscription& subscription) { uint64_t request_id = ++m_request_id; @@ -367,7 +518,6 @@ inline boost::future wamp_session::unsubscribe(const wamp_subscription& su message->set_field(2, subscription.id()); auto weak_self = std::weak_ptr(this->shared_from_this()); - auto unsubscribe_request = std::make_shared(subscription); m_io_service.dispatch([=]() { auto shared_self = weak_self.lock(); @@ -382,11 +532,30 @@ inline boost::future wamp_session::unsubscribe(const wamp_subscription& su unsubscribe_request->response().set_exception(boost::copy_exception(e)); } }); +} - return unsubscribe_request->response().get_future(); +inline boost::future wamp_session::unsubscribe( + const wamp_subscription& subscription) +{ + auto unsubscribe_request = std::make_shared(subscription); + do_unsubscribe(unsubscribe_request, subscription); + return unsubscribe_request->response().promise().get_future(); } -inline boost::future wamp_session::call( +inline void wamp_session::unsubscribe( + unsubscribe_on_success_handler&& on_success, + unsubscribe_on_exception_handler&& on_exception, + const wamp_subscription& subscription) +{ + auto unsubscribe_request = std::make_shared( + subscription, + std::move(on_success), + std::move(on_exception)); + do_unsubscribe(unsubscribe_request, subscription); +} + +inline void wamp_session::do_call( + const std::shared_ptr& call, const std::string& procedure, const wamp_call_options& options) { @@ -399,7 +568,6 @@ inline boost::future wamp_session::call( message->set_field(3, procedure); auto weak_self = std::weak_ptr(this->shared_from_this()); - auto call = std::make_shared(); m_io_service.dispatch([=]() { auto shared_self = weak_self.lock(); @@ -414,12 +582,30 @@ inline boost::future wamp_session::call( call->result().set_exception(boost::copy_exception(e)); } }); +} + +inline boost::future wamp_session::call( + const std::string& procedure, + const wamp_call_options& options) +{ + auto call = std::make_shared(); + do_call(call, procedure, options); + return call->result().promise().get_future(); +} - return call->result().get_future(); +inline void wamp_session::call( + call_on_success_handler&& on_success, + call_on_exception_handler&& on_exception, + const std::string& procedure, + const wamp_call_options& options) +{ + auto call = std::make_shared(std::move(on_success), std::move(on_exception)); + do_call(call, procedure, options); } template -inline boost::future wamp_session::call( +inline void wamp_session::do_call( + const std::shared_ptr& call, const std::string& procedure, const List& arguments, const wamp_call_options& options) @@ -434,7 +620,6 @@ inline boost::future wamp_session::call( message->set_field(4, arguments); auto weak_self = std::weak_ptr(this->shared_from_this()); - auto call = std::make_shared(); m_io_service.dispatch([=]() { auto shared_self = weak_self.lock(); @@ -449,12 +634,35 @@ inline boost::future wamp_session::call( call->result().set_exception(boost::copy_exception(e)); } }); +} - return call->result().get_future(); +template +inline boost::future wamp_session::call( + const std::string& procedure, + const List& arguments, + const wamp_call_options& options) +{ + auto call = std::make_shared(); + do_call(call, procedure, arguments, options); + return call->result().promise().get_future(); } +template +inline void wamp_session::call( + call_on_success_handler&& on_success, + call_on_exception_handler&& on_exception, + const std::string& procedure, + const List& arguments, + const wamp_call_options& options) +{ + auto call = std::make_shared(std::move(on_success), std::move(on_exception)); + do_call(call, procedure, arguments, options); +} + + template -inline boost::future wamp_session::call( +inline void wamp_session::do_call( + const std::shared_ptr& call, const std::string& procedure, const List& arguments, const Map& kw_arguments, @@ -471,7 +679,6 @@ inline boost::future wamp_session::call( message->set_field(5, kw_arguments); auto weak_self = std::weak_ptr(this->shared_from_this()); - auto call = std::make_shared(); m_io_service.dispatch([=]() { auto shared_self = weak_self.lock(); @@ -486,11 +693,35 @@ inline boost::future wamp_session::call( call->result().set_exception(boost::copy_exception(e)); } }); +} - return call->result().get_future(); +template +inline boost::future wamp_session::call( + const std::string& procedure, + const List& arguments, + const Map& kw_arguments, + const wamp_call_options& options) +{ + auto call = std::make_shared(); + do_call(call, procedure, arguments, kw_arguments, options); + return call->result().promise().get_future(); } -inline boost::future wamp_session::provide( +template +inline void wamp_session::call( + call_on_success_handler&& on_success, + call_on_exception_handler&& on_exception, + const std::string& procedure, + const List& arguments, + const Map& kw_arguments, + const wamp_call_options& options) +{ + auto call = std::make_shared(std::move(on_success), std::move(on_exception)); + do_call(call, procedure, arguments, kw_arguments, options); +} + +inline void wamp_session::do_provide( + const std::shared_ptr& register_request, const std::string& name, const wamp_procedure& procedure, const provide_options& options) @@ -504,7 +735,6 @@ inline boost::future wamp_session::provide( message->set_field(3, name); auto weak_self = std::weak_ptr(this->shared_from_this()); - auto register_request = std::make_shared(procedure); m_io_service.dispatch([=]() { auto shared_self = weak_self.lock(); @@ -519,46 +749,98 @@ inline boost::future wamp_session::provide( register_request->response().set_exception(boost::copy_exception(e)); } }); +} + +inline boost::future wamp_session::provide( + const std::string& name, + const wamp_procedure& procedure, + const provide_options& options) +{ + auto register_request = std::make_shared(procedure); + do_provide(register_request, name, procedure, options); + return register_request->response().promise().get_future(); +} - return register_request->response().get_future(); +inline void wamp_session::provide( + provide_on_success_handler&& on_success, + provide_on_exception_handler&& on_exception, + const std::string& name, + const wamp_procedure& procedure, + const provide_options& options) +{ + auto register_request = std::make_shared( + procedure, + std::move(on_success), + std::move(on_exception)); + do_provide(register_request, name, procedure, options); } -inline boost::future wamp_session::unprovide(const wamp_registration& registration) +inline void wamp_session::do_unprovide( + const std::shared_ptr& unregister_request, + const wamp_registration& registration) { uint64_t request_id = ++m_request_id; - auto message = std::make_shared(3); - message->set_field(0, static_cast(message_type::UNREGISTER)); - message->set_field(1, request_id); - message->set_field(2, registration.id()); - - auto weak_self = std::weak_ptr(this->shared_from_this()); - auto unregister_request = std::make_shared(registration); - - m_io_service.dispatch([=]() { - auto shared_self = weak_self.lock(); - if (!shared_self) { - return; - } - - try { - send_message(std::move(*message)); - m_unregister_requests.emplace(request_id, unregister_request); - } - catch (const std::exception& e) { - unregister_request->response().set_exception(boost::copy_exception(e)); - } - }); - - return unregister_request->response().get_future(); + auto message = std::make_shared(3); + message->set_field(0, static_cast(message_type::UNREGISTER)); + message->set_field(1, request_id); + message->set_field(2, registration.id()); + + auto weak_self = std::weak_ptr(this->shared_from_this()); + + m_io_service.dispatch([=]() { + auto shared_self = weak_self.lock(); + if (!shared_self) { + return; + } + + try { + send_message(std::move(*message)); + m_unregister_requests.emplace(request_id, unregister_request); + } + catch (const std::exception& e) { + unregister_request->response().set_exception(boost::copy_exception(e)); + } + }); +} + +inline boost::future wamp_session::unprovide( + const wamp_registration& registration) +{ + auto unregister_request = std::make_shared(registration); + do_unprovide(unregister_request, registration); + return unregister_request->response().promise().get_future(); +} + +inline void wamp_session::unprovide( + unprovide_on_success_handler&& on_success, + unprovide_on_exception_handler&& on_exception, + const wamp_registration& registration) +{ + auto unregister_request = std::make_shared( + registration, + std::move(on_success), + std::move(on_exception)); + do_unprovide(unregister_request, registration); +} + +inline boost::future wamp_session::on_challenge( + const wamp_challenge& challenge) +{ + // a dummy implementation + wamp_async dummy; + dummy.set_value( wamp_authenticate( "" ) ); + return dummy.promise().get_future(); } -inline boost::future wamp_session::on_challenge(const wamp_challenge& challenge) +inline void wamp_session::on_challenge( + challenge_on_success_handler&& on_success, + challenge_on_exception_handler&& on_exception, + const wamp_challenge& challenge) { // a dummy implementation - boost::promise dummy; + wamp_async dummy(std::move(on_success), std::move(on_exception)); dummy.set_value( wamp_authenticate( "" ) ); - return dummy.get_future(); } inline void wamp_session::on_attach(const std::shared_ptr& transport) diff --git a/autobahn/wamp_subscribe_request.hpp b/autobahn/wamp_subscribe_request.hpp index 8870064c..56884bdf 100644 --- a/autobahn/wamp_subscribe_request.hpp +++ b/autobahn/wamp_subscribe_request.hpp @@ -31,9 +31,10 @@ #ifndef AUTOBAHN_WAMP_SUBSCRIBE_REQUEST_HPP #define AUTOBAHN_WAMP_SUBSCRIBE_REQUEST_HPP +#include "boost_config.hpp" +#include "wamp_async.hpp" #include "wamp_event_handler.hpp" #include "wamp_subscription.hpp" -#include "boost_config.hpp" namespace autobahn { @@ -41,17 +42,25 @@ namespace autobahn { class wamp_subscribe_request { public: + using on_success_handler = wamp_async::on_success_handler; + using on_exception_handler = wamp_async::on_exception_handler; + wamp_subscribe_request(); + wamp_subscribe_request(on_success_handler&& on_success, + on_exception_handler&& on_exception); wamp_subscribe_request(const wamp_event_handler& handler); + wamp_subscribe_request(const wamp_event_handler& handler, + on_success_handler&& on_success, + on_exception_handler&& on_exception); const wamp_event_handler& handler() const; - boost::promise& response(); + wamp_async& response(); void set_handler(const wamp_event_handler& handler) const; void set_response(const wamp_subscription& subscription); private: wamp_event_handler m_handler; - boost::promise m_response; + wamp_async m_response; }; } // namespace autobahn diff --git a/autobahn/wamp_subscribe_request.ipp b/autobahn/wamp_subscribe_request.ipp index 45fe8d88..ab94add2 100644 --- a/autobahn/wamp_subscribe_request.ipp +++ b/autobahn/wamp_subscribe_request.ipp @@ -36,18 +36,33 @@ inline wamp_subscribe_request::wamp_subscribe_request() { } +inline wamp_subscribe_request::wamp_subscribe_request(on_success_handler&& on_success, + on_exception_handler&& on_exception) + : m_handler() + , m_response(std::move(on_success), std::move(on_exception)) +{ +} + inline wamp_subscribe_request::wamp_subscribe_request(const wamp_event_handler& handler) : m_handler(handler) , m_response() { } +inline wamp_subscribe_request::wamp_subscribe_request(const wamp_event_handler& handler, + on_success_handler&& on_success, + on_exception_handler&& on_exception) + : m_handler(handler) + , m_response(std::move(on_success), std::move(on_exception)) +{ +} + inline const wamp_event_handler& wamp_subscribe_request::handler() const { return m_handler; } -inline boost::promise& wamp_subscribe_request::response() +inline wamp_async& wamp_subscribe_request::response() { return m_response; } diff --git a/autobahn/wamp_tcp_transport.hpp b/autobahn/wamp_tcp_transport.hpp index 83360b49..ac66926d 100644 --- a/autobahn/wamp_tcp_transport.hpp +++ b/autobahn/wamp_tcp_transport.hpp @@ -53,6 +53,9 @@ class wamp_tcp_transport : virtual ~wamp_tcp_transport() override; virtual boost::future connect() override; + + virtual void connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) override; }; } // namespace autobahn diff --git a/autobahn/wamp_tcp_transport.ipp b/autobahn/wamp_tcp_transport.ipp index a9767a93..9f61039b 100644 --- a/autobahn/wamp_tcp_transport.ipp +++ b/autobahn/wamp_tcp_transport.ipp @@ -61,4 +61,19 @@ inline boost::future wamp_tcp_transport::connect() ); } +inline void wamp_tcp_transport::connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) +{ + wamp_rawsocket_transport::connect( + [=]() { + on_success(); + // Disable naggle for improved performance. + boost::asio::ip::tcp::no_delay option(true); + socket().set_option(option); + }, + [=](const boost::exception_ptr& eptr) { + on_exception(eptr); + }); +} + } // namespace autobahn diff --git a/autobahn/wamp_transport.hpp b/autobahn/wamp_transport.hpp index f6d4da4a..3b52f89d 100644 --- a/autobahn/wamp_transport.hpp +++ b/autobahn/wamp_transport.hpp @@ -32,6 +32,7 @@ #define AUTOBAHN_WAMP_TRANSPORT_HPP #include "boost_config.hpp" +#include "wamp_async.hpp" #include #include @@ -59,6 +60,16 @@ class wamp_transport */ using resume_handler = std::function; + /*! + * Asynchronous success handler. + */ + using on_success_handler = wamp_async::on_success_handler; + + /*! + * Asynchronous exception handler. + */ + using on_exception_handler = wamp_async::on_exception_handler; + public: /*! * Default virtual destructor. @@ -69,7 +80,8 @@ class wamp_transport * CONNECTION INTERFACE */ /*! - * Attempts to connect the transport. + * Attempts to connect the transport, using boost::future as asynchronus + * mechanism. * * @return A future that will be satisfied when the connect attempt * has been made. @@ -77,13 +89,34 @@ class wamp_transport virtual boost::future connect() = 0; /*! - * Attempts to disconnect the transport. + * Attempts to connect the transport, using handlers as asynchrous + * mechanism. + * + * @param on_success The success handler + * @param on_exception The exception handler + */ + virtual void connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) = 0; + + /*! + * Attempts to disconnect the transport, using boost::future as asynchronus + * mechanism. * * @return A future that will be satisfied when the disconnect attempt * has been made. */ virtual boost::future disconnect() = 0; + /*! + * Attempts to disconnect the transport, using handlers as asynchronus + * mechanism. + * + * @param on_success The success handler + * @param on_exception The exception handler + */ + virtual void disconnect(on_success_handler&& on_success, + on_exception_handler&& on_exception) = 0; + /*! * Determines if the transport is connected. * diff --git a/autobahn/wamp_unregister_request.hpp b/autobahn/wamp_unregister_request.hpp index 19233ce8..ea7cb4d6 100644 --- a/autobahn/wamp_unregister_request.hpp +++ b/autobahn/wamp_unregister_request.hpp @@ -20,6 +20,7 @@ #define AUTOBAHN_WAMP_UNREGISTER_REQUEST_HPP #include "boost_config.hpp" +#include "wamp_async.hpp" #include "wamp_registration.hpp" namespace autobahn { @@ -28,15 +29,21 @@ namespace autobahn { class wamp_unregister_request { public: + using on_success_handler = wamp_async::on_success_handler; + using on_exception_handler = wamp_async::on_exception_handler; + wamp_unregister_request(const wamp_registration& registration); + wamp_unregister_request(const wamp_registration& registration, + on_success_handler&& on_success, + on_exception_handler&& on_exception); - boost::promise& response(); + wamp_async& response(); void set_response(); wamp_registration& registration(); private: wamp_registration m_registration; - boost::promise m_response; + wamp_async m_response; }; } // namespace autobahn diff --git a/autobahn/wamp_unregister_request.ipp b/autobahn/wamp_unregister_request.ipp index 040540d1..a4c79dfa 100644 --- a/autobahn/wamp_unregister_request.ipp +++ b/autobahn/wamp_unregister_request.ipp @@ -24,7 +24,15 @@ inline wamp_unregister_request::wamp_unregister_request(const wamp_registration& { } -inline boost::promise& wamp_unregister_request::response() +inline wamp_unregister_request::wamp_unregister_request(const wamp_registration& registration, + on_success_handler&& on_success, + on_exception_handler&& on_exception) + : m_registration(registration) + , m_response(std::move(on_success), std::move(on_exception)) +{ +} + +inline wamp_async& wamp_unregister_request::response() { return m_response; } diff --git a/autobahn/wamp_unsubscribe_request.hpp b/autobahn/wamp_unsubscribe_request.hpp index a5731281..9e1285e8 100644 --- a/autobahn/wamp_unsubscribe_request.hpp +++ b/autobahn/wamp_unsubscribe_request.hpp @@ -32,6 +32,7 @@ #define AUTOBAHN_WAMP_UNSUBSCRIBE_REQUEST_HPP #include "boost_config.hpp" +#include "wamp_async.hpp" #include "wamp_subscription.hpp" namespace autobahn { @@ -40,15 +41,21 @@ namespace autobahn { class wamp_unsubscribe_request { public: + using on_success_handler = wamp_async::on_success_handler; + using on_exception_handler = wamp_async::on_exception_handler; + wamp_unsubscribe_request(const wamp_subscription& subscription); + wamp_unsubscribe_request(const wamp_subscription& subscription, + on_success_handler&& on_success, + on_exception_handler&& on_exception); - boost::promise& response(); + wamp_async& response(); void set_response(); wamp_subscription &subscription(); private: wamp_subscription m_subscription; - boost::promise m_response; + wamp_async m_response; }; } // namespace autobahn diff --git a/autobahn/wamp_unsubscribe_request.ipp b/autobahn/wamp_unsubscribe_request.ipp index 6e442fc7..32788a46 100644 --- a/autobahn/wamp_unsubscribe_request.ipp +++ b/autobahn/wamp_unsubscribe_request.ipp @@ -36,7 +36,15 @@ inline wamp_unsubscribe_request::wamp_unsubscribe_request(const wamp_subscriptio { } -inline boost::promise& wamp_unsubscribe_request::response() +inline wamp_unsubscribe_request::wamp_unsubscribe_request(const wamp_subscription &subscription, + on_success_handler&& on_success, + on_exception_handler&& on_exception) + : m_subscription(subscription) + , m_response(std::move(on_success), std::move(on_exception)) +{ +} + +inline wamp_async& wamp_unsubscribe_request::response() { return m_response; } diff --git a/autobahn/wamp_websocket_transport.hpp b/autobahn/wamp_websocket_transport.hpp index 51ae25d5..c8eae2ff 100644 --- a/autobahn/wamp_websocket_transport.hpp +++ b/autobahn/wamp_websocket_transport.hpp @@ -71,14 +71,28 @@ namespace autobahn { * CONNECTION INTERFACE */ /*! - * @copydoc wamp_transport::connect() - */ + * @copydoc wamp_transport::connect() + */ virtual boost::future connect() override; - + /*! - * @copydoc wamp_transport::disconnect() - */ + * @copydoc wamp_transport::connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) + */ + virtual void connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) override; + + /*! + * @copydoc wamp_transport::disconnect() + */ virtual boost::future disconnect() override; + + /*! + * @copydoc wamp_transport::disconnect(on_success_handler&& on_success, + on_exception_handler&& on_exception) + */ + virtual void disconnect(on_success_handler&& on_success, + on_exception_handler&& on_exception) override; /*! * @copydoc wamp_transport::is_connected() @@ -140,7 +154,7 @@ namespace autobahn { virtual bool is_open() const = 0; - virtual void async_connect(const std::string& m_uri, boost::promise& connect_promise) = 0; + virtual void async_connect(const std::string& m_uri, wamp_async& connect_async) = 0; virtual void close() = 0; virtual void write(void const * payload, size_t len) = 0; @@ -148,14 +162,14 @@ namespace autobahn { void receive_message(const std::string& msg); /*! - * The promise that is fulfilled when the connect attempt is complete. - */ - boost::promise m_connect; + * The async operation that is fulfilled when the connect attempt is complete. + */ + wamp_async m_connect; /*! - * The promise that is fulfilled when the disconnect attempt is complete. - */ - boost::promise m_disconnect; + * The async operation that is fulfilled when the disconnect attempt is complete. + */ + wamp_async m_disconnect; private: diff --git a/autobahn/wamp_websocket_transport.ipp b/autobahn/wamp_websocket_transport.ipp index f4914525..1c6ef3c6 100644 --- a/autobahn/wamp_websocket_transport.ipp +++ b/autobahn/wamp_websocket_transport.ipp @@ -54,26 +54,57 @@ inline wamp_websocket_transport::wamp_websocket_transport( inline boost::future wamp_websocket_transport::connect() { + m_connect = wamp_async(); + if (is_open()) { m_connect.set_exception(boost::copy_exception(network_error("network transport already connected"))); - return m_connect.get_future(); + return m_connect.promise().get_future(); } async_connect(m_uri, m_connect); - return m_connect.get_future(); + return m_connect.promise().get_future(); +} + +inline void wamp_websocket_transport::connect(on_success_handler&& on_success, + on_exception_handler&& on_exception) +{ + m_connect = wamp_async(std::move(on_success), std::move(on_exception)); + + if (is_open()) { + m_connect.set_exception(boost::copy_exception(network_error("network transport already connected"))); + return; + } + + async_connect(m_uri, m_connect); } inline boost::future wamp_websocket_transport::disconnect() { + m_disconnect = wamp_async(); + if (!is_open()) { - throw network_error("network transport already disconnected"); + m_disconnect.set_exception(boost::copy_exception(network_error("network transport already disconnected"))); + return m_disconnect.promise().get_future(); } close(); - m_disconnect.set_value(); - return m_disconnect.get_future(); + m_disconnect.promise().set_value(); + return m_disconnect.promise().get_future(); +} + +inline void wamp_websocket_transport::disconnect(on_success_handler&& on_success, + on_exception_handler&& on_exception) +{ + m_disconnect = wamp_async(std::move(on_success), std::move(on_exception)); + + if (!is_open()) { + m_disconnect.set_exception(boost::copy_exception(network_error("network transport already disconnected"))); + return; + } + + close(); } inline bool wamp_websocket_transport::is_connected() const diff --git a/autobahn/wamp_websocketpp_websocket_transport.hpp b/autobahn/wamp_websocketpp_websocket_transport.hpp index 1f0a1cb8..c2eecf53 100644 --- a/autobahn/wamp_websocketpp_websocket_transport.hpp +++ b/autobahn/wamp_websocketpp_websocket_transport.hpp @@ -77,7 +77,7 @@ namespace autobahn { private: virtual bool is_open() const override; virtual void close() override; - virtual void async_connect(const std::string& uri, boost::promise& connect_promise) override; + virtual void async_connect(const std::string& uri, wamp_async& connect_async) override; virtual void write(void const * payload, size_t len) override; private: diff --git a/autobahn/wamp_websocketpp_websocket_transport.ipp b/autobahn/wamp_websocketpp_websocket_transport.ipp index 78398aa7..ae36ee72 100644 --- a/autobahn/wamp_websocketpp_websocket_transport.ipp +++ b/autobahn/wamp_websocketpp_websocket_transport.ipp @@ -116,13 +116,13 @@ namespace autobahn { } template - inline void wamp_websocketpp_websocket_transport::async_connect(const std::string& uri, boost::promise& connect_promise) + inline void wamp_websocketpp_websocket_transport::async_connect(const std::string& uri, wamp_async& connect_async) { websocketpp::lib::error_code ec; typename client_type::connection_ptr con = m_client.get_connection(uri, ec); if (ec) { //Log "Get Connection Error: " + ec.message()); - connect_promise.set_exception(boost::copy_exception(websocketpp::lib::system_error(ec.value(), ec.category(), "connect"))); + connect_async.set_exception(boost::copy_exception(websocketpp::lib::system_error(ec.value(), ec.category(), "connect"))); return; } diff --git a/examples/callee.cpp b/examples/callee.cpp index d3f2720d..e462927b 100644 --- a/examples/callee.cpp +++ b/examples/callee.cpp @@ -84,73 +84,45 @@ int main(int argc, char** argv) transport->attach(std::static_pointer_cast(session)); - // Make sure the continuation futures we use do not run out of scope prematurely. - // Since we are only using one thread here this can cause the io service to block - // as a future generated by a continuation will block waiting for its promise to be - // fulfilled when it goes out of scope. This would prevent the session from receiving - // responses from the router. - boost::future connect_future; - boost::future start_future; - boost::future join_future; - boost::future provide_future_add; - boost::future provide_future_longop; - - connect_future = transport->connect().then([&](boost::future connected) { + // Use callback asynchronous mechanism + auto on_exception = [&](boost::exception_ptr eptr) { try { - connected.get(); + boost::rethrow_exception(eptr); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; io.stop(); - return; } + }; + transport->connect([&]() { std::cerr << "transport connected" << std::endl; - start_future = session->start().then([&](boost::future started) { - try { - started.get(); - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - io.stop(); - return; - } - + session->start([&]() { std::cerr << "session started" << std::endl; - join_future = session->join(parameters->realm()).then([&](boost::future joined) { - try { - std::cerr << "joined realm: " << joined.get() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - io.stop(); - return; - } - - provide_future_add = session->provide("com.examples.calculator.add2", &add2).then( - [&](boost::future registration) { - try { - std::cerr << "registered procedure:" << registration.get().id() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - io.stop(); - return; - } - }); - - provide_future_longop = session->provide("com.myapp.longop", &longop).then( - [&](boost::future registration) { - try { - std::cerr << "registered procedure:" << registration.get().id() << std::endl; - } - catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - io.stop(); - return; - } - }); - }); - }); - }); + session->join([&](uint64_t joined) { + std::cerr << "joined realm: " << joined << std::endl; + + session->provide([&](const autobahn::wamp_registration& registration) { + std::cerr << "registered procedure:" << registration.id() << std::endl; + }, + on_exception, + "com.examples.calculator.add2", + &add2); + + session->provide([&](const autobahn::wamp_registration& registration) { + std::cerr << "registered procedure:" << registration.id() << std::endl; + }, + on_exception, + "com.myapp.longop", + &longop); + }, + on_exception, + parameters->realm()); + }, + on_exception); + }, + on_exception); std::cerr << "starting io service" << std::endl; io.run(); diff --git a/examples/caller.cpp b/examples/caller.cpp index 1a88d41d..c79dd17c 100644 --- a/examples/caller.cpp +++ b/examples/caller.cpp @@ -64,6 +64,7 @@ int main(int argc, char** argv) boost::future start_future; boost::future join_future; boost::future call_future; + boost::future call_future2; boost::future leave_future; boost::future stop_future; @@ -105,25 +106,42 @@ int main(int argc, char** argv) [&](boost::future result) { try { uint64_t sum = result.get().argument(0); - std::cerr << "call result: " << sum << std::endl; + std::cerr << "add2 result: " << sum << std::endl; } catch (const std::exception& e) { - std::cerr << "call failed: " << e.what() << std::endl; + std::cerr << "add2 failed: " << e.what() << std::endl; io.stop(); return; } - leave_future = session->leave().then([&](boost::future reason) { + autobahn::wamp_call_options call_options; + call_options.set_timeout(std::chrono::seconds(10)); + + std::tuple arguments(5); + + call_future2 = session->call("com.myapp.longop", arguments, call_options).then( + [&](boost::future result) { try { - std::cerr << "left session (" << reason.get() << ")" << std::endl; + uint64_t sum = result.get().argument(0); + std::cerr << "longop result: " << sum << std::endl; } catch (const std::exception& e) { - std::cerr << "failed to leave session: " << e.what() << std::endl; + std::cerr << "longop failed: " << e.what() << std::endl; io.stop(); return; } - stop_future = session->stop().then([&](boost::future stopped) { - std::cerr << "stopped session" << std::endl; - io.stop(); + leave_future = session->leave().then([&](boost::future reason) { + try { + std::cerr << "left session (" << reason.get() << ")" << std::endl; + } catch (const std::exception& e) { + std::cerr << "failed to leave session: " << e.what() << std::endl; + io.stop(); + return; + } + + stop_future = session->stop().then([&](boost::future stopped) { + std::cerr << "stopped session" << std::endl; + io.stop(); + }); }); }); }); diff --git a/examples/publisher.cpp b/examples/publisher.cpp index ad139e26..4236da47 100644 --- a/examples/publisher.cpp +++ b/examples/publisher.cpp @@ -58,69 +58,46 @@ int main(int argc, char** argv) transport->attach(std::static_pointer_cast(session)); - // Make sure the continuation futures we use do not run out of scope prematurely. - // Since we are only using one thread here this can cause the io service to block - // as a future generated by a continuation will block waiting for its promise to be - // fulfilled when it goes out of scope. This would prevent the session from receiving - // responses from the router. - boost::future connect_future; - boost::future start_future; - boost::future join_future; - boost::future leave_future; - boost::future stop_future; - - connect_future = transport->connect().then([&](boost::future connected) { + // Use callback asynchronous mechanism + auto on_exception = [&](boost::exception_ptr eptr) { try { - connected.get(); + boost::rethrow_exception(eptr); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; io.stop(); - return; } + }; + transport->connect([&]() { std::cerr << "transport connected" << std::endl; - start_future = session->start().then([&](boost::future started) { - try { - started.get(); - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - io.stop(); - return; - } - + session->start([&]() { std::cerr << "session started" << std::endl; - join_future = session->join(parameters->realm()).then([&](boost::future joined) { - try { - std::cerr << "joined realm: " << joined.get() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - io.stop(); - return; - } + session->join([&](const uint64_t& joined) { + std::cerr << "joined realm: " << joined << std::endl; std::tuple arguments(std::string("hello")); session->publish("com.examples.subscriptions.topic1", arguments); std::cerr << "event published" << std::endl; - leave_future = session->leave().then([&](boost::future reason) { - try { - std::cerr << "left session (" << reason.get() << ")" << std::endl; - } catch (const std::exception& e) { - std::cerr << "failed to leave session: " << e.what() << std::endl; - io.stop(); - return; - } + session->leave([&](const std::string& reason) { + std::cerr << "left session (" << reason << ")" << std::endl; - stop_future = session->stop().then([&](boost::future stopped) { + session->stop([&]() { std::cerr << "stopped session" << std::endl; io.stop(); - }); - }); - }); - }); - }); + }, + on_exception); + }, + on_exception); + }, + on_exception, + parameters->realm()); + }, + on_exception); + }, + on_exception); std::cerr << "starting io service" << std::endl; io.run();