From 2d89f07156a72d50c02b068e78abec69dc1064ca Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Fri, 23 Feb 2024 09:09:46 -0800 Subject: [PATCH] Add multi-core concurrent packet processing --- osdep/BSDEthernetTap.cpp | 112 +++++++++++------ osdep/BSDEthernetTap.hpp | 4 + osdep/EthernetTap.cpp | 15 +-- osdep/EthernetTap.hpp | 1 + osdep/LinuxEthernetTap.cpp | 233 ++++++++++++++++++++--------------- osdep/LinuxEthernetTap.hpp | 8 +- osdep/MacEthernetTap.cpp | 22 +++- osdep/MacEthernetTap.hpp | 4 + osdep/MacEthernetTapAgent.c | 2 +- osdep/MacKextEthernetTap.cpp | 6 +- osdep/MacKextEthernetTap.hpp | 4 + service/OneService.cpp | 163 ++++++++++++++++++++---- 12 files changed, 392 insertions(+), 182 deletions(-) diff --git a/osdep/BSDEthernetTap.cpp b/osdep/BSDEthernetTap.cpp index b2e1a8760d..9d378b7701 100644 --- a/osdep/BSDEthernetTap.cpp +++ b/osdep/BSDEthernetTap.cpp @@ -39,7 +39,9 @@ #include #include #include +#include +#include #include #include #include @@ -53,6 +55,7 @@ #include "BSDEthernetTap.hpp" #define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv" +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -61,6 +64,7 @@ namespace ZeroTier { BSDEthernetTap::BSDEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -69,6 +73,7 @@ BSDEthernetTap::BSDEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int), void *arg) : _handler(handler), + _concurrency(concurrency), _arg(arg), _nwid(nwid), _mtu(mtu), @@ -195,11 +200,9 @@ BSDEthernetTap::BSDEthernetTap( BSDEthernetTap::~BSDEthernetTap() { ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit - Thread::join(_thread); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); - long cpid = (long)vfork(); if (cpid == 0) { #ifdef ZT_TRACE @@ -211,6 +214,10 @@ BSDEthernetTap::~BSDEthernetTap() int exitcode = -1; ::waitpid(cpid,&exitcode,0); } + Thread::join(_thread); + for (std::thread &t : _rxThreads) { + t.join(); + } } void BSDEthernetTap::setEnabled(bool en) @@ -418,53 +425,84 @@ void BSDEthernetTap::setMtu(unsigned int mtu) void BSDEthernetTap::threadMain() throw() { - fd_set readfds,nullfds; - MAC to,from; - int n,nfds,r; - char getBuf[ZT_MAX_MTU + 64]; + bool _enablePinning = false; + char* envvar = std::getenv("ZT_CPU_PINNING"); + if (envvar) { + int tmp = atoi(envvar); + if (tmp > 0) { + _enablePinning = true; + } + } // Wait for a moment after startup -- wait for Network to finish // constructing itself. Thread::sleep(500); - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning] { + + if (_enablePinning) { + int pinCore = i % _concurrency; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + //int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } + } - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); + uint8_t b[ZT_TAP_BUF_SIZE]; + MAC to, from; + fd_set readfds, nullfds; + int n, nfds, r; - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread - break; + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + + r = 0; - if (FD_ISSET(_fd,&readfds)) { - n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r); - if (n < 0) { - if ((errno != EINTR)&&(errno != ETIMEDOUT)) + for(;;) { + FD_SET(_shutdownSignalPipe[0],&readfds); + FD_SET(_fd,&readfds); + select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); + + if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread break; - } else { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - to.setTo(getBuf,6); - from.setTo(getBuf + 6,6); - unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]); - _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14); - } - r = 0; + if (FD_ISSET(_fd,&readfds)) { + n = (int)::read(_fd,b + r,sizeof(b) - r); + if (n < 0) { + if ((errno != EINTR)&&(errno != ETIMEDOUT)) + break; + } else { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; + + if (_enabled) { + to.setTo(b,6); + from.setTo(b + 6,6); + unsigned int etherType = ntohs(((const uint16_t *)b)[6]); + _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14); + } + + r = 0; + } + } } } - } + })); } } diff --git a/osdep/BSDEthernetTap.hpp b/osdep/BSDEthernetTap.hpp index fc4e4908e9..ecf6caf9bb 100644 --- a/osdep/BSDEthernetTap.hpp +++ b/osdep/BSDEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MulticastGroup.hpp" @@ -34,6 +35,7 @@ class BSDEthernetTap : public EthernetTap public: BSDEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -62,6 +64,7 @@ class BSDEthernetTap : public EthernetTap private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; + unsigned int _concurrency; uint64_t _nwid; Thread _thread; std::string _dev; @@ -73,6 +76,7 @@ class BSDEthernetTap : public EthernetTap volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 445a5fe438..d6c7bd7b22 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -58,6 +58,7 @@ namespace ZeroTier { std::shared_ptr EthernetTap::newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -83,16 +84,16 @@ std::shared_ptr EthernetTap::newInstance( // The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions // (Sierra and earlier) must use the a kernel extension. if (strtol(osrelease,(char **)0,10) < 17) { - return std::shared_ptr(new MacKextEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacKextEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); } else { - return std::shared_ptr(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new MacEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); } } } #endif // __APPLE__ #ifdef __LINUX__ - return std::shared_ptr(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new LinuxEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __LINUX__ #ifdef __WINDOWS__ @@ -126,19 +127,19 @@ std::shared_ptr EthernetTap::newInstance( _comInit = true; } } - return std::shared_ptr(new WindowsEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new WindowsEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __WINDOWS__ #ifdef __FreeBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __FreeBSD__ #ifdef __NetBSD__ - return std::shared_ptr(new NetBSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new NetBSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __NetBSD__ #ifdef __OpenBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __OpenBSD__ #endif // ZT_SDK? diff --git a/osdep/EthernetTap.hpp b/osdep/EthernetTap.hpp index 893e70c340..c5e82470c7 100644 --- a/osdep/EthernetTap.hpp +++ b/osdep/EthernetTap.hpp @@ -33,6 +33,7 @@ class EthernetTap static std::shared_ptr newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index a888db9d90..4919dbd69b 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -60,7 +60,7 @@ #define IFNAMSIZ 16 #endif -#define ZT_TAP_BUF_SIZE 16384 +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -68,7 +68,7 @@ static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC namespace ZeroTier { // determine if we're running a really old linux kernel. -// Kernels in the 2.6.x series don't behave the same when bringing up +// Kernels in the 2.6.x series don't behave the same when bringing up // the tap devices. // // Returns true if the kernel major version is < 3 @@ -111,6 +111,7 @@ static void _base32_5_to_8(const uint8_t *in,char *out) LinuxEthernetTap::LinuxEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -127,6 +128,7 @@ LinuxEthernetTap::LinuxEthernetTap( _fd(0), _enabled(true), _run(true), + _concurrency(concurrency), _lastIfAddrsUpdate(0) { static std::mutex s_tapCreateLock; @@ -220,135 +222,164 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); - _tapReaderThread = std::thread([this]{ - uint8_t b[ZT_TAP_BUF_SIZE]; - fd_set readfds,nullfds; - int n,nfds,r; - std::vector buffers; - struct ifreq ifr; + bool _enablePinning = false; + char* envvar = std::getenv("ZT_CPU_PINNING"); + if (envvar) { + int tmp = atoi(envvar); + if (tmp > 0) { + _enablePinning = true; + } + } - memset(&ifr,0,sizeof(ifr)); - strcpy(ifr.ifr_name,_dev.c_str()); + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning] { + + if (_enablePinning) { + int pinCore = i % _concurrency; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } + } - const int sock = socket(AF_INET,SOCK_DGRAM,0); - if (sock <= 0) - return; + uint8_t b[ZT_TAP_BUF_SIZE]; + fd_set readfds, nullfds; + int n, nfds, r; + if (i == 0) { + struct ifreq ifr; + memset(&ifr, 0, sizeof(ifr)); + strcpy(ifr.ifr_name, _dev.c_str()); + + const int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock <= 0) + return; + + if (ioctl(sock, SIOCGIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; - } + usleep(100000); - usleep(100000); + if (isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } - if (isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); - return; - } + usleep(100000); + } - usleep(100000); - } - - - ifr.ifr_flags |= IFF_MULTICAST; - ifr.ifr_flags |= IFF_UP; - if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + ifr.ifr_flags |= IFF_MULTICAST; + ifr.ifr_flags |= IFF_UP; + if (ioctl(sock, SIOCSIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - usleep(100000); + usleep(100000); + + if (! isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } + + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } + } + + fcntl(_fd, F_SETFL, O_NONBLOCK); - if (!isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; } - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + if (! _run) { return; } - } - fcntl(_fd,F_SETFL,O_NONBLOCK); - - ::close(sock); - - if (!_run) - return; - - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; - - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); - - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) - break; - - if (FD_ISSET(_fd,&readfds)) { - for(;;) { // read until there are no more packets, then return to outer select() loop - n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r); - if (n > 0) { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - //_tapq.post(std::pair(buf,r)); - //buf = nullptr; - MAC to(b, 6),from(b + 6, 6); - unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]); - _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14)); - } + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0], _fd) + 1; + + r = 0; + for (;;) { + FD_SET(_shutdownSignalPipe[0], &readfds); + FD_SET(_fd, &readfds); + select(nfds, &readfds, &nullfds, &nullfds, (struct timeval*)0); + if (FD_ISSET(_shutdownSignalPipe[0], &readfds)) { + break; + } + if (FD_ISSET(_fd, &readfds)) { + for (;;) { + // read until there are no more packets, then return to outer select() loop + n = (int)::read(_fd, b + r, ZT_TAP_BUF_SIZE - r); + if (n > 0) { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; + + if (_enabled) { + MAC to(b, 6), from(b + 6, 6); + unsigned int etherType = Utils::ntoh(((const uint16_t*)b)[6]); + _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void*)(b + 14), (unsigned int)(r - 14)); + } + + r = 0; + } + } + else { r = 0; + break; } - } else { - r = 0; - break; } } } - } - }); + })); + } } LinuxEthernetTap::~LinuxEthernetTap() { _run = false; (void)::write(_shutdownSignalPipe[1],"\0",1); - _tapReaderThread.join(); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); + for (std::thread &t : _rxThreads) { + t.join(); + } } void LinuxEthernetTap::setEnabled(bool en) diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index 424a6d37e1..b694b277c7 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -26,6 +26,7 @@ #include #include "../node/MulticastGroup.hpp" #include "EthernetTap.hpp" +#include "BlockingQueue.hpp" namespace ZeroTier { @@ -34,6 +35,7 @@ class LinuxEthernetTap : public EthernetTap public: LinuxEthernetTap( const char *homePath, + unsigned int _concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -57,9 +59,6 @@ class LinuxEthernetTap : public EthernetTap virtual void setMtu(unsigned int mtu); virtual void setDns(const char *domain, const std::vector &servers) {} - - - private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; @@ -69,13 +68,14 @@ class LinuxEthernetTap : public EthernetTap std::string _dev; std::vector _multicastGroups; unsigned int _mtu; + unsigned int _concurrency; int _fd; int _shutdownSignalPipe[2]; std::atomic_bool _enabled; std::atomic_bool _run; - std::thread _tapReaderThread; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/MacEthernetTap.cpp b/osdep/MacEthernetTap.cpp index 37f27f87a2..8d15b3b208 100644 --- a/osdep/MacEthernetTap.cpp +++ b/osdep/MacEthernetTap.cpp @@ -69,6 +69,7 @@ static bool fethMaxMtuAdjusted = false; MacEthernetTap::MacEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -77,6 +78,7 @@ MacEthernetTap::MacEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *data,unsigned int len), void *arg) : _handler(handler), + _concurrency(concurrency), _arg(arg), _nwid(nwid), _homePath(homePath), @@ -286,6 +288,9 @@ MacEthernetTap::~MacEthernetTap() } Thread::join(_thread); + for (std::thread &t : _rxThreads) { + t.join(); + } } void MacEthernetTap::setEnabled(bool en) { _enabled = en; } @@ -474,17 +479,25 @@ void MacEthernetTap::setMtu(unsigned int mtu) void MacEthernetTap::threadMain() throw() { + Thread::sleep(250); + + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i] { + + fprintf(stderr, "starting thread %d\n", i); + char agentReadBuf[ZT_MACETHERNETTAP_AGENT_READ_BUF_SIZE]; char agentStderrBuf[256]; fd_set readfds,nullfds; MAC to,from; - Thread::sleep(250); - const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1; long agentReadPtr = 0; - fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); - fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); + + if (i == 0) { + fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK); + fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK); + } FD_ZERO(&readfds); FD_ZERO(&nullfds); @@ -533,6 +546,7 @@ void MacEthernetTap::threadMain() */ } } + }));} ::close(_agentStdin); ::close(_agentStdout); diff --git a/osdep/MacEthernetTap.hpp b/osdep/MacEthernetTap.hpp index 8ba378022b..0bb78a79f1 100644 --- a/osdep/MacEthernetTap.hpp +++ b/osdep/MacEthernetTap.hpp @@ -28,6 +28,7 @@ #include #include #include +#include namespace ZeroTier { @@ -36,6 +37,7 @@ class MacEthernetTap : public EthernetTap public: MacEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -67,6 +69,7 @@ class MacEthernetTap : public EthernetTap uint64_t _nwid; Thread _thread; std::string _homePath; + unsigned int _concurrency; std::string _dev; std::vector _multicastGroups; Mutex _putLock; @@ -79,6 +82,7 @@ class MacEthernetTap : public EthernetTap volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index ca75ed0542..6a0dbaf851 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -32,7 +32,7 @@ * All this stuff is basically undocumented. A lot of tracing through * the Darwin/XNU kernel source was required to figure out how to make * this actually work. - * + * * We hope to develop a DriverKit-based driver in the near-mid future to * replace this weird hack, but it works for now through Big Sur in our * testing. diff --git a/osdep/MacKextEthernetTap.cpp b/osdep/MacKextEthernetTap.cpp index fce0c121de..7ac0dac25b 100644 --- a/osdep/MacKextEthernetTap.cpp +++ b/osdep/MacKextEthernetTap.cpp @@ -306,6 +306,7 @@ static Mutex globalTapCreateLock; MacKextEthernetTap::MacKextEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -317,6 +318,7 @@ MacKextEthernetTap::MacKextEthernetTap( _arg(arg), _nwid(nwid), _homePath(homePath), + _concurrency(concurrency), _mtu(mtu), _metric(metric), _fd(0), @@ -447,7 +449,9 @@ MacKextEthernetTap::~MacKextEthernetTap() ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit Thread::join(_thread); - + for (std::thread &t : _rxThreads) { + t.join(); + } ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); diff --git a/osdep/MacKextEthernetTap.hpp b/osdep/MacKextEthernetTap.hpp index 4c61c28435..6d85398744 100644 --- a/osdep/MacKextEthernetTap.hpp +++ b/osdep/MacKextEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MAC.hpp" @@ -36,6 +37,7 @@ class MacKextEthernetTap : public EthernetTap public: MacKextEthernetTap( const char *homePath, + unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -70,11 +72,13 @@ class MacKextEthernetTap : public EthernetTap std::string _homePath; std::string _dev; std::vector _multicastGroups; + unsigned int _concurrency; unsigned int _mtu; unsigned int _metric; int _fd; int _shutdownSignalPipe[2]; volatile bool _enabled; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/service/OneService.cpp b/service/OneService.cpp index 8a0aee2945..320e48158f 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -16,7 +16,6 @@ #include #include #include - #include #include #include @@ -26,6 +25,11 @@ #include #include +#ifdef __FreeBSD__ +#include +#include +#endif + #include "../version.h" #include "../include/ZeroTierOne.h" @@ -758,7 +762,7 @@ struct TcpConnection Mutex writeq_m; }; -struct OneServiceIncomingPacket +struct PacketRecord { uint64_t now; int64_t sock; @@ -785,14 +789,20 @@ class OneServiceImpl : public OneService SoftwareUpdater *_updater; bool _updateAutoApply; - httplib::Server _controlPlane; + httplib::Server _controlPlane; httplib::Server _controlPlaneV6; - std::thread _serverThread; + std::thread _serverThread; std::thread _serverThreadV6; bool _serverThreadRunning; bool _serverThreadRunningV6; - bool _allowTcpFallbackRelay; + unsigned int _rxThreadCount; + BlockingQueue _rxPacketQueue; + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m,_rxPacketThreads_m; + + bool _allowTcpFallbackRelay; bool _forceTcpRelay; bool _allowSecondaryPort; @@ -842,8 +852,6 @@ class OneServiceImpl : public OneService // Deadline for the next background task service function volatile int64_t _nextBackgroundTaskDeadline; - - std::map _nets; Mutex _nets_m; @@ -890,9 +898,9 @@ class OneServiceImpl : public OneService ,_node((Node *)0) ,_updater((SoftwareUpdater *)0) ,_updateAutoApply(false) - ,_controlPlane() + ,_controlPlane() ,_controlPlaneV6() - ,_serverThread() + ,_serverThread() ,_serverThreadV6() ,_serverThreadRunning(false) ,_serverThreadRunningV6(false) @@ -926,9 +934,79 @@ class OneServiceImpl : public OneService _ports[1] = 0; _ports[2] = 0; - prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); - prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); - prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); + bool _enablePinning = false; + char* pinningVar = std::getenv("ZT_CPU_PINNING"); + if (pinningVar) { + int tmp = atoi(pinningVar); + if (tmp > 0) { + _enablePinning = true; + } + } + char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _rxThreadCount = tmp; + } + else { + _rxThreadCount = std::thread::hardware_concurrency(); + } + } + else { + _rxThreadCount = std::thread::hardware_concurrency(); + } + for (unsigned int i = 0; i < _rxThreadCount; ++i) { + _rxPacketThreads.push_back(std::thread([this, i]() { + +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + int pinCore = i % _rxThreadCount; + fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); +#endif +#ifdef __LINUX__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#elif __FreeBSD__ + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); +#endif +#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ + if (rc != 0) + { + fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } +#endif + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueue.get(packet)) { + break; + } + if (! packet) { + break; + } + const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + } + })); + } + + prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); + prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); + prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -940,20 +1018,34 @@ class OneServiceImpl : public OneService #ifdef __WINDOWS__ WinFWHelper::removeICMPRules(); #endif + + _rxPacketQueue.stop(); + _rxPacketThreads_m.lock(); + for(auto t=_rxPacketThreads.begin();t!=_rxPacketThreads.end();++t) { + t->join(); + } + _rxPacketThreads_m.unlock(); _binder.closeAll(_phy); #if ZT_VAULT_SUPPORT curl_global_cleanup(); #endif - _controlPlane.stop(); + _controlPlane.stop(); if (_serverThreadRunning) { - _serverThread.join(); + _serverThread.join(); } _controlPlaneV6.stop(); if (_serverThreadRunningV6) { _serverThreadV6.join(); } + _rxPacketVector_m.lock(); + while (!_rxPacketVector.empty()) { + delete _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + #ifdef ZT_USE_MINIUPNPC delete _portMapper; @@ -1270,6 +1362,9 @@ class OneServiceImpl : public OneService const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 500; clockShouldBe = now + (int64_t)delay; _phy.poll(delay); + + + } } catch (std::exception &e) { Mutex::Lock _l(_termReason_m); @@ -2756,25 +2851,37 @@ class OneServiceImpl : public OneService // Handlers for Node and Phy<> callbacks // ========================================================================= - inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len) + + + + inline void phyOnDatagram(PhySocket* sock, void** uptr, const struct sockaddr* localAddr, const struct sockaddr* from, void* data, unsigned long len) { if (_forceTcpRelay) { return; } - Metrics::udp_recv += len; + Metrics::udp_recv += len; const uint64_t now = OSUtils::now(); - if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { + if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; - } - const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); } + + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->sock = reinterpret_cast(sock); + packet->now = now; + memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); + packet->size = (unsigned int)len; + memcpy(packet->data, data, len); + _rxPacketQueue.postLimit(packet, 256 * _rxThreadCount); } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) @@ -2996,6 +3103,7 @@ class OneServiceImpl : public OneService n.setTap(EthernetTap::newInstance( nullptr, _homePath.c_str(), + _rxThreadCount, MAC(nwc->mac), nwc->mtu, (unsigned int)ZT_IF_METRIC, @@ -3509,8 +3617,9 @@ class OneServiceImpl : public OneService inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { NetworkState *n = reinterpret_cast(*nuptr); - if ((!n)||(!n->tap())) + if ((!n)||(!n->tap())) { return; + } n->tap()->put(MAC(sourceMac),MAC(destMac),etherType,data,len); }