Skip to content

Commit

Permalink
Add multi-core concurrent packet processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Henry committed Feb 23, 2024
1 parent 663ed73 commit 4b9406a
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 182 deletions.
112 changes: 75 additions & 37 deletions osdep/BSDEthernetTap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
#include <net/if_dl.h>
#include <net/if_media.h>
#include <net/route.h>
#include <pthread_np.h>

#include <sched.h>
#include <string>
#include <map>
#include <set>
Expand All @@ -53,6 +55,7 @@
#include "BSDEthernetTap.hpp"

#define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv"
#define ZT_TAP_BUF_SIZE (1024 * 16)

// ff:ff:ff:ff:ff:ff with no ADI
static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0);
Expand All @@ -61,6 +64,7 @@ namespace ZeroTier {

BSDEthernetTap::BSDEthernetTap(
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand All @@ -69,6 +73,7 @@ BSDEthernetTap::BSDEthernetTap(
void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int),
void *arg) :
_handler(handler),
_concurrency(concurrency),
_arg(arg),
_nwid(nwid),
_mtu(mtu),
Expand Down Expand Up @@ -195,11 +200,9 @@ BSDEthernetTap::BSDEthernetTap(
BSDEthernetTap::~BSDEthernetTap()
{
::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit
Thread::join(_thread);
::close(_fd);
::close(_shutdownSignalPipe[0]);
::close(_shutdownSignalPipe[1]);

long cpid = (long)vfork();
if (cpid == 0) {
#ifdef ZT_TRACE
Expand All @@ -211,6 +214,10 @@ BSDEthernetTap::~BSDEthernetTap()
int exitcode = -1;
::waitpid(cpid,&exitcode,0);
}
Thread::join(_thread);
for (std::thread &t : _rxThreads) {
t.join();
}
}

void BSDEthernetTap::setEnabled(bool en)
Expand Down Expand Up @@ -418,53 +425,84 @@ void BSDEthernetTap::setMtu(unsigned int mtu)
void BSDEthernetTap::threadMain()
throw()
{
fd_set readfds,nullfds;
MAC to,from;
int n,nfds,r;
char getBuf[ZT_MAX_MTU + 64];
bool _enablePinning = false;
char* envvar = std::getenv("ZT_CPU_PINNING");
if (envvar) {
int tmp = atoi(envvar);
if (tmp > 0) {
_enablePinning = true;
}
}

// Wait for a moment after startup -- wait for Network to finish
// constructing itself.
Thread::sleep(500);

FD_ZERO(&readfds);
FD_ZERO(&nullfds);
nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxThreads.push_back(std::thread([this, i, _enablePinning] {

if (_enablePinning) {
int pinCore = i % _concurrency;
fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore);
pthread_t self = pthread_self();
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(pinCore, &cpuset);
//int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset);
int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
if (rc != 0)
{
fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
exit(1);
}
}

r = 0;
for(;;) {
FD_SET(_shutdownSignalPipe[0],&readfds);
FD_SET(_fd,&readfds);
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
uint8_t b[ZT_TAP_BUF_SIZE];
MAC to, from;
fd_set readfds, nullfds;
int n, nfds, r;

if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
break;
FD_ZERO(&readfds);
FD_ZERO(&nullfds);
nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;

r = 0;

if (FD_ISSET(_fd,&readfds)) {
n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r);
if (n < 0) {
if ((errno != EINTR)&&(errno != ETIMEDOUT))
for(;;) {
FD_SET(_shutdownSignalPipe[0],&readfds);
FD_SET(_fd,&readfds);
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);

if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
break;
} else {
// Some tap drivers like to send the ethernet frame and the
// payload in two chunks, so handle that by accumulating
// data until we have at least a frame.
r += n;
if (r > 14) {
if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
r = _mtu + 14;

if (_enabled) {
to.setTo(getBuf,6);
from.setTo(getBuf + 6,6);
unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]);
_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14);
}

r = 0;
if (FD_ISSET(_fd,&readfds)) {
n = (int)::read(_fd,b + r,sizeof(b) - r);
if (n < 0) {
if ((errno != EINTR)&&(errno != ETIMEDOUT))
break;
} else {
// Some tap drivers like to send the ethernet frame and the
// payload in two chunks, so handle that by accumulating
// data until we have at least a frame.
r += n;
if (r > 14) {
if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
r = _mtu + 14;

if (_enabled) {
to.setTo(b,6);
from.setTo(b + 6,6);
unsigned int etherType = ntohs(((const uint16_t *)b)[6]);
_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14);
}

r = 0;
}
}
}
}
}
}));
}
}

Expand Down
4 changes: 4 additions & 0 deletions osdep/BSDEthernetTap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <vector>
#include <stdexcept>
#include <thread>

#include "../node/Constants.hpp"
#include "../node/MulticastGroup.hpp"
Expand All @@ -34,6 +35,7 @@ class BSDEthernetTap : public EthernetTap
public:
BSDEthernetTap(
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand Down Expand Up @@ -62,6 +64,7 @@ class BSDEthernetTap : public EthernetTap
private:
void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int);
void *_arg;
unsigned int _concurrency;
uint64_t _nwid;
Thread _thread;
std::string _dev;
Expand All @@ -73,6 +76,7 @@ class BSDEthernetTap : public EthernetTap
volatile bool _enabled;
mutable std::vector<InetAddress> _ifaddrs;
mutable uint64_t _lastIfAddrsUpdate;
std::vector<std::thread> _rxThreads;
};

} // namespace ZeroTier
Expand Down
15 changes: 8 additions & 7 deletions osdep/EthernetTap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace ZeroTier {
std::shared_ptr<EthernetTap> EthernetTap::newInstance(
const char *tapDeviceType, // OS-specific, NULL for default
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand All @@ -83,16 +84,16 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
// The "feth" virtual Ethernet device type appeared in Darwin 17.x.x. Older versions
// (Sierra and earlier) must use the a kernel extension.
if (strtol(osrelease,(char **)0,10) < 17) {
return std::shared_ptr<EthernetTap>(new MacKextEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new MacKextEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
} else {
return std::shared_ptr<EthernetTap>(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new MacEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
}
}
}
#endif // __APPLE__

#ifdef __LINUX__
return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __LINUX__

#ifdef __WINDOWS__
Expand Down Expand Up @@ -126,19 +127,19 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
_comInit = true;
}
}
return std::shared_ptr<EthernetTap>(new WindowsEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new WindowsEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __WINDOWS__

#ifdef __FreeBSD__
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __FreeBSD__

#ifdef __NetBSD__
return std::shared_ptr<EthernetTap>(new NetBSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new NetBSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __NetBSD__

#ifdef __OpenBSD__
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __OpenBSD__

#endif // ZT_SDK?
Expand Down
1 change: 1 addition & 0 deletions osdep/EthernetTap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class EthernetTap
static std::shared_ptr<EthernetTap> newInstance(
const char *tapDeviceType, // OS-specific, NULL for default
const char *homePath,
unsigned int concurrency,
const MAC &mac,
unsigned int mtu,
unsigned int metric,
Expand Down
Loading

0 comments on commit 4b9406a

Please sign in to comment.