Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZeroTier on multiple threads #2233

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a6b764b
move zeroidc to rustybits folder
glimberg Aug 3, 2023
4674bb5
add smee client
glimberg Aug 4, 2023
f9af9a1
rusftormat zeroidc
glimberg Aug 4, 2023
36be14d
fix zeroidc include path
glimberg Aug 4, 2023
d2aeff6
another mac fix
glimberg Aug 4, 2023
d71d051
instantiate smee client
glimberg Aug 4, 2023
5e89d5a
fix a typo
glimberg Aug 4, 2023
581489f
this should theoretically send the new member notification
glimberg Aug 4, 2023
a9c8307
should only run this if smee is configured
glimberg Aug 4, 2023
98360d9
attempt at fixing the windows build
glimberg Aug 4, 2023
04a3206
fix rust cache in github actions
glimberg Aug 4, 2023
0dc92c2
update openidconnect rust lib
glimberg Aug 7, 2023
88b9626
look up hook URLs dynamically
glimberg Aug 7, 2023
802c990
limit to hooks with hook_type = NETWORK_JOIN enabled
glimberg Aug 8, 2023
714ef59
Merge branch 'dev' into temporal
glimberg Aug 15, 2023
5becb41
pass hook_id instead of url
glimberg Aug 15, 2023
69c590f
Merge branch 'dev' into temporal
glimberg Aug 28, 2023
d865c42
get connected to temporal
glimberg Aug 29, 2023
60fb8c9
fully wire up temporal. add startup script test to ensure temporal i…
glimberg Aug 29, 2023
d322f33
simplify hook firing
glimberg Aug 30, 2023
125257f
Merge branch 'dev' into temporal
glimberg Sep 6, 2023
e3d1565
Merge branch 'dev' into temporal
glimberg Sep 8, 2023
f03aae7
Fix test that was always true
bostick Sep 14, 2023
b7fb4ee
Merge pull request #2127 from zerotier/brenton/fix-always-true
joseph-henry Sep 14, 2023
9ae8b0b
Merge pull request #2128 from zerotier/1.12.2
adamierymenko Sep 14, 2023
e7ed1e4
Merge branch 'dev' into temporal
glimberg Oct 18, 2023
c89683f
update rust dependencies
glimberg Oct 26, 2023
f89cde8
Merge pull request #2163 from zerotier:temporal
glimberg Oct 30, 2023
85cab3d
remove some debug logging
glimberg Oct 30, 2023
0088cef
Merge pull request #2164 from zerotier/smee
glimberg Oct 30, 2023
2fd50b1
check hooks are enabled before firing
glimberg Oct 31, 2023
8b5ac93
Update SECURITY.md
glimberg Nov 1, 2023
1bd2fec
Merge pull request #2166 from zerotier/glimberg-patch-1
glimberg Nov 1, 2023
a477688
Remove extra '/'
bostick Nov 3, 2023
2e882b4
Merge branch 'dev' into network-check
glimberg Nov 16, 2023
d37dce5
don't require temporal for central controller startup
glimberg Nov 17, 2023
663ed73
Merge pull request #2165 from zerotier/network-check
glimberg Nov 20, 2023
2d89f07
Add multi-core concurrent packet processing
joseph-henry Feb 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ jobs:
uses: Swatinem/rust-cache@v2
continue-on-error: false
with:
key: ${{ runner.os }}-cargo-${{ hashFiles('zeroidc//Cargo.lock') }}
key: ${{ runner.os }}-cargo-${{ hashFiles('rustybits//Cargo.lock') }}
shared-key: ${{ runner.os }}-cargo-
workspaces: |
zeroidc/
rustybits/

- name: make
run: make
Expand Down Expand Up @@ -54,10 +54,10 @@ jobs:
uses: Swatinem/rust-cache@v2
continue-on-error: false
with:
key: ${{ runner.os }}-cargo-${{ hashFiles('zeroidc//Cargo.lock') }}
key: ${{ runner.os }}-cargo-${{ hashFiles('rustybits//Cargo.lock') }}
shared-key: ${{ runner.os }}-cargo-
workspaces: |
zeroidc/
rustybits/

- name: make
run: make
Expand Down Expand Up @@ -86,10 +86,10 @@ jobs:
uses: Swatinem/rust-cache@v2
continue-on-error: false
with:
key: ${{ runner.os }}-cargo-${{ hashFiles('zeroidc//Cargo.lock') }}
key: ${{ runner.os }}-cargo-${{ hashFiles('rustybits//Cargo.lock') }}
shared-key: ${{ runner.os }}-cargo-
workspaces: |
zeroidc/
rustybits/

