From 3b685edb1af9d818bfdc792711e6f6135c4b7ea5 Mon Sep 17 00:00:00 2001 From: Javad Saberlatibari <76922325+javadsaberlatibari@users.noreply.github.com> Date: Sat, 24 Feb 2024 21:20:48 -0800 Subject: [PATCH] Add files via upload --- .../benchmark/twopset-benchmarkcpp | 126 ++++++++++++++ wellcoordination/benchmark/twopset-crdt.hpp | 156 ++++++++++++++++++ wellcoordination/benchmark/twopset.hpp | 102 ++++++++++++ 3 files changed, 384 insertions(+) create mode 100644 wellcoordination/benchmark/twopset-benchmarkcpp create mode 100644 wellcoordination/benchmark/twopset-crdt.hpp create mode 100644 wellcoordination/benchmark/twopset.hpp diff --git a/wellcoordination/benchmark/twopset-benchmarkcpp b/wellcoordination/benchmark/twopset-benchmarkcpp new file mode 100644 index 0000000..b7b1bdf --- /dev/null +++ b/wellcoordination/benchmark/twopset-benchmarkcpp @@ -0,0 +1,126 @@ +#include +#include +#include +#include +#include + +#include "twopset.hpp" + +int main(int argc, char* argv[]) { + std::string loc = + "/users/jsaber/binHamband/workload/"; + + int nr_procs = static_cast(std::atoi(argv[1])); + int num_ops = static_cast(std::atoi(argv[2])); + double write_percentage = static_cast(std::atoi(argv[3])); + + loc += std::to_string(nr_procs) + "-" + std::to_string(num_ops) + "-" + + std::to_string(static_cast(write_percentage)); + loc += "/twopset/"; + + std::ofstream* outfile = new std::ofstream[nr_procs]; + std::vector* calls = new std::vector[nr_procs]; + for (int i = 0; i < nr_procs; i++) { + remove((loc + std::to_string(i + 1) + ".txt").c_str()); + outfile[i].open(loc + std::to_string(i + 1) + ".txt", std::ios_base::app); + calls[i] = std::vector(); + } + TWOPSet* test = new TWOPSet(); + + write_percentage /= 100; + int num_replicas = nr_procs; + double total_writes = num_ops * write_percentage; + int queries = num_ops - total_writes; + + int num_nonconflicting_write_methods = 2; + int num_read_methods = 1; + + std::cout << "ops: " << num_ops << std::endl; + std::cout << "write_perc: " << write_percentage << std::endl; + std::cout << "writes: " << total_writes << std::endl; + std::cout << "reads: " << queries << std::endl; + + int expected_calls_per_update_method = total_writes /num_nonconflicting_write_methods; + std::cout << "expected #calls per update method " + << expected_calls_per_update_method << std::endl; + + int expected_calls_per_update_method_per_node = expected_calls_per_update_method / nr_procs; + + std::cout << "expected #calls per nonconflicting writes in nodes " + << expected_calls_per_update_method_per_node << std::endl; + + int write_calls = + expected_calls_per_update_method * num_nonconflicting_write_methods; + + int write_calls_issued = 0; + int reads_issues = 0; + + // first allocating writes operations to the nodes + for (int i = 1; i <= num_replicas; i++) { + // non-conflicting write method + for (int count = 0; + count < expected_calls_per_update_method_per_node; count++) { + std::string element = std::to_string(std::rand() % 1000000); + std::string callStr; + callStr = "0 " + element; + calls[i - 1].push_back(callStr); + for(int x = 0; x < static_cast(1/write_percentage)+15; x++) + { + if(calls[i - 1].size() == num_ops/nr_procs) + break; + calls[i - 1].push_back(std::string("2")); + reads_issues++; + } + callStr = "1 " + element; + calls[i - 1].push_back(callStr); + write_calls_issued += 2; + } + + } + + + std::cout << "write calls issued: " << write_calls_issued << std::endl; + std::cout << "read calls issued: " << reads_issues << std::endl; + + // allocate reads to the nodes + int q = num_ops - write_calls_issued; + int read_calls = q - reads_issues; + std::cout << "extra read calls needed: " << read_calls << std::endl; + + int index = 0; + + std::cout << "after adding writes to nodes" << std::endl; + for (int i = 0; i < nr_procs; i++) + std::cout << i + 1 << " size: " << calls[i].size() << std::endl; + + + if (read_calls != 0) { + for (int i = 0; i < nr_procs; i++){ + for (int j = 0; j < read_calls / nr_procs; j++) + { + calls[i].push_back(std::string("2")); + } + } + } + + + std::cout << "after adding reads to all" << std::endl; + for (int i = 0; i < nr_procs; i++) + std::cout << i + 1 << " size: " << calls[i].size() << std::endl; + + + for (int i = 0; i < nr_procs; i++) { + calls[i].insert(calls[i].begin(), + std::string("#" + std::to_string(write_calls_issued))); + // std::random_shuffle(calls[i].begin() + 1, calls[i].end()); + } + + for (int i = 0; i < nr_procs; i++) { + for (int x = 0; x < calls[i].size(); x++) + outfile[i] << calls[i][x] << std::endl; + outfile[i].close(); + } + + std::cout << "expected calls to receive: " << write_calls_issued << std::endl; + return 0; +} diff --git a/wellcoordination/benchmark/twopset-crdt.hpp b/wellcoordination/benchmark/twopset-crdt.hpp new file mode 100644 index 0000000..ac435e8 --- /dev/null +++ b/wellcoordination/benchmark/twopset-crdt.hpp @@ -0,0 +1,156 @@ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../src/replicated_object_crdt.hpp" + + +typedef unsigned char uint8_t; + +class TWOPSet : public ReplicatedObject +{ +private: + +public: + + enum MethodType{ + ADD = 0, + REMOVE = 1, + QUERY = 2 + }; + std::recursive_mutex m; + std::set setsourceadd; + std::set setremoteadd; + std::set setsourceremove; + std::set setremoteremove; + + + + TWOPSet() { + read_methods.push_back(static_cast(MethodType::QUERY)); + + update_methods.push_back(static_cast(MethodType::ADD)); + update_methods.push_back(static_cast(MethodType::REMOVE)); + + method_args.insert(std::make_pair(static_cast(MethodType::ADD), 1)); + method_args.insert(std::make_pair(static_cast(MethodType::REMOVE), 1)); + method_args.insert(std::make_pair(static_cast(MethodType::QUERY), 0)); + } + + TWOPSet(TWOPSet &obj) : ReplicatedObject(obj) + { + //state + setsource = obj.setsource; + setsource = obj.setremote; + } + + virtual void toString() + { + std::cout << "#elementssource: " << (setsourceadd.size()-setsourceremove.size()) << std::endl; + std::cout << "#elementsremote: " << (setremoteadd.size()-setremotearemove.size()) << std::endl; + } + + + // 0 + std::string add(std::string a) + { + return ""; + } + std::string remove(std::string a) + { + return ""; + } + std::string addDownstream(std::string a, bool b) + { + //std::scoped_lock lock(m); + if (b==false) + setsourceadd.insert(a); + else + setremoteadd.insert(a); + return ""; + } + std::string removeDownstream(std::string a, bool b) + { + //std::scoped_lock lock(m); + if (b==false) + setsourceremove.insert(a); + else + setremoteremove.insert(a); + return ""; + } + // 1 + TWOPSet query() { return *this; } + + + virtual std::string execute(MethodCall call) + { + switch (static_cast(call.method_type)) + { + case MethodType::ADD: + return add(call.arg); + break; + case MethodType::REMOVE: + return remove(call.arg); + break; + case MethodType::QUERY: + return ""; + break; + default: + std::cout << "wrong method name" << std::endl; + break; + } + return ""; + } + + + virtual ReplicatedObject* executeDownstream(MethodCall call, bool b) + { + switch (static_cast(call.method_type)) + { + case MethodType::ADD: + { + size_t index = call.arg.find_first_of('-'); + if(index != std::string::npos){ + std::string arg = call.arg.substr(0, index); + addDownstream((arg), b); + } + else + addDownstream((call.arg), b); + + break; + } + case MethodType::REMOVE: + { + size_t index = call.arg.find_first_of('-'); + if(index != std::string::npos){ + std::string arg = call.arg.substr(0, index); + removeDownstream((arg), b); + } + else + removeDownstream((call.arg), b); + + break; + } + case MethodType::QUERY: + return this; + break; + default: + std::cout << "wrong method name" << std::endl; + break; + } + return this; + } + + + + virtual bool isPermissible(MethodCall call) + { + return true; + } +}; \ No newline at end of file diff --git a/wellcoordination/benchmark/twopset.hpp b/wellcoordination/benchmark/twopset.hpp new file mode 100644 index 0000000..f675061 --- /dev/null +++ b/wellcoordination/benchmark/twopset.hpp @@ -0,0 +1,102 @@ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../src/replicated_object.hpp" + + +typedef unsigned char uint8_t; + +class TWOPSet : public ReplicatedObject +{ +private: + +public: + + enum MethodType{ + ADD = 0, + REMOVE = 1, + QUERY = 2 + }; + std::atomic addlock, removelock; + std::set addset; + std::set removeset; + + + + TWOPSet() { + read_methods.push_back(static_cast(MethodType::QUERY)); + + update_methods.push_back(static_cast(MethodType::ADD)); + update_methods.push_back(static_cast(MethodType::REMOVE)); + + method_args.insert(std::make_pair(static_cast(MethodType::ADD), 1)); + method_args.insert(std::make_pair(static_cast(MethodType::REMOVE), 1)); + method_args.insert(std::make_pair(static_cast(MethodType::QUERY), 0)); + } + + TWOPSet(TWOPSet &obj) : ReplicatedObject(obj) + { + //state + addset = obj.addset; + removeset = obj.removeset; + } + + virtual void toString() + { + std::cout << "#elements: " << (addset.size()-removeset.size()) << std::endl; + } + + + // 0 + void add(std::string a) + { + while(addlock.load()); + addlock.store(true); + addset.insert(a); + addlock.store(false); + } + void remove(std::string a) + { + while(removelock.load()); + removelock.store(true); + removeset.insert(a); + removelock.store(false); + } + // 1 + TWOPSet query() { return *this; } + + + virtual ReplicatedObject* execute(MethodCall call) + { + switch (static_cast(call.method_type)) + { + case MethodType::ADD: + add(call.arg); + break; + case MethodType::REMOVE: + remove(call.arg); + break; + case MethodType::QUERY: + return this; + break; + default: + std::cout << "wrong method name" << std::endl; + break; + } + return this; + } + + + + virtual bool isPermissible(MethodCall call) + { + return true; + } +}; \ No newline at end of file