diff --git a/Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift index c00360e1..f83a37fa 100644 --- a/Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift +++ b/Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift @@ -18,34 +18,47 @@ final class BoundedBufferStorage: Sendable where Base: Send func next() async -> Result? { return await withTaskCancellationHandler { - let (shouldSuspend, result) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result?) in + let action: BoundedBufferStateMachine.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in let action = stateMachine.next() switch action { case .startTask(let base): self.startTask(stateMachine: &stateMachine, base: base) - return (true, nil) + return nil + case .suspend: - return (true, nil) - case .returnResult(let producerContinuation, let result): - producerContinuation?.resume() - return (false, result) + return action + case .returnResult: + return action } } - if !shouldSuspend { - return result + switch action { + case .startTask: + // We are handling the startTask in the lock already because we want to avoid + // other inputs interleaving while starting the task + fatalError("Internal inconsistency") + + case .suspend: + break + + case .returnResult(let producerContinuation, let result): + producerContinuation?.resume() + return result + + case .none: + break } return await withUnsafeContinuation { (continuation: UnsafeContinuation?, Never>) in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.nextSuspended(continuation: continuation) - switch action { - case .none: - break - case .returnResult(let producerContinuation, let result): - producerContinuation?.resume() - continuation.resume(returning: result) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.nextSuspended(continuation: continuation) + } + switch action { + case .none: + break + case .returnResult(let producerContinuation, let result): + producerContinuation?.resume() + continuation.resume(returning: result) } } } onCancel: { @@ -68,15 +81,15 @@ final class BoundedBufferStorage: Sendable where Base: Send if shouldSuspend { await withUnsafeContinuation { (continuation: UnsafeContinuation) in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.producerSuspended(continuation: continuation) - - switch action { - case .none: - break - case .resumeProducer: - continuation.resume() - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.producerSuspended(continuation: continuation) + } + + switch action { + case .none: + break + case .resumeProducer: + continuation.resume() } } } @@ -86,35 +99,35 @@ final class BoundedBufferStorage: Sendable where Base: Send break loop } - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.elementProduced(element: element) - switch action { - case .none: - break - case .resumeConsumer(let continuation, let result): - continuation.resume(returning: result) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.elementProduced(element: element) } - } - - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.finish(error: nil) switch action { case .none: break - case .resumeConsumer(let continuation): - continuation?.resume(returning: nil) + case .resumeConsumer(let continuation, let result): + continuation.resume(returning: result) } } + + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.finish(error: nil) + } + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: nil) + } } catch { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.finish(error: error) - switch action { - case .none: - break - case .resumeConsumer(let continuation): - continuation?.resume(returning: .failure(error)) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.finish(error: error) + } + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: .failure(error)) } } } @@ -123,16 +136,16 @@ final class BoundedBufferStorage: Sendable where Base: Send } func interrupted() { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.interrupted() - switch action { - case .none: - break - case .resumeProducerAndConsumer(let task, let producerContinuation, let consumerContinuation): - task.cancel() - producerContinuation?.resume() - consumerContinuation?.resume(returning: nil) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.interrupted() + } + switch action { + case .none: + break + case .resumeProducerAndConsumer(let task, let producerContinuation, let consumerContinuation): + task.cancel() + producerContinuation?.resume() + consumerContinuation?.resume(returning: nil) } } diff --git a/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift index 59b02810..b63b261f 100644 --- a/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift +++ b/Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift @@ -19,32 +19,41 @@ final class UnboundedBufferStorage: Sendable where Base: Se func next() async -> Result? { return await withTaskCancellationHandler { - let (shouldSuspend, result) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result?) in + let action: UnboundedBufferStateMachine.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in let action = stateMachine.next() switch action { case .startTask(let base): self.startTask(stateMachine: &stateMachine, base: base) - return (true, nil) + return nil case .suspend: - return (true, nil) - case .returnResult(let result): - return (false, result) + return action + case .returnResult: + return action } } - if !shouldSuspend { - return result + switch action { + case .startTask: + // We are handling the startTask in the lock already because we want to avoid + // other inputs interleaving while starting the task + fatalError("Internal inconsistency") + case .suspend: + break + case .returnResult(let result): + return result + case .none: + break } return await withUnsafeContinuation { (continuation: UnsafeContinuation?, Never>) in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.nextSuspended(continuation: continuation) - switch action { - case .none: - break - case .resumeConsumer(let result): - continuation.resume(returning: result) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.nextSuspended(continuation: continuation) + } + switch action { + case .none: + break + case .resumeConsumer(let result): + continuation.resume(returning: result) } } } onCancel: { @@ -59,35 +68,35 @@ final class UnboundedBufferStorage: Sendable where Base: Se let task = Task { do { for try await element in base { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.elementProduced(element: element) - switch action { - case .none: - break - case .resumeConsumer(let continuation, let result): - continuation.resume(returning: result) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.elementProduced(element: element) } - } - - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.finish(error: nil) switch action { case .none: break - case .resumeConsumer(let continuation): - continuation?.resume(returning: nil) + case .resumeConsumer(let continuation, let result): + continuation.resume(returning: result) } } + + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.finish(error: nil) + } + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: nil) + } } catch { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.finish(error: error) - switch action { - case .none: - break - case .resumeConsumer(let continuation): - continuation?.resume(returning: .failure(error)) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.finish(error: error) + } + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: .failure(error)) } } } @@ -96,15 +105,15 @@ final class UnboundedBufferStorage: Sendable where Base: Se } func interrupted() { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.interrupted() - switch action { - case .none: - break - case .resumeConsumer(let task, let continuation): - task.cancel() - continuation?.resume(returning: nil) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.interrupted() + } + switch action { + case .none: + break + case .resumeConsumer(let task, let continuation): + task.cancel() + continuation?.resume(returning: nil) } } diff --git a/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift b/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift index 12b5ba72..0fb67818 100644 --- a/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift +++ b/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift @@ -25,21 +25,17 @@ struct ChannelStorage: Sendable { func send(element: Element) async { // check if a suspension is needed - let shouldExit = self.stateMachine.withCriticalRegion { stateMachine -> Bool in - let action = stateMachine.send() - - switch action { - case .suspend: - // the element has not been delivered because no consumer available, we must suspend - return false - case .resumeConsumer(let continuation): - continuation?.resume(returning: element) - return true - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.send() } - if shouldExit { - return + switch action { + case .suspend: + break + + case .resumeConsumer(let continuation): + continuation?.resume(returning: element) + return } let producerID = self.generateId() @@ -47,103 +43,100 @@ struct ChannelStorage: Sendable { await withTaskCancellationHandler { // a suspension is needed await withUnsafeContinuation { (continuation: UnsafeContinuation) in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.sendSuspended(continuation: continuation, element: element, producerID: producerID) - - switch action { - case .none: - break - case .resumeProducer: - continuation.resume() - case .resumeProducerAndConsumer(let consumerContinuation): - continuation.resume() - consumerContinuation?.resume(returning: element) - } + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.sendSuspended(continuation: continuation, element: element, producerID: producerID) } - } - } onCancel: { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.sendCancelled(producerID: producerID) switch action { case .none: break - case .resumeProducer(let continuation): - continuation?.resume() + case .resumeProducer: + continuation.resume() + case .resumeProducerAndConsumer(let consumerContinuation): + continuation.resume() + consumerContinuation?.resume(returning: element) } } - } - } - - func finish(error: Failure? = nil) { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.finish(error: error) + } onCancel: { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.sendCancelled(producerID: producerID) + } switch action { case .none: break - case .resumeProducersAndConsumers(let producerContinuations, let consumerContinuations): - producerContinuations.forEach { $0?.resume() } - if let error { - consumerContinuations.forEach { $0?.resume(throwing: error) } - } else { - consumerContinuations.forEach { $0?.resume(returning: nil) } - } + case .resumeProducer(let continuation): + continuation?.resume() } } } - func next() async throws -> Element? { - let (shouldExit, result) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result?) in - let action = stateMachine.next() + func finish(error: Failure? = nil) { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.finish(error: error) + } - switch action { - case .suspend: - return (false, nil) - case .resumeProducer(let producerContinuation, let result): - producerContinuation?.resume() - return (true, result) - } + switch action { + case .none: + break + case .resumeProducersAndConsumers(let producerContinuations, let consumerContinuations): + producerContinuations.forEach { $0?.resume() } + if let error { + consumerContinuations.forEach { $0?.resume(throwing: error) } + } else { + consumerContinuations.forEach { $0?.resume(returning: nil) } + } } + } - if shouldExit { - return try result?._rethrowGet() + func next() async throws -> Element? { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.next() + } + + switch action { + case .suspend: + break + + case .resumeProducer(let producerContinuation, let result): + producerContinuation?.resume() + return try result._rethrowGet() } let consumerID = self.generateId() return try await withTaskCancellationHandler { try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.nextSuspended( + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.nextSuspended( continuation: continuation, consumerID: consumerID ) - - switch action { - case .none: - break - case .resumeConsumer(let element): - continuation.resume(returning: element) - case .resumeConsumerWithError(let error): - continuation.resume(throwing: error) - case .resumeProducerAndConsumer(let producerContinuation, let element): - producerContinuation?.resume() - continuation.resume(returning: element) - } } - } - } onCancel: { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.nextCancelled(consumerID: consumerID) switch action { case .none: break - case .resumeConsumer(let continuation): - continuation?.resume(returning: nil) + case .resumeConsumer(let element): + continuation.resume(returning: element) + case .resumeConsumerWithError(let error): + continuation.resume(throwing: error) + case .resumeProducerAndConsumer(let producerContinuation, let element): + producerContinuation?.resume() + continuation.resume(returning: element) } } + } onCancel: { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.nextCancelled(consumerID: consumerID) + } + + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: nil) + } } } } diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift index d3b67404..0d97adea 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift @@ -48,7 +48,7 @@ final class CombineLatestStorage< func next() async rethrows -> (Base1.Element, Base2.Element, Base3.Element?)? { try await withTaskCancellationHandler { let result = await withUnsafeContinuation { continuation in - self.stateMachine.withCriticalRegion { stateMachine in + let action: StateMachine.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in let action = stateMachine.next(for: continuation) switch action { case .startTask(let base1, let base2, let base3): @@ -60,45 +60,65 @@ final class CombineLatestStorage< base3: base3, downstreamContinuation: continuation ) + return nil - case .resumeContinuation(let downstreamContinuation, let result): - downstreamContinuation.resume(returning: result) + case .resumeContinuation: + return action - case .resumeUpstreamContinuations(let upstreamContinuations): - // bases can be iterated over for 1 iteration so their next value can be retrieved - upstreamContinuations.forEach { $0.resume() } + case .resumeUpstreamContinuations: + return action - case .resumeDownstreamContinuationWithNil(let continuation): - // the async sequence is already finished, immediately resuming - continuation.resume(returning: .success(nil)) + case .resumeDownstreamContinuationWithNil: + return action } } + + switch action { + case .startTask: + // We are handling the startTask in the lock already because we want to avoid + // other inputs interleaving while starting the task + fatalError("Internal inconsistency") + + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .resumeUpstreamContinuations(let upstreamContinuations): + // bases can be iterated over for 1 iteration so their next value can be retrieved + upstreamContinuations.forEach { $0.resume() } + + case .resumeDownstreamContinuationWithNil(let continuation): + // the async sequence is already finished, immediately resuming + continuation.resume(returning: .success(nil)) + + case .none: + break + } } return try result._rethrowGet() } onCancel: { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.cancelled() + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.cancelled() + } - switch action { - case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( - let downstreamContinuation, - let task, - let upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() - downstreamContinuation.resume(returning: .success(nil)) + downstreamContinuation.resume(returning: .success(nil)) - case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() - case .none: - break - } + case .none: + break } } } @@ -124,33 +144,33 @@ final class CombineLatestStorage< // element from upstream. This continuation is only resumed // if the downstream consumer called `next` to signal his demand. try await withUnsafeThrowingContinuation { continuation in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.childTaskSuspended(baseIndex: 0, continuation: continuation) + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.childTaskSuspended(baseIndex: 0, continuation: continuation) + } - switch action { - case .resumeContinuation(let upstreamContinuation): - upstreamContinuation.resume() + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() - case .resumeContinuationWithError(let upstreamContinuation, let error): - upstreamContinuation.resume(throwing: error) + case .resumeContinuationWithError(let upstreamContinuation, let error): + upstreamContinuation.resume(throwing: error) - case .none: - break - } + case .none: + break } } if let element1 = try await base1Iterator.next() { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.elementProduced((element1, nil, nil)) + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.elementProduced((element1, nil, nil)) + } - switch action { - case .resumeContinuation(let downstreamContinuation, let result): - downstreamContinuation.resume(returning: result) + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) - case .none: - break - } + case .none: + break } } else { let action = self.stateMachine.withCriticalRegion { stateMachine in @@ -191,33 +211,33 @@ final class CombineLatestStorage< // element from upstream. This continuation is only resumed // if the downstream consumer called `next` to signal his demand. try await withUnsafeThrowingContinuation { continuation in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.childTaskSuspended(baseIndex: 1, continuation: continuation) + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.childTaskSuspended(baseIndex: 1, continuation: continuation) + } - switch action { - case .resumeContinuation(let upstreamContinuation): - upstreamContinuation.resume() + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() - case .resumeContinuationWithError(let upstreamContinuation, let error): - upstreamContinuation.resume(throwing: error) + case .resumeContinuationWithError(let upstreamContinuation, let error): + upstreamContinuation.resume(throwing: error) - case .none: - break - } + case .none: + break } } if let element2 = try await base1Iterator.next() { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.elementProduced((nil, element2, nil)) + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.elementProduced((nil, element2, nil)) + } - switch action { - case .resumeContinuation(let downstreamContinuation, let result): - downstreamContinuation.resume(returning: result) + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) - case .none: - break - } + case .none: + break } } else { let action = self.stateMachine.withCriticalRegion { stateMachine in @@ -259,33 +279,33 @@ final class CombineLatestStorage< // element from upstream. This continuation is only resumed // if the downstream consumer called `next` to signal his demand. try await withUnsafeThrowingContinuation { continuation in - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.childTaskSuspended(baseIndex: 2, continuation: continuation) + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.childTaskSuspended(baseIndex: 2, continuation: continuation) + } - switch action { - case .resumeContinuation(let upstreamContinuation): - upstreamContinuation.resume() + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() - case .resumeContinuationWithError(let upstreamContinuation, let error): - upstreamContinuation.resume(throwing: error) + case .resumeContinuationWithError(let upstreamContinuation, let error): + upstreamContinuation.resume(throwing: error) - case .none: - break - } + case .none: + break } } if let element3 = try await base1Iterator.next() { - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.elementProduced((nil, nil, element3)) + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.elementProduced((nil, nil, element3)) + } - switch action { - case .resumeContinuation(let downstreamContinuation, let result): - downstreamContinuation.resume(returning: result) + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) - case .none: - break - } + case .none: + break } } else { let action = self.stateMachine.withCriticalRegion { stateMachine in @@ -323,28 +343,29 @@ final class CombineLatestStorage< do { try await group.next() } catch { - // One of the upstream sequences threw an error - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.upstreamThrew(error) - switch action { - case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( - let downstreamContinuation, - let error, - let task, - let upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - downstreamContinuation.resume(returning: .failure(error)) - case .none: - break - } - } + // One of the upstream sequences threw an error + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamThrew(error) + } + + switch action { + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let error, + let task, + let upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + downstreamContinuation.resume(returning: .failure(error)) + case .none: + break + } - group.cancelAll() + group.cancelAll() } } } diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift index 1c223143..1839e334 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -55,36 +55,59 @@ final class DebounceStorage: Sendable // We always suspend since we can never return an element right away let result: Result = await withUnsafeContinuation { continuation in - self.stateMachine.withCriticalRegion { - let action = $0.next(for: continuation) - - switch action { - case .startTask(let base): - self.startTask( - stateMachine: &$0, - base: base, - downstreamContinuation: continuation - ) - - case .resumeUpstreamContinuation(let upstreamContinuation): - // This is signalling the upstream task that is consuming the upstream - // sequence to signal demand. - upstreamContinuation?.resume(returning: ()) - - case .resumeUpstreamAndClockContinuation(let upstreamContinuation, let clockContinuation, let deadline): - // This is signalling the upstream task that is consuming the upstream - // sequence to signal demand and start the clock task. - upstreamContinuation?.resume(returning: ()) - clockContinuation?.resume(returning: deadline) - - case .resumeDownstreamContinuationWithNil(let continuation): - continuation.resume(returning: .success(nil)) - - case .resumeDownstreamContinuationWithError(let continuation, let error): - continuation.resume(returning: .failure(error)) - } + let action: DebounceStateMachine.NextAction? = self.stateMachine.withCriticalRegion { + let action = $0.next(for: continuation) + + switch action { + case .startTask(let base): + self.startTask( + stateMachine: &$0, + base: base, + downstreamContinuation: continuation + ) + return nil + + case .resumeUpstreamContinuation: + return action + + case .resumeUpstreamAndClockContinuation: + return action + + case .resumeDownstreamContinuationWithNil: + return action + + case .resumeDownstreamContinuationWithError: + return action } - } + } + + switch action { + case .startTask: + // We are handling the startTask in the lock already because we want to avoid + // other inputs interleaving while starting the task + fatalError("Internal inconsistency") + + case .resumeUpstreamContinuation(let upstreamContinuation): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand. + upstreamContinuation?.resume(returning: ()) + + case .resumeUpstreamAndClockContinuation(let upstreamContinuation, let clockContinuation, let deadline): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand and start the clock task. + upstreamContinuation?.resume(returning: ()) + clockContinuation?.resume(returning: deadline) + + case .resumeDownstreamContinuationWithNil(let continuation): + continuation.resume(returning: .success(nil)) + + case .resumeDownstreamContinuationWithError(let continuation, let error): + continuation.resume(returning: .failure(error)) + + case .none: + break + } + } return try result._rethrowGet() } onCancel: { @@ -258,37 +281,38 @@ final class DebounceStorage: Sendable do { try await group.next() } catch { - // One of the upstream sequences threw an error - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.upstreamThrew(error) - switch action { - case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( - let downstreamContinuation, - let error, - let task, - let upstreamContinuation, - let clockContinuation - ): - upstreamContinuation?.resume(throwing: CancellationError()) - clockContinuation?.resume(throwing: CancellationError()) - - task.cancel() - - downstreamContinuation.resume(returning: .failure(error)) - - case .cancelTaskAndClockContinuation( - let task, - let clockContinuation - ): - clockContinuation?.resume(throwing: CancellationError()) - task.cancel() - case .none: - break - } + // One of the upstream sequences threw an error + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamThrew(error) } - group.cancelAll() + switch action { + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + let downstreamContinuation, + let error, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + downstreamContinuation.resume(returning: .failure(error)) + + case .cancelTaskAndClockContinuation( + let task, + let clockContinuation + ): + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + case .none: + break + } } + + group.cancelAll() } } } diff --git a/Sources/AsyncAlgorithms/Zip/ZipStorage.swift b/Sources/AsyncAlgorithms/Zip/ZipStorage.swift index 7d971a78..93a3466c 100644 --- a/Sources/AsyncAlgorithms/Zip/ZipStorage.swift +++ b/Sources/AsyncAlgorithms/Zip/ZipStorage.swift @@ -39,7 +39,7 @@ final class ZipStorage (Base1.Element, Base2.Element, Base3.Element?)? { try await withTaskCancellationHandler { let result = await withUnsafeContinuation { continuation in - self.stateMachine.withCriticalRegion { stateMachine in + let action: StateMachine.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in let action = stateMachine.next(for: continuation) switch action { case .startTask(let base1, let base2, let base3): @@ -51,42 +51,59 @@ final class ZipStorage