From 128b274f088212ba8f0149f0fe807b3f243aa420 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Wed, 13 Dec 2023 12:36:40 -0500 Subject: [PATCH] Fix stall when checking acks after checking window size. Fixes #1583 --- CHANGELOG.md | 1 + router/xgress/link_send_buffer.go | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 524d38d78..b9fb57480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ ziti fabric raft remove-member ctrl3 * [Issue #462](https://github.com/openziti/sdk-golang/issues/462) - Allow refreshing a single service * github.com/openziti/ziti: [v0.31.2 -> v0.31.3](https://github.com/openziti/ziti/compare/v0.31.2...v0.31.3) + * [Issue #1583](https://github.com/openziti/ziti/issues/1583) - xgress: Potential data stall due when processing acks after checking window size * [Issue #1578](https://github.com/openziti/ziti/issues/1578) - Send BindSuccess notifications to SDK if supported * [Issue #1544](https://github.com/openziti/ziti/issues/1544) - Support transfer raft leadership via REST * [Issue #1543](https://github.com/openziti/ziti/issues/1543) - Support add/remove raft peer via REST diff --git a/router/xgress/link_send_buffer.go b/router/xgress/link_send_buffer.go index 9f7c751ab..4a807df4e 100644 --- a/router/xgress/link_send_buffer.go +++ b/router/xgress/link_send_buffer.go @@ -193,16 +193,9 @@ func (buffer *LinkSendBuffer) run() { defer retransmitTicker.Stop() for { - // don't block when we're closing, since the only thing that should still be coming in is end-of-circuit - // if we're blocked, but empty, let one payload in to reduce the chances of a stall - if buffer.isBlocked() && !buffer.closeWhenEmpty.Load() && buffer.linkSendBufferSize != 0 { - buffered = nil - } else { - buffered = buffer.newlyBuffered - } - - // bias acks by allowing 10 acks to be processed for every payload in - for i := 0; i < 10; i++ { + // bias acks, process all pending, since that should not block + processingAcks := true + for processingAcks { select { case ack := <-buffer.newlyReceivedAcks: buffer.receiveAcknowledgement(ack) @@ -210,10 +203,18 @@ func (buffer *LinkSendBuffer) run() { buffer.close() return default: - i = 10 + processingAcks = false } } + // don't block when we're closing, since the only thing that should still be coming in is end-of-circuit + // if we're blocked, but empty, let one payload in to reduce the chances of a stall + if buffer.isBlocked() && !buffer.closeWhenEmpty.Load() && buffer.linkSendBufferSize != 0 { + buffered = nil + } else { + buffered = buffer.newlyBuffered + } + select { case inspectEvent := <-buffer.inspectRequests: inspectEvent.handle(buffer)