diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b1eca08..5b4dd5f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,6 +17,8 @@ macro(IAC_BUILD target_name compile_defs) "local_endpoint.cpp" "local_node.cpp" "local_node_api.cpp" + "local_node_state_handling.cpp" + "local_node_package_handling.cpp" "local_transport_route.cpp" "logging.cpp" "network.cpp" diff --git a/src/local_node.cpp b/src/local_node.cpp index 4c00c0d..4982ef8 100644 --- a/src/local_node.cpp +++ b/src/local_node.cpp @@ -76,209 +76,6 @@ pair LocalNode::best_local_route(const unordered_mapstate() == LocalTransportRoute::route_state::WAITING_FOR_HANDSHAKE_REPLY) - package.route()->state() = LocalTransportRoute::route_state::CONNECTED; - return handle_heartbeat(package); - } - - if (package.type() == reserved_package_types::DISCONNECT && package.route()->state() == LocalTransportRoute::route_state::CONNECTED) { - return handle_disconnect(package); - } - - if (!m_network.endpoint_registered(package.to())) { - iac_log(Logging::loglevels::error, "received package for unregistered endpoint\n"); - return false; - } - - const auto& ep = m_network.endpoint(package.to()); - - if (ep.local()) - return ((const LocalEndpoint&)ep).handle_package(package); - - const auto& available_routes = m_network.node(ep.node()).local_routes(); - if (available_routes.empty()) - return false; - - route = (LocalTransportRoute*)&m_network.route(best_local_route(available_routes).first); - } - - if (route->state() != LocalTransportRoute::route_state::INITIALIZED || route->state() != LocalTransportRoute::route_state::CLOSED) { - route->meta().last_package_out = timestamp(); - return package.send_over(route); - } else { - iac_log(Logging::loglevels::warning, "asked to send package with type %d to %d over route in state %d\n", package.type(), package.to(), route->state()); - } - - return false; -} - -bool LocalNode::handle_connect(Package& package) { - BufferReader reader{package.payload(), package.payload_size()}; - - // package.print(); - - auto relay = reader.boolean(); - - iac_log(Logging::loglevels::debug, "node %d received connect pkg; state: %d%s\n", id(), package.route()->state(), relay == 1 ? " [relay]" : ""); - - if (relay && package.route()->state() != LocalTransportRoute::route_state::CONNECTED) { - iac_log(Logging::loglevels::debug, "route is not connected, discarding relay package\n"); - return true; - } - - auto sender_id = reader.num(); - auto other_tr_id = reader.num(); - - package.route()->meta().timings.heartbeat_interval_ms = max_of(reader.num(), package.route()->meta().timings.heartbeat_interval_ms); - package.route()->meta().timings.assume_dead_after_ms = max_of(reader.num(), package.route()->meta().timings.assume_dead_after_ms); - - if (!m_network.node_registered(sender_id)) { - m_network.add_node(ManagedNetworkEntry::create_and_adopt(new Node(sender_id))); - } - - if (!relay) { - if (other_tr_id < package.route()->id()) { - iac_log(Logging::loglevels::debug, "changing route id\n"); - const auto old_id = package.route()->id(); - - auto erase_tr_links = [&](node_id_t node_id) { - if (node_id != unset_id) { - auto& node_entry = m_network.mutable_node(node_id); - - auto node_route_res = node_entry.routes().find(package.route()->id()); - if (node_route_res != node_entry.routes().end()) { - node_entry.remove_route(node_route_res); - } - - auto node_local_route_res = node_entry.local_routes().find(package.route()->id()); - if (node_local_route_res != node_entry.local_routes().end()) { - node_entry.remove_local_route(node_local_route_res); - } - } - }; - - auto insert_tr_links = [&](node_id_t node_id) { - if (node_id != unset_id) { - auto& node_entry = m_network.mutable_node(node_id); - - node_entry.add_route(other_tr_id); - node_entry.add_local_route(other_tr_id, 1); - } - }; - - auto temp = move(m_network.mutable_route_managed_entry(old_id)); - m_network.erase_route_managed_entry(old_id); - - erase_tr_links(package.route()->nodes().first); - erase_tr_links(package.route()->nodes().second); - - package.route()->set_id(other_tr_id); - m_network.add_route(move(temp)); - - insert_tr_links(package.route()->node1()); - insert_tr_links(package.route()->node2()); - } - - package.route()->set_node2(sender_id); - auto& sender = m_network.mutable_node(sender_id); - sender.add_route(package.route()->id()); - sender.add_local_route({package.route()->id(), 1}); - - iac_log(Logging::loglevels::connect, "connecting %d to %d\n", id(), sender_id); - - iac_log(Logging::loglevels::verbose, "connect with timing: %d %d\n", - package.route()->meta().timings.heartbeat_interval_ms, - package.route()->meta().timings.assume_dead_after_ms); - } else { - iac_log(Logging::loglevels::debug, "relay pkg\n"); - } - - const auto num_ep_data = reader.num(); - for (uint8_t i = 0; i < num_ep_data; ++i) { - const auto ep_id = reader.num(); - const auto* ep_name = reader.str(); - const auto ep_node = reader.num(); - - if (!m_network.endpoint_registered(ep_id)) { - auto* ep = new Endpoint(ep_id); - ep->set_name(ep_name); - ep->set_node(ep_node); - m_network.add_endpoint(ManagedNetworkEntry::create_and_adopt(ep)); - } - } - - const auto num_tr_data = reader.num(); - for (uint8_t i = 0; i < num_tr_data; ++i) { - const auto tr_id = reader.num(); - const auto node1_id = reader.num(); - const auto node2_id = reader.num(); - - if (node1_id != unset_id && node2_id != unset_id && !m_network.route_registered(tr_id)) { - auto* tr = new TransportRoute(tr_id, {node1_id, node2_id}); - m_network.add_route(ManagedNetworkEntry::create_and_adopt(tr)); - } - } - - const auto num_ltr_data = reader.num(); - for (uint8_t i = 0; i < num_ltr_data; ++i) { - const auto reachable_node = reader.num(); - const auto num_hops = reader.num(); - - auto& reachable_node_entry = m_network.mutable_node(reachable_node); - auto res = reachable_node_entry.local_routes().find(package.route()->id()); - - if (res == reachable_node_entry.local_routes().end() || res->second > num_hops + 1) { - reachable_node_entry.add_local_route(package.route()->id(), num_hops + 1); - } - } - - return send_heartbeat(package.route()); -} - -bool LocalNode::handle_disconnect(const Package& package) { - return handle_disconnect(package.route()); -} - -bool LocalNode::handle_disconnect(LocalTransportRoute* route) { - // save what the route was connected to and reset it to unset value - auto was_connected_to = m_network.route(route->id()).node2(); - - if (was_connected_to != unset_id) { - iac_log(Logging::loglevels::connect, "disconnecting %d(self) from %d\n", id(), was_connected_to); - - // the node the route was connected to - auto& concerning_node = m_network.mutable_node(was_connected_to); - - // remove the disconnected route - concerning_node.remove_route(route->id()); - concerning_node.remove_local_route(route->id()); - route->set_node2(unset_id); - - // if no connections lead to this node anymore, delete it - if (concerning_node.local_routes().empty()) { - if (!m_network.remove_node(was_connected_to)) IAC_ASSERT_NOT_REACHED(); - } - } - - return true; -} - -bool LocalNode::handle_heartbeat(const Package& package) { - iac_log(Logging::loglevels::debug, "heartbeat from: %d node: %d (self) %d %d\n", package.from(), - id(), - timestamp() - package.route()->meta().last_package_in, - timestamp() - package.route()->meta().last_package_out); - return true; -} - bool LocalNode::update() { if (m_network.endpoint_mapping().empty()) { IAC_HANDLE_EXCEPTION(NoRegisteredEndpointsException, "updating node with no endpoints"); @@ -295,35 +92,7 @@ bool LocalNode::update() { it++; // iterator might get invalidated in handle_connect_package, so we have to increment now; - // iac_printf("route %p; mode %d; last in: %d; last out: %d\n", route.first, - // route.first->m_state, timestamp() - route.first->m_last_package_in, - // timestamp() - route.first->m_last_package_out); - - iac_log(Logging::loglevels::verbose, "state of route %d @ node %d is %d\n", route->id(), id(), route->state()); - - switch (route->state()) { - case LocalTransportRoute::route_state::INITIALIZED: - case LocalTransportRoute::route_state::CLOSED: - if (!open_route(route)) break; - - case LocalTransportRoute::route_state::OPEN: - route->meta().last_package_in = timestamp(); - - case LocalTransportRoute::route_state::WAITING_FOR_HANDSHAKE_REPLY: - if (!send_handshake(route)) break; - - case LocalTransportRoute::route_state::CONNECTED: - if (timestamp() - route->meta().last_package_in > route->meta().timings.assume_dead_after_ms) { - if (!close_route(route)) return false; - } - - if (timestamp() - route->meta().last_package_out > route->meta().timings.heartbeat_interval_ms) { - if (!send_heartbeat(route)) return false; - } - - if (!read_from(route)) return false; - break; - } + if (!state_handling(route)) return false; } if (m_network.is_modified()) { @@ -332,7 +101,7 @@ bool LocalNode::update() { if (!route_entry.second->local()) continue; auto* route = (LocalTransportRoute*)route_entry.second.element_ptr(); if (route->state() != LocalTransportRoute::route_state::CONNECTED) continue; - send_connect_package(route, true); + send_network_update(route); } } @@ -343,8 +112,8 @@ bool LocalNode::read_from(LocalTransportRoute* route) { for (size_t i = 0; i < s_num_package_reads_from_route_per_update && route->connection().available() > 0; i++) { Package package; if (package.read_from(route)) { - route->meta().last_package_in = timestamp(); if (!handle_package(package)) return false; + route->meta().last_package_in = timestamp::now(); } else { break; } @@ -352,105 +121,31 @@ bool LocalNode::read_from(LocalTransportRoute* route) { return true; } -bool LocalNode::open_route(LocalTransportRoute* route) { - if (route->state() != LocalTransportRoute::route_state::OPEN && route->open()) { - route->state() = LocalTransportRoute::route_state::OPEN; - iac_log(Logging::loglevels::network, "opened route %d [%s]\n", route->id(), route->typestring().c_str()); - return true; - } - - iac_log(Logging::loglevels::warning, "error opening route %d [%s]\n", route->id(), route->typestring().c_str()); - return false; -} - -bool LocalNode::close_route(LocalTransportRoute* route) { - if (route->state() != LocalTransportRoute::route_state::CLOSED && route->close()) { - if (!handle_disconnect(route)) { - iac_log(Logging::loglevels::warning, "error disconnecting route %d [%s] \n", route->id(), route->typestring().c_str()); - return false; - } - route->state() = LocalTransportRoute::route_state::CLOSED; - iac_log(Logging::loglevels::network, "closed route %d [%s]\n", route->id(), route->typestring().c_str()); - return true; - } - - iac_log(Logging::loglevels::warning, "error closing route %d [%s] \n", route->id(), route->typestring().c_str()); - return false; -} - -bool LocalNode::send_handshake(LocalTransportRoute* route) { - if (send_connect_package(route, false)) { - if (route->state() == LocalTransportRoute::route_state::OPEN) - route->state() = LocalTransportRoute::route_state::WAITING_FOR_HANDSHAKE_REPLY; - } else { - iac_log(Logging::loglevels::warning, "error sending connect package to route %d [%s] \n", route->id(), route->typestring().c_str()); +bool LocalNode::send_package(const Package& package) { + if (!m_network.endpoint_registered(package.to())) { + iac_log_from_node(Logging::loglevels::error, "asked to send package for unregistered endpoint %d, dropping package\n", package.to()); return false; } - return true; -} + const auto& ep = m_network.endpoint(package.to()); -bool LocalNode::send_heartbeat(LocalTransportRoute* route) { - Package package{reserved_endpoint_addresses::IAC, - reserved_endpoint_addresses::IAC, - reserved_package_types::HEARTBEAT, nullptr, 0}; + const auto& available_routes = m_network.node(ep.node()).local_routes(); + if (available_routes.empty()) + return false; - // iac_log(Logging::loglevels::debug,"sending heartbeat package\n"); - // package.print(); + auto route = (LocalTransportRoute*)&m_network.route(best_local_route(available_routes).first); - return handle_package(package, route); + return send_package(package, route); } -bool LocalNode::send_connect_package(LocalTransportRoute* route, bool relay) { - BufferWriter writer; - - writer.boolean(relay); - - writer.num(id()); - writer.num(route->id()); - - writer.num(route->meta().timings.heartbeat_interval_ms); - writer.num(route->meta().timings.assume_dead_after_ms); - - writer.num(m_network.endpoint_mapping().size()); - for (const auto& ep_entry : m_network.endpoint_mapping()) { - writer.num(ep_entry.second.element().id()); - writer.str(ep_entry.second.element().name()); - writer.num(ep_entry.second->m_node); - } - - writer.num(m_network.route_mapping().size() - 1); - for (const auto& tr_entry : m_network.route_mapping()) { - if (tr_entry.second.element_ptr() == route) continue; - - writer.num(tr_entry.second.element().id()); - writer.num(tr_entry.second->node1()); - writer.num(tr_entry.second->node2()); - } - - writer.num(m_network.node_mapping().size() - 1); - for (const auto& node_entry : m_network.node_mapping()) { - if (node_entry.second.element_ptr() == this) continue; - - if (node_entry.second->local_routes().empty()) { - IAC_HANDLE_FATAL_EXCEPTION(NonExistingException, "no local route leading to node, invalid state suspected"); - return false; - } - - auto best_route = best_local_route(node_entry.second->local_routes()); - - writer.num(node_entry.first); - writer.num(best_route.second); +bool LocalNode::send_package(const Package& package, LocalTransportRoute* route) { + if (route->state() == LocalTransportRoute::route_state::INITIALIZED || route->state() == LocalTransportRoute::route_state::CLOSED) { + iac_log_from_node(Logging::loglevels::warning, "asked to send package with type %d to %d over route in state %d, dropping package\n", package.type(), package.to(), route->state()); + return false; } - Package package{reserved_endpoint_addresses::IAC, - reserved_endpoint_addresses::IAC, - reserved_package_types::CONNECT, writer.buffer(), writer.size()}; - - iac_log(Logging::loglevels::network, "sending %s package from %d to %d over %d\n", relay ? "relay" : "connect", id(), m_network.route(route->id()).node2(), route->id()); - // package.print(); - - return handle_package(package, route); + route->meta().last_package_out = timestamp::now(); + return package.send_over(route); } } // namespace iac \ No newline at end of file diff --git a/src/local_node.hpp b/src/local_node.hpp index 71ca490..5cbb39f 100644 --- a/src/local_node.hpp +++ b/src/local_node.hpp @@ -20,14 +20,19 @@ #include "std_provider/utility.hpp" #include "std_provider/vector.hpp" -#ifndef ARDUINO -# include -#endif - namespace iac { IAC_MAKE_EXCEPTION(OutOfTrIdException); +#define IAC_LOG_PACKAGE_SEND(level, type) \ + iac_log_from_node(level, "sending " type " package to %d over %d\n", m_network.route(route->id()).node2(), route->id()); + +#define IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(level, type, info, ...) \ + iac_log_from_node(level, "received " type " package over %d " info "\n", package.route()->id(), __VA_ARGS__); + +#define IAC_LOG_PACKAGE_RECEIVE(level, type) \ + IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(level, type, "", 0); + class LocalNode : public Node { public: LocalNode(route_timings_t route_timings = {}); @@ -70,7 +75,7 @@ class LocalNode : public Node { private: static constexpr uint16_t s_min_heartbeat_interval_ms = 100; - static constexpr uint16_t s_min_assume_dead_time = s_min_heartbeat_interval_ms * 2; + static constexpr uint16_t s_min_assume_dead_time = s_min_heartbeat_interval_ms * 3; static constexpr uint8_t s_num_package_reads_from_route_per_update = 5; route_timings_t m_default_route_timings; @@ -79,42 +84,31 @@ class LocalNode : public Node { unordered_set m_used_tr_ids; - bool handle_package(Package& package, LocalTransportRoute* route = nullptr); + bool send_package(const Package& package); + bool send_package(const Package& package, LocalTransportRoute* route); + bool handle_package(const Package& package); - bool handle_connect(Package& package); - bool handle_disconnect(const Package& package); - bool handle_disconnect(LocalTransportRoute* route); + bool handle_connect(const Package& package); bool handle_heartbeat(const Package& package); + bool handle_ack(const Package& package); + bool handle_network_update(const Package& package); bool read_from(LocalTransportRoute* route); + + bool state_handling(LocalTransportRoute* route); + bool open_route(LocalTransportRoute* route); bool close_route(LocalTransportRoute* route); - bool send_handshake(LocalTransportRoute* route); - bool send_heartbeat(LocalTransportRoute* route); - bool send_connect_package(LocalTransportRoute* route, bool relay); + bool send_connect(LocalTransportRoute* route); + bool send_heartbeat(LocalTransportRoute* route); + bool send_ack(LocalTransportRoute* route); + bool send_network_update(LocalTransportRoute* route); uint8_t get_tr_id(); bool pop_tr_id(uint8_t id); static pair best_local_route(const unordered_map& local_routes); - static LocalTransportRoute::timestamp_t timestamp() { -#ifdef ARDUINO - return millis(); -#else - return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); -#endif - } - - template - void delete_mapping(T mapping, LocalNode* obj, bool (LocalNode::*remove_func)(U)) { - for (auto it = mapping.begin(); it != mapping.end();) { - U id = it->first; - it++; - - (obj->*remove_func)(id); - } - }; }; } // namespace iac \ No newline at end of file diff --git a/src/local_node_api.cpp b/src/local_node_api.cpp index 6b23320..9ff4868 100644 --- a/src/local_node_api.cpp +++ b/src/local_node_api.cpp @@ -6,17 +6,17 @@ bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const uint return send(from.id(), to, type, buffer, buffer_length, buffer_management); } -bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, Package::buffer_management_t buffer_management) { - Package package{from, to, type, buffer, buffer_length, buffer_management}; - return handle_package(package); +bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) { + return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management); } bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) { return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management); } -bool LocalNode::send(Endpoint& from, ep_id_t to, package_type_t type, const BufferWriter& buffer, Package::buffer_management_t buffer_management) { - return send(from, to, type, buffer.buffer(), buffer.size(), buffer_management); +bool LocalNode::send(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, Package::buffer_management_t buffer_management) { + Package package{from, to, type, buffer, buffer_length, buffer_management}; + return send_package(package); } bool LocalNode::endpoint_connected(ep_id_t address) const { @@ -35,11 +35,11 @@ bool LocalNode::endpoints_connected(const vector& addresses) const { bool LocalNode::all_routes_connected() const { for (const auto& route : m_network.route_mapping()) { if (!route.second.element().local()) continue; - const auto& ltr = (const LocalTransportRoute&) route.second.element(); + const auto& ltr = (const LocalTransportRoute&)route.second.element(); if (ltr.state() != LocalTransportRoute::route_state::CONNECTED) return false; } return true; } -} \ No newline at end of file +} // namespace iac \ No newline at end of file diff --git a/src/local_node_package_handling.cpp b/src/local_node_package_handling.cpp new file mode 100644 index 0000000..144d8bd --- /dev/null +++ b/src/local_node_package_handling.cpp @@ -0,0 +1,191 @@ +#include "local_node.hpp" + +namespace iac { + +bool LocalNode::handle_package(const Package& package) { + if (package.route()->state() == LocalTransportRoute::route_state::INITIALIZED || package.route()->state() == LocalTransportRoute::route_state::CLOSED) { + iac_log_from_node(Logging::loglevels::warning, "received package on closed route\n"); + return false; + } + + if (package.to() == reserved_endpoint_addresses::IAC) { + if (package.type() == reserved_package_types::CONNECT && package.route()->state() == LocalTransportRoute::route_state::WAIT_CONNECT) { + if (handle_connect(package)) { + package.route()->state() = LocalTransportRoute::route_state::SEND_ACK; + return true; + } + return false; + } + + if (package.type() == reserved_package_types::ACK && package.route()->state() == LocalTransportRoute::route_state::WAIT_ACK) { + if (handle_ack(package)) { + package.route()->state() = LocalTransportRoute::route_state::CONNECTED; + m_network.set_modified(); // force a send of network_update + return true; + } + return false; + } + + if (package.route()->state() == LocalTransportRoute::route_state::CONNECTED) { + if (package.type() == reserved_package_types::NETWORK_UPDATE) { + return handle_network_update(package); + } + + if (package.type() == reserved_package_types::HEARTBEAT) { + return handle_heartbeat(package); + } + } + + iac_log_from_node(Logging::loglevels::warning, "dropping package for IAC; route_id: %d; type: %d\n", package.route()->id(), package.type()); + return false; + } + if (!m_network.endpoint_registered(package.to())) { + iac_log_from_node(Logging::loglevels::error, "received package for unregistered endpoint %d, dropping package\n", package.to()); + return false; + } + + const auto& ep = m_network.endpoint(package.to()); + + if (ep.local()) + return ((const LocalEndpoint&)ep).handle_package(package); + + return send_package(package); +} + +bool LocalNode::handle_connect(const Package& package) { + BufferReader reader{package.payload(), package.payload_size()}; + + auto sender_id = reader.num(); + auto other_tr_id = reader.num(); + + package.route()->meta().timings.heartbeat_interval_ms = max_of(reader.num(), package.route()->meta().timings.heartbeat_interval_ms); + package.route()->meta().timings.assume_dead_after_ms = max_of(reader.num(), package.route()->meta().timings.assume_dead_after_ms); + + IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(Logging::loglevels::network, "connect", "from %d", sender_id); + + if (!m_network.node_registered(sender_id)) { + m_network.add_node(ManagedNetworkEntry::create_and_adopt(new Node(sender_id))); + + if (other_tr_id < package.route()->id()) { + iac_log_from_node(Logging::loglevels::debug, "changing route id\n"); + + const auto old_id = package.route()->id(); + + auto erase_tr_links = [&](node_id_t node_id) { + if (node_id != unset_id) { + auto& node_entry = m_network.mutable_node(node_id); + + auto node_route_res = node_entry.routes().find(package.route()->id()); + if (node_route_res != node_entry.routes().end()) { + node_entry.remove_route(node_route_res); + } + + auto node_local_route_res = node_entry.local_routes().find(package.route()->id()); + if (node_local_route_res != node_entry.local_routes().end()) { + node_entry.remove_local_route(node_local_route_res); + } + } + }; + + auto insert_tr_links = [&](node_id_t node_id) { + if (node_id != unset_id) { + auto& node_entry = m_network.mutable_node(node_id); + + node_entry.add_route(other_tr_id); + node_entry.add_local_route(other_tr_id, 1); + } + }; + + auto temp = iac::move(m_network.mutable_route_managed_entry(old_id)); + m_network.erase_route_managed_entry(old_id); + + erase_tr_links(package.route()->nodes().first); + erase_tr_links(package.route()->nodes().second); + + package.route()->set_id(other_tr_id); + m_network.add_route(iac::move(temp)); + + insert_tr_links(package.route()->node1()); + insert_tr_links(package.route()->node2()); + } + + package.route()->set_node2(sender_id); + auto& sender = m_network.mutable_node(sender_id); + sender.add_route(package.route()->id()); + sender.add_local_route({package.route()->id(), 1}); + + iac_log_from_node(Logging::loglevels::connect, "connecting %d to %d\n", id(), sender_id); + + iac_log_from_node(Logging::loglevels::verbose, "connect with timing: %d %d\n", package.route()->meta().timings.heartbeat_interval_ms, package.route()->meta().timings.assume_dead_after_ms); + } + + return true; +} + +bool LocalNode::handle_heartbeat(const Package& package) { + auto now = timestamp::now(); + + IAC_LOG_PACKAGE_RECEIVE_WITH_INFO(Logging::loglevels::verbose, "heartbeat", "with timing: last_out: %d; last_in:%d", + now - package.route()->meta().last_package_in, + now - package.route()->meta().last_package_out); + return true; +} + +bool LocalNode::handle_ack(const Package& package) { + IAC_LOG_PACKAGE_RECEIVE(Logging::loglevels::network, "ack"); + return true; +} + +bool LocalNode::handle_network_update(const Package& package) { + BufferReader reader{package.payload(), package.payload_size()}; + + IAC_LOG_PACKAGE_RECEIVE(Logging::loglevels::network, "network_update"); + + if (package.route()->state() != LocalTransportRoute::route_state::CONNECTED) { + iac_log_from_node(Logging::loglevels::debug, "route is not connected, discarding relay package\n"); + return true; + } + + const auto num_ep_data = reader.num(); + for (uint8_t i = 0; i < num_ep_data; ++i) { + const auto ep_id = reader.num(); + const auto* ep_name = reader.str(); + const auto ep_node = reader.num(); + + if (!m_network.endpoint_registered(ep_id)) { + auto* ep = new Endpoint(ep_id); + ep->set_name(ep_name); + ep->set_node(ep_node); + m_network.add_endpoint(ManagedNetworkEntry::create_and_adopt(ep)); + } + } + + const auto num_tr_data = reader.num(); + for (uint8_t i = 0; i < num_tr_data; ++i) { + const auto tr_id = reader.num(); + const auto node1_id = reader.num(); + const auto node2_id = reader.num(); + + if (node1_id != unset_id && node2_id != unset_id && !m_network.route_registered(tr_id)) { + auto* tr = new TransportRoute(tr_id, {node1_id, node2_id}); + m_network.add_route(ManagedNetworkEntry::create_and_adopt(tr)); + } + } + + const auto num_ltr_data = reader.num(); + for (uint8_t i = 0; i < num_ltr_data; ++i) { + const auto reachable_node = reader.num(); + const auto num_hops = reader.num(); + + auto& reachable_node_entry = m_network.node(reachable_node); + auto res = reachable_node_entry.local_routes().find(package.route()->id()); + + if (res == reachable_node_entry.local_routes().end() || res->second > num_hops + 1) { + m_network.mutable_node(reachable_node).add_local_route(package.route()->id(), num_hops + 1); + } + } + + return send_heartbeat(package.route()); +} + +} // namespace iac diff --git a/src/local_node_state_handling.cpp b/src/local_node_state_handling.cpp new file mode 100644 index 0000000..461ee07 --- /dev/null +++ b/src/local_node_state_handling.cpp @@ -0,0 +1,203 @@ +#include "local_node.hpp" + +namespace iac { + +/* + +On init: +-----------------+ +-----------------+ + | INITIALIZED | | INITIALIZED | + +-----------------+ +-----------------+ + | | + V V + +-----------------+ --------------> +-----------------+ + +> | SEND_CONNECT | msg::connect | SEND_CONNECT | <+ + | +-----------------+ <-------------- +-----------------+ | + until | | | | until +msg::connect | V V | msg::connect + received | +-----------------+ +-----------------+ | received + +- | WAIT_CONNECT | | WAIT_CONNECT | -+ + +-----------------+ +-----------------+ + | | + V V + +-----------------+ --------------> +-----------------+ + | SEND_ACK | msg::ack | SEND_ACK | + +-----------------+ <-------------- +-----------------+ + | | + V V + +-----------------+ +-----------------+ + | WAIT_ACK | | WAIT_ACK | + +-----------------+ +-----------------+ + | | + V V + +-----------------+ +-----------------+ + | CONNECTED | | CONNECTED | + +-----------------+ +-----------------+ + +*/ + +bool LocalNode::state_handling(LocalTransportRoute* route) { + iac_log_from_node(Logging::loglevels::verbose, "state of route %d @ node %d is %d\n", route->id(), id(), route->state()); + auto now = timestamp::now(); + + if (route->state() != LocalTransportRoute::route_state::CLOSED && + route->state() != LocalTransportRoute::route_state::INITIALIZED && + route->meta().last_package_in.is_more_than_n_in_past(now, route->meta().timings.assume_dead_after_ms)) { + if (!close_route(route)) return false; + route->state() = LocalTransportRoute::route_state::CLOSED; + } + + switch (route->state()) { + case LocalTransportRoute::route_state::INITIALIZED: + case LocalTransportRoute::route_state::CLOSED: + if (!open_route(route)) return false; + route->state() = LocalTransportRoute::route_state::SEND_CONNECT; + // intentional fall-through + + case LocalTransportRoute::route_state::SEND_CONNECT: + if (!send_connect(route)) return false; + route->state() = LocalTransportRoute::route_state::WAIT_CONNECT; + // intentional fall-through + + case LocalTransportRoute::route_state::WAIT_CONNECT: + // NOTE: This loop will be broken when a CONNECT package from this route arrives + if (route->meta().last_package_out.is_more_than_n_in_past(now, route->meta().timings.heartbeat_interval_ms)) + route->state() = LocalTransportRoute::route_state::SEND_CONNECT; + break; + + case LocalTransportRoute::route_state::SEND_ACK: + + if (!send_ack(route)) return false; + route->state() = LocalTransportRoute::route_state::WAIT_ACK; + // intentional fall-through + + case LocalTransportRoute::route_state::WAIT_ACK: + // NOTE: This loop will be broken when a ACK package from this route arrives + if (route->meta().last_package_out.is_more_than_n_in_past(now, route->meta().timings.heartbeat_interval_ms)) + route->state() = LocalTransportRoute::route_state::SEND_ACK; + break; + + case LocalTransportRoute::route_state::CONNECTED: + + if (route->meta().last_package_out.is_more_than_n_in_past(now, route->meta().timings.heartbeat_interval_ms)) { + if (!send_heartbeat(route)) return false; + } + + break; + } + + // NOTE: route should always be open at this point + if (!read_from(route)) return false; + + return true; +} + +bool LocalNode::open_route(LocalTransportRoute* route) { + if (route->connection().open()) { + auto now = timestamp::now(); + route->meta().last_package_in = now; + route->meta().last_package_out = now; + iac_log_from_node(Logging::loglevels::network, "opened route %d [%s]\n", route->id(), route->typestring().c_str()); + return true; + } + + iac_log_from_node(Logging::loglevels::verbose, "error opening route %d [%s]\n", route->id(), route->typestring().c_str()); + return false; +} + +bool LocalNode::close_route(LocalTransportRoute* route) { + if (route->connection().close()) { + if (!(m_network.disconnect_route(route->id()) && route->reset())) { + iac_log_from_node(Logging::loglevels::warning, "error disconnecting route %d [%s] \n", route->id(), route->typestring().c_str()); + return false; + } + + iac_log_from_node(Logging::loglevels::network, "closed route %d [%s]\n", route->id(), route->typestring().c_str()); + return true; + } + + iac_log_from_node(Logging::loglevels::warning, "error closing route %d [%s] \n", route->id(), route->typestring().c_str()); + return false; +} + +bool LocalNode::send_network_update(LocalTransportRoute* route) { + BufferWriter writer; + + writer.num(m_network.endpoint_mapping().size()); + for (const auto& ep_entry : m_network.endpoint_mapping()) { + writer.num(ep_entry.second.element().id()); + writer.str(ep_entry.second.element().name()); + writer.num(ep_entry.second->m_node); + } + + writer.num(m_network.route_mapping().size() - 1); + for (const auto& tr_entry : m_network.route_mapping()) { + if (tr_entry.second.element_ptr() == route) continue; + + writer.num(tr_entry.second.element().id()); + writer.num(tr_entry.second->node1()); + writer.num(tr_entry.second->node2()); + } + + writer.num(m_network.node_mapping().size() - 1); + for (const auto& node_entry : m_network.node_mapping()) { + if (node_entry.second.element_ptr() == this) continue; + + if (node_entry.second->local_routes().empty()) { + IAC_HANDLE_FATAL_EXCEPTION(NonExistingException, "no local route leading to node, invalid state suspected"); + return false; + } + + auto best_route = best_local_route(node_entry.second->local_routes()); + + writer.num(node_entry.first); + writer.num(best_route.second); + } + + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::NETWORK_UPDATE, writer.buffer(), writer.size()}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::network, "network_update"); + + return send_package(package, route); +} + +bool LocalNode::send_heartbeat(LocalTransportRoute* route) { + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::HEARTBEAT, nullptr, 0}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::verbose, "heartbeat"); + + return send_package(package, route); +} + +bool LocalNode::send_ack(LocalTransportRoute* route) { + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::ACK, nullptr, 0}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::network, "ack"); + + return send_package(package, route); +} + +bool LocalNode::send_connect(LocalTransportRoute* route) { + BufferWriter writer; + + writer.num(id()); + writer.num(route->id()); + + writer.num(route->meta().timings.heartbeat_interval_ms); + writer.num(route->meta().timings.assume_dead_after_ms); + + Package package{reserved_endpoint_addresses::IAC, + reserved_endpoint_addresses::IAC, + reserved_package_types::CONNECT, writer.buffer(), writer.size()}; + + IAC_LOG_PACKAGE_SEND(Logging::loglevels::network, "connect"); + + return send_package(package, route); +} + +} // namespace iac \ No newline at end of file diff --git a/src/logging.hpp b/src/logging.hpp index adb459d..a5a25b5 100644 --- a/src/logging.hpp +++ b/src/logging.hpp @@ -34,20 +34,23 @@ class Logging { } #ifndef IAC_DISABLE_PRINTING -# define iac_log(level, ...) Logging::log(__FILE__, __LINE__, __FUNCTION__, level, __VA_ARGS__); +# define iac_log(level, ...) Logging::log(__FILE__, __LINE__, __FUNCTION__, nullptr, 0, level, __VA_ARGS__); +# define iac_log_from_node(level, ...) Logging::log(__FILE__, __LINE__, __FUNCTION__, "[node::%d] \t", id(), level, __VA_ARGS__); - static constexpr void log_head([[maybe_unused]] const char* file, [[maybe_unused]] unsigned line, [[maybe_unused]] const char* function, loglevel_t level) { + static constexpr void log_head([[maybe_unused]] const char* file, [[maybe_unused]] unsigned line, [[maybe_unused]] const char* function, loglevel_t level, const char* id_str, int id) { # ifdef IAC_LOG_WITH_LINE_NUMBERS iac_printf(s_log_head_fmt_with_line_info, s_level_colors[(int)level], s_level_names[(int)level], file, line, function); # else iac_printf(s_log_head_fmt, s_level_colors[(int)level], s_level_names[(int)level]); # endif + if (id_str) + iac_printf(id_str, id); } template - static constexpr void log(const char* file, unsigned line, const char* function, loglevel_t level, Args... args) { + static constexpr void log(const char* file, unsigned line, const char* function, const char* id_str, int id, loglevel_t level, Args... args) { if (s_loglevel >= level) { - log_head(file, line, function, level); + log_head(file, line, function, level, id_str, id); iac_printf(args...); } } diff --git a/src/network.cpp b/src/network.cpp index d92435a..3e4a14a 100644 --- a/src/network.cpp +++ b/src/network.cpp @@ -10,20 +10,25 @@ bool Network::add_route(ManagedNetworkEntry&& route) { return false; } + auto add_if_not_unset = [&](node_id_t node_id) { + if (node_id != unset_id && !node_registered(node_id)) + add_node(ManagedNetworkEntry::create_and_adopt(new Node(node_id))); + }; + + auto link_if_not_unset = [&](tr_id_t tr_id, node_id_t node_id) { + if (node_id != unset_id) mutable_node(node_id).add_route(tr_id); + }; + auto tr_id = route.element().id(); auto nodes = route.element().nodes(); - m_tr_mapping[tr_id] = iac::move(route); - auto add_if_not_unset = [&](tr_id_t tr_id, node_id_t node_id) { - if (node_id != unset_id) { - if (!node_registered(node_id)) - add_node(ManagedNetworkEntry::create_and_adopt(new Node(node_id))); - mutable_node(node_id).add_route(tr_id); - } - }; + add_if_not_unset(nodes.first); + add_if_not_unset(nodes.second); + + m_tr_mapping[tr_id] = iac::move(route); - add_if_not_unset(tr_id, nodes.first); - add_if_not_unset(tr_id, nodes.second); + link_if_not_unset(tr_id, nodes.first); + link_if_not_unset(tr_id, nodes.second); set_modified(); diff --git a/src/network.hpp b/src/network.hpp index a257d67..d91ecc2 100644 --- a/src/network.hpp +++ b/src/network.hpp @@ -105,6 +105,10 @@ class Network { bool add_node(ManagedNetworkEntry&& node); bool remove_node(node_id_t node_id); + bool disconnect_route(tr_id_t route_id) { + return true; + }; + bool validate_network() const; bool is_modified() const { diff --git a/src/network_types.hpp b/src/network_types.hpp index 1df2049..8dc3e04 100644 --- a/src/network_types.hpp +++ b/src/network_types.hpp @@ -10,6 +10,10 @@ #include "std_provider/unordered_set.hpp" #include "std_provider/utility.hpp" +#ifndef ARDUINO +# include +#endif + namespace iac { typedef uint8_t node_id_t; @@ -23,8 +27,9 @@ typedef uint8_t start_byte_t; enum reserved_package_types { CONNECT = numeric_limits::max(), - DISCONNECT = numeric_limits::max() - 1, - HEARTBEAT = numeric_limits::max() - 2, + ACK = numeric_limits::max() - 1, + NETWORK_UPDATE = numeric_limits::max() - 2, + HEARTBEAT = numeric_limits::max() - 3, }; enum reserved_endpoint_addresses { @@ -38,6 +43,35 @@ typedef struct route_timings { uint16_t assume_dead_after_ms = 0; } route_timings_t; +struct timestamp { + timestamp() = default; + + timestamp(size_t ts) + : ts(ts){}; + + static timestamp now() { +#ifdef ARDUINO + return millis(); +#else + return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); +#endif + } + + bool is_more_than_n_in_past(const timestamp& now, const size_t n) { + return timestamp(ts + n) < now; + } + + bool operator<(const timestamp& rhs) { + return ts < rhs.ts; + } + + size_t operator-(const timestamp& rhs) { + return ts - rhs.ts; + } + + size_t ts; +}; + IAC_MAKE_EXCEPTION(EmptyNetworkEntryDereferenceException); IAC_MAKE_EXCEPTION(CopyingNonEmptyNetworkEntryException); IAC_MAKE_EXCEPTION(BindingToNonEmptyNetworkEntryException); diff --git a/src/package.cpp b/src/package.cpp index 1f93888..1b70d4a 100644 --- a/src/package.cpp +++ b/src/package.cpp @@ -7,7 +7,8 @@ constexpr size_t Package::s_info_header_size; constexpr size_t Package::s_max_payload_size; constexpr start_byte_t Package::s_startbyte; -Package::Package(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, buffer_management_t buffer_type) : m_from(from), m_to(to), m_type(type), m_payload((uint8_t*)buffer), m_buffer_type(buffer_type) { +Package::Package(ep_id_t from, ep_id_t to, package_type_t type, const uint8_t* buffer, size_t buffer_length, buffer_management_t buffer_type) + : m_from(from), m_to(to), m_type(type), m_payload((uint8_t*)buffer), m_buffer_type(buffer_type) { if (buffer_length > s_max_payload_size) { IAC_HANDLE_EXCEPTION(InvalidPackageException, "payload to big"); @@ -73,7 +74,7 @@ bool Package::read_from(LocalTransportRoute* route) { #ifdef ARDUINO uint8_t dummy = 0; - route->write(&dummy, 1); + route->connection().write(&dummy, 1); #endif if (route->meta().wait_for_available_size > 0 && route->connection().available() < route->meta().wait_for_available_size) diff --git a/tests/test_network_visualization.hpp b/tests/test_network_visualization.hpp index e63c12f..b8f3bb2 100644 --- a/tests/test_network_visualization.hpp +++ b/tests/test_network_visualization.hpp @@ -30,6 +30,22 @@ class TestNetworkVisualization { TEST_UTILS_CONNECT_NODES_WITH_LOOPBACK(tr3, node2, node4); TEST_UTILS_CONNECT_NODES_WITH_LOOPBACK(tr4, node3, node4); + iac::Visualization viz{"127.0.0.1", 3000}; + viz.add_network("node1", node1.network()); + viz.add_network("node2", node2.network()); + viz.add_network("node3", node3.network()); + viz.add_network("node4", node4.network()); + + bool run = true; + auto t = std::thread(run_viz, &run, &viz); + + printf("Visualization up on http://127.0.0.1:3000\npress any key to continue..."); + + char c; + std::cin >> c; + + printf("...continuing\n"); + TestLogging::test_printf("node1 network rep on startup %s", node1.network().network_representation().c_str()); TestUtilities::update_til_connected([] {}, node1, node2, node3, node4); @@ -46,18 +62,6 @@ class TestNetworkVisualization { TestLogging::test_printf("node4 %s", node4.network().network_representation(false).c_str()); } - iac::Visualization viz{"127.0.0.1", 3000}; - viz.add_network("node1", node1.network()); - viz.add_network("node2", node2.network()); - viz.add_network("node3", node3.network()); - viz.add_network("node4", node4.network()); - - bool run = true; - auto t = std::thread(run_viz, &run, &viz); - - printf("Visualization up on http://127.0.0.1:3000\npress any key to continue..."); - - char c; std::cin >> c; printf("...continuing\n"); diff --git a/tests/test_send_receive.hpp b/tests/test_send_receive.hpp index 86d48a6..3d65556 100644 --- a/tests/test_send_receive.hpp +++ b/tests/test_send_receive.hpp @@ -21,15 +21,18 @@ class TestSendReceive { TestUtilities::update_til_connected([] {}, node1, node2); + // NOTE: network_update should arrive on next update + TestUtilities::update_all_nodes(node1, node2); + if (!node1.send(ep1, ep2.id(), 0, nullptr, 0)) { - return {"failed to send pkg to ep2\n"}; + return {"failed to send pkg to ep2"}; } while (rec_pkg_count < 1) TestUtilities::update_all_nodes(node1, node2); if (!node2.send(ep2, ep1.id(), 0, nullptr, 0)) { - return {"failed to send pkg to ep1\n"}; + return {"failed to send pkg to ep1"}; } while (rec_pkg_count < 2) diff --git a/tests/testing.cpp b/tests/testing.cpp index 7814864..e9318c3 100644 --- a/tests/testing.cpp +++ b/tests/testing.cpp @@ -12,12 +12,6 @@ int main(int argc, char* argv[]) { iac::Logging::set_loglevel(iac::Logging::loglevels::debug); - TestLogging::start_suite("communication"); - - TestLogging::run("disconnect-reconnect", TestDisconnectReconnect::run); - TestLogging::run("send-receive", TestSendReceive::run); - TestLogging::run("network-building", TestNetworkBuilding::run); - #ifndef IAC_DISABLE_VISUALIZATION if (argc >= 2 && strncmp(argv[1], "-h", 2) == 0) { TestLogging::start_suite("visualization"); @@ -25,5 +19,11 @@ int main(int argc, char* argv[]) { } #endif + TestLogging::start_suite("communication"); + + TestLogging::run("disconnect-reconnect", TestDisconnectReconnect::run); + TestLogging::run("send-receive", TestSendReceive::run); + TestLogging::run("network-building", TestNetworkBuilding::run); + return TestLogging::results(); } \ No newline at end of file