Skip to content

Commit

Permalink
Merge branch 'main' into fix_crash_on_disable_enable
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoby authored Jun 20, 2022
2 parents bacbb54 + 80ee30b commit 9e1a869
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 66 deletions.
14 changes: 3 additions & 11 deletions includes/ziti/zitilib.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,6 @@ int Ziti_connect_addr(ziti_socket_t socket, const char *host, unsigned int port)
ZITI_FUNC
int Ziti_bind(ziti_socket_t socket, ziti_context ztx, const char *service);

/**
* @brief Bind socket to a Ziti service with the given intercept address
* @param socket socket handle created with [Ziti_socket()]
* @param host target hostname
* @param port target port
* @return
*/
ZITI_FUNC
int Ziti_bind_addr(ziti_socket_t socket, const char *host, unsigned int port);

/**
* @brief marks the [socket] as a socket able to accept incoming connections
* @param socket a file descriptor created with [Ziti_socket()] and bound to a service with [Ziti_bind] or [Ziti_bind_addr]
Expand All @@ -135,10 +125,12 @@ int Ziti_listen(ziti_socket_t socket, int backlog);
* - not marked as non-blocking: blocks until a connection request is present.
*
* @param socket socket created with [Ziti_socket()], bound to a service with [Ziti_bind()] or [Ziti_bind_addr()], and is listening after [Ziti_listen()]
* @param caller buffer to store caller ID (dialing identity name)
* @param caller_len length of the [caller] buffer
* @return on success returns a file descriptor for the accepted connection. on error -1 is returned, use [Ziti_last_error()] to get actual error code.
*/
ZITI_FUNC
ziti_socket_t Ziti_accept(ziti_socket_t socket);
ziti_socket_t Ziti_accept(ziti_socket_t socket, char *caller, int caller_len);

