From bf7dddbd848343131e223425d961297e7bb67bdf Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Thu, 29 Aug 2024 10:09:19 -0700 Subject: [PATCH] Fix issues that prevent user multipath preferences from being respected --- node/Bond.cpp | 41 ++++++++++++++++++++++++++--------------- node/Bond.hpp | 2 ++ node/IncomingPacket.cpp | 1 - service/OneService.cpp | 2 +- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/node/Bond.cpp b/node/Bond.cpp index f5124a6a2..2a061796d 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -373,6 +373,7 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) */ if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) { if (_abPathIdx != ZT_MAX_PEER_NETWORK_PATHS && _paths[_abPathIdx].p) { + //fprintf(stderr, "trying to send via (_abPathIdx=%d) %s\n", _abPathIdx, pathToStr(_paths[_abPathIdx].p).c_str()); return _paths[_abPathIdx].p; } } @@ -1032,6 +1033,13 @@ void Bond::curateBond(int64_t now, bool rebuildBond) bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay; // How long since the last QoS was received (Must be less than ZT_PEER_PATH_EXPIRATION since the remote peer's _qosSendInterval isn't known) bool acceptableQoSAge = (_paths[i].lastQoSReceived == 0 && inTrial) || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD); + + // Allow active-backup to operate without the receipt of QoS records + // This may be expanded to the other modes as an option + if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) { + acceptableQoSAge = true; + } + currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay && acceptableQoSAge) || inTrial); if (currEligibility) { @@ -1043,12 +1051,11 @@ void Bond::curateBond(int64_t now, bool rebuildBond) */ if (currEligibility != _paths[i].eligible) { if (currEligibility == 0) { - log("link %s is no longer eligible", pathToStr(_paths[i].p).c_str()); + log("link %s is no longer eligible (reason: allowed=%d, age=%d, ud=%d, qos=%d, trial=%d)", pathToStr(_paths[i].p).c_str(), _paths[i].allowed(), acceptableAge, satisfiedUpDelay, acceptableQoSAge, inTrial); } if (currEligibility == 1) { log("link %s is eligible", pathToStr(_paths[i].p).c_str()); } - debug("\t[%d] allowed=%d, age=%d, qa=%d, ud=%d, trial=%d", i, _paths[i].allowed(), acceptableAge, acceptableQoSAge, satisfiedUpDelay, inTrial); dumpPathStatus(now, i); if (currEligibility) { rebuildBond = true; @@ -1496,7 +1503,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) { int prevActiveBackupPathIdx = _abPathIdx; int nonPreferredPathIdx = ZT_MAX_PEER_NETWORK_PATHS; - bool bFoundPrimaryLink = false; + bool foundPathOnPrimaryLink = false; + bool foundPreferredPath = false; if (_abPathIdx != ZT_MAX_PEER_NETWORK_PATHS && ! _paths[_abPathIdx].p) { _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS; @@ -1559,15 +1567,16 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) if (! _paths[i].preferred()) { // Found path on primary link, take note in case we don't find a preferred path nonPreferredPathIdx = i; - bFoundPrimaryLink = true; + foundPathOnPrimaryLink = true; } if (_paths[i].preferred()) { _abPathIdx = i; - bFoundPrimaryLink = true; + foundPathOnPrimaryLink = true; if (_paths[_abPathIdx].p) { SharedPtr abLink = RR->bc->getLinkBySocket(_policyAlias, _paths[_abPathIdx].p->localSocket()); if (abLink) { - log("found preferred primary link %s", pathToStr(_paths[_abPathIdx].p).c_str()); + log("found preferred primary link (_abPathIdx=%d), %s", _abPathIdx, pathToStr(_paths[_abPathIdx].p).c_str()); + foundPreferredPath = true; } break; // Found preferred path on primary link } @@ -1575,8 +1584,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } } } - if (bFoundPrimaryLink && (nonPreferredPathIdx != ZT_MAX_PEER_NETWORK_PATHS)) { - log("found non-preferred primary link"); + if (!foundPreferredPath && foundPathOnPrimaryLink && (nonPreferredPathIdx != ZT_MAX_PEER_NETWORK_PATHS)) { + log("found non-preferred primary link (_abPathIdx=%d)", _abPathIdx); _abPathIdx = nonPreferredPathIdx; } } @@ -1614,10 +1623,10 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } if (_paths[(*it)].p && ! _paths[(*it)].eligible) { SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[(*it)].p->localSocket()); - it = _abFailoverQueue.erase(it); if (link) { - log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); + log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[(*it)].p).c_str(), _abFailoverQueue.size()); } + it = _abFailoverQueue.erase(it); continue; } else { @@ -1684,7 +1693,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } } if (! bFoundPathInQueue) { - _abFailoverQueue.push_front(i); + _abFailoverQueue.push_back(i); log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size()); addPathToBond(i, 0); } @@ -1734,13 +1743,14 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } } if (! bFoundPathInQueue) { - _abFailoverQueue.push_front(i); + _abFailoverQueue.push_back(i); log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size()); addPathToBond(i, 0); } } } } + /* // Sort queue based on performance if (! _abFailoverQueue.empty()) { for (int i = 0; i < _abFailoverQueue.size(); i++) { @@ -1752,7 +1762,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } _abFailoverQueue[hole_position] = value_to_insert; } - } + }*/ /** * Short-circuit if we have no queued paths @@ -1902,7 +1912,7 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT * Policy defaults */ _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS; - _abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_OPTIMIZE; + _abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_ALWAYS; _rrPacketsSentOnCurrLink = 0; _rrIdx = 0; _packetsPerLink = 64; @@ -2021,7 +2031,8 @@ void Bond::dumpInfo(int64_t now, bool force) _lastSummaryDump = now; float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f); _overheadBytes = 0; - log("bond: bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)", + log("bond: ready=%d, bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)", + isReady(), _policy, _failoverInterval, _monitorInterval, diff --git a/node/Bond.hpp b/node/Bond.hpp index 408c1e125..d5d3f673e 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -1144,6 +1144,7 @@ class Bond { __attribute__((format(printf, 2, 3))) #endif { + //if (_peerId != 0x0 && _peerId != 0x0) { return; } #ifdef ZT_TRACE time_t rawtime; struct tm* timeinfo; @@ -1175,6 +1176,7 @@ class Bond { __attribute__((format(printf, 2, 3))) #endif { + //if (_peerId != 0x0 && _peerId != 0x0) { return; } #ifdef ZT_DEBUG time_t rawtime; struct tm* timeinfo; diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index d939fa0a1..dc268b1ab 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -334,7 +334,6 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const Shar bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr& peer) { Metrics::pkt_qos_in++; - SharedPtr bond = peer->bond(); if (! peer->rateGateQoS(RR->node->now(), _path)) { return true; } diff --git a/service/OneService.cpp b/service/OneService.cpp index 88a516ebd..4a4962b78 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -2510,7 +2510,7 @@ class OneServiceImpl : public OneService } _node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,mtu,capacity,enabled,linkMode,failoverToStr)); } - std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"optimize")); + std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"always")); if (linkSelectMethodStr == "always") { newTemplateBond->setLinkSelectMethod(ZT_BOND_RESELECTION_POLICY_ALWAYS); }