From c7880b36ced3bb9d9318d66c8e2eef1a30996fe8 Mon Sep 17 00:00:00 2001 From: Matt <85322+mattmassicotte@users.noreply.github.com> Date: Wed, 6 Dec 2023 07:14:37 -0500 Subject: [PATCH] Add AsyncStream-based serial queue --- Sources/Queue/AsyncSerialQueue.swift | 70 ++++++++++++++++++++ Tests/QueueTests/AsyncSerialQueueTests.swift | 23 +++++++ 2 files changed, 93 insertions(+) create mode 100644 Sources/Queue/AsyncSerialQueue.swift create mode 100644 Tests/QueueTests/AsyncSerialQueueTests.swift diff --git a/Sources/Queue/AsyncSerialQueue.swift b/Sources/Queue/AsyncSerialQueue.swift new file mode 100644 index 0000000..9d7796c --- /dev/null +++ b/Sources/Queue/AsyncSerialQueue.swift @@ -0,0 +1,70 @@ +actor AsyncSerialQueue { + public actor QueueTask { + var cancelled = false + let operation: @Sendable () async throws -> Void + + init(operation: @escaping @Sendable () async throws -> Void) { + self.operation = operation + } + + public nonisolated func cancel() { + Task { await internalCancel() } + } + + private func internalCancel() { + cancelled = true + } + + func run() async throws { + if cancelled { + return + } + + try await operation() + } + } + + typealias Stream = AsyncStream + + private let continuation: Stream.Continuation + + public init() { + let (stream, continuation) = Stream.makeStream() + + self.continuation = continuation + + Task { + for await item in stream { + try? await item.run() + } + } + } + + deinit { + continuation.finish() + } + + /// Submit a throwing operation to the queue. + @discardableResult + public nonisolated func addOperation( + @_inheritActorContext operation: @escaping @Sendable () async throws -> Void + ) -> QueueTask { + let queueTask = QueueTask(operation: operation) + + continuation.yield(queueTask) + + return queueTask + } + + /// Submit an operation to the queue. + @discardableResult + public nonisolated func addOperation( + @_inheritActorContext operation: @escaping @Sendable () async -> Void + ) -> QueueTask { + let queueTask = QueueTask(operation: operation) + + continuation.yield(queueTask) + + return queueTask + } +} diff --git a/Tests/QueueTests/AsyncSerialQueueTests.swift b/Tests/QueueTests/AsyncSerialQueueTests.swift new file mode 100644 index 0000000..f177707 --- /dev/null +++ b/Tests/QueueTests/AsyncSerialQueueTests.swift @@ -0,0 +1,23 @@ +import XCTest +@testable import Queue + +final class AsyncSerialQueueTests: XCTestCase { + + func testEnqueuePerformance() { + let queue = AsyncSerialQueue() + + measure { + // techincally measuring the actor creation time too, but I don't think that is a big deal + let s = Counter() + + for i in 1 ... 1_000 { + queue.addOperation { [i] in + let result = await s.increment() + if i != result { + print(i, "does not match", result) + } + } + } + } + } +}