/**
* @brief Shutdown Ziti library.
Expand Down
10 changes: 5 additions & 5 deletions library/channel.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022. NetFoundry, Inc.
// Copyright (c) 2022. NetFoundry Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -237,22 +237,22 @@ static void check_connecting_state(ziti_channel_t *ch) {
// verify channel state
bool reset = false;
if (!uv_is_active((const uv_handle_t *) &ch->timer)) {
CH_LOG(ERROR, "state check: timer not active!");
CH_LOG(DEBUG, "state check: timer not active!");
reset = true;
}

if (ch->timer->timer_cb != ch_connect_timeout) {
CH_LOG(ERROR, "state check: unexpected callback(%s)!", get_timeout_cb(ch));
CH_LOG(DEBUG, "state check: unexpected callback(%s)!", get_timeout_cb(ch));
reset = true;
}

if (ch->timer->timeout < uv_now(ch->loop)) {
CH_LOG(ERROR, "state check: timer is in the past!");
CH_LOG(DEBUG, "state check: timer is in the past!");
reset = true;
}

if (ch->timer->timeout - uv_now(ch->loop) > CONNECT_TIMEOUT) {
CH_LOG(ERROR, "state check: timer is too far into the future!");
CH_LOG(DEBUG, "state check: timer is too far into the future!");
reset = true;
}
}
Expand Down
153 changes: 110 additions & 43 deletions library/zitilib.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ typedef struct ztx_wrap {
} ztx_wrap_t;

struct backlog_entry_s {
struct ziti_sock_s *parent;
ziti_connection conn;
char *caller_id;
future_t *accept_f;
TAILQ_ENTRY(backlog_entry_s) _next;
};

Expand All @@ -148,6 +151,7 @@ typedef struct ziti_sock_s {
ziti_context ztx;
ziti_connection conn;

char *service;
bool server;
int pending;
int max_pending;
Expand Down Expand Up @@ -201,7 +205,6 @@ static void on_ctx_event(ziti_context ztx, const ziti_event_t *ev) {
}
} else if (ev->type == ZitiServiceEvent) {


for (int i = 0; ev->event.service.removed && ev->event.service.removed[i] != NULL; i++) {
ziti_intercept_cfg_v1 *intercept = model_map_remove(&wrap->intercepts, ev->event.service.removed[i]->name);
free_ziti_intercept_cfg_v1(intercept);
Expand Down Expand Up @@ -243,7 +246,7 @@ static const char *configs[] = {
};

static void load_ziti_ctx(void *arg, future_t *f, uv_loop_t *l) {

int rc = 0;
struct ztx_wrap *wrap = model_map_get(&ziti_contexts, arg);
if (wrap == NULL) {
wrap = calloc(1, sizeof(struct ztx_wrap));
Expand All @@ -256,10 +259,15 @@ static void load_ziti_ctx(void *arg, future_t *f, uv_loop_t *l) {
wrap->services_loaded = new_future();
TAILQ_INIT(&wrap->futures);

rc = ziti_init_opts(&wrap->opts, l);
if (rc != ZITI_OK) {
fail_future(f, rc);
ZITI_LOG(WARN, "identity file[%s] not found", (const char *) arg);
free(wrap);
return;
}
model_map_set(&ziti_contexts, arg, wrap);
TAILQ_INSERT_TAIL(&wrap->futures, f, _next);

ziti_init_opts(&wrap->opts, l);
} else if (wrap->ztx) {
complete_future(f, wrap->ztx);
} else {
Expand All @@ -268,12 +276,14 @@ static void load_ziti_ctx(void *arg, future_t *f, uv_loop_t *l) {
}

ziti_context Ziti_load_context(const char *identity) {
future_t *f = schedule_on_loop(load_ziti_ctx, (void*)identity, true);
future_t *f = schedule_on_loop(load_ziti_ctx, (void *) identity, true);
int err = await_future(f);
set_error(err);
ziti_context ztx = (ziti_context) f->result;
ztx_wrap_t *wrap = ziti_app_ctx(ztx);
await_future(wrap->services_loaded);
if (err == 0) {
ztx_wrap_t *wrap = ziti_app_ctx(ztx);
await_future(wrap->services_loaded);
}
destroy_future(f);
return ztx;
}
Expand All @@ -295,6 +305,16 @@ static const char * fmt_win32err(int err) {
}
#endif

#ifdef __MINGW32__
static const IN_ADDR in4addr_loopback;
static void init_in4addr_loopback() {
IN_ADDR *lo = (IN_ADDR *)&in4addr_loopback;
lo->S_un.S_addr = htonl(INADDR_LOOPBACK);
}
#else
#define init_in4addr_loopback() {}
#endif

static int make_socketpair(int type, ziti_socket_t *fd0, ziti_socket_t *fd1) {
int rc = 0;
#if _WIN32
Expand Down Expand Up @@ -403,13 +423,14 @@ static void on_bridge_close(void *ctx) {
#else
close(zs->ziti_fd);
#endif
free(zs->service);
free(zs);
}

static void on_ziti_connect(ziti_connection conn, int status) {
ziti_sock_t *zs = ziti_conn_data(conn);
if (status == ZITI_OK) {
ZITI_LOG(INFO, "bridge connected to ziti service");
ZITI_LOG(DEBUG, "bridge connected to ziti service[%s]", zs->service);
ziti_conn_bridge_fds(conn, (uv_os_fd_t) zs->ziti_fd, (uv_os_fd_t) zs->ziti_fd, on_bridge_close, zs);
complete_future(zs->f, conn);
} else {
Expand Down Expand Up @@ -491,6 +512,7 @@ static void do_ziti_connect(struct conn_req_s *req, future_t *f, uv_loop_t *l) {

const char *proto_str = proto == SOCK_DGRAM ? "udp" : "tcp";
if (req->ztx != NULL) {
zs->service = strdup(req->service);
ziti_conn_init(req->ztx, &zs->conn, zs);
char app_data[1024];
size_t len = snprintf(app_data, sizeof(app_data),
Expand Down Expand Up @@ -520,14 +542,14 @@ int Ziti_connect_addr(ziti_socket_t socket, const char *host, unsigned int port)

future_t *f = schedule_on_loop((loop_work_cb) do_ziti_connect, &req, true);
int err = await_future(f);
set_error(err);
destroy_future(f);
return err;
return err ? -1 : 0;
}

int Ziti_connect(ziti_socket_t socket, ziti_context ztx, const char *service) {

if (ztx == NULL) return -EINVAL;
if (service == NULL) return -EINVAL;
if (ztx == NULL) { return EINVAL; }
if (service == NULL) { return EINVAL; }

struct conn_req_s req = {
.fd = socket,
Expand All @@ -537,8 +559,9 @@ int Ziti_connect(ziti_socket_t socket, ziti_context ztx, const char *service) {

future_t *f = schedule_on_loop((loop_work_cb) do_ziti_connect, &req, true);
int err = await_future(f);
set_error(err);
destroy_future(f);
return err;
return err ? -1 : 0;
}

static bool is_blocking(ziti_socket_t s) {
Expand Down Expand Up @@ -569,20 +592,31 @@ static bool is_blocking(ziti_socket_t s) {
#endif
}

struct sock_info_s {
ziti_socket_t fd;
char *peer;
};

static void on_ziti_accept(ziti_connection client, int status) {
future_t *f = ziti_conn_data(client);
struct backlog_entry_s *pending = ziti_conn_data(client);
if (status != ZITI_OK) {
fail_future(f, status);
ZITI_LOG(WARN, "ziti_accept failed!");
// ziti accept failed, so just put the accept future back into accept_q
TAILQ_INSERT_HEAD(&pending->parent->accept_q, pending->accept_f, _next);

ziti_close(client, NULL);
free(pending->caller_id);
free(pending);
return;
}

ziti_socket_t fd, ziti_fd;
int rc = make_socketpair(SOCK_STREAM, &fd, &ziti_fd);
if (rc != 0) {
fail_future(f, rc);
fail_future(pending->accept_f, rc);
ziti_close(client, NULL);
free(pending->caller_id);
free(pending);
return;
}

Expand All @@ -592,41 +626,58 @@ static void on_ziti_accept(ziti_connection client, int status) {
ziti_conn_set_data(client, zs);
model_map_set_key(&ziti_sockets, &zs->fd, sizeof(zs->fd), zs);
ziti_conn_bridge_fds(client, (uv_os_fd_t) zs->ziti_fd, (uv_os_fd_t) zs->ziti_fd, on_bridge_close, zs);
complete_future(f, (void *) (uintptr_t) zs->fd);
NEWP(si, struct sock_info_s);
si->fd = zs->fd;
si->peer = pending->caller_id;

complete_future(pending->accept_f, si);
free(pending);
}

static void on_ziti_client(ziti_connection server, ziti_connection client, int status, ziti_client_ctx *clt_ctx) {
ziti_sock_t *server_sock = ziti_conn_data(server);
ZITI_LOG(INFO, "incoming client = %s", clt_ctx->caller_id);
ZITI_LOG(DEBUG, "incoming client[%s] for service[%s]", clt_ctx->caller_id, server_sock->service);

if (status != ZITI_OK) {
on_bridge_close(server_sock);
return;
}

char notify = 1;

NEWP(pending, struct backlog_entry_s);
pending->parent = server_sock;
pending->conn = client;
pending->caller_id = strdup(clt_ctx->caller_id);

if (!TAILQ_EMPTY(&server_sock->accept_q)) {
future_t *accept_f = TAILQ_FIRST(&server_sock->accept_q);
TAILQ_REMOVE(&server_sock->accept_q, accept_f, _next);

ziti_conn_set_data(client, accept_f);
ziti_accept(client, on_ziti_accept, NULL);
ziti_conn_set_data(client, pending);
// this should not happen but check anyway
if (ziti_accept(client, on_ziti_accept, NULL) != ZITI_OK) {
ZITI_LOG(WARN, "ziti_accept() failed unexpectedly");
ziti_close(client, NULL);
free(pending->caller_id);
free(pending);
return;
}
pending->accept_f = accept_f;
TAILQ_REMOVE(&server_sock->accept_q, accept_f, _next);
write(server_sock->ziti_fd, &notify, sizeof(notify));
return;
}

if (server_sock->pending < server_sock->max_pending) {
NEWP(pending, struct backlog_entry_s);
pending->conn = client;
TAILQ_INSERT_TAIL(&server_sock->backlog, pending, _next);
server_sock->pending++;
if (!is_blocking(server_sock->fd)) {
char b = 1;
#if _WIN32
send(server_sock->ziti_fd, &b, 1, 0);
send(server_sock->ziti_fd, &notify, sizeof(notify), 0);
#else
write(server_sock->ziti_fd, &b, 1);
write(server_sock->ziti_fd, &notify, sizeof(notify));
#endif
}
} else {
ZITI_LOG(DEBUG, "accept backlog is full, client[%s] rejected", clt_ctx->caller_id);
ziti_close(client, NULL);
}
}
Expand All @@ -653,7 +704,8 @@ static void do_ziti_bind(struct conn_req_s *req, future_t *f, uv_loop_t *l) {
fail_future(f, EALREADY);
} else {
if (req->ztx != NULL) {
ZITI_LOG(INFO, "requesting bind fd[%d] to service[%s]", zs->fd, req->service);
ZITI_LOG(DEBUG, "requesting bind fd[%d] to service[%s]", zs->fd, req->service);
zs->service = strdup(req->service);
ziti_conn_init(req->ztx, &zs->conn, zs);
ziti_listen(zs->conn, req->service, on_ziti_bind, on_ziti_client);
zs->f = f;
Expand Down Expand Up @@ -740,32 +792,46 @@ static void do_ziti_accept(void *r, future_t *f, uv_loop_t *l) {
return;
}

struct backlog_entry_s *pending = TAILQ_FIRST(&zs->backlog);
TAILQ_REMOVE(&zs->backlog, pending, _next);
while (!TAILQ_EMPTY(&zs->backlog)) {
struct backlog_entry_s *pending = TAILQ_FIRST(&zs->backlog);
ZITI_LOG(DEBUG, "pending connection[%s] for service[%s]", pending->caller_id, zs->service);
TAILQ_REMOVE(&zs->backlog, pending, _next);

ziti_connection conn = pending->conn;
free(pending);
ziti_connection conn = pending->conn;
pending->accept_f = f;
ziti_conn_set_data(conn, pending);
int rc = ziti_accept(conn, on_ziti_accept, NULL);

ziti_conn_set_data(conn, f);
ziti_accept(conn, on_ziti_accept, NULL);
if (rc == ZITI_OK) {
break;
}

ZITI_LOG(DEBUG, "failed to accept: client gone? [%d/%s]", rc, ziti_errorstr(rc));
ziti_close(conn, NULL);
free(pending->caller_id);
free(pending);
}
}

ziti_socket_t Ziti_accept(ziti_socket_t server) {
ziti_socket_t Ziti_accept(ziti_socket_t server, char *caller, int caller_len) {
future_t *f = schedule_on_loop(do_ziti_accept, (void *) (uintptr_t) server, true);

ziti_socket_t clt = -1;
int err = await_future(f);
if (!err) {
clt = (ziti_socket_t) (uintptr_t) f->result;
if (!is_blocking(server)) {
char b;
struct sock_info_s *si = f->result;
clt = si->fd;
if (caller != NULL) {
strncpy(caller, si->peer, caller_len);
}
free(si->peer);
free(si);
char b;
#if _WIN32
recv(server, &b, 1, 0);
recv(server, &b, 1, 0);
#else
read(server, &b, 1);
read(server, &b, 1);
#endif

}
}
set_error(err);
destroy_future(f);
Expand Down Expand Up @@ -821,6 +887,7 @@ void process_on_loop(uv_async_t *async) {
}

static void internal_init() {
init_in4addr_loopback();
uv_key_create(&err_key);
uv_mutex_init(&q_mut);
lib_loop = uv_loop_new();
Expand Down
Loading

0 comments on commit 9e1a869

Please sign in to comment.