From 7a8eee250d678b99b6551043bccb84915e3cc1ec Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 3 Jul 2024 08:39:48 -0700 Subject: [PATCH] Fix build for macOS, tune to prevent packet re-ordering --- osdep/EthernetTap.cpp | 4 +- osdep/LinuxEthernetTap.cpp | 2 +- osdep/MacEthernetTap.cpp | 22 +---- osdep/MacEthernetTap.hpp | 4 - osdep/MacKextEthernetTap.cpp | 2 - osdep/MacKextEthernetTap.hpp | 2 - service/OneService.cpp | 162 ++++++++++++++++++++--------------- 7 files changed, 102 insertions(+), 96 deletions(-) diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index d6c7bd7b22..95ce54b0a4 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -84,9 +84,9 @@ std::shared_ptr EthernetTap::newInstance( // The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions // (Sierra and earlier) must use the a kernel extension. if (strtol(osrelease,(char **)0,10) < 17) { - return std::shared_ptr(new MacKextEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacKextEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); } else { - return std::shared_ptr(new MacEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); } } } diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index 4919dbd69b..81ebedc11e 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -223,7 +223,7 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); bool _enablePinning = false; - char* envvar = std::getenv("ZT_CPU_PINNING"); + char* envvar = std::getenv("ZT_CORE_PINNING"); if (envvar) { int tmp = atoi(envvar); if (tmp > 0) { diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp index 8d15b3b208..37f27f87a2 100644 --- a/osdep/MacEthernetTap.cpp +++ b/osdep/MacEthernetTap.cpp @@ -69,7 +69,6 @@ static bool fethMaxMtuAdjusted = false; MacEthernetTap::MacEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -78,7 +77,6 @@ MacEthernetTap::MacEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *data,unsigned int len), void *arg) : _handler(handler), - _concurrency(concurrency), _arg(arg), _nwid(nwid), _homePath(homePath), @@ -288,9 +286,6 @@ MacEthernetTap::~MacEthernetTap() } Thread::join(_thread); - for (std::thread &t : _rxThreads) { - t.join(); - } } void MacEthernetTap::setEnabled(bool en) { _enabled = en; } @@ -479,25 +474,17 @@ void MacEthernetTap::setMtu(unsigned int mtu) void MacEthernetTap::threadMain() throw() { - Thread::sleep(250); - - for (unsigned int i = 0; i < _concurrency; ++i) { - _rxThreads.push_back(std::thread([this, i] { - - fprintf(stderr, "starting thread %d\n", i); - char agentReadBuf[ZT_MACETHERNETTAP_AGENT_READ_BUF_SIZE]; char agentStderrBuf[256]; fd_set readfds,nullfds; MAC to,from; + Thread::sleep(250); + const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1; long agentReadPtr = 0; - - if (i == 0) { - fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); - fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); - } + fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); + fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); FD_ZERO(&readfds); FD_ZERO(&nullfds); @@ -546,7 +533,6 @@ void MacEthernetTap::threadMain() */ } } - }));} ::close(_agentStdin); ::close(_agentStdout); diff --git a/osdep/MacEthernetTap.hpp b/osdep/MacEthernetTap.hpp index 0bb78a79f1..8ba378022b 100644 --- a/osdep/MacEthernetTap.hpp +++ b/osdep/MacEthernetTap.hpp @@ -28,7 +28,6 @@ #include #include #include -#include namespace ZeroTier { @@ -37,7 +36,6 @@ class MacEthernetTap : public EthernetTap public: MacEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -69,7 +67,6 @@ class MacEthernetTap : public EthernetTap uint64_t _nwid; Thread _thread; std::string _homePath; - unsigned int _concurrency; std::string _dev; std::vector _multicastGroups; Mutex _putLock; @@ -82,7 +79,6 @@ class MacEthernetTap : public EthernetTap volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; - std::vector _rxThreads; }; diff --git a/osdep/MacKextEthernetTap.cpp b/osdep/MacKextEthernetTap.cpp index 7ac0dac25b..69e97050dc 100644 --- a/osdep/MacKextEthernetTap.cpp +++ b/osdep/MacKextEthernetTap.cpp @@ -306,7 +306,6 @@ static Mutex globalTapCreateLock; MacKextEthernetTap::MacKextEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -318,7 +317,6 @@ MacKextEthernetTap::MacKextEthernetTap( _arg(arg), _nwid(nwid), _homePath(homePath), - _concurrency(concurrency), _mtu(mtu), _metric(metric), _fd(0), diff --git a/osdep/MacKextEthernetTap.hpp b/osdep/MacKextEthernetTap.hpp index 6d85398744..a0cc1b81ac 100644 --- a/osdep/MacKextEthernetTap.hpp +++ b/osdep/MacKextEthernetTap.hpp @@ -37,7 +37,6 @@ class MacKextEthernetTap : public EthernetTap public: MacKextEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -72,7 +71,6 @@ class MacKextEthernetTap : public EthernetTap std::string _homePath; std::string _dev; std::vector _multicastGroups; - unsigned int _concurrency; unsigned int _mtu; unsigned int _metric; int _fd; diff --git a/service/OneService.cpp b/service/OneService.cpp index 320e48158f..5a99ea35f5 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -801,6 +801,7 @@ class OneServiceImpl : public OneService std::vector _rxPacketVector; std::vector _rxPacketThreads; Mutex _rxPacketVector_m,_rxPacketThreads_m; + bool _enableMulticore; bool _allowTcpFallbackRelay; bool _forceTcpRelay; @@ -934,74 +935,87 @@ class OneServiceImpl : public OneService _ports[1] = 0; _ports[2] = 0; - bool _enablePinning = false; - char* pinningVar = std::getenv("ZT_CPU_PINNING"); - if (pinningVar) { - int tmp = atoi(pinningVar); + _enableMulticore = false; + char* multicoreVar = std::getenv("ZT_ENABLE_MULTICORE"); + if (multicoreVar) { + int tmp = atoi(multicoreVar); if (tmp > 0) { - _enablePinning = true; + _enableMulticore = true; } } - char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); - if (concurrencyVar) { - int tmp = atoi(concurrencyVar); - if (tmp > 0) { - _rxThreadCount = tmp; + if (_enableMulticore) { + bool _enablePinning = false; + char* pinningVar = std::getenv("ZT_CORE_PINNING"); + if (pinningVar) { + int tmp = atoi(pinningVar); + if (tmp > 0) { + _enablePinning = true; + } + } + char* concurrencyVar = std::getenv("ZT_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _rxThreadCount = tmp; + } + else { + _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; + } } else { - _rxThreadCount = std::thread::hardware_concurrency(); + _rxThreadCount = std::thread::hardware_concurrency() >= 4 ? 2 : 1; } - } - else { - _rxThreadCount = std::thread::hardware_concurrency(); - } - for (unsigned int i = 0; i < _rxThreadCount; ++i) { - _rxPacketThreads.push_back(std::thread([this, i]() { + fprintf(stderr, "using %d rx threads\n", _rxThreadCount); + for (unsigned int i = 0; i < _rxThreadCount; ++i) { + _rxPacketThreads.push_back(std::thread([this, i, _enablePinning]() { + if (_enablePinning) { #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - int pinCore = i % _rxThreadCount; - fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); - pthread_t self = pthread_self(); - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(pinCore, &cpuset); + int pinCore = i % _rxThreadCount; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); #endif #ifdef __LINUX__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); #elif __FreeBSD__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); #endif #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ - if (rc != 0) - { - fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); - exit(1); - } -#endif - PacketRecord* packet = nullptr; - for (;;) { - if (! _rxPacketQueue.get(packet)) { - break; - } - if (! packet) { - break; - } - const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + if (rc != 0) { - Mutex::Lock l(_rxPacketVector_m); - _rxPacketVector.push_back(packet); - } - if (ZT_ResultCode_isFatal(err)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); } +#endif } - })); + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueue.get(packet)) { + break; + } + if (! packet) { + break; + } + const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + })); + } } prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); @@ -2865,25 +2879,39 @@ class OneServiceImpl : public OneService _lastDirectReceiveFromGlobal = now; } - PacketRecord* packet; - _rxPacketVector_m.lock(); - if (_rxPacketVector.empty()) { - packet = new PacketRecord; + if (_enableMulticore) { + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->sock = reinterpret_cast(sock); + packet->now = now; + memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); + packet->size = (unsigned int)len; + memcpy(packet->data, data, len); + _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); } else { - packet = _rxPacketVector.back(); - _rxPacketVector.pop_back(); + const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + } } - _rxPacketVector_m.unlock(); - - packet->sock = reinterpret_cast(sock); - packet->now = now; - memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); - packet->size = (unsigned int)len; - memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); } + inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { if (!success) { @@ -3103,7 +3131,7 @@ class OneServiceImpl : public OneService n.setTap(EthernetTap::newInstance( nullptr, _homePath.c_str(), - _rxThreadCount, + _enableMulticore ? _rxThreadCount : 1, MAC(nwc->mac), nwc->mtu, (unsigned int)ZT_IF_METRIC,