Skip to content

Commit

Permalink
Slow fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kabiroberai committed Oct 16, 2024
1 parent d30efd3 commit ce6b0e7
Showing 1 changed file with 29 additions and 34 deletions.
63 changes: 29 additions & 34 deletions Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ final class CurrentValueRelay<Output>: Publisher {
typealias Failure = Never

private var currentValue: Output
private let lock: os_unfair_lock_t
private let lock: NSRecursiveLock
private var subscriptions = ContiguousArray<Subscription>()

var value: Output {
Expand All @@ -15,13 +15,7 @@ final class CurrentValueRelay<Output>: Publisher {

init(_ value: Output) {
self.currentValue = value
self.lock = os_unfair_lock_t.allocate(capacity: 1)
self.lock.initialize(to: os_unfair_lock())
}

deinit {
self.lock.deinitialize(count: 1)
self.lock.deallocate()
self.lock = NSRecursiveLock()
}

func receive(subscriber: some Subscriber<Output, Never>) {
Expand All @@ -36,7 +30,7 @@ final class CurrentValueRelay<Output>: Publisher {
self.lock.sync {
self.currentValue = value
}
for subscription in self.lock.sync({ self.subscriptions }) {
for subscription in self.lock.sync(work: { self.subscriptions }) {
subscription.receive(value)
}
}
Expand All @@ -52,27 +46,28 @@ final class CurrentValueRelay<Output>: Publisher {

extension CurrentValueRelay {
fileprivate final class Subscription: Combine.Subscription, Equatable {
private var demand = Subscribers.Demand.none
private var downstream: (any Subscriber<Output, Never>)?
private let lock: os_unfair_lock_t
private var _demand = Subscribers.Demand.none

private var _downstream: (any Subscriber<Output, Never>)?
var downstream: (any Subscriber<Output, Never>)? {
var downstream: (any Subscriber<Output, Never>)?
self.lock.sync { downstream = _downstream }
return downstream
}

private let lock: NSRecursiveLock
private var receivedLastValue = false
private var upstream: CurrentValueRelay?

init(upstream: CurrentValueRelay, downstream: any Subscriber<Output, Never>) {
self.upstream = upstream
self.downstream = downstream
self.lock = os_unfair_lock_t.allocate(capacity: 1)
self.lock.initialize(to: os_unfair_lock())
}

deinit {
self.lock.deinitialize(count: 1)
self.lock.deallocate()
self._downstream = downstream
self.lock = upstream.lock
}

func cancel() {
self.lock.sync {
self.downstream = nil
self._downstream = nil
self.upstream?.remove(self)
self.upstream = nil
}
Expand All @@ -81,24 +76,24 @@ extension CurrentValueRelay {
func receive(_ value: Output) {
guard let downstream else { return }

switch self.demand {
self.lock.lock()
switch self._demand {
case .unlimited:
self.lock.unlock()
// NB: Adding to unlimited demand has no effect and can be ignored.
_ = downstream.receive(value)

case .none:
self.lock.sync {
self.receivedLastValue = false
}
self.receivedLastValue = false
self.lock.unlock()

default:
self.lock.sync {
self.receivedLastValue = true
self.demand -= 1
}
self.receivedLastValue = true
self._demand -= 1
self.lock.unlock()
let moreDemand = downstream.receive(value)
self.lock.sync {
self.demand += moreDemand
self._demand += moreDemand
}
}
}
Expand All @@ -109,7 +104,7 @@ extension CurrentValueRelay {
guard let downstream else { return }

self.lock.lock()
self.demand += demand
self._demand += demand

guard
!self.receivedLastValue,
Expand All @@ -121,18 +116,18 @@ extension CurrentValueRelay {

self.receivedLastValue = true

switch self.demand {
switch self._demand {
case .unlimited:
self.lock.unlock()
// NB: Adding to unlimited demand has no effect and can be ignored.
_ = downstream.receive(value)

default:
self.demand -= 1
self._demand -= 1
self.lock.unlock()
let moreDemand = downstream.receive(value)
self.lock.lock()
self.demand += moreDemand
self._demand += moreDemand
self.lock.unlock()
}
}
Expand Down

0 comments on commit ce6b0e7

Please sign in to comment.