- name: setup msbuild
uses: microsoft/[email protected]
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,4 @@ __pycache__
*_source.tar.bz2
snap/.snapcraft
tcp-proxy/tcp-proxy
rustybits/target
6 changes: 3 additions & 3 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ The following versions of ZeroTier One receive security updates

| Version | Supported |
| ------- | ------------------ |
| 1.10.x | :white_check_mark: |
| 1.8.x | :white_check_mark: |
| < 1.8.0 | :x: |
| 1.12.x | :white_check_mark: |
| 1.10.x | :white_check_mark: |
| < 1.10.0 | :x: |

## Reporting a Vulnerability

Expand Down
160 changes: 127 additions & 33 deletions controller/PostgreSQL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "../version.h"
#include "Redis.hpp"

#include <smeeclient.h>

#include <libpq-fe.h>
#include <sstream>
#include <iomanip>
Expand Down Expand Up @@ -159,6 +161,8 @@ using Attrs = std::vector<std::pair<std::string, std::string>>;
using Item = std::pair<std::string, Attrs>;
using ItemStream = std::vector<Item>;



PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc)
: DB()
, _pool()
Expand All @@ -173,6 +177,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
, _redis(NULL)
, _cluster(NULL)
, _redisMemberStatus(false)
, _smee(NULL)
{
char myAddress[64];
_myAddressStr = myId.address().toString(myAddress);
Expand Down Expand Up @@ -248,10 +253,17 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
_commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
}
_onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);

configureSmee();
}

PostgreSQL::~PostgreSQL()
{
if (_smee != NULL) {
smeeclient::smee_client_delete(_smee);
_smee = NULL;
}

_run = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(100));

Expand All @@ -265,6 +277,31 @@ PostgreSQL::~PostgreSQL()
_onlineNotificationThread.join();
}

void PostgreSQL::configureSmee()
{
const char *TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME";
const char *TEMPORAL_HOST = "ZT_TEMPORAL_HOST";
const char *TEMPORAL_PORT = "ZT_TEMPORAL_PORT";
const char *TEMPORAL_NAMESPACE = "ZT_TEMPORAL_NAMESPACE";
const char *SMEE_TASK_QUEUE = "ZT_SMEE_TASK_QUEUE";

const char *scheme = getenv(TEMPORAL_SCHEME);
if (scheme == NULL) {
scheme = "http";
}
const char *host = getenv(TEMPORAL_HOST);
const char *port = getenv(TEMPORAL_PORT);
const char *ns = getenv(TEMPORAL_NAMESPACE);
const char *task_queue = getenv(SMEE_TASK_QUEUE);

if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) {
fprintf(stderr, "creating smee client\n");
std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
this->_smee = smeeclient::smee_client_new(hostPort.c_str(), ns, task_queue);
} else {
fprintf(stderr, "Smee client not configured\n");
}
}

bool PostgreSQL::waitForReady()
{
Expand Down Expand Up @@ -1306,40 +1343,72 @@ void PostgreSQL::commitThread()
continue;
}

pqxx::result res = w.exec_params0(
"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
"VALUES ($1, $2, $3, $4, $5, $6, "
"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
"$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
"active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
"identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
"last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
"remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
"revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
"v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
memberId,
networkId,
(bool)config["activeBridge"],
(bool)config["authorized"],
OSUtils::jsonDump(config["capabilities"], -1),
OSUtils::jsonString(config["identity"], ""),
(uint64_t)config["lastAuthorizedTime"],
(uint64_t)config["lastDeauthorizedTime"],
(bool)config["noAutoAssignIps"],
(int)config["remoteTraceLevel"],
target,
(uint64_t)config["revision"],
OSUtils::jsonDump(config["tags"], -1),
(int)config["vMajor"],
(int)config["vMinor"],
(int)config["vRev"],
(int)config["vProto"]);

pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId);
int membercount = mrow[0].as<int>();

