From b16ccfd6a4b7ae2aa1a30065f28ce6c56af5aa0d Mon Sep 17 00:00:00 2001 From: tilsche Date: Tue, 29 May 2018 17:42:03 +0200 Subject: [PATCH 1/6] Change examples to use SSL --- examples/libboostasio.cpp | 15 ++++++++++++--- examples/libev.cpp | 5 +++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/examples/libboostasio.cpp b/examples/libboostasio.cpp index d290382d..6c281f50 100644 --- a/examples/libboostasio.cpp +++ b/examples/libboostasio.cpp @@ -19,6 +19,9 @@ #include #include +#include +#include + /** * Main program * @return int @@ -32,9 +35,16 @@ int main() // handler for libev AMQP::LibBoostAsioHandler handler(service); - + + // init the SSL library +#if OPENSSL_VERSION_NUMBER < 0x10100000L + SSL_library_init(); +#else + OPENSSL_init_ssl(0, NULL); +#endif + // make a connection - AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/")); + AMQP::TcpConnection connection(&handler, AMQP::Address("amqps://guest:guest@localhost/")); // we need a channel too AMQP::TcpChannel channel(&connection); @@ -44,7 +54,6 @@ int main() // report the name of the temporary queue std::cout << "declared queue " << name << std::endl; - // now we can close the connection connection.close(); }); diff --git a/examples/libev.cpp b/examples/libev.cpp index 43f8bb27..06507b81 100644 --- a/examples/libev.cpp +++ b/examples/libev.cpp @@ -144,10 +144,11 @@ int main() #endif // make a connection - AMQP::Address address("amqp://guest:guest@localhost/"); -// AMQP::Address address("amqps://guest:guest@localhost/"); +// AMQP::Address address("amqp://guest:guest@localhost/"); + AMQP::Address address("amqps://guest:guest@localhost/"); AMQP::TcpConnection connection(&handler, address); + // we need a channel too AMQP::TcpChannel channel(&connection); From c2a9b49609d5b2fe301290550b39355d500164db Mon Sep 17 00:00:00 2001 From: tilsche Date: Tue, 29 May 2018 18:06:26 +0200 Subject: [PATCH 2/6] Fix includes --- src/linux_tcp/sslconnected.h | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index 07d575f8..5a065467 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -5,8 +5,8 @@ * * @copyright 2018 copernica BV */ - -/** + +/** * Include guard */ #pragma once @@ -16,10 +16,24 @@ */ #include "tcpoutbuffer.h" #include "tcpinbuffer.h" +#include "tcpstate.h" #include "wait.h" #include "sslwrapper.h" #include "sslshutdown.h" +#include +#include + +#include +#include + +#include + +extern "C" +{ +#include +} + /** * Set up namespace */ From 3994c9021797e22f1f6eb38f2ec06fb172b2b3b8 Mon Sep 17 00:00:00 2001 From: tilsche Date: Tue, 29 May 2018 18:06:41 +0200 Subject: [PATCH 3/6] Reorganize the SSL event loop always try to write if there is something in the buffer, always try to read remember why SSL failed to use as monitoring flags - and filter in process() --- src/linux_tcp/sslconnected.h | 285 +++++++++++++++-------------------- 1 file changed, 124 insertions(+), 161 deletions(-) diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index 5a065467..14a09d26 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -39,10 +39,10 @@ extern "C" */ namespace AMQP { -/** +/** * Class definition - */ -class SslConnected : public TcpState, private Watchable + */ +class SslConnected : public TcpState, private Watchable { private: /** @@ -51,12 +51,12 @@ class SslConnected : public TcpState, private Watchable */ SslWrapper _ssl; - /** + /** * Socket file descriptor * @var int */ int _socket; - + /** * The outgoing buffer * @var TcpBuffer @@ -68,23 +68,13 @@ class SslConnected : public TcpState, private Watchable * @var TcpInBuffer */ TcpInBuffer _in; - - /** - * Are we now busy with sending or receiving? - * @var enum - */ - enum State { - state_idle, - state_sending, - state_receiving - } _state; - + /** * Should we close the connection after we've finished all operations? * @var bool */ bool _closed = false; - + /** * Have we reported the final instruction to the user? * @var bool @@ -96,7 +86,20 @@ class SslConnected : public TcpState, private Watchable * @var size_t */ size_t _reallocate = 0; - + + /** + * Remember what the last logical SSL write operation that failed wanted from the socket + * @var int + */ + int _write_want_flags; + + /** + * Remember what the last logical SSL read operation that failed wanted from the socket + * This is initialized to both readable and writable to ensure the first process() will + * try to read. + * @var int + */ + int _receive_want_flags = readable | writable; /** * Close the connection @@ -106,16 +109,16 @@ class SslConnected : public TcpState, private Watchable { // do nothing if already closed if (_socket < 0) return false; - + // and stop monitoring it _handler->monitor(_connection, _socket, 0); // close the socket ::close(_socket); - + // forget filedescriptor _socket = -1; - + // done return true; } @@ -129,12 +132,12 @@ class SslConnected : public TcpState, private Watchable { // close the socket if it is still open close(); - - // if the object is still in a valid state, we can move to the close-state, + + // if the object is still in a valid state, we can move to the close-state, // otherwise there is no point in moving to a next state return monitor.valid() ? new TcpClosed(this) : nullptr; } - + /** * Proceed with the next operation after the previous operation was * a success, possibly changing the filedescriptor-monitor @@ -142,89 +145,68 @@ class SslConnected : public TcpState, private Watchable */ TcpState *proceed() { - // if we still have an outgoing buffer we want to send out data - if (_out) - { - // let's wait until the socket becomes writable - _handler->monitor(_connection, _socket, readable | writable); - } - else if (_closed) + if (_closed && !_out) { // start the state that closes the connection auto *nextstate = new SslShutdown(_connection, _socket, std::move(_ssl), _finalized, _handler); // we forget the current socket to prevent that it gets destructed _socket = -1; - + // report the next state return nextstate; } - else + + if (_write_want_flags | _receive_want_flags) { - // let's wait until the socket becomes readable - _handler->monitor(_connection, _socket, readable); + // this is the only place where we setup the monitor + // setup the monitor when our async operations didn't immedeately complete + _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); } - - // done return this; } - + /** * Method to repeat the previous call\ * @param monitor monitor to check if connection object still exists - * @param state the state that we were in * @param result result of an earlier SSL_get_error call * @return TcpState* */ - TcpState *repeat(const Monitor &monitor, enum State state, int error) + TcpState *repeat(const Monitor &monitor, int &want_flags, int error) { // check the error switch (error) { case SSL_ERROR_WANT_READ: - // remember state - _state = state; - - // the operation must be repeated when readable - _handler->monitor(_connection, _socket, readable); - + want_flags = readable; + // allow chaining return monitor.valid() ? this : nullptr; - + case SSL_ERROR_WANT_WRITE: - // remember state - _state = state; - - // wait until socket becomes writable again - _handler->monitor(_connection, _socket, readable | writable); + want_flags = writable; // allow chaining return monitor.valid() ? this : nullptr; case SSL_ERROR_NONE: - // we're ready for the next instruction from userspace - _state = state_idle; - - // turns out no error occured, an no action has to be rescheduled - _handler->monitor(_connection, _socket, _out || _closed ? readable | writable : readable); + // can not happen since repeat is only called when an SSL function failed + assert(false); - // allow chaining - return monitor.valid() ? this : nullptr; - default: // if we have already reported an error to user space, we can go to the final state right away if (_finalized) return finalstate(monitor); - + // remember that we've sent out an error _finalized = true; - + // tell the handler _handler->onError(_connection, "ssl error"); - + // go to the final state return finalstate(monitor); } } - + /** * Parse the received buffer * @param monitor object to check the existance of the connection object @@ -236,107 +218,103 @@ class SslConnected : public TcpState, private Watchable // we need a local copy of the buffer - because it is possible that "this" // object gets destructed halfway through the call to the parse() method TcpInBuffer buffer(std::move(_in)); - + // parse the buffer auto processed = _connection->parse(buffer); - + // "this" could be removed by now, check this if (!monitor.valid()) return nullptr; - + // shrink buffer buffer.shrink(processed); - + // restore the buffer as member _in = std::move(buffer); - + // do we have to reallocate? if (!_reallocate) return this; - + // reallocate the buffer - _in.reallocate(_reallocate); - + _in.reallocate(_reallocate); + // we can remove the reallocate instruction _reallocate = 0; - + // done return this; } - + /** - * Perform a write operation - * @param monitor object to check the existance of the connection object - * @param readable is the connection also readable and should we call a read operation afterwards? + * Write as much from _out as we can. This modifies _write_want_flags such that it is + * 0 if everything was written or set to whatever the failed SSL_Write wanted from the socket + * @param monitor object to check the existence of the connection object * @return TcpState* */ - TcpState *write(const Monitor &monitor, bool readable) + TcpState *write(const Monitor &monitor) { - // assume default state - _state = state_idle; - - // we are going to check for errors after the openssl operations, so we make + // we are going to check for errors after the openssl operations, so we make // sure that the error queue is currently completely empty OpenSSL::ERR_clear_error(); - - // because the output buffer contains a lot of small buffers, we can do multiple + + // because the output buffer contains a lot of small buffers, we can do multiple // operations till the buffer is empty (but only if the socket is not also // readable, because then we want to read that data first instead of endless writes do { // try to send more data from the outgoing buffer auto result = _out.sendto(_ssl); - + // we may have to repeat the operation on failure if (result > 0) continue; - + // check for error auto error = OpenSSL::SSL_get_error(_ssl, result); // the operation failed, we may have to repeat our call - return repeat(monitor, state_sending, error); + return repeat(monitor, _write_want_flags, error); } - while (_out && !readable); - - // proceed with the read operation or the event loop - return readable ? receive(monitor, false) : proceed(); + while (_out); + + // Doesn't matter + _write_want_flags = 0; + return monitor.valid() ? this : nullptr; } /** - * Perform a receive operation + * Read as much as we can. This modifies _receive_want_flags such that it is to whatever the + * failed SSL_Read wanted from the socket * @param monitor object to check the existance of the connection object - * @param writable is the socket writable, and should we start a write operation after this operation? * @return TcpState */ - TcpState *receive(const Monitor &monitor, bool writable) + TcpState *receive(const Monitor &monitor) { - // we are going to check for errors after the openssl operations, so we make + // we are going to check for errors after the openssl operations, so we make // sure that the error queue is currently completely empty OpenSSL::ERR_clear_error(); - // start a loop do { - // assume default state - _state = state_idle; - // read data from ssl into the buffer auto result = _in.receivefrom(_ssl, _connection->expected()); - + // if this is a failure, we are going to repeat the operation - if (result <= 0) return repeat(monitor, state_receiving, OpenSSL::SSL_get_error(_ssl, result)); + if (result <= 0) + { + return repeat(monitor, _receive_want_flags, OpenSSL::SSL_get_error(_ssl, result)); + } // go process the received data auto *nextstate = parse(monitor, result); - + // leap out if we moved to a different state if (nextstate != this) return nextstate; } - while (OpenSSL::SSL_pending(_ssl) > 0); - - // proceed with the write operation or the event loop - return writable && _out ? write(monitor, false) : proceed(); + while (1); + // We **really** read until we fail so that we know for sure what SSL wants from the socket + // otherwise we can't know whether we have to monitor for readable or writable } - + public: /** * Constructor @@ -346,18 +324,17 @@ class SslConnected : public TcpState, private Watchable * @param buffer The buffer that was already built * @param handler User-supplied handler object */ - SslConnected(TcpConnection *connection, int socket, SslWrapper &&ssl, TcpOutBuffer &&buffer, TcpHandler *handler) : + SslConnected(TcpConnection *connection, int socket, SslWrapper &&ssl, TcpOutBuffer &&buffer, TcpHandler *handler) : TcpState(connection, handler), _ssl(std::move(ssl)), _socket(socket), _out(std::move(buffer)), _in(4096), - _state(_out ? state_sending : state_idle) + _write_want_flags(_out ? readable | writable : 0) { - // tell the handler to monitor the socket if there is an out - _handler->monitor(_connection, _socket, _state == state_sending ? readable | writable : readable); + proceed(); } - + /** * Destructor */ @@ -366,7 +343,7 @@ class SslConnected : public TcpState, private Watchable // close the socket close(); } - + /** * The filedescriptor of this connection * @return int @@ -390,22 +367,16 @@ class SslConnected : public TcpState, private Watchable { // the socket must be the one this connection writes to if (fd != _socket) return this; - - // if we were busy with a write operation, we have to repeat that - if (_state == state_sending) return write(monitor, flags & readable); - - // same is true for read operations, they should also be repeated - if (_state == state_receiving) return receive(monitor, flags & writable); - - // if the socket is readable, we are going to receive data - if (flags & readable) return receive(monitor, flags & writable); - - // socket is not readable (so it must be writable), do we have data to write? - if (_out) return write(monitor, false); - - // the only scenario in which we can end up here is the socket should be - // closed, but instead of moving to the shutdown-state right, we call proceed() - // because that function is a little more careful + + if (_out && (_write_want_flags & flags)) { + TcpState *new_state = write(monitor); + if (new_state != this) return new_state; + } + + if (_receive_want_flags & flags) { + TcpState *new_state = receive(monitor); + if (new_state != this) return new_state; + } return proceed(); } @@ -416,31 +387,25 @@ class SslConnected : public TcpState, private Watchable */ virtual TcpState *flush(const Monitor &monitor) override { - // we are not going to do this is object is busy reading - if (_state == state_receiving) return this; - // create an object to wait for the filedescriptor to becomes active Wait wait(_socket); - // we are going to check for errors after the openssl operations, so we make + // we are going to check for errors after the openssl operations, so we make // sure that the error queue is currently completely empty OpenSSL::ERR_clear_error(); - + // keep looping while we have an outgoing buffer while (_out) { - // move to the idle-state - _state = state_idle; - // try to send more data from the outgoing buffer auto result = _out.sendto(_ssl); - + // was this a success? if (result > 0) { // proceed to the next state auto *nextstate = proceed(); - + // leap out if we move to a different state if (nextstate != this) return nextstate; } @@ -450,11 +415,11 @@ class SslConnected : public TcpState, private Watchable auto error = OpenSSL::SSL_get_error(_ssl, result); // get the next state given the error - auto *nextstate = repeat(monitor, state_sending, error); - + auto *nextstate = repeat(monitor, _write_want_flags, error); + // leap out if we move to a different state if (nextstate != this) return nextstate; - + // check the type of error, and wait now switch (error) { case SSL_ERROR_WANT_READ: wait.readable(); break; @@ -462,7 +427,9 @@ class SslConnected : public TcpState, private Watchable } } } - + + _write_want_flags = 0; + // done return this; } @@ -477,12 +444,12 @@ class SslConnected : public TcpState, private Watchable // put the data in the outgoing buffer _out.add(buffer, size); - // if we're already busy with sending or receiving, we first have to wait - // for that operation to complete before we can move on - if (_state != state_idle) return; - - // let's wait until the socket becomes writable - _handler->monitor(_connection, _socket, readable | writable); + // Try to immediately send the data (asynchronously) to make sure whe know what + // event (readable/writable) we need to monitor for. + Monitor monitor(this); + write(monitor); + + proceed(); } /** @@ -494,11 +461,11 @@ class SslConnected : public TcpState, private Watchable { // remember that we have to reallocate (_in member can not be accessed because it is moved away) _reallocate = _connection->maxFrame(); - + // pass to base return TcpState::reportNegotiate(heartbeat); } - + /** * Report a connection error * @param error @@ -508,13 +475,13 @@ class SslConnected : public TcpState, private Watchable // we want to start the elegant ssl shutdown procedure, so we call reportClosed() here too, // because that function does exactly what we want to do here too reportClosed(); - + // if the user was already notified of an final state, we do not have to proceed if (_finalized) return; - + // remember that this is the final call to user space _finalized = true; - + // pass to handler _handler->onError(_connection, error); } @@ -526,14 +493,10 @@ class SslConnected : public TcpState, private Watchable { // remember that the object is going to be closed _closed = true; - - // if the previous operation is still in progress we can wait for that - if (_state != state_idle) return; - - // wait until the connection is writable so that we can close it then - _handler->monitor(_connection, _socket, readable | writable); + + proceed(); } -}; +}; /** * End of namespace From 3c777ddf1ab8d49e74ccb4eeefbf0ae218e8009c Mon Sep 17 00:00:00 2001 From: tilsche Date: Tue, 5 Jun 2018 13:51:56 +0200 Subject: [PATCH 4/6] Can't process in certain situation because we need to take into account the return value --- src/linux_tcp/sslconnected.h | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index 14a09d26..b8b6ca0d 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -159,8 +159,7 @@ class SslConnected : public TcpState, private Watchable if (_write_want_flags | _receive_want_flags) { - // this is the only place where we setup the monitor - // setup the monitor when our async operations didn't immedeately complete + // setup the monitor when our async operations didn't immediately complete _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); } return this; @@ -449,7 +448,11 @@ class SslConnected : public TcpState, private Watchable Monitor monitor(this); write(monitor); - proceed(); + if (_write_want_flags | _receive_want_flags) + { + // setup the monitor when our async operations didn't immediately complete + _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); + } } /** @@ -494,7 +497,11 @@ class SslConnected : public TcpState, private Watchable // remember that the object is going to be closed _closed = true; - proceed(); + if (_write_want_flags | _receive_want_flags) + { + // setup the monitor when our async operations didn't immediately complete + _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); + } } }; From 930aa1c643c7b4ad8e776ceeb1a22dc3c7f7f58b Mon Sep 17 00:00:00 2001 From: tilsche Date: Tue, 5 Jun 2018 13:58:18 +0200 Subject: [PATCH 5/6] Avoid setting up redundant read/write handlers when process() already called monitor->events --- include/amqpcpp/libboostasio.h | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/include/amqpcpp/libboostasio.h b/include/amqpcpp/libboostasio.h index c1492f5d..d8074f46 100644 --- a/include/amqpcpp/libboostasio.h +++ b/include/amqpcpp/libboostasio.h @@ -192,11 +192,15 @@ class LibBoostAsioHandler : public virtual TcpHandler { connection->process(fd, AMQP::readable); - _read_pending = true; - - _socket.async_read_some( - boost::asio::null_buffers(), - get_read_handler(connection, fd)); + // Avoid setting up too many read handlers if process already setup a read handler + if (!_read_pending) + { + _read_pending = true; + + _socket.async_read_some( + boost::asio::null_buffers(), + get_read_handler(connection, fd)); + } } } @@ -226,11 +230,15 @@ class LibBoostAsioHandler : public virtual TcpHandler { connection->process(fd, AMQP::writable); - _write_pending = true; + // Avoid setting up too many write handlers if process() already setup a write handler + if (!_write_pending) + { + _write_pending = true; - _socket.async_write_some( - boost::asio::null_buffers(), - get_write_handler(connection, fd)); + _socket.async_write_some( + boost::asio::null_buffers(), + get_write_handler(connection, fd)); + } } } From 02e3961ca3966ce9217b064073e9ac65c8910907 Mon Sep 17 00:00:00 2001 From: Mario Bielert Date: Wed, 6 Jun 2018 15:00:30 +0200 Subject: [PATCH 6/6] Fixes calling of proceed() without checking the result. Also refactors redundant code into a helper function --- src/linux_tcp/sslconnected.h | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/linux_tcp/sslconnected.h b/src/linux_tcp/sslconnected.h index b8b6ca0d..1efde17a 100644 --- a/src/linux_tcp/sslconnected.h +++ b/src/linux_tcp/sslconnected.h @@ -138,6 +138,14 @@ class SslConnected : public TcpState, private Watchable return monitor.valid() ? new TcpClosed(this) : nullptr; } + void setup_monitor() + { + if (_write_want_flags | _receive_want_flags) + { + _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); + } + } + /** * Proceed with the next operation after the previous operation was * a success, possibly changing the filedescriptor-monitor @@ -157,11 +165,9 @@ class SslConnected : public TcpState, private Watchable return nextstate; } - if (_write_want_flags | _receive_want_flags) - { - // setup the monitor when our async operations didn't immediately complete - _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); - } + // setup the monitor when our async operations didn't immediately complete + setup_monitor(); + return this; } @@ -331,7 +337,7 @@ class SslConnected : public TcpState, private Watchable _in(4096), _write_want_flags(_out ? readable | writable : 0) { - proceed(); + setup_monitor(); } /** @@ -448,11 +454,8 @@ class SslConnected : public TcpState, private Watchable Monitor monitor(this); write(monitor); - if (_write_want_flags | _receive_want_flags) - { - // setup the monitor when our async operations didn't immediately complete - _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); - } + // setup the monitor when our async operations didn't immediately complete + setup_monitor(); } /** @@ -497,11 +500,8 @@ class SslConnected : public TcpState, private Watchable // remember that the object is going to be closed _closed = true; - if (_write_want_flags | _receive_want_flags) - { - // setup the monitor when our async operations didn't immediately complete - _handler->monitor(_connection, _socket, _write_want_flags | _receive_want_flags); - } + // setup the monitor when our async operations didn't immediately complete + setup_monitor(); } };