Skip to content

Commit

Permalink
Merge pull request #2163 from zerotier:temporal
Browse files Browse the repository at this point in the history
Temporal integration with hosted controllers
  • Loading branch information
glimberg authored Oct 30, 2023
2 parents 9ae8b0b + c89683f commit f89cde8
Show file tree
Hide file tree
Showing 30 changed files with 4,335 additions and 1,993 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ jobs:
uses: Swatinem/rust-cache@v2
continue-on-error: false
with:
key: ${{ runner.os }}-cargo-${{ hashFiles('zeroidc//Cargo.lock') }}
key: ${{ runner.os }}-cargo-${{ hashFiles('rustybits//Cargo.lock') }}
shared-key: ${{ runner.os }}-cargo-
workspaces: |
zeroidc/
rustybits/
- name: make
run: make
Expand Down Expand Up @@ -54,10 +54,10 @@ jobs:
uses: Swatinem/rust-cache@v2
continue-on-error: false
with:
key: ${{ runner.os }}-cargo-${{ hashFiles('zeroidc//Cargo.lock') }}
key: ${{ runner.os }}-cargo-${{ hashFiles('rustybits//Cargo.lock') }}
shared-key: ${{ runner.os }}-cargo-
workspaces: |
zeroidc/
rustybits/
- name: make
run: make
Expand Down Expand Up @@ -86,10 +86,10 @@ jobs:
uses: Swatinem/rust-cache@v2
continue-on-error: false
with:
key: ${{ runner.os }}-cargo-${{ hashFiles('zeroidc//Cargo.lock') }}
key: ${{ runner.os }}-cargo-${{ hashFiles('rustybits//Cargo.lock') }}
shared-key: ${{ runner.os }}-cargo-
workspaces: |
zeroidc/
rustybits/
- name: setup msbuild
uses: microsoft/[email protected]
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,4 @@ __pycache__
*_source.tar.bz2
snap/.snapcraft
tcp-proxy/tcp-proxy
rustybits/target
153 changes: 120 additions & 33 deletions controller/PostgreSQL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "../version.h"
#include "Redis.hpp"

#include <smeeclient.h>

#include <libpq-fe.h>
#include <sstream>
#include <iomanip>
Expand Down Expand Up @@ -159,6 +161,8 @@ using Attrs = std::vector<std::pair<std::string, std::string>>;
using Item = std::pair<std::string, Attrs>;
using ItemStream = std::vector<Item>;



PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc)
: DB()
, _pool()
Expand All @@ -173,6 +177,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
, _redis(NULL)
, _cluster(NULL)
, _redisMemberStatus(false)
, _smee(NULL)
{
char myAddress[64];
_myAddressStr = myId.address().toString(myAddress);
Expand Down Expand Up @@ -248,10 +253,17 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
_commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
}
_onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);

configureSmee();
}

PostgreSQL::~PostgreSQL()
{
if (_smee != NULL) {
smeeclient::smee_client_delete(_smee);
_smee = NULL;
}

_run = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(100));

Expand All @@ -265,6 +277,31 @@ PostgreSQL::~PostgreSQL()
_onlineNotificationThread.join();
}

void PostgreSQL::configureSmee()
{
const char *TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME";
const char *TEMPORAL_HOST = "ZT_TEMPORAL_HOST";
const char *TEMPORAL_PORT = "ZT_TEMPORAL_PORT";
const char *TEMPORAL_NAMESPACE = "ZT_TEMPORAL_NAMESPACE";
const char *SMEE_TASK_QUEUE = "ZT_SMEE_TASK_QUEUE";

const char *scheme = getenv(TEMPORAL_SCHEME);
if (scheme == NULL) {
scheme = "http";
}
const char *host = getenv(TEMPORAL_HOST);
const char *port = getenv(TEMPORAL_PORT);
const char *ns = getenv(TEMPORAL_NAMESPACE);
const char *task_queue = getenv(SMEE_TASK_QUEUE);

if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) {
fprintf(stderr, "creating smee client\n");
std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
this->_smee = smeeclient::smee_client_new(hostPort.c_str(), ns, task_queue);
} else {
fprintf(stderr, "Smee client not configured\n");
}
}

bool PostgreSQL::waitForReady()
{
Expand Down Expand Up @@ -1306,40 +1343,72 @@ void PostgreSQL::commitThread()
continue;
}

pqxx::result res = w.exec_params0(
"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
"VALUES ($1, $2, $3, $4, $5, $6, "
"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
"$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
"active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
"identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
"last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
"remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
"revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
"v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
memberId,
networkId,
(bool)config["activeBridge"],
(bool)config["authorized"],
OSUtils::jsonDump(config["capabilities"], -1),
OSUtils::jsonString(config["identity"], ""),
(uint64_t)config["lastAuthorizedTime"],
(uint64_t)config["lastDeauthorizedTime"],
(bool)config["noAutoAssignIps"],
(int)config["remoteTraceLevel"],
target,
(uint64_t)config["revision"],
OSUtils::jsonDump(config["tags"], -1),
(int)config["vMajor"],
(int)config["vMinor"],
(int)config["vRev"],
(int)config["vProto"]);

pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId);
int membercount = mrow[0].as<int>();

