Skip to content

Commit

Permalink
Merge pull request #2234 from zerotier/jh-zerotier-multithreaded
Browse files Browse the repository at this point in the history
Add multi-core concurrent packet processing
  • Loading branch information
adamierymenko authored Sep 11, 2024
2 parents 2522d29 + 6bc785e commit 4a485df
Show file tree
Hide file tree
Showing 19 changed files with 570 additions and 232 deletions.
120 changes: 60 additions & 60 deletions node/IncomingPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "Path.hpp"
#include "Bond.hpp"
#include "Metrics.hpp"
#include "PacketMultiplexer.hpp"

namespace ZeroTier {

Expand Down Expand Up @@ -792,66 +793,65 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
{
Metrics::pkt_frame_in++;
int32_t _flowId = ZT_QOS_NO_FLOW;
if (peer->flowHashingSupported()) {
if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;

if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
switch(proto) {
case 0x01: // ICMP
//flowId = 0x01;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (headerLen + 4)) {
unsigned int pos = headerLen + 0;
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
}

if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;

if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
switch(proto) {
case 0x01: // ICMP
//flowId = 0x01;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (headerLen + 4)) {
unsigned int pos = headerLen + 0;
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
}
}

if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
unsigned int pos;
unsigned int proto;
_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
switch(proto) {
case 0x3A: // ICMPv6
//flowId = 0x3A;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (pos + 4)) {
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
default:
break;
}
if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
uint16_t srcPort = 0;
uint16_t dstPort = 0;
unsigned int pos;
unsigned int proto;
_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
switch(proto) {
case 0x3A: // ICMPv6
//flowId = 0x3A;
break;
// All these start with 16-bit source and destination port in that order
case 0x06: // TCP
case 0x11: // UDP
case 0x84: // SCTP
case 0x88: // UDPLite
if (frameLen > (pos + 4)) {
srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
pos++;
dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
_flowId = dstPort ^ srcPort ^ proto;
}
break;
default:
break;
}
}
}
Expand All @@ -868,7 +868,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) {
RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
}
}
} else {
Expand Down Expand Up @@ -941,7 +941,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
}
// fall through -- 2 means accept regardless of bridging checks or other restrictions
case 2:
RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
break;
}
}
Expand Down
19 changes: 17 additions & 2 deletions node/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "Network.hpp"
#include "Trace.hpp"
#include "Metrics.hpp"
#include "PacketMultiplexer.hpp"

// FIXME: remove this suppression and actually fix warnings
#ifdef __GNUC__
Expand Down Expand Up @@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0);
const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0);
const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0);
const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0);

m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bc));
m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms));
if (!m) {
throw std::bad_alloc();
}
Expand All @@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
RR->sa = new (m) SelfAwareness(RR);
m += sas;
RR->bc = new (m) Bond(RR);
m += bcs;
RR->pm = new (m) PacketMultiplexer(RR);
} catch ( ... ) {
if (RR->sa) {
RR->sa->~SelfAwareness();
Expand All @@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
if (RR->bc) {
RR->bc->~Bond();
}
if (RR->pm) {
RR->pm->~PacketMultiplexer();
}
::free(m);
throw;
}
Expand Down Expand Up @@ -191,6 +198,9 @@ Node::~Node()
if (RR->bc) {
RR->bc->~Bond();
}
if (RR->pm) {
RR->pm->~PacketMultiplexer();
}
::free(RR->rtmem);
}

Expand Down Expand Up @@ -230,6 +240,11 @@ ZT_ResultCode Node::processVirtualNetworkFrame(
}
}

void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled)
{
RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled);
}

// Closure used to ping upstream and active/online peers
class _PingPeersThatNeedPing
{
Expand Down
5 changes: 4 additions & 1 deletion node/Node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ class Node : public NetworkController::Sender
return _lowBandwidthMode;
}

private:
void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled);


public:
RuntimeEnvironment _RR;
RuntimeEnvironment *RR;
void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P
Expand Down
122 changes: 122 additions & 0 deletions node/PacketMultiplexer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (c)2013-2021 ZeroTier, Inc.
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
*
* Change Date: 2026-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
/****/

#include "PacketMultiplexer.hpp"

#include "Node.hpp"
#include "RuntimeEnvironment.hpp"
#include "Constants.hpp"

#include <stdio.h>
#include <stdlib.h>

namespace ZeroTier {

PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
{
RR = renv;
};

void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
{
#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
return;
#endif

if (!_enabled) {
RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
return;
}

PacketRecord* packet;
_rxPacketVector_m.lock();
if (_rxPacketVector.empty()) {
packet = new PacketRecord;
}
else {
packet = _rxPacketVector.back();
_rxPacketVector.pop_back();
}
_rxPacketVector_m.unlock();

packet->tPtr = tPtr;
packet->nwid = nwid;
packet->nuptr = nuptr;
packet->source = source.toInt();
packet->dest = dest.toInt();
packet->etherType = etherType;
packet->vlanId = vlanId;
packet->len = len;
packet->flowId = flowId;
memcpy(packet->data, data, len);

int bucket = flowId % _concurrency;
_rxPacketQueues[bucket]->postLimit(packet, 2048);
}

void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled)
{
#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
return;
#endif
_enabled = true;
_concurrency = concurrency;
bool _enablePinning = cpuPinningEnabled;

for (unsigned int i = 0; i < _concurrency; ++i) {
fprintf(stderr, "Reserved queue for thread %d\n", i);
_rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());
}

// Each thread picks from its own queue to feed into the core
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxThreads.push_back(std::thread([this, i, _enablePinning]() {
fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);

PacketRecord* packet = nullptr;
for (;;) {
if (! _rxPacketQueues[i]->get(packet)) {
break;
}
if (! packet) {
break;
}

// fprintf(stderr, "popped packet from queue %d\n", i);

MAC sourceMac = MAC(packet->source);
MAC destMac = MAC(packet->dest);

RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len);
{
Mutex::Lock l(_rxPacketVector_m);
_rxPacketVector.push_back(packet);
}
/*
if (ZT_ResultCode_isFatal(err)) {
char tmp[256];
OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
break;
}
*/
}
}));
}
}

} // namespace ZeroTier
Loading

0 comments on commit 4a485df

Please sign in to comment.