From 59bfc7e859d4cc0b923c75dd34316ae2fafd9097 Mon Sep 17 00:00:00 2001 From: Felix Reissmann Date: Thu, 9 Jun 2022 15:11:25 +0200 Subject: [PATCH] Refactor connection handshake code The code handling the connection to other nodes is now split into more, smaller functions to increase readability. API and internal code has also been seperated into respective files. Connection closing is temporarily broken. --- src/CMakeLists.txt | 2 + src/local_node.cpp | 341 ++------------------------- src/local_node.hpp | 52 ++-- src/local_node_api.cpp | 14 +- src/local_node_package_handling.cpp | 191 +++++++++++++++ src/local_node_state_handling.cpp | 203 ++++++++++++++++ src/logging.hpp | 11 +- src/network.cpp | 25 +- src/network.hpp | 4 + src/network_types.hpp | 38 ++- src/package.cpp | 5 +- tests/test_network_visualization.hpp | 28 ++- tests/test_send_receive.hpp | 7 +- tests/testing.cpp | 12 +- 14 files changed, 536 insertions(+), 397 deletions(-) create mode 100644 src/local_node_package_handling.cpp create mode 100644 src/local_node_state_handling.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b1eca08..5b4dd5f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,6 +17,8 @@ macro(IAC_BUILD target_name compile_defs) "local_endpoint.cpp" "local_node.cpp" "local_node_api.cpp" + "local_node_state_handling.cpp" + "local_node_package_handling.cpp" "local_transport_route.cpp" "logging.cpp" "network.cpp" diff --git a/src/local_node.cpp b/src/local_node.cpp index 4c00c0d..4982ef8 100644 --- a/src/local_node.cpp +++ b/src/local_node.cpp @@ -76,209 +76,6 @@ pair LocalNode::best_local_route(const unordered_mapstate() == LocalTransportRoute::route_state::WAITING_FOR_HANDSHAKE_REPLY) - package.route()->state() = LocalTransportRoute::route_state::CONNECTED; - return handle_heartbeat(package); - } - - if (package.type() == reserved_package_types::DISCONNECT && package.route()->state() == LocalTransportRoute::route_state::CONNECTED) { - return handle_disconnect(package); - } - - if (!m_network.endpoint_registered(package.to())) { - iac_log(Logging::loglevels::error, "received package for unregistered endpoint\n"); - return false; - } - - const auto& ep = m_network.endpoint(package.to()); - - if (ep.local()) - return ((const LocalEndpoint&)ep).handle_package(package); - - const auto& available_routes = m_network.node(ep.node()).local_routes(); - if (available_routes.empty()) - return false; - - route = (LocalTransportRoute*)&m_network.route(best_local_route(available_routes).first); - } - - if (route->state() != LocalTransportRoute::route_state::INITIALIZED || route->state() != LocalTransportRoute::route_state::CLOSED) { - route->meta().last_package_out = timestamp(); - return package.send_over(route); - } else { - iac_log(Logging::loglevels::warning, "asked to send package with type %d to %d over route in state %d\n", package.type(), package.to(), route->state()); - } - - return false; -} - -bool LocalNode::handle_connect(Package& package) { - BufferReader reader{package.payload(), package.payload_size()}; - - // package.print(); - - auto relay = reader.boolean(); - - iac_log(Logging::loglevels::debug, "node %d received connect pkg; state: %d%s\n", id(), package.route()->state(), relay == 1 ? " [relay]" : ""); - - if (relay && package.route()->state() != LocalTransportRoute::route_state::CONNECTED) { - iac_log(Logging::loglevels::debug, "route is not connected, discarding relay package\n"); - return true; - } - - auto sender_id = reader.num(); - auto other_tr_id = reader.num(); - - package.route()->meta().timings.heartbeat_interval_ms = max_of(reader.num(), package.route()->meta().timings.heartbeat_interval_ms); - package.route()->meta().timings.assume_dead_after_ms = max_of(reader.num(), package.route()->meta().timings.assume_dead_after_ms); - - if (!m_network.node_registered(sender_id)) { - m_network.add_node(ManagedNetworkEntry::create_and_adopt(new Node(sender_id))); - } - - if (!relay) { - if (other_tr_id < package.route()->id()) { - iac_log(Logging::loglevels::debug, "changing route id\n"); - const auto old_id = package.route()->id(); - - auto erase_tr_links = [&](node_id_t node_id) { - if (node_id != unset_id) { - auto& node_entry = m_network.mutable_node(node_id); - - auto node_route_res = node_entry.routes().find(package.route()->id()); - if (node_route_res != node_entry.routes().end()) { - node_entry.remove_route(node_route_res); - } - - auto node_local_route_res = node_entry.local_routes().find(package.route()->id()); - if (node_local_route_res != node_entry.local_routes().end()) { - node_entry.remove_local_route(node_local_route_res); - } - } - }; - - auto insert_tr_links = [&](node_id_t node_id) { - if (node_id != unset_id) { - auto& node_entry = m_network.mutable_node(node_id); - - node_entry.add_route(other_tr_id); - node_entry.add_local_route(other_tr_id, 1); - } - }; - - auto temp = move(m_network.mutable_route_managed_entry(old_id)); - m_network.erase_route_managed_entry(old_id); - - erase_tr_links(package.route()->nodes().first); - erase_tr_links(package.route()->nodes().second); - - package.route()->set_id(other_tr_id); - m_network.add_route(move(temp)); - - insert_tr_links(package.route()->node1()); - insert_tr_links(package.route()->node2()); - } - - package.route()->set_node2(sender_id); - auto& sender = m_network.mutable_node(sender_id); - sender.add_route(package.route()->id()); - sender.add_local_route({package.route()->id(), 1}); - - iac_log(Logging::loglevels::connect, "connecting %d to %d\n", id(), sender_id); - - iac_log(Logging::loglevels::verbose, "connect with timing: %d %d\n", - package.route()->meta().timings.heartbeat_interval_ms, - package.route()->meta().timings.assume_dead_after_ms); - } else { - iac_log(Logging::loglevels::debug, "relay pkg\n"); - } - - const auto num_ep_data = reader.num(); - for (uint8_t i = 0; i < num_ep_data; ++i) { - const auto ep_id = reader.num(); - const auto* ep_name = reader.str(); - const auto ep_node = reader.num(); - - if (!m_network.endpoint_registered(ep_id)) { - auto* ep = new Endpoint(ep_id); - ep->set_name(ep_name); - ep->set_node(ep_node); - m_network.add_endpoint(ManagedNetworkEntry::create_and_adopt(ep)); - } - } - - const auto num_tr_data = reader.num(); - for (uint8_t i = 0; i < num_tr_data; ++i) { - const auto tr_id = reader.num(); - const auto node1_id = reader.num(); - const auto node2_id = reader.num(); - - if (node1_id != unset_id && node2_id != unset_id && !m_network.route_registered(tr_id)) { - auto* tr = new TransportRoute(tr_id, {node1_id, node2_id}); - m_network.add_route(ManagedNetworkEntry::create_and_adopt(tr)); - } - } - - const auto num_ltr_data = reader.num(); - for (uint8_t i = 0; i < num_ltr_data; ++i) { - const auto reachable_node = reader.num(); - const auto num_hops = reader.num(); - - auto& reachable_node_entry = m_network.mutable_node(reachable_node); - auto res = reachable_node_entry.local_routes().find(package.route()->id()); - - if (res == reachable_node_entry.local_routes().end() || res->second > num_hops + 1) { - reachable_node_entry.add_local_route(package.route()->id(), num_hops + 1); - } - } - - return send_heartbeat(package.route()); -} - -bool LocalNode::handle_disconnect(const Package& package) { - return handle_disconnect(package.route()); -} - -bool LocalNode::handle_disconnect(LocalTransportRoute* route) { - // save what the route was connected to and reset it to unset value - auto was_connected_to = m_network.route(route->id()).node2(); - - if (was_connected_to != unset_id) { - iac_log(Logging::loglevels::connect, "disconnecting %d(self) from %d\n", id(), was_connected_to); - - // the node the route was connected to - auto& concerning_node = m_network.mutable_node(was_connected_to); - - // remove the disconnected route - concerning_node.remove_route(route->id()); - concerning_node.remove_local_route(route->id()); - route->set_node2(unset_id); - - // if no connections lead to this node anymore, delete it - if (concerning_node.local_routes().empty()) { - if (!m_network.remove_node(was_connected_to)) IAC_ASSERT_NOT_REACHED(); - } - } - - return true; -} - -bool LocalNode::handle_heartbeat(const Package& package) { - iac_log(Logging::loglevels::debug, "heartbeat from: %d node: %d (self) %d %d\n", package.from(), - id(), - timestamp() - package.route()->meta().last_package_in, - timestamp() - package.route()->meta().last_package_out); - return true; -} - bool LocalNode::update() { if (m_network.endpoint_mapping().empty()) { IAC_HANDLE_EXCEPTION(NoRegisteredEndpointsException, "updating node with no endpoints"); @@ -295,35 +92,7 @@ bool LocalNode::update() { it++; // iterator might get invalidated in handle_connect_package, so we have to increment now; - // iac_printf("route %p; mode %d; last in: %d; last out: %d\n", route.first, - // route.first->m_state, timestamp() - route.first->m_last_package_in, - // timestamp() - route.first->m_last_package_out); - - iac_log(Logging::loglevels::verbose, "state of route %d @ node %d is %d\n", route->id(), id(), route->state()); - - switch (route->state()) { - case LocalTransportRoute::route_state::INITIALIZED: - case LocalTransportRoute::route_state::CLOSED: - if (!open_route(route)) break; - - case LocalTransportRoute::route_state::OPEN: - route->meta().last_package_in = timestamp(); - - case LocalTransportRoute::route_state::WAITING_FOR_HANDSHAKE_REPLY: - if (!send_handshake(route)) break; - - case LocalTransportRoute::route_state::CONNECTED: - if (timestamp() - route->meta().last_package_in > route->meta().timings.assume_dead_after_ms) { - if (!close_route(route)) return false; - } - - if (timestamp() - route->meta().last_package_out > route->meta().timings.heartbeat_interval_ms) { - if (!send_heartbeat(route)) return false; - } - - if (!read_from(route)) return false; - break; - } + if (!state_handling(route)) return false; } if (m_network.is_modified()) { @@ -332,7 +101,7 @@ bool LocalNode::update() { if (!route_entry.second->local()) continue; auto* route = (LocalTransportRoute*)route_entry.second.element_ptr(); if (route->state() != LocalTransportRoute::route_state::CONNECTED) continue; - send_connect_package(route, true); + send_network_update(route); } } @@ -343,8 +112,8 @@ bool LocalNode::read_from(LocalTransportRoute* route) { for (size_t i = 0; i < s_num_package_reads_from_route_per_update && route->connection().available() > 0; i++) { Package package; if (package.read_from(route)) { - route->meta().last_package_in = timestamp(); if (!handle_package(package)) return false; + route->meta().last_package_in = timestamp::now(); } else { break; } @@ -352,105 +121,31 @@ bool LocalNode::read_from(LocalTransportRoute* route) { return true; } -bool LocalNode::open_route(LocalTransportRoute* route) { - if (route->state() != LocalTransportRoute::route_state::OPEN && route->open()) { - route->state() = LocalTransportRoute::route_state::OPEN; - iac_log(Logging::loglevels::network, "opened route %d [%s]\n", route->id(), route->typestring().c_str()); - return true; - } - - iac_log(Logging::loglevels::warning, "error opening route %d [%s]\n", route->id(), route->typestring().c_str()); - return false; -} - -bool LocalNode::close_route(LocalTransportRoute* route) { - if (route->state() != LocalTransportRoute::route_state::CLOSED && route->close()) { - if (!handle_disconnect(route)) { - iac_log(Logging::loglevels::warning, "error disconnecting route %d [%s] \n", route->id(), route->typestring().c_str()); - return false; - } - route->state() = LocalTransportRoute::route_state::CLOSED; - iac_log(Logging::loglevels::network, "closed route %d [%s]\n", route->id(), route->typestring().c_str()); - return true; - } - - iac_log(Logging::loglevels::warning, "error closing route %d [%s] \n", route->id(), route->typestring().c_str()); - return false; -} - -bool LocalNode::send_handshake(LocalTransportRoute* route) { - if (send_connect_package(route, false)) { - if (route->state() == LocalTransportRoute::route_state::OPEN) - route->state() = LocalTransportRoute::route_state::WAITING_FOR_HANDSHAKE_REPLY; - } else { - iac_log(Logging::loglevels::warning, "error sending connect package to route %d [%s] \n", route->id(), route->typestring().c_str()); +bool LocalNode::send_package(const Package& package) { + if (!m_network.endpoint_registered(package.to())) { + iac_log_from_node(Logging::loglevels::error, "asked to send package for unregistered endpoint %d, dropping package\n", package.to()); return false; } - return true; -} + const auto& ep = m_network.endpoint(package.to()); -bool LocalNode::send_heartbeat(LocalTransportRoute* route) { - Package package{reserved_endpoint_addresses::IAC, - reserved_endpoint_addresses::IAC, - reserved_package_types::HEARTBEAT, nullptr, 0}; + const auto& available_routes = m_network.node(ep.node()).local_routes(); + if (available_routes.empty()) + return false; - // iac_log(Logging::loglevels::debug,"sending heartbeat package\n"); - // package.print(); + auto route = (LocalTransportRoute*)&m_network.route(best_local_route(available_routes).first); - return handle_package(package, route); + return send_package(package, route); } -bool LocalNode::send_connect_package(LocalTransportRoute* route, bool relay) { - BufferWriter writer; - - writer.boolean(relay); - - writer.num(id()); - writer.num(route->id()); - - writer.num(route->meta().timings.heartbeat_interval_ms); - writer.num(route->meta().timings.assume_dead_after_ms); - - writer.num(m_network.endpoint_mapping().size()); - for (const auto& ep_entry : m_network.endpoint_mapping()) { - writer.num(ep_entry.second.element().id()); - writer.str(ep_entry.second.element().name()); - writer.num(ep_entry.second->m_node); - } - - writer.num(m_network.route_mapping().size() - 1); - for (const auto& tr_entry : m_network.route_mapping()) { - if (tr_entry.second.element_ptr() == route) continue; - - writer.num(tr_entry.second.element().id()); - writer.num(tr_entry.second->node1()); - writer.num(tr_entry.second->node2()); - } - - writer.num(m_network.node_mapping().size() - 1); - for (const auto& node_entry : m_network.node_mapping()) { - if (node_entry.second.element_ptr() == this) continue; - - if (node_entry.second->local_routes().empty()) { - IAC_HANDLE_FATAL_EXCEPTION(NonExistingException, "no local route leading to node, invalid state suspected"); - return false; - } - - auto best_route = best_local_route(node_entry.second->local_routes()); - - writer.num(node_entry.first); - writer.num(best_route.second); +bool LocalNode::send_package(const Package& package, LocalTransportRoute* route) { + if (route->state() == LocalTransportRoute::route_state::INITIALIZED || route->state() == LocalTransportRoute::route_state::CLOSED) { + iac_log_from_node(Logging::loglevels::warning, "asked to send package with type %d to %d over route in state %d, dropping package\n", package.type(), package.to(), route->state()); + return false; } - Package package{reserved_endpoint_addresses::IAC, - reserved_endpoint_addresses::IAC, - reserved_package_types::CONNECT, writer.buffer(), writer.size()}; - - iac_log(Logging::loglevels::network, "sending %s package from %d to %d over %d\n", relay ? "relay" : "connect", id(), m_network.route(route->id()).node2(), route->id()); - // package.print(); - - return handle_package(package, route); + route->meta().last_package_out = timestamp::now(); + return package.send_over(route); } } // namespace iac \ No newline at end of file diff --git a/src/local_node.hpp b/src/local_node.hpp index 71ca490..5cbb39f 100644 --- a/src/local_node.hpp +++ b/src/local_node.hpp @@ -20,14 +20,19 @@ #include "std_provider/utility.hpp" #include "std_provider/vector.hpp" -#ifndef ARDUINO -# include -#endif - namespace iac { IAC_MAKE_EXCEPTION(OutOfTrIdException); +#define IAC_LOG_PACKAGE_SEND(level, type) \ + iac_log_from_node(level, "sending " type " package to %d over %d\n", m_network.route(route->id()).node2(), route->id()); + +#define IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(level, type, info, ...) \ + iac_log_from_node(level, "received " type " package over %d " info "\n", package.route()->id(), __VA_ARGS__); + +#define IAC_LOG_PACKAGE_RECEIVE(level, type) \ + IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(level, type, "", 0); + class LocalNode : public Node { public: LocalNode(route_timings_t route_timings = {}); @@ -70,7 +75,7 @@ class LocalNode : public Node { private: static constexpr uint16_t s_min_heartbeat_interval_ms = 100; - static constexpr uint16_t s_min_assume_dead_time = s_min_heartbeat_interval_ms * 2; + static constexpr uint16_t s_min_assume_dead_time = s_min_heartbeat_interval_ms * 3; static constexpr uint8_t s_num_package_reads_from_route_per_update = 5; route_timings_t m_default_route_timings; @@ -79,42 +84,31 @@ class LocalNode : public Node { unordered_set m_used_tr_ids; - bool handle_package(Package& package, LocalTransportRoute* route = nullptr); + bool send_package(const Package& package); + bool send_package(const Package& package, LocalTransportRoute* route); + bool handle_package(const Package& package); - bool handle_connect(Package& package); - bool handle_disconnect(const Package& package); - bool handle_disconnect(LocalTransportRoute* route); + bool handle_connect(const Package& package); bool handle_heartbeat(const Package& package); + bool handle_ack(const Package& package); + bool handle_network_update(const Package& package); bool read_from(LocalTransportRoute* route); + + bool state_handling(LocalTransportRoute* route); + bool open_route(LocalTransportRoute* route); bool close_route(LocalTransportRoute* route); - bool send_handshake(LocalTransportRoute* route); - bool send_heartbeat(LocalTransportRoute* route); - bool send_connect_package(LocalTransportRoute* route, bool relay); + bool send_connect(LocalTransportRoute* route); + bool send_heartbeat(LocalTransportRoute* route); + bool send_ack(LocalTransportRoute* route); + bool send_network_update(LocalTransportRoute* route); uint8_t get_tr_id(); bool pop_tr_id(uint8_t id); static pair best_local_route(const unordered_map& local_routes); - static LocalTransportRoute::timestamp_t timestamp() { -#ifdef ARDUINO - return millis(); -#else - return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); -#endif - } - - template - void delete_mapping(T mapping, LocalNode* obj, bool (LocalNode::*remove_func)(U)) { - for (auto it = mapping.begin(); it != mapping.end();) { - U id = it->first; - it++; - - (obj->*remove_func)(id); - } - }; }; } // namespace iac \ No newline at end of file diff --git a/src/local_node_api.cpp b/src/local_node_api.cpp index 6b23320..9ff4868 100644 --- a/src/local_node_api.cpp +++ b/src/local_node_api.cpp @@ -6,17 +6,17 @@ bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const uint return send(from.id(), to, type, buffer, buffer_length, buffer_management); } -bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, Package::buffer_management_t buffer_management) { - Package package{from, to, type, buffer, buffer_length, buffer_management}; - return handle_package(package); +bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) { + return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management); } bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) { return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management); } -bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) { - return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management); +bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, Package::buffer_management_t buffer_management) { + Package package{from, to, type, buffer, buffer_length, buffer_management}; + return send_package(package); } bool LocalNode::endpoint_connected(ep_id_t address) const { @@ -35,11 +35,11 @@ bool LocalNode::endpoints_connected(const vector& addresses) const { bool LocalNode::all_routes_connected() const { for (const auto& route : m_network.route_mapping()) { if (!route.second.element().local()) continue; - const auto& ltr = (const LocalTransportRoute&) route.second.element(); + const auto& ltr = (const LocalTransportRoute&)route.second.element(); if (ltr.state() != LocalTransportRoute::route_state::CONNECTED) return false; } return true; } -} \ No newline at end of file +} // namespace iac \ No newline at end of file diff --git a/src/local_node_package_handling.cpp b/src/local_node_package_handling.cpp new file mode 100644 index 0000000..144d8bd --- /dev/null +++ b/src/local_node_package_handling.cpp @@ -0,0 +1,191 @@ +#include "local_node.hpp" + +namespace iac { + +bool LocalNode::handle_package(const Package& package) { + if (package.route()->state() == LocalTransportRoute::route_state::INITIALIZED || package.route()->state() == LocalTransportRoute::route_state::CLOSED) { + iac_log_from_node(Logging::loglevels::warning, "received package on closed route\n"); + return false; + } + + if (package.to() == reserved_endpoint_addresses::IAC) { + if (package.type() == reserved_package_types::CONNECT && package.route()->state() == LocalTransportRoute::route_state::WAIT_CONNECT) { + if (handle_connect(package)) { + package.route()->state() = LocalTransportRoute::route_state::SEND_ACK; + return true; + } + return false; + } + + if (package.type() == reserved_package_types::ACK && package.route()->state() == LocalTransportRoute::route_state::WAIT_ACK) { + if (handle_ack(package)) { + package.route()->state() = LocalTransportRoute::route_state::CONNECTED; + m_network.set_modified(); // force a send of network_update + return true; + } + return false; + } + + if (package.route()->state() == LocalTransportRoute::route_state::CONNECTED) { + if (package.type() == reserved_package_types::NETWORK_UPDATE) { + return handle_network_update(package); + } + + if (package.type() == reserved_package_types::HEARTBEAT) { + return handle_heartbeat(package); + } + } + + iac_log_from_node(Logging::loglevels::warning, "dropping package for IAC; route_id: %d; type: %d\n", package.route()->id(), package.type()); + return false; + } + if (!m_network.endpoint_registered(package.to())) { + iac_log_from_node(Logging::loglevels::error, "received package for unregistered endpoint %d, dropping package\n", package.to()); + return false; + } + + const auto& ep = m_network.endpoint(package.to()); + + if (ep.local()) + return ((const LocalEndpoint&)ep).handle_package(package); + + return send_package(package); +} + +bool LocalNode::handle_connect(const Package& package) { + BufferReader reader{package.payload(), package.payload_size()}; + + auto sender_id = reader.num(); + auto other_tr_id = reader.num(); + + package.route()->meta().timings.heartbeat_interval_ms = max_of(reader.num(), package.route()->meta().timings.heartbeat_interval_ms); + package.route()->meta().timings.assume_dead_after_ms = max_of(reader.num(), package.route()->meta().timings.assume_dead_after_ms); + + IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(Logging::loglevels::network, "connect", "from %d", sender_id); + + if (!m_network.node_registered(sender_id)) { + m_network.add_node(ManagedNetworkEntry::create_and_adopt(new Node(sender_id))); + + if (other_tr_id < package.route()->id()) { + iac_log_from_node(Logging::loglevels::debug, "changing route id\n"); + + const auto old_id = package.route()->id(); + + auto erase_tr_links = [&](node_id_t node_id) { + if (node_id != unset_id) { + auto& node_entry = m_network.mutable_node(node_id); + + auto node_route_res = node_entry.routes().find(package.route()->id()); + if (node_route_res != node_entry.routes().end()) { + node_entry.remove_route(node_route_res); + } + + auto node_local_route_res = node_entry.local_routes().find(package.route()->id()); + if (node_local_route_res != node_entry.local_routes().end()) { + node_entry.remove_local_route(node_local_route_res); + } + } + }; + + auto insert_tr_links = [&](node_id_t node_id) { + if (node_id != unset_id) { + auto& node_entry = m_network.mutable_node(node_id); + + node_entry.add_route(other_tr_id); + node_entry.add_local_route(other_tr_id, 1); + } + }; + + auto temp = iac::move(m_network.mutable_route_managed_entry(old_id)); + m_network.erase_route_managed_entry(old_id); + + erase_tr_links(package.route()->nodes().first); + erase_tr_links(package.route()->nodes().second); + + package.route()->set_id(other_tr_id); + m_network.add_route(iac::move(temp)); + + insert_tr_links(package.route()->node1()); + insert_tr_links(package.route()->node2()); + } + + package.route()->set_node2(sender_id); + auto& sender = m_network.mutable_node(sender_id); + sender.add_route(package.route()->id()); + sender.add_local_route({package.route()->id(), 1}); + + iac_log_from_node(Logging::loglevels::connect, "connecting %d to %d\n", id(), sender_id); + + iac_log_from_node(Logging::loglevels::verbose, "connect with timing: %d %d\n", package.route()->meta().timings.heartbeat_interval_ms, package.route()->meta().timings.assume_dead_after_ms); + } + + return true; +} + +bool LocalNode::handle_heartbeat(const Package& package) { + auto now = timestamp::now(); + + IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(Logging::loglevels::verbose, "heartbeat", "with timing: last_out: %d; last_in:%d", + now - package.route()->meta().last_package_in, + now - package.route()->meta().last_package_out); + return true; +} + +bool LocalNode::handle_ack(const Package& package) { + IAC_LOG_PACKAGE_RECEIVE(Logging::loglevels::network, "ack"); + return true; +} + +bool LocalNode::handle_network_update(const Package& package) { + BufferReader reader{package.payload(), package.payload_size()}; + + IAC_LOG_PACKAGE_RECEIVE(Logging::loglevels::network, "network_update"); + + if (package.route()->state() != LocalTransportRoute::route_state::CONNECTED) { + iac_log_from_node(Logging::loglevels::debug, "route is not connected, discarding relay package\n"); + return true; + } + + const auto num_ep_data = reader.num(); + for (uint8_t i = 0; i < num_ep_data; ++i) { + const auto ep_id = reader.num(); + const auto* ep_name = reader.str(); + const auto ep_node = reader.num(); + + if (!m_network.endpoint_registered(ep_id)) { + auto* ep = new Endpoint(ep_id); + ep->set_name(ep_name); + ep->set_node(ep_node); + m_network.add_endpoint(ManagedNetworkEntry::create_and_adopt(ep)); + } + } + + const auto num_tr_data = reader.num(); + for (uint8_t i = 0; i < num_tr_data; ++i) { + const auto tr_id = reader.num(); + const auto node1_id = reader.num(); + const auto node2_id = reader.num(); + + if (node1_id != unset_id && node2_id != unset_id && !m_network.route_registered(tr_id)) { + auto* tr = new TransportRoute(tr_id, {node1_id, node2_id}); + m_network.add_route(ManagedNetworkEntry::create_and_adopt(tr)); + } + } + + const auto num_ltr_data = reader.num(); + for (uint8_t i = 0; i < num_ltr_data; ++i) { + const auto reachable_node = reader.num(); + const auto num_hops = reader.num(); + + auto& reachable_node_entry = m_network.node(reachable_node); + auto res = reachable_node_entry.local_routes().find(package.route()->id()); + + if (res == reachable_node_entry.local_routes().end() || res->second > num_hops + 1) { + m_network.mutable_node(reachable_node).add_local_route(package.route()->id(), num_hops + 1); + } + } + + return send_heartbeat(package.route()); +} + +} // namespace iac diff --git a/src/local_node_state_handling.cpp b/src/local_node_state_handling.cpp new file mode 100644 index 0000000..461ee07 --- /dev/null +++ b/src/local_node_state_handling.cpp @@ -0,0 +1,203 @@ +#include "local_node.hpp" + +namespace iac { + +/* + +On init: +-----------------+ +-----------------+ + | INITIALIZED | | INITIALIZED | + +-----------------+ +-----------------+ + | | + V V + +-----------------+ --------------> +-----------------+ + +> | SEND_CONNECT | msg::connect | SEND_CONNECT | <+ + | +-----------------+ <-------------- +-----------------+ | + until | | | | until +msg::connect | V V | msg::connect + received | +-----------------+ +-----------------+ | received + +- | WAIT_CONNECT | | WAIT_CONNECT | -+ + +-----------------+ +-----------------+ + | | + V V + +-----------------+ --------------> +-----------------+ + | SEND_ACK | msg::ack | SEND_ACK | + +-----------------+ <-------------- +-----------------+ + | | + V V + +-----------------+ +-----------------+ + | WAIT_ACK | | WAIT_ACK | + +-----------------+ +-----------------+ + | | + V V + +-----------------+ +-----------------+ + | CONNECTED | | CONNECTED | + +-----------------+ +-----------------+ + +*/ + +bool LocalNode::state_handling(LocalTransportRoute* route) { + iac_log_from_node(Logging::loglevels::verbose, "state of route %d @ node %d is %d\n", route->id(), id(), route->state()); + auto now = timestamp::now(); + + if (route->state() != LocalTransportRoute::route_state::CLOSED && + route->state() != LocalTransportRoute::route_state::INITIALIZED && + route->meta().last_package_in.is_more_than_n_in_past(now, route->meta().timings.assume_dead_after_ms)) { + if (!close_route(route)) return false; + route->state() = LocalTransportRoute::route_state::CLOSED; + } + + switch (route->state()) { + case LocalTransportRoute::route_state::INITIALIZED: + case LocalTransportRoute::route_state::CLOSED: + if (!open_route(route)) return false; + route->state() = LocalTransportRoute::route_state::SEND_CONNECT; + // intentional fall-through + + case LocalTransportRoute::route_state::SEND_CONNECT: + if (!send_connect(route)) return false; + route->state() = LocalTransportRoute::route_state::WAIT_CONNECT; + // intentional fall-through + + case LocalTransportRoute::route_state::WAIT_CONNECT: + // NOTE: This loop will be broken when a CONNECT package from this route arrives + if (route->meta().last_package_out.is_more_than_n_in_past(now, route->meta().timings.heartbeat_interval_ms)) + route->state() = LocalTransportRoute::route_state::SEND_CONNECT; + break; + + case LocalTransportRoute::route_state::SEND_ACK: + + if (!send_ack(route)) return false; + route->state() = LocalTransportRoute::route_state::WAIT_ACK; + // intentional fall-through + + case LocalTransportRoute::route_state::WAIT_ACK: + // NOTE: This loop will be broken when a ACK package from this route arrives + if (route->meta().last_package_out.is_more_than_n_in_past(now, route->meta().timings.heartbeat_interval_ms)) + route->state() = LocalTransportRoute::route_state::SEND_ACK; + break; + + case LocalTransportRoute::route_state::CONNECTED: + + if (route->meta().last_package_out.is_more_than_n_in_past(now, route->meta().timings.heartbeat_interval_ms)) { + if (!send_heartbeat(route)) return false; + } + + break; + } + + // NOTE: route should always be open at this point + if (!read_from(route)) return false; + + return true; +} + +bool LocalNode::open_route(LocalTransportRoute* route) { + if (route->connection().open()) { + auto now = timestamp::now(); + route->meta().last_package_in = now; + route->meta().last_package_out = now; + iac_log_from_node(Logging::loglevels::network, "opened route %d [%s]\n", route->id(), route->typestring().c_str()); + return true; + } + + iac_log_from_node(Logging::loglevels::verbose, "error opening route %d [%s]\n", route->id(), route->typestring().c_str()); + return false; +} + +bool LocalNode::close_route(LocalTransportRoute* route) { + if (route->connection().close()) { + if (!(m_network.disconnect_route(route->id()) && route->reset())) { + iac_log_from_node(Logging::loglevels::warning, "error disconnecting route %d [%s] \n", route->id(), route->typestring().c_str()); + return false; + } + + iac_log_from_node(Logging::loglevels::network, "closed route %d [%s]\n", route->id(), route->typestring().c_str()); + return true; + } + + iac_log_from_node(Logging::loglevels::warning, "error closing route %d [%s] \n", route->id(), route->typestring().c_str()); + return false; +} + +bool LocalNode::send_network_update(LocalTransportRoute* route) { + BufferWriter writer; + + writer.num(m_network.endpoint_mapping().size()); + for (const auto& ep_entry : m_network.endpoint_mapping()) { + writer.num(ep_entry.second.element().id()); + writer.str(ep_entry.second.element().name()); + writer.num(ep_entry.second->m_node); + } + + writer.num(m_network.route_mapping().size() - 1); + for (const auto& tr_entry : m_network.route_mapping()) { + if (tr_entry.second.element_ptr() == route) continue; + + writer.num(tr_entry.second.element().id()); + writer.num(tr_entry.second->node1()); + writer.num(tr_entry.second->node2()); + } + + writer.num(m_network.node_mapping().size() - 1); + for (const auto& node_entry : m_network.node_mapping()) { + if (node_entry.second.element_ptr() == this) continue; + + if (node_entry.second->local_routes().empty()) { + IAC_HANDLE_FATAL_EXCEPTION(NonExistingException, "no local route leading to node, invalid state suspected"); + return false; + } + + auto best_route = best_local_route(node_entry.second->local_routes()); + + writer.num(node_entry.first); + writer.num(best_route.second); + } + + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::NETWORK_UPDATE, writer.buffer(), writer.size()}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::network, "network_update"); + + return send_package(package, route); +} + +bool LocalNode::send_heartbeat(LocalTransportRoute* route) { + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::HEARTBEAT, nullptr, 0}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::verbose, "heartbeat"); + + return send_package(package, route); +} + +bool LocalNode::send_ack(LocalTransportRoute* route) { + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::ACK, nullptr, 0}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::network, "ack"); + + return send_package(package, route); +} + +bool LocalNode::send_connect(LocalTransportRoute* route) { + BufferWriter writer; + + writer.num(id()); + writer.num(route->id()); + + writer.num(route->meta().timings.heartbeat_interval_ms); + writer.num(route->meta().timings.assume_dead_after_ms); + + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::CONNECT, writer.buffer(), writer.size()}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::network, "connect"); + + return send_package(package, route); +} + +} // namespace iac \ No newline at end of file diff --git a/src/logging.hpp b/src/logging.hpp index adb459d..a5a25b5 100644 --- a/src/logging.hpp +++ b/src/logging.hpp @@ -34,20 +34,23 @@ class Logging { } #ifndef IAC_DISABLE_PRINTING -# define iac_log(level, ...) Logging::log(__FILE__, __LINE__, __FUNCTION__, level, __VA_ARGS__); +# define iac_log(level, ...) Logging::log(__FILE__, __LINE__, __FUNCTION__, nullptr, 0, level, __VA_ARGS__); +# define iac_log_from_node(level, ...) Logging::log(__FILE__, __LINE__, __FUNCTION__, "[node::%d] \t", id(), level, __VA_ARGS__); - static constexpr void log_head([[maybe_unused]] const char* file, [[maybe_unused]] unsigned line, [[maybe_unused]] const char* function, loglevel_t level) { + static constexpr void log_head([[maybe_unused]] const char* file, [[maybe_unused]] unsigned line, [[maybe_unused]] const char* function, loglevel_t level, const char* id_str, int id) { # ifdef IAC_LOG_WITH_LINE_NUMBERS iac_printf(s_log_head_fmt_with_line_info, s_level_colors[(int)level], s_level_names[(int)level], file, line, function); # else iac_printf(s_log_head_fmt, s_level_colors[(int)level], s_level_names[(int)level]); # endif + if (id_str) + iac_printf(id_str, id); } template - static constexpr void log(const char* file, unsigned line, const char* function, loglevel_t level, Args... args) { + static constexpr void log(const char* file, unsigned line, const char* function, const char* id_str, int id, loglevel_t level, Args... args) { if (s_loglevel >= level) { - log_head(file, line, function, level); + log_head(file, line, function, level, id_str, id); iac_printf(args...); } } diff --git a/src/network.cpp b/src/network.cpp index d92435a..3e4a14a 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -10,20 +10,25 @@ bool Network::add_route(ManagedNetworkEntry&& route) { return false; } + auto add_if_not_unset = [&](node_id_t node_id) { + if (node_id != unset_id && !node_registered(node_id)) + add_node(ManagedNetworkEntry::create_and_adopt(new Node(node_id))); + }; + + auto link_if_not_unset = [&](tr_id_t tr_id, node_id_t node_id) { + if (node_id != unset_id) mutable_node(node_id).add_route(tr_id); + }; + auto tr_id = route.element().id(); auto nodes = route.element().nodes(); - m_tr_mapping[tr_id] = iac::move(route); - auto add_if_not_unset = [&](tr_id_t tr_id, node_id_t node_id) { - if (node_id != unset_id) { - if (!node_registered(node_id)) - add_node(ManagedNetworkEntry::create_and_adopt(new Node(node_id))); - mutable_node(node_id).add_route(tr_id); - } - }; + add_if_not_unset(nodes.first); + add_if_not_unset(nodes.second); + + m_tr_mapping[tr_id] = iac::move(route); - add_if_not_unset(tr_id, nodes.first); - add_if_not_unset(tr_id, nodes.second); + link_if_not_unset(tr_id, nodes.first); + link_if_not_unset(tr_id, nodes.second); set_modified(); diff --git a/src/network.hpp b/src/network.hpp index a257d67..d91ecc2 100644 --- a/src/network.hpp +++ b/src/network.hpp @@ -105,6 +105,10 @@ class Network { bool add_node(ManagedNetworkEntry&& node); bool remove_node(node_id_t node_id); + bool disconnect_route(tr_id_t route_id) { + return true; + }; + bool validate_network() const; bool is_modified() const { diff --git a/src/network_types.hpp b/src/network_types.hpp index 1df2049..8dc3e04 100644 --- a/src/network_types.hpp +++ b/src/network_types.hpp @@ -10,6 +10,10 @@ #include "std_provider/unordered_set.hpp" #include "std_provider/utility.hpp" +#ifndef ARDUINO +# include +#endif + namespace iac { typedef uint8_t node_id_t; @@ -23,8 +27,9 @@ typedef uint8_t start_byte_t; enum reserved_package_types { CONNECT = numeric_limits::max(), - DISCONNECT = numeric_limits::max() - 1, - HEARTBEAT = numeric_limits::max() - 2, + ACK = numeric_limits::max() - 1, + NETWORK_UPDATE = numeric_limits::max() - 2, + HEARTBEAT = numeric_limits::max() - 3, }; enum reserved_endpoint_addresses { @@ -38,6 +43,35 @@ typedef struct route_timings { uint16_t assume_dead_after_ms = 0; } route_timings_t; +struct timestamp { + timestamp() = default; + + timestamp(size_t ts) + : ts(ts){}; + + static timestamp now() { +#ifdef ARDUINO + return millis(); +#else + return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); +#endif + } + + bool is_more_than_n_in_past(const timestamp& now, const size_t n) { + return timestamp(ts + n) < now; + } + + bool operator<(const timestamp& rhs) { + return ts < rhs.ts; + } + + size_t operator-(const timestamp& rhs) { + return ts - rhs.ts; + } + + size_t ts; +}; + IAC_MAKE_EXCEPTION(EmptyNetworkEntryDereferenceException); IAC_MAKE_EXCEPTION(CopyingNonEmptyNetworkEntryException); IAC_MAKE_EXCEPTION(BindingToNonEmptyNetworkEntryException); diff --git a/src/package.cpp b/src/package.cpp index 1f93888..1b70d4a 100644 --- a/src/package.cpp +++ b/src/package.cpp @@ -7,7 +7,8 @@ constexpr size_t Package::s_info_header_size; constexpr size_t Package::s_max_payload_size; constexpr start_byte_t Package::s_startbyte; -Package::Package(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, buffer_management_t buffer_type) : m_from(from), m_to(to), m_type(type), m_payload((uint8_t*)buffer), m_buffer_type(buffer_type) { +Package::Package(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, buffer_management_t buffer_type) + : m_from(from), m_to(to), m_type(type), m_payload((uint8_t*)buffer), m_buffer_type(buffer_type) { if (buffer_length > s_max_payload_size) { IAC_HANDLE_EXCEPTION(InvalidPackageException, "payload to big"); @@ -73,7 +74,7 @@ bool Package::read_from(LocalTransportRoute* route) { #ifdef ARDUINO uint8_t dummy = 0; - route->write(&dummy, 1); + route->connection().write(&dummy, 1); #endif if (route->meta().wait_for_available_size > 0 && route->connection().available() < route->meta().wait_for_available_size) diff --git a/tests/test_network_visualization.hpp b/tests/test_network_visualization.hpp index e63c12f..b8f3bb2 100644 --- a/tests/test_network_visualization.hpp +++ b/tests/test_network_visualization.hpp @@ -30,6 +30,22 @@ class TestNetworkVisualization { TEST_UTILS_CONNECT_NODES_WITH_LOOPBACK(tr3, node2, node4); TEST_UTILS_CONNECT_NODES_WITH_LOOPBACK(tr4, node3, node4); + iac::Visualization viz{"127.0.0.1", 3000}; + viz.add_network("node1", node1.network()); + viz.add_network("node2", node2.network()); + viz.add_network("node3", node3.network()); + viz.add_network("node4", node4.network()); + + bool run = true; + auto t = std::thread(run_viz, &run, &viz); + + printf("Visualization up on http://127.0.0.1:3000\npress any key to continue..."); + + char c; + std::cin >> c; + + printf("...continuing\n"); + TestLogging::test_printf("node1 network rep on startup %s", node1.network().network_representation().c_str()); TestUtilities::update_til_connected([] {}, node1, node2, node3, node4); @@ -46,18 +62,6 @@ class TestNetworkVisualization { TestLogging::test_printf("node4 %s", node4.network().network_representation(false).c_str()); } - iac::Visualization viz{"127.0.0.1", 3000}; - viz.add_network("node1", node1.network()); - viz.add_network("node2", node2.network()); - viz.add_network("node3", node3.network()); - viz.add_network("node4", node4.network()); - - bool run = true; - auto t = std::thread(run_viz, &run, &viz); - - printf("Visualization up on http://127.0.0.1:3000\npress any key to continue..."); - - char c; std::cin >> c; printf("...continuing\n"); diff --git a/tests/test_send_receive.hpp b/tests/test_send_receive.hpp index 86d48a6..3d65556 100644 --- a/tests/test_send_receive.hpp +++ b/tests/test_send_receive.hpp @@ -21,15 +21,18 @@ class TestSendReceive { TestUtilities::update_til_connected([] {}, node1, node2); + // NOTE: network_update should arrive on next update + TestUtilities::update_all_nodes(node1, node2); + if (!node1.send(ep1, ep2.id(), 0, nullptr, 0)) { - return {"failed to send pkg to ep2\n"}; + return {"failed to send pkg to ep2"}; } while (rec_pkg_count < 1) TestUtilities::update_all_nodes(node1, node2); if (!node2.send(ep2, ep1.id(), 0, nullptr, 0)) { - return {"failed to send pkg to ep1\n"}; + return {"failed to send pkg to ep1"}; } while (rec_pkg_count < 2) diff --git a/tests/testing.cpp b/tests/testing.cpp index 7814864..e9318c3 100644 --- a/tests/testing.cpp +++ b/tests/testing.cpp @@ -12,12 +12,6 @@ int main(int argc, char* argv[]) { iac::Logging::set_loglevel(iac::Logging::loglevels::debug); - TestLogging::start_suite("communication"); - - TestLogging::run("disconnect-reconnect", TestDisconnectReconnect::run); - TestLogging::run("send-receive", TestSendReceive::run); - TestLogging::run("network-building", TestNetworkBuilding::run); - #ifndef IAC_DISABLE_VISUALIZATION if (argc >= 2 && strncmp(argv[1], "-h", 2) == 0) { TestLogging::start_suite("visualization"); @@ -25,5 +19,11 @@ int main(int argc, char* argv[]) { } #endif + TestLogging::start_suite("communication"); + + TestLogging::run("disconnect-reconnect", TestDisconnectReconnect::run); + TestLogging::run("send-receive", TestSendReceive::run); + TestLogging::run("network-building", TestNetworkBuilding::run); + return TestLogging::results(); } \ No newline at end of file