From e9db3b149ee39945812ed75789e5d2fca611fd77 Mon Sep 17 00:00:00 2001 From: Eugene K Date: Tue, 5 Jul 2022 09:48:06 -0400 Subject: [PATCH 1/2] make sure queued write requests are processed on disconnect --- library/connect.c | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/library/connect.c b/library/connect.c index 7dd7a356..dc92c839 100644 --- a/library/connect.c +++ b/library/connect.c @@ -23,7 +23,7 @@ static const char *INVALID_SESSION = "Invalid Session"; static const int MAX_CONNECT_RETRY = 3; -#define CONN_LOG(lvl, fmt, ...) ZITI_LOG(lvl, "conn[%u.%u] " fmt, conn->ziti_ctx->id, conn->conn_id, ##__VA_ARGS__) +#define CONN_LOG(lvl, fmt, ...) ZITI_LOG(lvl, "conn[%u.%u/%s] " fmt, conn->ziti_ctx->id, conn->conn_id, conn_state_str[conn->state], ##__VA_ARGS__) #define conn_states(XX) \ XX(Initial)\ @@ -590,16 +590,6 @@ static void ziti_write_timeout(uv_timer_t *t) { static void ziti_write_req(struct ziti_write_req_s *req) { struct ziti_conn *conn = req->conn; - if (conn->state >= Timedout) { - CONN_LOG(WARN, "got write req in closed/disconnected sate"); - conn->write_reqs--; - - if (req->cb) { - req->cb(conn, ZITI_CONN_CLOSED, req->ctx); - } - free(req); - return; - } if (req->eof) { conn_set_state(conn, CloseWrite); send_fin_message(conn); @@ -767,20 +757,30 @@ static void flush_connection (ziti_connection conn) { static bool flush_to_service(ziti_connection conn) { + // still connecting if (conn->channel == NULL) { return false; } + if (conn->state < Connected || conn->state == Accepting) { return false; } - if (conn->state == Connected) { - int count = 0; - while (!TAILQ_EMPTY(&conn->wreqs)) { - struct ziti_write_req_s *req = TAILQ_FIRST(&conn->wreqs); - TAILQ_REMOVE(&conn->wreqs, req, _next); + int count = 0; + while (!TAILQ_EMPTY(&conn->wreqs)) { + struct ziti_write_req_s *req = TAILQ_FIRST(&conn->wreqs); + TAILQ_REMOVE(&conn->wreqs, req, _next); + if (conn->state == Connected) { conn->write_reqs++; ziti_write_req(req); count++; + } else { + CONN_LOG(DEBUG, "got write req in invalid state[%s]", conn_state_str[conn->state]); + conn->write_reqs--; + + if (req->cb) { + req->cb(conn, ZITI_INVALID_STATE, req->ctx); + } + free(req); } - CONN_LOG(TRACE, "flushed %d messages", count); } + CONN_LOG(TRACE, "flushed %d messages", count); return !TAILQ_EMPTY(&conn->wreqs); } From af7cd1ac902da7ad50eff169f975783b4aa5e825 Mon Sep 17 00:00:00 2001 From: Eugene K Date: Tue, 5 Jul 2022 09:48:45 -0400 Subject: [PATCH 2/2] close connection bridge if both sides sent EOF --- library/conn_bridge.c | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/library/conn_bridge.c b/library/conn_bridge.c index 7a1f16c1..86d1578e 100644 --- a/library/conn_bridge.c +++ b/library/conn_bridge.c @@ -29,6 +29,8 @@ struct fd_bridge_s { struct ziti_bridge_s { bool closed; + bool ziti_eof; + bool input_eof; ziti_connection conn; uv_stream_t *input; uv_stream_t *output; @@ -178,8 +180,14 @@ ssize_t on_ziti_data(ziti_connection conn, const uint8_t *data, ssize_t len) { uv_write(wr, br->output, &b, 1, on_output); return len; } else if (len == ZITI_EOF) { - NEWP(sr, uv_shutdown_t); - uv_shutdown(sr, br->output, on_shutdown); + br->ziti_eof = true; + if (br->input_eof) { + ZITI_LOG(VERBOSE, "both sides are EOF"); + close_bridge(br); + } else { + NEWP(sr, uv_shutdown_t); + uv_shutdown(sr, br->output, on_shutdown); + } } else { close_bridge(br); } @@ -208,7 +216,13 @@ void on_input(uv_stream_t *s, ssize_t len, const uv_buf_t *b) { if (len == UV_ENOBUFS) { ZITI_LOG(TRACE, "stalled"); } else if (len == UV_EOF) { - ziti_close_write(br->conn); + br->input_eof = true; + if (br->ziti_eof) { + ZITI_LOG(VERBOSE, "both sides are EOF"); + close_bridge(br); + } else { + ziti_close_write(br->conn); + } } else if (len < 0) { ZITI_LOG(WARN, "err = %zd", len); close_bridge(br);