diff --git a/includes/ziti/ziti.h b/includes/ziti/ziti.h index 1e8750c0..4e9280fd 100644 --- a/includes/ziti/ziti.h +++ b/includes/ziti/ziti.h @@ -825,12 +825,21 @@ extern int ziti_write(ziti_connection conn, uint8_t *data, size_t length, ziti_w * [on_close] is called after the bridge is terminated and ziti_connection was closed. * * @param conn - * @param stream + * @param handle IO handle, must be a stream (UV_TCP, UV_PIPE, UV_TTY) or a UV_UDP handle * @param on_close - * @return + * @return 0 on success, error code on failure + */ +ZITI_FUNC +extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_cb on_close); + +/** + * set idle timeout on bridged connection. + * @param conn ziti_connection previously bridged wtih [ziti_conn_bridge] or [ziti_conn_bridge_fds] + * @param millis timeout after which bridge will be closed absent any traffic + * @return 0 on success, error code on failure */ ZITI_FUNC -extern int ziti_conn_bridge(ziti_connection conn, uv_stream_t *stream, uv_close_cb on_close); +extern int ziti_conn_bridge_idle_timeout(ziti_connection conn, unsigned long millis); /** * @brief Bridge [ziti_connection] to given IO file descriptors. diff --git a/library/conn_bridge.c b/library/conn_bridge.c index af80db57..5948c2ae 100644 --- a/library/conn_bridge.c +++ b/library/conn_bridge.c @@ -32,36 +32,60 @@ struct ziti_bridge_s { bool ziti_eof; bool input_eof; ziti_connection conn; - uv_stream_t *input; - uv_stream_t *output; + uv_handle_t *input; + uv_handle_t *output; uv_close_cb close_cb; void *data; struct fd_bridge_s *fdbr; pool_t *input_pool; bool input_throttle; + unsigned long idle_timeout; + uv_timer_t *idle_timer; }; static ssize_t on_ziti_data(ziti_connection conn, const uint8_t *data, ssize_t len); static void bridge_alloc(uv_handle_t *h, size_t req, uv_buf_t *b); +static void close_bridge(struct ziti_bridge_s *br); static void on_input(uv_stream_t *s, ssize_t len, const uv_buf_t *b); +static void on_udp_input(uv_udp_t *udp, ssize_t len, const uv_buf_t *b, const struct sockaddr *addr, unsigned int flags); +extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_cb on_close) { + if (handle == NULL) return UV_EINVAL; + + if ( !(handle->type == UV_TCP || handle->type == UV_NAMED_PIPE || + handle->type == UV_TTY || handle->type == UV_UDP )) { + return UV_EINVAL; + } + + if (handle->type == UV_UDP) { + struct sockaddr_storage peer; + int len = sizeof(peer); + int rc = uv_udp_getpeername((const uv_udp_t *) handle, (struct sockaddr *) &peer, &len); + if (rc != 0) { + ZITI_LOG(ERROR, "cannot bridge unconnected socket: %d/%s", rc, uv_strerror(rc)); + return UV_EINVAL; + } + } -extern int ziti_conn_bridge(ziti_connection conn, uv_stream_t *stream, uv_close_cb on_close) { NEWP(br, struct ziti_bridge_s); br->conn = conn; - br->input = stream; - br->output = stream; + br->input = handle; + br->output = handle; br->close_cb = on_close; - br->data = uv_handle_get_data((const uv_handle_t *) stream); + br->data = uv_handle_get_data(handle); br->input_pool = pool_new(BRIDGE_MSG_SIZE, BRIDGE_POOL_SIZE, NULL); - uv_handle_set_data((uv_handle_t *) stream, br); + uv_handle_set_data(handle, br); ziti_conn_set_data(conn, br); ziti_conn_set_data_cb(conn, on_ziti_data); - uv_read_start(br->input, bridge_alloc, on_input); + if (br->input->type == UV_UDP) { + uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input); + } else { + uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input); + } return ZITI_OK; } @@ -98,12 +122,29 @@ extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd fdbr->close_cb = close_cb; fdbr->ctx = ctx; + uv_handle_t *sock = NULL; if (input == output) { - uv_tcp_t *sock = calloc(1, sizeof(uv_tcp_t)); - uv_tcp_init(l, sock); - uv_tcp_open(sock, input); - sock->data = fdbr; - return ziti_conn_bridge(conn, (uv_stream_t *) sock, on_sock_close); + int type; + socklen_t len = sizeof(type); + if (getsockopt(input, SOL_SOCKET, SO_TYPE, &type, &len) == 0) { + if (type == SOCK_STREAM) { + sock = calloc(1, sizeof(uv_tcp_t)); + uv_tcp_init(l, (uv_tcp_t *) sock); + uv_tcp_open((uv_tcp_t *) sock, input); + } else if (type == SOCK_DGRAM) { + sock = calloc(1, sizeof(uv_udp_t)); + uv_udp_init(l, (uv_udp_t *) sock); + uv_udp_open((uv_udp_t *) sock, input); + } + } + if (sock) { + sock->data = fdbr; + } else { + ZITI_LOG(ERROR, "unsupported fd type"); + return UV_EINVAL; + } + + return ziti_conn_bridge(conn, sock, on_sock_close); } NEWP(br, struct ziti_bridge_s); @@ -124,15 +165,41 @@ extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd br->data = br; br->fdbr = fdbr; - uv_handle_set_data((uv_handle_t *) br->input, br); + uv_handle_set_data(br->input, br); ziti_conn_set_data(conn, br); ziti_conn_set_data_cb(conn, on_ziti_data); - uv_read_start(br->input, bridge_alloc, on_input); + uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input); return ZITI_OK; } +static void on_bridge_idle(uv_timer_t *t) { + struct ziti_bridge_s *br = t->data; + ZITI_LOG(DEBUG, "closing bridge conn[%d.%d] due to idle timeout", br->conn->ziti_ctx->id, br->conn->conn_id); + close_bridge(br); +} + +int ziti_conn_bridge_idle_timeout(ziti_connection conn, unsigned long millis) { + struct ziti_bridge_s *br = ziti_conn_data(conn); + if (millis == 0) { + br->idle_timeout = 0; + if (br->idle_timer) { + uv_close((uv_handle_t *) br->idle_timer, (uv_close_cb) free); + br->idle_timer = NULL; + } + } else { + br->idle_timeout = millis; + if (br->idle_timer == NULL) { + br->idle_timer = calloc(1, sizeof(*br->idle_timer)); + br->idle_timer->data = br; + uv_timer_init(br->input->loop, br->idle_timer); + } + uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0); + } + return 0; +} + static void on_ziti_close(ziti_connection conn) { struct ziti_bridge_s *br = ziti_conn_data(conn); pool_destroy(br->input_pool); @@ -150,6 +217,11 @@ static void close_bridge(struct ziti_bridge_s *br) { br->input = NULL; } + if (br->idle_timer) { + uv_close((uv_handle_t *) br->idle_timer, (uv_close_cb) free); + br->idle_timer = NULL; + } + ziti_close(br->conn, on_ziti_close); } @@ -163,6 +235,16 @@ static void on_output(uv_write_t *wr, int status) { free(wr); } +static void on_udp_send(uv_udp_send_t *sr, int status) { + if (status != 0) { + struct ziti_bridge_s *br = sr->handle->data; + ZITI_LOG(WARN, "udp_send failed: %d(%s)", status, uv_strerror(status)); + close_bridge(br); + } + free(sr->data); + free(sr); +} + static void on_shutdown(uv_shutdown_t *sr, int status) { if (status != 0) { ZITI_LOG(WARN, "shutdown failed: %d(%s)", status, uv_strerror(status)); @@ -173,21 +255,32 @@ static void on_shutdown(uv_shutdown_t *sr, int status) { ssize_t on_ziti_data(ziti_connection conn, const uint8_t *data, ssize_t len) { struct ziti_bridge_s *br = ziti_conn_data(conn); + + if (br->idle_timer) { // reset idle timer + uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0); + } + if (len > 0) { - NEWP(wr, uv_write_t); uv_buf_t b = uv_buf_init(malloc(len), len); memcpy(b.base, data, len); - wr->data = b.base; - uv_write(wr, br->output, &b, 1, on_output); + if (br->output->type == UV_UDP) { + NEWP(sr, uv_udp_send_t); + sr->data = b.base; + uv_udp_send(sr, (uv_udp_t *) br->output, &b, 1, NULL, on_udp_send); + } else { + NEWP(wr, uv_write_t); + wr->data = b.base; + uv_write(wr, (uv_stream_t *) br->output, &b, 1, on_output); + } return len; } else if (len == ZITI_EOF) { br->ziti_eof = true; - if (br->input_eof) { + if (br->input_eof || br->input->type == UV_UDP) { ZITI_LOG(VERBOSE, "both sides are EOF"); close_bridge(br); } else { NEWP(sr, uv_shutdown_t); - uv_shutdown(sr, br->output, on_shutdown); + uv_shutdown(sr, (uv_stream_t *) br->output, on_shutdown); } } else { close_bridge(br); @@ -214,8 +307,36 @@ static void on_ziti_write(ziti_connection conn, ssize_t status, void *ctx) { } } +void on_udp_input(uv_udp_t *udp, ssize_t len, const uv_buf_t *b, const struct sockaddr *addr, unsigned int flags) { + struct ziti_bridge_s *br = udp->data; + + if (br->idle_timer) { // reset idle timer + uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0); + } + + if (len > 0) { + ziti_write(br->conn, b->base, len, on_ziti_write, b->base); + } else { + pool_return_obj(b->base); + if (len == UV_ENOBUFS) { + if (!br->input_throttle) { + ZITI_LOG(TRACE, "stalled ziti_conn[%d.%d]", br->conn->ziti_ctx->id, br->conn->conn_id); + br->input_throttle = true; + } + } else if (len < 0) { + ZITI_LOG(WARN, "err = %zd/%s", len, uv_strerror(len)); + close_bridge(br); + } + } +} + void on_input(uv_stream_t *s, ssize_t len, const uv_buf_t *b) { struct ziti_bridge_s *br = s->data; + + if (br->idle_timer) { // reset idle timer + uv_timer_start(br->idle_timer, on_bridge_idle, br->idle_timeout, 0); + } + if (len > 0) { ziti_write(br->conn, b->base, len, on_ziti_write, b->base); } else {