Skip to content

Commit

Permalink
Merge pull request #424 from openziti/ziti-conn-disconnect-fail-write…
Browse files Browse the repository at this point in the history
…-req

fix: write requests are not processed on disconnect
  • Loading branch information
ekoby authored Jul 5, 2022
2 parents 068b528 + af7cd1a commit bb20e9a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
20 changes: 17 additions & 3 deletions library/conn_bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct fd_bridge_s {

struct ziti_bridge_s {
bool closed;
bool ziti_eof;
bool input_eof;
ziti_connection conn;
uv_stream_t *input;
uv_stream_t *output;
Expand Down Expand Up @@ -178,8 +180,14 @@ ssize_t on_ziti_data(ziti_connection conn, const uint8_t *data, ssize_t len) {
uv_write(wr, br->output, &b, 1, on_output);
return len;
} else if (len == ZITI_EOF) {
NEWP(sr, uv_shutdown_t);
uv_shutdown(sr, br->output, on_shutdown);
br->ziti_eof = true;
if (br->input_eof) {
ZITI_LOG(VERBOSE, "both sides are EOF");
close_bridge(br);
} else {
NEWP(sr, uv_shutdown_t);
uv_shutdown(sr, br->output, on_shutdown);
}
} else {
close_bridge(br);
}
Expand Down Expand Up @@ -208,7 +216,13 @@ void on_input(uv_stream_t *s, ssize_t len, const uv_buf_t *b) {
if (len == UV_ENOBUFS) {
ZITI_LOG(TRACE, "stalled");
} else if (len == UV_EOF) {
ziti_close_write(br->conn);
br->input_eof = true;
if (br->ziti_eof) {
ZITI_LOG(VERBOSE, "both sides are EOF");
close_bridge(br);
} else {
ziti_close_write(br->conn);
}
} else if (len < 0) {
ZITI_LOG(WARN, "err = %zd", len);
close_bridge(br);
Expand Down
34 changes: 17 additions & 17 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
static const char *INVALID_SESSION = "Invalid Session";
static const int MAX_CONNECT_RETRY = 3;

#define CONN_LOG(lvl, fmt, ...) ZITI_LOG(lvl, "conn[%u.%u] " fmt, conn->ziti_ctx->id, conn->conn_id, ##__VA_ARGS__)
#define CONN_LOG(lvl, fmt, ...) ZITI_LOG(lvl, "conn[%u.%u/%s] " fmt, conn->ziti_ctx->id, conn->conn_id, conn_state_str[conn->state], ##__VA_ARGS__)

#define conn_states(XX) \
XX(Initial)\
Expand Down Expand Up @@ -590,16 +590,6 @@ static void ziti_write_timeout(uv_timer_t *t) {
static void ziti_write_req(struct ziti_write_req_s *req) {
struct ziti_conn *conn = req->conn;

if (conn->state >= Timedout) {
CONN_LOG(WARN, "got write req in closed/disconnected sate");
conn->write_reqs--;

if (req->cb) {
req->cb(conn, ZITI_CONN_CLOSED, req->ctx);
}
free(req);
return;
}
if (req->eof) {
conn_set_state(conn, CloseWrite);
send_fin_message(conn);
Expand Down Expand Up @@ -767,20 +757,30 @@ static void flush_connection (ziti_connection conn) {

static bool flush_to_service(ziti_connection conn) {

// still connecting
if (conn->channel == NULL) { return false; }
if (conn->state < Connected || conn->state == Accepting) { return false; }

if (conn->state == Connected) {
int count = 0;
while (!TAILQ_EMPTY(&conn->wreqs)) {
struct ziti_write_req_s *req = TAILQ_FIRST(&conn->wreqs);
TAILQ_REMOVE(&conn->wreqs, req, _next);
int count = 0;
while (!TAILQ_EMPTY(&conn->wreqs)) {
struct ziti_write_req_s *req = TAILQ_FIRST(&conn->wreqs);
TAILQ_REMOVE(&conn->wreqs, req, _next);

if (conn->state == Connected) {
conn->write_reqs++;
ziti_write_req(req);
count++;
} else {
CONN_LOG(DEBUG, "got write req in invalid state[%s]", conn_state_str[conn->state]);
conn->write_reqs--;

if (req->cb) {
req->cb(conn, ZITI_INVALID_STATE, req->ctx);
}
free(req);
}
CONN_LOG(TRACE, "flushed %d messages", count);
}
CONN_LOG(TRACE, "flushed %d messages", count);

return !TAILQ_EMPTY(&conn->wreqs);
}
Expand Down

0 comments on commit bb20e9a

Please sign in to comment.