diff --git a/src/api/qpid-proton/reactor/SendingClient.cpp b/src/api/qpid-proton/reactor/SendingClient.cpp index 8be674b..0ef2bd6 100644 --- a/src/api/qpid-proton/reactor/SendingClient.cpp +++ b/src/api/qpid-proton/reactor/SendingClient.cpp @@ -556,13 +556,19 @@ int SendingClient::run(int argc, char **argv) const ); handler.setMessage(msg); - + int count = 1; if (options.is_set("count")) { count = static_cast (options.get("count")); } handler.setCount(count); + int tx_size = 1; + if (options.is_set("tx-size")) { + tx_size = static_cast (options.get("tx-size")); + } + handler.setBatchSize(tx_size); + try { container(handler).run(); diff --git a/src/api/qpid-proton/reactor/handler/CommonHandler.h b/src/api/qpid-proton/reactor/handler/CommonHandler.h index 5919b97..b197074 100644 --- a/src/api/qpid-proton/reactor/handler/CommonHandler.h +++ b/src/api/qpid-proton/reactor/handler/CommonHandler.h @@ -19,7 +19,7 @@ #include #include #include - +#include #include @@ -35,6 +35,8 @@ #include "logger/LoggerWrapper.h" using proton::messaging_handler; +using proton::transaction_handler; +using proton::transaction; using proton::container; using proton::void_function0; using proton::duration; @@ -58,7 +60,7 @@ using dtests::common::UriParser; * An abstract proton message handler providing a common interface for other * client handlers */ -class CommonHandler : public messaging_handler { +class CommonHandler : public messaging_handler, transaction_handler { public: /** * Constructor @@ -121,6 +123,8 @@ class CommonHandler : public messaging_handler { virtual void timerEvent() = 0; + transaction_handler th; + protected: diff --git a/src/api/qpid-proton/reactor/handler/SenderHandler.cpp b/src/api/qpid-proton/reactor/handler/SenderHandler.cpp index 7f3b576..99d479e 100644 --- a/src/api/qpid-proton/reactor/handler/SenderHandler.cpp +++ b/src/api/qpid-proton/reactor/handler/SenderHandler.cpp @@ -88,6 +88,11 @@ SenderHandler::SenderHandler( duration_mode(duration_mode), sent(0), confirmedSent(0), + batch_size(1), + current_batch(0), + committed(0), + confirmed(0), + total(0), m(), timer_event(*this), interval(duration::IMMEDIATE) @@ -205,6 +210,11 @@ void SenderHandler::on_container_start(container &c) work_q->schedule(duration::IMMEDIATE, make_work(&SenderHandler::checkIfCanSend, this)); } #endif + // TODO Transactions + tx = NULL; + cont = &c; + // currently causes seqfault + // c.declare_transaction(conn, th); } void SenderHandler::on_sendable(sender &s) @@ -258,6 +268,14 @@ void SenderHandler::on_tracker_reject(tracker &t) void SenderHandler::on_connection_close(connection &c) { logger(debug) << "Closing connection"; + // TODO debug remove + // logger(info) << "Transactions"; + // logger(info) << "Transaction total: " << total; + // logger(info) << "Transaction sent: " << sent; + // logger(info) << "Transaction batch size: " << batch_size; + // logger(info) << "Transaction current batch: " << current_batch; + // logger(info) << "Transaction confirmed: " << confirmed; + // logger(info) << "Transaction committed: " << committed; } void SenderHandler::on_connection_error(connection &c) @@ -269,7 +287,6 @@ void SenderHandler::on_connection_error(connection &c) } } - void SenderHandler::setCount(int count) { this->count = count; @@ -280,6 +297,16 @@ int SenderHandler::getCount() const return count; } +void SenderHandler::setBatchSize(int batchSize) +{ + this->batch_size = batchSize; +} + +int SenderHandler::getBatchSize() const +{ + return batch_size; +} + void SenderHandler::setMessage(message &msg) { this->m = msg; @@ -361,6 +388,34 @@ void SenderHandler::send() ready = false; } +// TODO VIP TRANSACTIONS + +void SenderHandler::on_transaction_declared(transaction &t) { + tx = &t; + send(); +} + +void SenderHandler::on_transaction_committed(transaction &t) { + committed += current_batch; + connection conn = sndr.connection(); + if(committed == total) { + std::cout << "All messages committed"; + conn.close(); + } + else { + current_batch = 0; + cont->declare_transaction(conn, th); + } +} + +void SenderHandler::on_sender_close(sender &s) { + current_batch = 0; +} + +// override +// void SenderHandler::on_sendable(sender &s) override {} +// void SenderHandler::on_tracker_accept(tracker &t) override {} + } /* namespace reactor */ } /* namespace proton */ } /* namespace dtests */ diff --git a/src/api/qpid-proton/reactor/handler/SenderHandler.h b/src/api/qpid-proton/reactor/handler/SenderHandler.h index 0b81439..010d36c 100644 --- a/src/api/qpid-proton/reactor/handler/SenderHandler.h +++ b/src/api/qpid-proton/reactor/handler/SenderHandler.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "CommonHandler.h" #include "Timer.h" @@ -36,6 +37,9 @@ using proton::source_options; using proton::transport; using proton::tracker; using proton::connection_options; +using proton::sender; +using proton::transaction; +using proton::transaction_handler; namespace dtests { namespace proton { @@ -124,6 +128,11 @@ class SenderHandler : public CommonHandler { void on_transport_error(transport &t); void on_transport_close(transport &t); + // TODO TX Support + void on_sender_close(sender &s); + void on_transaction_declared(transaction &t); + void on_transaction_committed(transaction &t); + /** * Sets the message count * @param count the message count @@ -136,6 +145,18 @@ class SenderHandler : public CommonHandler { */ int getCount() const; + /** + * Sets the transaction batch size + * @param batch_size the transaction batch size + */ + void setBatchSize(int batchSize); + + /** + * Gets the transaction batch size + * @return the transaction batch size + */ + int getBatchSize() const; + /** * Sets the message to send * @param m the message to send @@ -159,7 +180,17 @@ class SenderHandler : public CommonHandler { string duration_mode; int sent; int confirmedSent; + + // transactions + int batch_size = 0; + int current_batch = 0; + int committed = 0; + int confirmed = 0; + int total = 0; + sender sndr; + transaction *tx; + container *cont; message m; diff --git a/src/common/options/modern/ModernOptionsParser.cpp b/src/common/options/modern/ModernOptionsParser.cpp index 8a95a01..03244de 100644 --- a/src/common/options/modern/ModernOptionsParser.cpp +++ b/src/common/options/modern/ModernOptionsParser.cpp @@ -126,6 +126,27 @@ ModernOptionsParser::ModernOptionsParser() .help("client reconnect TIMEOUT (default: -1)") .metavar("TIMEOUT"); + // transactions + add_option("--tx-size") + .dest("tx-size") + .help("transactional mode: batch message count size (default: 0)") + .metavar("TX_SIZE"); + + char const* const choices[] = { "commit", "rollback", "none" }; + add_option("--tx-action") + .dest("tx-action") + .help("transactional action at the end of tx batch (default: commit)") + .type("choice") + .choices(std::begin(choices), std::end(choices)) + .metavar("TX_ACTION"); + + add_option("--tx-endloop-action") + .dest("tx-endloop-action") + .help("transactional action after sending all messages in loop") + .type("choice") + .choices(std::begin(choices), std::end(choices)) + .metavar("TX_ENDLOOP_ACTION"); + /*********************** Reactive C++ API client extras ***********************/ add_option("--conn-reconnect-first") .dest("conn-reconnect-first")