bool isNewMember = false;
if (membercount == 0) {
// new member
isNewMember = true;
pqxx::result res = w.exec_params0(
"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
"VALUES ($1, $2, $3, $4, $5, $6, "
"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
"$9, $10, $11, $12, $13, $14, $15, $16, $17)",
memberId,
networkId,
(bool)config["activeBridge"],
(bool)config["authorized"],
OSUtils::jsonDump(config["capabilities"], -1),
OSUtils::jsonString(config["identity"], ""),
(uint64_t)config["lastAuthorizedTime"],
(uint64_t)config["lastDeauthorizedTime"],
(bool)config["noAutoAssignIps"],
(int)config["remoteTraceLevel"],
target,
(uint64_t)config["revision"],
OSUtils::jsonDump(config["tags"], -1),
(int)config["vMajor"],
(int)config["vMinor"],
(int)config["vRev"],
(int)config["vProto"]);
} else {
// existing member
pqxx::result res = w.exec_params0(
"UPDATE ztc_member "
"SET active_bridge = $3, authorized = $4, capabilities = $5, identity = $6, "
"last_authorized_time = TO_TIMESTAMP($7::double precision/1000), "
"last_deauthorized_time = TO_TIMESTAMP($8::double precision/1000), "
"no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, "
"revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 "
"WHERE id = $1 AND network_id = $2",
memberId,
networkId,
(bool)config["activeBridge"],
(bool)config["authorized"],
OSUtils::jsonDump(config["capabilities"], -1),
OSUtils::jsonString(config["identity"], ""),
(uint64_t)config["lastAuthorizedTime"],
(uint64_t)config["lastDeauthorizedTime"],
(bool)config["noAutoAssignIps"],
(int)config["remoteTraceLevel"],
target,
(uint64_t)config["revision"],
OSUtils::jsonDump(config["tags"], -1),
(int)config["vMajor"],
(int)config["vMinor"],
(int)config["vRev"],
(int)config["vProto"]
);
}

res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
memberId, networkId);
if (!isNewMember) {
pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
memberId, networkId);
}

std::vector<std::string> assignments;
bool ipAssignError = false;
Expand All @@ -1350,7 +1419,7 @@ void PostgreSQL::commitThread()
continue;
}

res = w.exec_params0(
pqxx::result res = w.exec_params0(
"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING",
memberId, networkId, addr);

Expand All @@ -1366,6 +1435,17 @@ void PostgreSQL::commitThread()

w.commit();

if (_smee != NULL && isNewMember) {
notifyNewMember(networkId, memberId);
} else {
if (_smee == NULL) {
fprintf(stderr, "smee is NULL\n");
}
if (!isNewMember) {
fprintf(stderr, "nt a new member\n");
}
}

const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
if (nwidInt && memberidInt) {
Expand Down Expand Up @@ -1609,6 +1689,13 @@ void PostgreSQL::commitThread()
fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
}

void PostgreSQL::notifyNewMember(const std::string &networkID, const std::string &memberID) {
smeeclient::smee_client_notify_network_joined(
_smee,
networkID.c_str(),
memberID.c_str());
}

void PostgreSQL::onlineNotificationThread()
{
waitForReady();
Expand Down
9 changes: 9 additions & 0 deletions controller/PostgreSQL.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ extern "C" {
typedef struct pg_conn PGconn;
}

namespace smeeclient {
struct SmeeClient;
}

namespace ZeroTier {

struct RedisConfig;
Expand Down Expand Up @@ -144,6 +148,9 @@ class PostgreSQL : public DB
uint64_t _doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId,
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline);

void configureSmee();
void notifyNewMember(const std::string &networkID, const std::string &memberID);

enum OverrideMode {
ALLOW_PGBOUNCER_OVERRIDE = 0,
NO_OVERRIDE = 1
Expand Down Expand Up @@ -178,6 +185,8 @@ class PostgreSQL : public DB
std::shared_ptr<sw::redis::Redis> _redis;
std::shared_ptr<sw::redis::RedisCluster> _cluster;
bool _redisMemberStatus;

smeeclient::SmeeClient *_smee;
};

} // namespace ZeroTier
Expand Down
3 changes: 2 additions & 1 deletion ext/central-controller-docker/Dockerfile.builder
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN apt -y install \
postgresql-client-common \
curl \
google-perftools \
libgoogle-perftools-dev
libgoogle-perftools-dev \
protobuf-compiler

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
3 changes: 2 additions & 1 deletion ext/central-controller-docker/Dockerfile.run_base
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM ubuntu:jammy
RUN apt update && apt upgrade -y

RUN apt -y install \
netcat \
postgresql-client \
postgresql-client-common \
libjemalloc2 \
Expand All @@ -11,4 +12,4 @@ RUN apt -y install \
binutils \
linux-tools-gke \
perf-tools-unstable \
google-perftools
google-perftools
7 changes: 7 additions & 0 deletions ext/central-controller-docker/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ else
done
fi

echo "Waiting for temporal"
while ! nc -z ${ZT_TEMPORAL_HOST} ${ZT_TEMPORAL_PORT}; do
echo "waiting...";
sleep 1;
done
echo "Temporal is up"

export GLIBCXX_FORCE_NEW=1
export GLIBCPP_FORCE_NEW=1
export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so.2"
Expand Down
Loading

0 comments on commit f89cde8

Please sign in to comment.