Skip to content

Commit

Permalink
Merge pull request #1584 from openziti/fix-xgress-data-stall
Browse files Browse the repository at this point in the history
Fix stall when checking acks after checking window size. Fixes #1583
  • Loading branch information
plorenz authored Dec 13, 2023
2 parents 19b5c0e + 128b274 commit beb8daa
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ziti fabric raft remove-member ctrl3
* [Issue #462](https://github.com/openziti/sdk-golang/issues/462) - Allow refreshing a single service

* github.com/openziti/ziti: [v0.31.2 -> v0.31.3](https://github.com/openziti/ziti/compare/v0.31.2...v0.31.3)
* [Issue #1583](https://github.com/openziti/ziti/issues/1583) - xgress: Potential data stall due when processing acks after checking window size
* [Issue #1578](https://github.com/openziti/ziti/issues/1578) - Send BindSuccess notifications to SDK if supported
* [Issue #1544](https://github.com/openziti/ziti/issues/1544) - Support transfer raft leadership via REST
* [Issue #1543](https://github.com/openziti/ziti/issues/1543) - Support add/remove raft peer via REST
Expand Down
23 changes: 12 additions & 11 deletions router/xgress/link_send_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,27 +193,28 @@ func (buffer *LinkSendBuffer) run() {
defer retransmitTicker.Stop()

for {
// don't block when we're closing, since the only thing that should still be coming in is end-of-circuit
// if we're blocked, but empty, let one payload in to reduce the chances of a stall
if buffer.isBlocked() && !buffer.closeWhenEmpty.Load() && buffer.linkSendBufferSize != 0 {
buffered = nil
} else {
buffered = buffer.newlyBuffered
}

// bias acks by allowing 10 acks to be processed for every payload in
for i := 0; i < 10; i++ {
// bias acks, process all pending, since that should not block
processingAcks := true
for processingAcks {
select {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
case <-buffer.closeNotify:
buffer.close()
return
default:
i = 10
processingAcks = false
}
}

// don't block when we're closing, since the only thing that should still be coming in is end-of-circuit
// if we're blocked, but empty, let one payload in to reduce the chances of a stall
if buffer.isBlocked() && !buffer.closeWhenEmpty.Load() && buffer.linkSendBufferSize != 0 {
buffered = nil
} else {
buffered = buffer.newlyBuffered
}

select {
case inspectEvent := <-buffer.inspectRequests:
inspectEvent.handle(buffer)
Expand Down

0 comments on commit beb8daa

Please sign in to comment.