Skip to content

Commit

Permalink
Merge pull request #488 from openziti/ziti-conn-udp-bridge
Browse files Browse the repository at this point in the history
implement ziti bridge for UDP handles
  • Loading branch information
ekoby authored Jan 30, 2023
2 parents 560ee8e + 5636957 commit 9777209
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 47 deletions.
15 changes: 12 additions & 3 deletions includes/ziti/ziti.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,12 +825,21 @@ extern int ziti_write(ziti_connection conn, uint8_t *data, size_t length, ziti_w
* [on_close] is called after the bridge is terminated and ziti_connection was closed.
*
* @param conn
* @param stream
* @param handle IO handle, must be a stream (UV_TCP, UV_PIPE, UV_TTY) or a UV_UDP handle
* @param on_close
* @return
* @return 0 on success, error code on failure
*/
ZITI_FUNC
extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_cb on_close);

/**
* set idle timeout on bridged connection.
* @param conn ziti_connection previously bridged wtih [ziti_conn_bridge] or [ziti_conn_bridge_fds]
* @param millis timeout after which bridge will be closed absent any traffic
* @return 0 on success, error code on failure
*/
ZITI_FUNC
extern int ziti_conn_bridge(ziti_connection conn, uv_stream_t *stream, uv_close_cb on_close);
extern int ziti_conn_bridge_idle_timeout(ziti_connection conn, unsigned long millis);

