Skip to content

Commit

Permalink
Refactor connection handshake code
Browse files Browse the repository at this point in the history
The code handling the connection to other nodes is now split into more, smaller functions
to increase readability. API and internal code has also been seperated into respective files.
Connection closing is temporarily broken.
  • Loading branch information
Felix-Rm committed Jun 9, 2022
1 parent cc3f009 commit 59bfc7e
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 397 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ macro(IAC_BUILD target_name compile_defs)
"local_endpoint.cpp"
"local_node.cpp"
"local_node_api.cpp"
"local_node_state_handling.cpp"
"local_node_package_handling.cpp"
"local_transport_route.cpp"
"logging.cpp"
"network.cpp"
Expand Down
341 changes: 18 additions & 323 deletions src/local_node.cpp

Large diffs are not rendered by default.

52 changes: 23 additions & 29 deletions src/local_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
#include "std_provider/utility.hpp"
#include "std_provider/vector.hpp"

#ifndef ARDUINO
# include <chrono>
#endif

namespace iac {

IAC_MAKE_EXCEPTION(OutOfTrIdException);

#define IAC_LOG_PACKAGE_SEND(level, type) \
iac_log_from_node(level, "sending " type " package to %d over %d\n", m_network.route(route->id()).node2(), route->id());

#define IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(level, type, info, ...) \
iac_log_from_node(level, "received " type " package over %d " info "\n", package.route()->id(), __VA_ARGS__);

#define IAC_LOG_PACKAGE_RECEIVE(level, type) \
IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(level, type, "", 0);

class LocalNode : public Node {
public:
LocalNode(route_timings_t route_timings = {});
Expand Down Expand Up @@ -70,7 +75,7 @@ class LocalNode : public Node {

private:
static constexpr uint16_t s_min_heartbeat_interval_ms = 100;
static constexpr uint16_t s_min_assume_dead_time = s_min_heartbeat_interval_ms * 2;
static constexpr uint16_t s_min_assume_dead_time = s_min_heartbeat_interval_ms * 3;
static constexpr uint8_t s_num_package_reads_from_route_per_update = 5;

route_timings_t m_default_route_timings;
Expand All @@ -79,42 +84,31 @@ class LocalNode : public Node {

unordered_set<uint8_t> m_used_tr_ids;

bool handle_package(Package& package, LocalTransportRoute* route = nullptr);
bool send_package(const Package& package);
bool send_package(const Package& package, LocalTransportRoute* route);
bool handle_package(const Package& package);

bool handle_connect(Package& package);
bool handle_disconnect(const Package& package);
bool handle_disconnect(LocalTransportRoute* route);
bool handle_connect(const Package& package);
bool handle_heartbeat(const Package& package);
bool handle_ack(const Package& package);
bool handle_network_update(const Package& package);

bool read_from(LocalTransportRoute* route);

bool state_handling(LocalTransportRoute* route);

bool open_route(LocalTransportRoute* route);
bool close_route(LocalTransportRoute* route);
bool send_handshake(LocalTransportRoute* route);
bool send_heartbeat(LocalTransportRoute* route);

bool send_connect_package(LocalTransportRoute* route, bool relay);
bool send_connect(LocalTransportRoute* route);
bool send_heartbeat(LocalTransportRoute* route);
bool send_ack(LocalTransportRoute* route);
bool send_network_update(LocalTransportRoute* route);

uint8_t get_tr_id();
bool pop_tr_id(uint8_t id);

static pair<tr_id_t, uint8_t> best_local_route(const unordered_map<tr_id_t, uint8_t>& local_routes);
static LocalTransportRoute::timestamp_t timestamp() {
#ifdef ARDUINO
return millis();
#else
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
#endif
}

template <typename T, typename U>
void delete_mapping(T mapping, LocalNode* obj, bool (LocalNode::*remove_func)(U)) {
for (auto it = mapping.begin(); it != mapping.end();) {
U id = it->first;
it++;

(obj->*remove_func)(id);
}
};
};

} // namespace iac
14 changes: 7 additions & 7 deletions src/local_node_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const uint
return send(from.id(), to, type, buffer, buffer_length, buffer_management);
}

bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, Package::buffer_management_t buffer_management) {
Package package{from, to, type, buffer, buffer_length, buffer_management};
return handle_package(package);
bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) {
return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management);
}

bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) {
return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management);
}

bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) {
return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management);
bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, Package::buffer_management_t buffer_management) {
Package package{from, to, type, buffer, buffer_length, buffer_management};
return send_package(package);
}

bool LocalNode::endpoint_connected(ep_id_t address) const {
Expand All @@ -35,11 +35,11 @@ bool LocalNode::endpoints_connected(const vector<ep_id_t>& addresses) const {
bool LocalNode::all_routes_connected() const {
for (const auto& route : m_network.route_mapping()) {
if (!route.second.element().local()) continue;
const auto& ltr = (const LocalTransportRoute&) route.second.element();
const auto& ltr = (const LocalTransportRoute&)route.second.element();
if (ltr.state() != LocalTransportRoute::route_state::CONNECTED) return false;
}

return true;
}

}
} // namespace iac
191 changes: 191 additions & 0 deletions src/local_node_package_handling.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#include "local_node.hpp"

namespace iac {

bool LocalNode::handle_package(const Package& package) {
if (package.route()->state() == LocalTransportRoute::route_state::INITIALIZED || package.route()->state() == LocalTransportRoute::route_state::CLOSED) {
iac_log_from_node(Logging::loglevels::warning, "received package on closed route\n");
return false;
}

if (package.to() == reserved_endpoint_addresses::IAC) {
if (package.type() == reserved_package_types::CONNECT && package.route()->state() == LocalTransportRoute::route_state::WAIT_CONNECT) {
if (handle_connect(package)) {
package.route()->state() = LocalTransportRoute::route_state::SEND_ACK;
return true;
}
return false;
}

if (package.type() == reserved_package_types::ACK && package.route()->state() == LocalTransportRoute::route_state::WAIT_ACK) {
if (handle_ack(package)) {
package.route()->state() = LocalTransportRoute::route_state::CONNECTED;
m_network.set_modified(); // force a send of network_update
return true;
}
return false;
}

if (package.route()->state() == LocalTransportRoute::route_state::CONNECTED) {
if (package.type() == reserved_package_types::NETWORK_UPDATE) {
return handle_network_update(package);
}

if (package.type() == reserved_package_types::HEARTBEAT) {
return handle_heartbeat(package);
}
}

iac_log_from_node(Logging::loglevels::warning, "dropping package for IAC; route_id: %d; type: %d\n", package.route()->id(), package.type());
return false;
}
if (!m_network.endpoint_registered(package.to())) {
iac_log_from_node(Logging::loglevels::error, "received package for unregistered endpoint %d, dropping package\n", package.to());
return false;
}

const auto& ep = m_network.endpoint(package.to());

if (ep.local())
return ((const LocalEndpoint&)ep).handle_package(package);

return send_package(package);
}

bool LocalNode::handle_connect(const Package& package) {
BufferReader reader{package.payload(), package.payload_size()};

auto sender_id = reader.num<node_id_t>();
auto other_tr_id = reader.num<tr_id_t>();

package.route()->meta().timings.heartbeat_interval_ms = max_of(reader.num<uint16_t>(), package.route()->meta().timings.heartbeat_interval_ms);
package.route()->meta().timings.assume_dead_after_ms = max_of(reader.num<uint16_t>(), package.route()->meta().timings.assume_dead_after_ms);

IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(Logging::loglevels::network, "connect", "from %d", sender_id);

if (!m_network.node_registered(sender_id)) {
m_network.add_node(ManagedNetworkEntry<Node>::create_and_adopt(new Node(sender_id)));

if (other_tr_id < package.route()->id()) {
iac_log_from_node(Logging::loglevels::debug, "changing route id\n");

const auto old_id = package.route()->id();

auto erase_tr_links = [&](node_id_t node_id) {
if (node_id != unset_id) {
auto& node_entry = m_network.mutable_node(node_id);

auto node_route_res = node_entry.routes().find(package.route()->id());
if (node_route_res != node_entry.routes().end()) {
node_entry.remove_route(node_route_res);
}

auto node_local_route_res = node_entry.local_routes().find(package.route()->id());
if (node_local_route_res != node_entry.local_routes().end()) {
node_entry.remove_local_route(node_local_route_res);
}
}
};

auto insert_tr_links = [&](node_id_t node_id) {
if (node_id != unset_id) {
auto& node_entry = m_network.mutable_node(node_id);

node_entry.add_route(other_tr_id);
node_entry.add_local_route(other_tr_id, 1);
}
};

auto temp = iac::move(m_network.mutable_route_managed_entry(old_id));
m_network.erase_route_managed_entry(old_id);

erase_tr_links(package.route()->nodes().first);
erase_tr_links(package.route()->nodes().second);

package.route()->set_id(other_tr_id);
m_network.add_route(iac::move(temp));

insert_tr_links(package.route()->node1());
insert_tr_links(package.route()->node2());
}

package.route()->set_node2(sender_id);
auto& sender = m_network.mutable_node(sender_id);
sender.add_route(package.route()->id());
sender.add_local_route({package.route()->id(), 1});

iac_log_from_node(Logging::loglevels::connect, "connecting %d to %d\n", id(), sender_id);

iac_log_from_node(Logging::loglevels::verbose, "connect with timing: %d %d\n", package.route()->meta().timings.heartbeat_interval_ms, package.route()->meta().timings.assume_dead_after_ms);
}

return true;
}

bool LocalNode::handle_heartbeat(const Package& package) {
auto now = timestamp::now();

IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(Logging::loglevels::verbose, "heartbeat", "with timing: last_out: %d; last_in:%d",
now - package.route()->meta().last_package_in,
now - package.route()->meta().last_package_out);
return true;
}

bool LocalNode::handle_ack(const Package& package) {
IAC_LOG_PACKAGE_RECEIVE(Logging::loglevels::network, "ack");
return true;
}

bool LocalNode::handle_network_update(const Package& package) {
BufferReader reader{package.payload(), package.payload_size()};

IAC_LOG_PACKAGE_RECEIVE(Logging::loglevels::network, "network_update");

if (package.route()->state() != LocalTransportRoute::route_state::CONNECTED) {
iac_log_from_node(Logging::loglevels::debug, "route is not connected, discarding relay package\n");
return true;
}

const auto num_ep_data = reader.num<uint8_t>();
for (uint8_t i = 0; i < num_ep_data; ++i) {
const auto ep_id = reader.num<ep_id_t>();
const auto* ep_name = reader.str();
const auto ep_node = reader.num<node_id_t>();

if (!m_network.endpoint_registered(ep_id)) {
auto* ep = new Endpoint(ep_id);
ep->set_name(ep_name);
ep->set_node(ep_node);
m_network.add_endpoint(ManagedNetworkEntry<Endpoint>::create_and_adopt(ep));
}
}

const auto num_tr_data = reader.num<uint8_t>();
for (uint8_t i = 0; i < num_tr_data; ++i) {
const auto tr_id = reader.num<tr_id_t>();
const auto node1_id = reader.num<node_id_t>();
const auto node2_id = reader.num<node_id_t>();

if (node1_id != unset_id && node2_id != unset_id && !m_network.route_registered(tr_id)) {
auto* tr = new TransportRoute(tr_id, {node1_id, node2_id});
m_network.add_route(ManagedNetworkEntry<TransportRoute>::create_and_adopt(tr));
}
}

const auto num_ltr_data = reader.num<uint8_t>();
for (uint8_t i = 0; i < num_ltr_data; ++i) {
const auto reachable_node = reader.num<node_id_t>();
const auto num_hops = reader.num<uint8_t>();

auto& reachable_node_entry = m_network.node(reachable_node);
auto res = reachable_node_entry.local_routes().find(package.route()->id());

if (res == reachable_node_entry.local_routes().end() || res->second > num_hops + 1) {
m_network.mutable_node(reachable_node).add_local_route(package.route()->id(), num_hops + 1);
}
}

return send_heartbeat(package.route());
}

} // namespace iac
Loading

0 comments on commit 59bfc7e

Please sign in to comment.