Skip to content

Commit

Permalink
wip: initial version
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Matousek committed Nov 15, 2024
1 parent aa57569 commit 1888d19
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 4 deletions.
8 changes: 7 additions & 1 deletion src/api/qpid-proton/reactor/SendingClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,13 +556,19 @@ int SendingClient::run(int argc, char **argv) const
);

handler.setMessage(msg);

int count = 1;
if (options.is_set("count")) {
count = static_cast<int> (options.get("count"));
}
handler.setCount(count);

int tx_size = 1;
if (options.is_set("tx-size")) {
tx_size = static_cast<int> (options.get("tx-size"));
}
handler.setBatchSize(tx_size);

try {
container(handler).run();

Expand Down
8 changes: 6 additions & 2 deletions src/api/qpid-proton/reactor/handler/CommonHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/reconnect_options.hpp>

#include <proton/transaction.hpp>

#include <proton/function.hpp>

Expand All @@ -35,6 +35,8 @@
#include "logger/LoggerWrapper.h"

using proton::messaging_handler;
using proton::transaction_handler;
using proton::transaction;
using proton::container;
using proton::void_function0;
using proton::duration;
Expand All @@ -58,7 +60,7 @@ using dtests::common::UriParser;
* An abstract proton message handler providing a common interface for other
* client handlers
*/
class CommonHandler : public messaging_handler {
class CommonHandler : public messaging_handler, transaction_handler {
public:
/**
* Constructor
Expand Down Expand Up @@ -121,6 +123,8 @@ class CommonHandler : public messaging_handler {

virtual void timerEvent() = 0;

transaction_handler th;

protected:


Expand Down
57 changes: 56 additions & 1 deletion src/api/qpid-proton/reactor/handler/SenderHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ SenderHandler::SenderHandler(
duration_mode(duration_mode),
sent(0),
confirmedSent(0),
batch_size(1),
current_batch(0),
committed(0),
confirmed(0),
total(0),
m(),
timer_event(*this),
interval(duration::IMMEDIATE)
Expand Down Expand Up @@ -205,6 +210,11 @@ void SenderHandler::on_container_start(container &c)
work_q->schedule(duration::IMMEDIATE, make_work(&SenderHandler::checkIfCanSend, this));
}
#endif
// TODO Transactions
tx = NULL;
cont = &c;
// currently causes seqfault
// c.declare_transaction(conn, th);
}

void SenderHandler::on_sendable(sender &s)
Expand Down Expand Up @@ -258,6 +268,14 @@ void SenderHandler::on_tracker_reject(tracker &t)
void SenderHandler::on_connection_close(connection &c)
{
logger(debug) << "Closing connection";
// TODO debug remove
// logger(info) << "Transactions";
// logger(info) << "Transaction total: " << total;
// logger(info) << "Transaction sent: " << sent;
// logger(info) << "Transaction batch size: " << batch_size;
// logger(info) << "Transaction current batch: " << current_batch;
// logger(info) << "Transaction confirmed: " << confirmed;
// logger(info) << "Transaction committed: " << committed;
}

void SenderHandler::on_connection_error(connection &c)
Expand All @@ -269,7 +287,6 @@ void SenderHandler::on_connection_error(connection &c)
}
}


void SenderHandler::setCount(int count)
{
this->count = count;
Expand All @@ -280,6 +297,16 @@ int SenderHandler::getCount() const
return count;
}

void SenderHandler::setBatchSize(int batchSize)
{
this->batch_size = batchSize;
}

int SenderHandler::getBatchSize() const
{
return batch_size;
}

void SenderHandler::setMessage(message &msg)
{
this->m = msg;
Expand Down Expand Up @@ -361,6 +388,34 @@ void SenderHandler::send()
ready = false;
}

// TODO VIP TRANSACTIONS

void SenderHandler::on_transaction_declared(transaction &t) {
tx = &t;
send();
}

void SenderHandler::on_transaction_committed(transaction &t) {
committed += current_batch;
connection conn = sndr.connection();
if(committed == total) {
std::cout << "All messages committed";
conn.close();
}
else {
current_batch = 0;
cont->declare_transaction(conn, th);
}
}

void SenderHandler::on_sender_close(sender &s) {
current_batch = 0;
}

// override
// void SenderHandler::on_sendable(sender &s) override {}
// void SenderHandler::on_tracker_accept(tracker &t) override {}

} /* namespace reactor */
} /* namespace proton */
} /* namespace dtests */
31 changes: 31 additions & 0 deletions src/api/qpid-proton/reactor/handler/SenderHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <proton/connection_options.hpp>
#include <proton/sender_options.hpp>
#include <proton/thread_safe.hpp>
#include <proton/transaction.hpp>

#include "CommonHandler.h"
#include "Timer.h"
Expand All @@ -36,6 +37,9 @@ using proton::source_options;
using proton::transport;
using proton::tracker;
using proton::connection_options;
using proton::sender;
using proton::transaction;
using proton::transaction_handler;

namespace dtests {
namespace proton {
Expand Down Expand Up @@ -124,6 +128,11 @@ class SenderHandler : public CommonHandler {
void on_transport_error(transport &t);
void on_transport_close(transport &t);

// TODO TX Support
void on_sender_close(sender &s);
void on_transaction_declared(transaction &t);
void on_transaction_committed(transaction &t);

/**
* Sets the message count
* @param count the message count
Expand All @@ -136,6 +145,18 @@ class SenderHandler : public CommonHandler {
*/
int getCount() const;

/**
* Sets the transaction batch size
* @param batch_size the transaction batch size
*/
void setBatchSize(int batchSize);

/**
* Gets the transaction batch size
* @return the transaction batch size
*/
int getBatchSize() const;

/**
* Sets the message to send
* @param m the message to send
Expand All @@ -159,7 +180,17 @@ class SenderHandler : public CommonHandler {
string duration_mode;
int sent;
int confirmedSent;

// transactions
int batch_size = 0;
int current_batch = 0;
int committed = 0;
int confirmed = 0;
int total = 0;

sender sndr;
transaction *tx;
container *cont;

message m;

Expand Down
21 changes: 21 additions & 0 deletions src/common/options/modern/ModernOptionsParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ ModernOptionsParser::ModernOptionsParser()
.help("client reconnect TIMEOUT (default: -1)")
.metavar("TIMEOUT");

// transactions
add_option("--tx-size")
.dest("tx-size")
.help("transactional mode: batch message count size (default: 0)")
.metavar("TX_SIZE");

char const* const choices[] = { "commit", "rollback", "none" };
add_option("--tx-action")
.dest("tx-action")
.help("transactional action at the end of tx batch (default: commit)")
.type("choice")
.choices(std::begin(choices), std::end(choices))
.metavar("TX_ACTION");

add_option("--tx-endloop-action")
.dest("tx-endloop-action")
.help("transactional action after sending all messages in loop")
.type("choice")
.choices(std::begin(choices), std::end(choices))
.metavar("TX_ENDLOOP_ACTION");

/*********************** Reactive C++ API client extras ***********************/
add_option("--conn-reconnect-first")
.dest("conn-reconnect-first")
Expand Down

0 comments on commit 1888d19

Please sign in to comment.