/**
* @brief Bridge [ziti_connection] to given IO file descriptors.
Expand Down
161 changes: 141 additions & 20 deletions library/conn_bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,60 @@ struct ziti_bridge_s {
bool ziti_eof;
bool input_eof;
ziti_connection conn;
uv_stream_t *input;
uv_stream_t *output;
uv_handle_t *input;
uv_handle_t *output;
uv_close_cb close_cb;
void *data;
struct fd_bridge_s *fdbr;
pool_t *input_pool;
bool input_throttle;
unsigned long idle_timeout;
uv_timer_t *idle_timer;
};

static ssize_t on_ziti_data(ziti_connection conn, const uint8_t *data, ssize_t len);

static void bridge_alloc(uv_handle_t *h, size_t req, uv_buf_t *b);
static void close_bridge(struct ziti_bridge_s *br);

static void on_input(uv_stream_t *s, ssize_t len, const uv_buf_t *b);
static void on_udp_input(uv_udp_t *udp, ssize_t len, const uv_buf_t *b, const struct sockaddr *addr, unsigned int flags);

extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_cb on_close) {
if (handle == NULL) return UV_EINVAL;

if ( !(handle->type == UV_TCP || handle->type == UV_NAMED_PIPE ||
handle->type == UV_TTY || handle->type == UV_UDP )) {
return UV_EINVAL;
}

if (handle->type == UV_UDP) {
struct sockaddr_storage peer;
int len = sizeof(peer);
int rc = uv_udp_getpeername((const uv_udp_t *) handle, (struct sockaddr *) &peer, &len);
if (rc != 0) {
ZITI_LOG(ERROR, "cannot bridge unconnected socket: %d/%s", rc, uv_strerror(rc));
return UV_EINVAL;
}
}

extern int ziti_conn_bridge(ziti_connection conn, uv_stream_t *stream, uv_close_cb on_close) {
NEWP(br, struct ziti_bridge_s);
br->conn = conn;
br->input = stream;
br->output = stream;
br->input = handle;
br->output = handle;
br->close_cb = on_close;
br->data = uv_handle_get_data((const uv_handle_t *) stream);
br->data = uv_handle_get_data(handle);
br->input_pool = pool_new(BRIDGE_MSG_SIZE, BRIDGE_POOL_SIZE, NULL);

uv_handle_set_data((uv_handle_t *) stream, br);
uv_handle_set_data(handle, br);
ziti_conn_set_data(conn, br);

ziti_conn_set_data_cb(conn, on_ziti_data);
uv_read_start(br->input, bridge_alloc, on_input);
if (br->input->type == UV_UDP) {
uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input);
} else {
uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input);
}

return ZITI_OK;
}
Expand Down Expand Up @@ -98,12 +122,29 @@ extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd
fdbr->close_cb = close_cb;
fdbr->ctx = ctx;

uv_handle_t *sock = NULL;
if (input == output) {
uv_tcp_t *sock = calloc(1, sizeof(uv_tcp_t));
uv_tcp_init(l, sock);
uv_tcp_open(sock, input);
sock->data = fdbr;
return ziti_conn_bridge(conn, (uv_stream_t *) sock, on_sock_close);
int type;
socklen_t len = sizeof(type);
if (getsockopt(input, SOL_SOCKET, SO_TYPE, &type, &len) == 0) {
if (type == SOCK_STREAM) {
sock = calloc(1, sizeof(uv_tcp_t));
uv_tcp_init(l, (uv_tcp_t *) sock);
uv_tcp_open((uv_tcp_t *) sock, input);
} else if (type == SOCK_DGRAM) {
sock = calloc(1, sizeof(uv_udp_t));
uv_udp_init(l, (uv_udp_t *) sock);
uv_udp_open((uv_udp_t *) sock, input);
}
}
if (sock) {
sock->data = fdbr;
} else {
ZITI_LOG(ERROR, "unsupported fd type");
return UV_EINVAL;
}

return ziti_conn_bridge(conn, sock, on_sock_close);
}

NEWP(br, struct ziti_bridge_s);
Expand All @@ -124,15 +165,41 @@ extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd
br->data = br;
br->fdbr = fdbr;

uv_handle_set_data((uv_handle_t *) br->input, br);
uv_handle_set_data(br->input, br);
ziti_conn_set_data(conn, br);

ziti_conn_set_data_cb(conn, on_ziti_data);
uv_read_start(br->input, bridge_alloc, on_input);
uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input);

return ZITI_OK;
}

static void on_bridge_idle(uv_timer_t *t) {
struct ziti_bridge_s *br = t->data;
ZITI_LOG(DEBUG, "closing bridge conn[%d.%d] due to idle timeout", br->conn->ziti_ctx->id, br->conn->conn_id);
close_bridge(br);
}

int ziti_conn_bridge_idle_timeout(ziti_connection conn, unsigned long millis) {
struct ziti_bridge_s *br = ziti_conn_data(conn);
if (millis == 0) {
br->idle_timeout = 0;
if (br->idle_timer) {
uv_close((uv_handle_t *) br->idle_timer, (uv_close_cb) free);
br->idle_timer = NULL;
}
} else {
br->idle_timeout = millis;
if (br->idle_timer == NULL) {
br->idle_timer = calloc(1, sizeof(*br->idle_timer));
br->idle_timer->data = br;
uv_timer_init(br->input->loop, br->idle_timer);
}
uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0);
}
return 0;
}

static void on_ziti_close(ziti_connection conn) {
struct ziti_bridge_s *br = ziti_conn_data(conn);
pool_destroy(br->input_pool);
Expand All @@ -150,6 +217,11 @@ static void close_bridge(struct ziti_bridge_s *br) {
br->input = NULL;
}

if (br->idle_timer) {
uv_close((uv_handle_t *) br->idle_timer, (uv_close_cb) free);
br->idle_timer = NULL;
}

ziti_close(br->conn, on_ziti_close);
}

Expand All @@ -163,6 +235,16 @@ static void on_output(uv_write_t *wr, int status) {
free(wr);
}

static void on_udp_send(uv_udp_send_t *sr, int status) {
if (status != 0) {
struct ziti_bridge_s *br = sr->handle->data;
ZITI_LOG(WARN, "udp_send failed: %d(%s)", status, uv_strerror(status));
close_bridge(br);
}
free(sr->data);
free(sr);
}

static void on_shutdown(uv_shutdown_t *sr, int status) {
if (status != 0) {
ZITI_LOG(WARN, "shutdown failed: %d(%s)", status, uv_strerror(status));
Expand All @@ -173,21 +255,32 @@ static void on_shutdown(uv_shutdown_t *sr, int status) {

ssize_t on_ziti_data(ziti_connection conn, const uint8_t *data, ssize_t len) {
struct ziti_bridge_s *br = ziti_conn_data(conn);

if (br->idle_timer) { // reset idle timer
uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0);
}

if (len > 0) {
NEWP(wr, uv_write_t);
uv_buf_t b = uv_buf_init(malloc(len), len);
memcpy(b.base, data, len);
wr->data = b.base;
uv_write(wr, br->output, &b, 1, on_output);
if (br->output->type == UV_UDP) {
NEWP(sr, uv_udp_send_t);
sr->data = b.base;
uv_udp_send(sr, (uv_udp_t *) br->output, &b, 1, NULL, on_udp_send);
} else {
NEWP(wr, uv_write_t);
wr->data = b.base;
uv_write(wr, (uv_stream_t *) br->output, &b, 1, on_output);
}
return len;
} else if (len == ZITI_EOF) {
br->ziti_eof = true;
if (br->input_eof) {
if (br->input_eof || br->input->type == UV_UDP) {
ZITI_LOG(VERBOSE, "both sides are EOF");
close_bridge(br);
} else {
NEWP(sr, uv_shutdown_t);
uv_shutdown(sr, br->output, on_shutdown);
uv_shutdown(sr, (uv_stream_t *) br->output, on_shutdown);
}
} else {
close_bridge(br);
Expand All @@ -214,8 +307,36 @@ static void on_ziti_write(ziti_connection conn, ssize_t status, void *ctx) {
}
}

void on_udp_input(uv_udp_t *udp, ssize_t len, const uv_buf_t *b, const struct sockaddr *addr, unsigned int flags) {
struct ziti_bridge_s *br = udp->data;

if (br->idle_timer) { // reset idle timer
uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0);
}

if (len > 0) {
ziti_write(br->conn, b->base, len, on_ziti_write, b->base);
} else {
pool_return_obj(b->base);
if (len == UV_ENOBUFS) {
if (!br->input_throttle) {
ZITI_LOG(TRACE, "stalled ziti_conn[%d.%d]", br->conn->ziti_ctx->id, br->conn->conn_id);
br->input_throttle = true;
}
} else if (len < 0) {
ZITI_LOG(WARN, "err = %zd/%s", len, uv_strerror(len));
close_bridge(br);
}
}
}

void on_input(uv_stream_t *s, ssize_t len, const uv_buf_t *b) {
struct ziti_bridge_s *br = s->data;

if (br->idle_timer) { // reset idle timer
uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0);
}

if (len > 0) {
ziti_write(br->conn, b->base, len, on_ziti_write, b->base);
} else {
Expand Down
Loading

0 comments on commit 9777209

Please sign in to comment.