bool isNewMember = false;
if (membercount == 0) {
// new member
isNewMember = true;
pqxx::result res = w.exec_params0(
"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
"VALUES ($1, $2, $3, $4, $5, $6, "
"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
"$9, $10, $11, $12, $13, $14, $15, $16, $17)",
memberId,
networkId,
(bool)config["activeBridge"],
(bool)config["authorized"],
OSUtils::jsonDump(config["capabilities"], -1),
OSUtils::jsonString(config["identity"], ""),
(uint64_t)config["lastAuthorizedTime"],
(uint64_t)config["lastDeauthorizedTime"],
(bool)config["noAutoAssignIps"],
(int)config["remoteTraceLevel"],
target,
(uint64_t)config["revision"],
OSUtils::jsonDump(config["tags"], -1),
(int)config["vMajor"],
(int)config["vMinor"],
(int)config["vRev"],
(int)config["vProto"]);
} else {
// existing member
pqxx::result res = w.exec_params0(
"UPDATE ztc_member "
"SET active_bridge = $3, authorized = $4, capabilities = $5, identity = $6, "
"last_authorized_time = TO_TIMESTAMP($7::double precision/1000), "
"last_deauthorized_time = TO_TIMESTAMP($8::double precision/1000), "
"no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, "
"revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 "
"WHERE id = $1 AND network_id = $2",
memberId,
networkId,
(bool)config["activeBridge"],
(bool)config["authorized"],
OSUtils::jsonDump(config["capabilities"], -1),
OSUtils::jsonString(config["identity"], ""),
(uint64_t)config["lastAuthorizedTime"],
(uint64_t)config["lastDeauthorizedTime"],
(bool)config["noAutoAssignIps"],
(int)config["remoteTraceLevel"],
target,
(uint64_t)config["revision"],
OSUtils::jsonDump(config["tags"], -1),
(int)config["vMajor"],
(int)config["vMinor"],
(int)config["vRev"],
(int)config["vProto"]
);
}

res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
memberId, networkId);
if (!isNewMember) {
pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
memberId, networkId);
}

std::vector<std::string> assignments;
bool ipAssignError = false;
Expand All @@ -1350,7 +1419,7 @@ void PostgreSQL::commitThread()
continue;
}

res = w.exec_params0(
pqxx::result res = w.exec_params0(
"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING",
memberId, networkId, addr);

Expand All @@ -1366,6 +1435,24 @@ void PostgreSQL::commitThread()

w.commit();

if (_smee != NULL && isNewMember) {
pqxx::row row = w.exec_params1(
"SELECT "
" count(h.hook_id) "
"FROM "
" ztc_hook h "
" INNER JOIN ztc_org o ON o.org_id = h.org_id "
" INNER JOIN ztc_network n ON n.owner_id = o.owner_id "
" WHERE "
"n.id = $1 ",
networkId
);
int64_t hookCount = row[0].as<int64_t>();
if (hookCount > 0) {
notifyNewMember(networkId, memberId);
}
}

const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
if (nwidInt && memberidInt) {
Expand Down Expand Up @@ -1609,6 +1696,13 @@ void PostgreSQL::commitThread()
fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
}

void PostgreSQL::notifyNewMember(const std::string &networkID, const std::string &memberID) {
smeeclient::smee_client_notify_network_joined(
_smee,
networkID.c_str(),
memberID.c_str());
}

void PostgreSQL::onlineNotificationThread()
{
waitForReady();
Expand Down
9 changes: 9 additions & 0 deletions controller/PostgreSQL.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ extern "C" {
typedef struct pg_conn PGconn;
}

namespace smeeclient {
struct SmeeClient;
}

namespace ZeroTier {

struct RedisConfig;
Expand Down Expand Up @@ -144,6 +148,9 @@ class PostgreSQL : public DB
uint64_t _doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId,
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline);

void configureSmee();
void notifyNewMember(const std::string &networkID, const std::string &memberID);

enum OverrideMode {
ALLOW_PGBOUNCER_OVERRIDE = 0,
NO_OVERRIDE = 1
Expand Down Expand Up @@ -178,6 +185,8 @@ class PostgreSQL : public DB
std::shared_ptr<sw::redis::Redis> _redis;
std::shared_ptr<sw::redis::RedisCluster> _cluster;
bool _redisMemberStatus;

smeeclient::SmeeClient *_smee;
};

} // namespace ZeroTier
Expand Down
3 changes: 2 additions & 1 deletion ext/central-controller-docker/Dockerfile.builder
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN apt -y install \
postgresql-client-common \
curl \
google-perftools \
libgoogle-perftools-dev
libgoogle-perftools-dev \
protobuf-compiler

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
3 changes: 2 additions & 1 deletion ext/central-controller-docker/Dockerfile.run_base
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM ubuntu:jammy
RUN apt update && apt upgrade -y

RUN apt -y install \
netcat \
postgresql-client \
postgresql-client-common \
libjemalloc2 \
Expand All @@ -11,4 +12,4 @@ RUN apt -y install \
binutils \
linux-tools-gke \
perf-tools-unstable \
google-perftools
google-perftools
9 changes: 9 additions & 0 deletions ext/central-controller-docker/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ else
done
fi

if [ -n "$ZT_TEMPORAL_HOST" ] && [ -n "$ZT_TEMPORAL_PORT" ]; then
echo "waiting for temporal..."
while ! nc -z ${ZT_TEMPORAL_HOST} ${ZT_TEMPORAL_PORT}; do
echo "waiting...";
sleep 1;
done
echo "Temporal is up"
fi

export GLIBCXX_FORCE_NEW=1
export GLIBCPP_FORCE_NEW=1
export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so.2"
Expand Down
Loading
Loading