Skip to content

Commit

Permalink
Fix potential deadlocks when resuming a continuation while holding a …
Browse files Browse the repository at this point in the history
…lock (#303)

* Fix potential deadlocks in zip

* Fix debounce

* Fix combineLatest

* Fix Channel

* Fix buffer
  • Loading branch information
FranzBusch authored Nov 17, 2023
1 parent 5bbdcc1 commit da4e36f
Show file tree
Hide file tree
Showing 6 changed files with 553 additions and 476 deletions.
129 changes: 71 additions & 58 deletions Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,47 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send

func next() async -> Result<Base.Element, Error>? {
return await withTaskCancellationHandler {
let (shouldSuspend, result) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result<Base.Element, Error>?) in
let action: BoundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.next()
switch action {
case .startTask(let base):
self.startTask(stateMachine: &stateMachine, base: base)
return (true, nil)
return nil

case .suspend:
return (true, nil)
case .returnResult(let producerContinuation, let result):
producerContinuation?.resume()
return (false, result)
return action
case .returnResult:
return action
}
}

if !shouldSuspend {
return result
switch action {
case .startTask:
// We are handling the startTask in the lock already because we want to avoid
// other inputs interleaving while starting the task
fatalError("Internal inconsistency")

case .suspend:
break

case .returnResult(let producerContinuation, let result):
producerContinuation?.resume()
return result

case .none:
break
}

return await withUnsafeContinuation { (continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.nextSuspended(continuation: continuation)
switch action {
case .none:
break
case .returnResult(let producerContinuation, let result):
producerContinuation?.resume()
continuation.resume(returning: result)
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.nextSuspended(continuation: continuation)
}
switch action {
case .none:
break
case .returnResult(let producerContinuation, let result):
producerContinuation?.resume()
continuation.resume(returning: result)
}
}
} onCancel: {
Expand All @@ -68,15 +81,15 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send

if shouldSuspend {
await withUnsafeContinuation { (continuation: UnsafeContinuation<Void, Never>) in
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.producerSuspended(continuation: continuation)

switch action {
case .none:
break
case .resumeProducer:
continuation.resume()
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.producerSuspended(continuation: continuation)
}

switch action {
case .none:
break
case .resumeProducer:
continuation.resume()
}
}
}
Expand All @@ -86,35 +99,35 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
break loop
}

self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.elementProduced(element: element)
switch action {
case .none:
break
case .resumeConsumer(let continuation, let result):
continuation.resume(returning: result)
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.elementProduced(element: element)
}
}

self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.finish(error: nil)
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
case .resumeConsumer(let continuation, let result):
continuation.resume(returning: result)
}
}

let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: nil)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
}
} catch {
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.finish(error: error)
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: error)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
}
}
}
Expand All @@ -123,16 +136,16 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
}

func interrupted() {
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.interrupted()
switch action {
case .none:
break
case .resumeProducerAndConsumer(let task, let producerContinuation, let consumerContinuation):
task.cancel()
producerContinuation?.resume()
consumerContinuation?.resume(returning: nil)
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.interrupted()
}
switch action {
case .none:
break
case .resumeProducerAndConsumer(let task, let producerContinuation, let consumerContinuation):
task.cancel()
producerContinuation?.resume()
consumerContinuation?.resume(returning: nil)
}
}

Expand Down
101 changes: 55 additions & 46 deletions Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,41 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
func next() async -> Result<Base.Element, Error>? {
return await withTaskCancellationHandler {

let (shouldSuspend, result) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result<Base.Element, Error>?) in
let action: UnboundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.next()
switch action {
case .startTask(let base):
self.startTask(stateMachine: &stateMachine, base: base)
return (true, nil)
return nil
case .suspend:
return (true, nil)
case .returnResult(let result):
return (false, result)
return action
case .returnResult:
return action
}
}

if !shouldSuspend {
return result
switch action {
case .startTask:
// We are handling the startTask in the lock already because we want to avoid
// other inputs interleaving while starting the task
fatalError("Internal inconsistency")
case .suspend:
break
case .returnResult(let result):
return result
case .none:
break
}

return await withUnsafeContinuation { (continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.nextSuspended(continuation: continuation)
switch action {
case .none:
break
case .resumeConsumer(let result):
continuation.resume(returning: result)
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.nextSuspended(continuation: continuation)
}
switch action {
case .none:
break
case .resumeConsumer(let result):
continuation.resume(returning: result)
}
}
} onCancel: {
Expand All @@ -59,35 +68,35 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
let task = Task {
do {
for try await element in base {
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.elementProduced(element: element)
switch action {
case .none:
break
case .resumeConsumer(let continuation, let result):
continuation.resume(returning: result)
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.elementProduced(element: element)
}
}

self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.finish(error: nil)
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
case .resumeConsumer(let continuation, let result):
continuation.resume(returning: result)
}
}

let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: nil)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
}
} catch {
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.finish(error: error)
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: error)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
}
}
}
Expand All @@ -96,15 +105,15 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
}

func interrupted() {
self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.interrupted()
switch action {
case .none:
break
case .resumeConsumer(let task, let continuation):
task.cancel()
continuation?.resume(returning: nil)
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.interrupted()
}
switch action {
case .none:
break
case .resumeConsumer(let task, let continuation):
task.cancel()
continuation?.resume(returning: nil)
}
}

Expand Down
Loading

0 comments on commit da4e36f

Please sign in to comment.