Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added synchronized access to listeners and global subscriptions #189

Merged
merged 10 commits into from
Sep 13, 2024
30 changes: 19 additions & 11 deletions Sources/PubNub/Helpers/WeakBox.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,43 +29,51 @@ final class WeakBox<Element>: Hashable where Element: AnyObject, Element: Hashab
}

struct WeakSet<Element> where Element: AnyObject, Element: Hashable {
private var elements: Set<WeakBox<Element>> = []
private var elements: Atomic<Set<WeakBox<Element>>> = Atomic([])

init(_ elements: [Element]) {
elements.forEach { self.elements.update(with: WeakBox($0)) }
self.elements.lockedWrite { [elements] currentValue in
elements.forEach { element in
currentValue.update(with: WeakBox(element))
}
}
}

// NSSet Operations
var allObjects: [Element] {
return elements.compactMap { $0.underlying }
return elements.lockedRead { $0.compactMap { $0.underlying } }
}

var count: Int {
return self.elements.count
elements.lockedRead { $0.count }
}

mutating func update(_ element: Element) {
elements.update(with: WeakBox(element))
elements.lockedWrite { [element] in
$0.update(with: WeakBox(element))
}
}

mutating func remove(_ element: Element) {
elements.remove(WeakBox(element))
elements.lockedWrite { [element] in
$0.remove(WeakBox(element))
}
}

mutating func removeAll() {
elements.removeAll()
elements.lockedWrite { $0 = Set<WeakBox<Element>>() }
}
}

extension WeakSet: Collection {
var startIndex: Set<WeakBox<Element>>.Index { return elements.startIndex }
var endIndex: Set<WeakBox<Element>>.Index { return elements.endIndex }
var startIndex: Set<WeakBox<Element>>.Index { return elements.lockedRead { $0.startIndex } }
var endIndex: Set<WeakBox<Element>>.Index { return elements.lockedRead { $0.endIndex } }

subscript(position: Set<WeakBox<Element>>.Index) -> Element? {
return elements[position].underlying
elements.lockedRead { $0[position].underlying }
}

func index(after index: Set<WeakBox<Element>>.Index) -> Set<WeakBox<Element>>.Index {
return elements.index(after: index)
elements.lockedRead { $0.index(after: index) }
}
}
51 changes: 37 additions & 14 deletions Sources/PubNub/Subscription/SubscriptionSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ class SubscriptionSession: EventEmitter, StatusEmitter {
return statusListener
}()

private var globalChannelSubscriptions: [String: Subscription] = [:]
private var globalGroupSubscriptions: [String: Subscription] = [:]
private var globalChannelSubscriptions: Atomic<[String: Subscription]> = Atomic([:])
private var globalGroupSubscriptions: Atomic<[String: Subscription]> = Atomic([:])
private let strategy: any SubscriptionSessionStrategy

init(
Expand Down Expand Up @@ -125,16 +125,29 @@ class SubscriptionSession: EventEmitter, StatusEmitter {
and: channelGroupSubscriptions,
at: cursor?.timetoken
)
for subscription in channelSubscriptions {
subscription.subscriptionNames.compactMap { $0 }.forEach {
globalChannelSubscriptions[$0] = subscription

let channelSubsToMerge = channelSubscriptions.reduce(
into: [String: Subscription]()
) { accumulatedValue, subscription in
subscription.subscriptionNames.forEach {
accumulatedValue[$0] = subscription
}
}
for subscription in channelGroupSubscriptions {
subscription.subscriptionNames.compactMap { $0 }.forEach {
globalGroupSubscriptions[$0] = subscription

let channelGroupSubsToMerge = channelGroupSubscriptions.reduce(
into: [String: Subscription]()
) { accumulatedValue, subscription in
subscription.subscriptionNames.forEach {
accumulatedValue[$0] = subscription
}
}

globalChannelSubscriptions.lockedWrite {
$0.merge(channelSubsToMerge) { _, new in new }
}
globalGroupSubscriptions.lockedWrite {
$0.merge(channelGroupSubsToMerge) { _, new in new }
}
}

// MARK: - Reconnect
Expand Down Expand Up @@ -163,15 +176,25 @@ class SubscriptionSession: EventEmitter, StatusEmitter {
presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName]
}
internalUnsubscribe(
from: globalChannelSubscriptions.compactMap { channelNamesToUnsubscribe.contains($0.key) ? $0.value : nil },
and: globalGroupSubscriptions.compactMap { groupNamesToUnsubscribe.contains($0.key) ? $0.value : nil },
from: globalChannelSubscriptions.lockedRead { $0.compactMap {
channelNamesToUnsubscribe.contains($0.key) ? $0.value : nil
} },
and: globalGroupSubscriptions.lockedRead { $0.compactMap {
groupNamesToUnsubscribe.contains($0.key) ? $0.value : nil
} },
presenceOnly: presenceOnly
)
channelNamesToUnsubscribe.forEach {
globalChannelSubscriptions.removeValue(forKey: $0)

globalChannelSubscriptions.lockedWrite { currentContainer in
channelNamesToUnsubscribe.forEach {
currentContainer.removeValue(forKey: $0)
}
}
groupNamesToUnsubscribe.forEach {
globalGroupSubscriptions.removeValue(forKey: $0)

globalGroupSubscriptions.lockedWrite { currentContainer in
groupNamesToUnsubscribe.forEach {
currentContainer.removeValue(forKey: $0)
}
}
}

Expand Down
Loading