Skip to content

Commit

Permalink
- improved CoScope performance
Browse files Browse the repository at this point in the history
- fixed bugs
  • Loading branch information
Alex Belozierov committed May 29, 2020
1 parent e0d1d60 commit c038275
Show file tree
Hide file tree
Showing 67 changed files with 855 additions and 254 deletions.
Binary file added ._SwiftCoroutine.xcodeproj
Binary file not shown.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ DispatchQueue.main.startCoroutine {
### Requirements

- iOS 10+ / macOS 10.12+ / Ubuntu
- Xcode 10.2+
- Swift 5+
- Xcode 10.4+
- Swift 5.2+

### Installation

Expand Down
26 changes: 23 additions & 3 deletions Sources/SwiftCoroutine/CoChannel/CoChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class CoChannel<Element> {
public let maxBufferSize: Int
private var receiveCallbacks = FifoQueue<ReceiveCallback>()
private var sendBlocks = FifoQueue<SendBlock>()
private var cancelBlocks = CallbackStack<Void>()
private var completeBlocks = CallbackStack<CoChannelError?>()
private var atomic = AtomicTuple()

/// Initializes a channel.
Expand Down Expand Up @@ -255,14 +255,34 @@ public final class CoChannel<Element> {
atomic.value.1 == 2
}

/// Adds an observer callback that is called when the `CoChannel` is canceled.
/// - Parameter callback: The callback that is called when the `CoChannel` is canceled.
public func whenCanceled(_ callback: @escaping () -> Void) {
whenFinished { if $0 == .canceled { callback() } }
}

// MARK: - complete

/// Adds an observer callback that is called when the `CoChannel` is completed (closed, canceled or deinited).
/// - Parameter callback: The callback that is called when the `CoChannel` is completed.
public func whenComplete(_ callback: @escaping () -> Void) {
if !cancelBlocks.append(callback) { callback() }
whenFinished { _ in callback() }
}

private func whenFinished(_ callback: @escaping (CoChannelError?) -> Void) {
if !completeBlocks.append(callback) { callback(channelError) }
}

private func finish() {
cancelBlocks.close()?.finish(with: ())
completeBlocks.close()?.finish(with: channelError)
}

private var channelError: CoChannelError? {
switch atomic.value.1 {
case 1: return .closed
case 2: return .canceled
default: return nil
}
}

deinit {
Expand Down
8 changes: 8 additions & 0 deletions Sources/SwiftCoroutine/CoChannel/CoChannel1+Receiver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ extension CoChannel {
/// Returns `true` if the channel is canceled.
public var isCanceled: Bool { true }

/// Adds an observer callback that is called when the `CoChannel` is canceled.
/// - Parameter callback: The callback that is called when the `CoChannel` is canceled.
public func whenCanceled(_ callback: @escaping () -> Void) {
callback()
}

// MARK: - complete

/// Adds an observer callback that is called when the `CoChannel` is completed (closed, canceled or deinited).
/// - Parameter callback: The callback that is called when the `CoChannel` is completed.
public func whenComplete(_ callback: @escaping () -> Void) {
Expand Down
8 changes: 8 additions & 0 deletions Sources/SwiftCoroutine/CoChannel/CoChannel2+Sender.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ extension CoChannel.Sender {
channel.isCanceled
}

/// Adds an observer callback that is called when the `CoChannel` is canceled.
/// - Parameter callback: The callback that is called when the `CoChannel` is canceled.
public func whenCanceled(_ callback: @escaping () -> Void) {
channel.whenCanceled(callback)
}

// MARK: - complete

/// Adds an observer callback that is called when the `CoChannel` is completed (closed, canceled or deinited).
/// - Parameter callback: The callback that is called when the `CoChannel` is completed.
@inlinable public func whenComplete(_ callback: @escaping () -> Void) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ internal final class CoChannelReceiver<T>: CoChannel<T>.Receiver {
channel.whenComplete(callback)
}

internal override func whenCanceled(_ callback: @escaping () -> Void) {
channel.whenCanceled(callback)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal protocol CoChannelReceiverProtocol {
func cancel()
var isCanceled: Bool { get }
func whenComplete(_ callback: @escaping () -> Void)
func whenCanceled(_ callback: @escaping () -> Void)
var maxBufferSize: Int { get }

}
Expand Down
110 changes: 26 additions & 84 deletions Sources/SwiftCoroutine/CoScope/CoScope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,121 +36,63 @@
///
public final class CoScope {

private typealias Block = () -> Void

private var items = [Int: Block]()
private var callbacks = [Block]()
private var state = Int.free
private var keyCounter = 0
internal typealias Completion = () -> Void
private var storage = Storage<Completion>()
private var callbacks = CallbackStack<Void>()

/// Initializes a scope.
public init() {}

/// Adds weak referance of `CoCancellable` to be canceled when the scope is being canceled or deinited.
/// - Parameter item: `CoCancellable` to add.
@inlinable public func add(_ item: CoCancellable) {
item.whenComplete(add { [weak item] in item?.cancel() })
}

@usableFromInline internal func add(_ cancel: @escaping () -> Void) -> () -> Void {
let key = atomicAdd(&keyCounter, value: 1)
addItem(for: key, block: cancel)
return { [weak self] in self?.removeItem(for: key) }
}

/// Returns `true` if the scope is empty (contains no `CoCancellable`).
public var isEmpty: Bool {
if isCanceled { return true }
var isEmpty = true
locked { isEmpty = items.isEmpty }
return isEmpty
}

// MARK: - items

private func addItem(for key: Int, block: @escaping Block) {
if !locked({ items[key] = block }) { block() }
public func add(_ item: CoCancellable) {
add { [weak item] in item?.cancel() }.map(item.whenComplete)
}

private func removeItem(for key: Int) {
locked { items[key] = nil }
internal func add(_ cancel: @escaping () -> Void) -> Completion? {
if isCanceled { cancel(); return nil }
let key = storage.append(cancel)
if isCanceled { storage.remove(key)?(); return nil }
return { [weak self] in self?.remove(key: key) }
}

// MARK: - lock

@discardableResult private func locked(_ block: Block) -> Bool {
while true {
switch state {
case .free:
if atomicCAS(&state, expected: .free, desired: .busy) {
block()
unlock()
return true
}
case .busy:
continue
default:
return false
}
}
private func remove(key: Storage<Completion>.Index) {
storage.remove(key)
}

private func unlock() {
while true {
switch state {
case .busy:
if atomicCAS(&state, expected: .busy, desired: .free) { return }
default:
return completeAll()
}
}
/// Returns `true` if the scope is empty (contains no `CoCancellable`).
public var isEmpty: Bool {
isCanceled || storage.isEmpty
}

// MARK: - cancel

/// Returns `true` if the scope is canceled.
public var isCanceled: Bool {
state == .canceled
callbacks.isClosed
}

/// Cancels the scope and all `CoCancellable` that it contains.
public func cancel() {
while true {
switch state {
case .free:
if !atomicCAS(&state, expected: .free, desired: .canceled) { continue }
return completeAll()
case .busy:
if atomicCAS(&state, expected: .busy, desired: .canceled) { return }
default:
return
}
}
if isCanceled { return }
completeAll()
storage.removeAll()
}

private func completeAll() {
callbacks.close()?.finish(with: ())
storage.forEach { $0() }
}

/// Adds an observer callback that is called when the `CoScope` is canceled or deinited.
/// - Parameter callback: The callback that is called when the scope is canceled or deinited.
public func whenComplete(_ callback: @escaping () -> Void) {
if !locked({ callbacks.append(callback) }) { callback() }
}

private func completeAll() {
items.values.forEach { $0() }
items.removeAll()
callbacks.forEach { $0() }
callbacks.removeAll()
if !callbacks.append(callback) { callback() }
}

deinit {
if !isCanceled { completeAll() }
storage.free()
}

}

fileprivate extension Int {

static let canceled = -1
static let free = 0
static let busy = 1

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ extension CoroutineScheduler {
public func startCoroutine(in scope: CoScope? = nil, task: @escaping () throws -> Void) {
guard let scope = scope else { return _startCoroutine { try? task() } }
_startCoroutine {
let coroutine = try? Coroutine.current()
let finish = scope.add { [weak coroutine] in coroutine?.cancel() }
if !scope.isCanceled { try? task() }
finish()
guard let coroutine = try? Coroutine.current(),
let completion = scope.add(coroutine.cancel) else { return }
try? task()
completion()
}
}

Expand Down Expand Up @@ -113,8 +113,9 @@ extension CoroutineScheduler {
@inlinable public func coroutineFuture<T>(_ task: @escaping () throws -> T) -> CoFuture<T> {
let promise = CoPromise<T>()
_startCoroutine {
let current = try? Coroutine.current()
promise.whenCanceled { current?.cancel() }
if let coroutine = try? Coroutine.current() {
promise.whenCanceled(coroutine.cancel)
}
if promise.isCanceled { return }
promise.complete(with: Result(catching: task))
}
Expand Down Expand Up @@ -165,8 +166,9 @@ extension CoroutineScheduler {
@inlinable public func actor<T>(of type: T.Type = T.self, maxBufferSize: Int = .max, body: @escaping (CoChannel<T>.Receiver) throws -> Void) -> CoChannel<T>.Sender {
let (receiver, sender) = CoChannel<T>(maxBufferSize: maxBufferSize).pair
_startCoroutine {
let coroutine = try? Coroutine.current()
receiver.whenComplete { coroutine?.cancel() }
if let coroutine = try? Coroutine.current() {
receiver.whenCanceled { [weak coroutine] in coroutine?.cancel() }
}
if receiver.isCanceled { return }
try? body(receiver)
}
Expand Down
Binary file not shown.
Loading

0 comments on commit c038275

Please sign in to comment.