Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a new Listeners API #153

Merged
merged 12 commits into from
Feb 21, 2024
206 changes: 148 additions & 58 deletions PubNub.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

83 changes: 33 additions & 50 deletions Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,24 @@ struct SubscribeInput: Equatable {
private let channelEntries: [String: PubNubChannel]
private let groupEntries: [String: PubNubChannel]

typealias InsertingResult = (
newInput: SubscribeInput,
insertedChannels: [PubNubChannel],
insertedGroups: [PubNubChannel]
)
typealias RemovingResult = (
newInput: SubscribeInput,
removedChannels: [PubNubChannel],
removedGroups: [PubNubChannel]
)

init(channels: [PubNubChannel] = [], groups: [PubNubChannel] = []) {
self.channelEntries = channels.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) }
self.groupEntries = groups.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) }
self.channelEntries = channels.reduce(into: [String: PubNubChannel]()) { r, channel in
_ = r.insert(channel)
}
self.groupEntries = groups.reduce(into: [String: PubNubChannel]()) { r, channel in
_ = r.insert(channel)
}
}

private init(channels: [String: PubNubChannel], groups: [String: PubNubChannel]) {
Expand Down Expand Up @@ -89,52 +104,42 @@ struct SubscribeInput: Equatable {
func adding(
channels: [PubNubChannel],
and groups: [PubNubChannel]
) -> (
newInput: SubscribeInput,
insertedChannels: [PubNubChannel],
insertedGroups: [PubNubChannel]
) {
) -> SubscribeInput.InsertingResult {
// Gets a copy of current channels and channel groups
var currentChannels = channelEntries
var currentGroups = groupEntries

let insertedChannels = channels.filter { currentChannels.insert($0) }
let insertedGroups = groups.filter { currentGroups.insert($0) }

return (
return InsertingResult(
newInput: SubscribeInput(channels: currentChannels, groups: currentGroups),
insertedChannels: insertedChannels,
insertedGroups: insertedGroups
)
}

func removing(
channels: [String],
and groups: [String]
) -> (
newInput: SubscribeInput,
removedChannels: [PubNubChannel],
removedGroups: [PubNubChannel]
) {
channels: [PubNubChannel],
presenceChannelsOnly: [PubNubChannel],
groups: [PubNubChannel],
presenceGroupsOnly: [PubNubChannel]
) -> SubscribeInput.RemovingResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you subscribe solely on presence channels with the latest setup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preserved existing behaviors in SDK:

  1. You cannot subscribe to Presence channel without main channel
  2. You can unsubscribe from Presence channel only and still keep main channel

The helper method you highlighted is responsible for returning the new channels and groups due to unsubscribing. I can change its signature to be more meaningful:

func removing(
  mainChannels: [PubNubChannel]
  presenceOnlyChannels: [PubNubChannel]
  mainGroups: [PubNubChannel],
  presenceOnlyGroups: [PubNubChannel]
)

// Gets a copy of current channels and channel groups
var currentChannels = channelEntries
var currentGroups = groupEntries

let removedChannels = channels.compactMap {
if $0.isPresenceChannelName {
return currentChannels.unsubscribePresence($0.trimmingPresenceChannelSuffix)
} else {
return currentChannels.removeValue(forKey: $0)
}
currentChannels.removeValue(forKey: $0.id)
} + presenceChannelsOnly.compactMap {
currentChannels.unsubscribePresence($0.id)
}

let removedGroups = groups.compactMap {
if $0.isPresenceChannelName {
return currentGroups.unsubscribePresence($0.trimmingPresenceChannelSuffix)
} else {
return currentGroups.removeValue(forKey: $0)
}
currentGroups.removeValue(forKey: $0.id)
} + presenceGroupsOnly.compactMap {
currentGroups.unsubscribePresence($0.id)
}

return (
return RemovingResult(
newInput: SubscribeInput(channels: currentChannels, groups: currentGroups),
removedChannels: removedChannels,
removedGroups: removedGroups
Expand All @@ -158,28 +163,6 @@ extension Dictionary where Key == String, Value == PubNubChannel {
self[channel.id] = channel
return true
}

func difference(_ dict: [Key:Value]) -> [Key: Value] {
let entriesInSelfAndNotInDict = filter {
dict[$0.0] != self[$0.0]
}
return entriesInSelfAndNotInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in
var res = res
res[entry.0] = entry.1
return res
}
}

func intersection(_ dict: [Key:Value]) -> [Key: Value] {
let entriesInSelfAndInDict = filter {
dict[$0.0] == self[$0.0]
}
return entriesInSelfAndInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in
var res = res
res[entry.0] = entry.1
return res
}
}

// Updates current Dictionary with the new channel value unsubscribed from Presence.
// Returns the updated value if the corresponding entry matching the passed `id:` was found, otherwise `nil`
Expand Down
93 changes: 93 additions & 0 deletions Sources/PubNub/Events/New/Entities/EntityCreator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// PubNub+Subscribable.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//

import Foundation

/// Protocol for types capable of creating references for entities to which the user can subscribe,
/// receiving real-time updates.
public protocol EntityCreator {
/// Creates a new channel entity the user can subscribe to.
///
/// This method does not create any entity, either locally or remotely; it merely provides
/// a reference to a channel that can be subscribed to and unsubscribed from
///
/// - Parameters:
/// - name: The unique identifier for the channel.
/// - Returns: A `ChannelRepresentation` object representing the channel.
func channel(_ name: String) -> ChannelRepresentation

/// Creates a new channel group entity the user can subscribe to.
///
/// - Parameters:
/// - name: The unique identifier for the channel group.
/// - Returns: A `ChannelGroupRepresentation` object representing the channel group.
func channelGroup(_ name: String) -> ChannelGroupRepresentation

/// Creates user metadata entity the user can subscribe to.
///
/// This method does not create any entity, either locally or remotely; it merely provides
/// a reference to a channel that can be subscribed to and unsubscribed from
///
/// - Parameters:
/// - name: The unique identifier for the user metadata.
/// - Returns: A `UserMetadataRepresentation` object representing the user metadata.
func userMetadata(_ name: String) -> UserMetadataRepresentation

/// Creates channel metadata entity the user can subscribe to.
///
/// This method does not create any entity, either locally or remotely; it merely provides
/// a reference to a channel that can be subscribed to and unsubscribed from
///
/// - Parameters:
/// - name: The unique identifier for the channel metadata.
/// - Returns: A `ChannelMetadataRepresentation` object representing the channel metadata.
func channelMetadata(_ name: String) -> ChannelMetadataRepresentation
}

public extension EntityCreator {
/// Creates a `SubscriptionSet` object from the collection of `Subscribable` entites.
///
/// Use this function to set up and manage subscriptions for a collection of `Subscribable` entities.
///
/// - Parameters:
/// - queue: The dispatch queue on which the subscription events should be handled
/// - entities: A collection of `Subscribable` entities to subscribe to
/// - options: Additional options for configuring the subscription
/// - Returns: A `SubscriptionSet` instance for managing the specified entities.
func subscription(
queue: DispatchQueue = .main,
entities: any Collection<Subscribable>,
options: SubscriptionOptions = SubscriptionOptions.empty()
) -> SubscriptionSet {
SubscriptionSet(
queue: queue,
entities: entities,
options: options
)
}
}

// This internal protocol is designed for types capable of receiving an intent
// to Subscribe or Unsubscribe and invoking the PubNub service with computed channels
// and channel groups.
protocol SubscribeReceiver: AnyObject {
func registerAdapter(_ adapter: BaseSubscriptionListenerAdapter)

func internalSubscribe(
with channels: [Subscription],
and groups: [Subscription],
at timetoken: Timetoken?
)
func internalUnsubscribe(
from channels: [Subscription],
and groups: [Subscription],
presenceOnly: Bool
)
}
47 changes: 47 additions & 0 deletions Sources/PubNub/Events/New/Entities/EntitySubscribable.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//
// EntitySubscribable.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//

import Foundation

// MARK: - PubNubChannelRepresentation

/// Represents a channel that can be subscribed to and unsubscribed from using the PubNub service.
public class ChannelRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
}
}

// MARK: - PubNubChannelGroupRepresentation

/// Represents a channel group that can be subscribed to and unsubscribed from using the PubNub service.
public class ChannelGroupRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channelGroup, receiver: receiver)
}
}

// MARK: - PubNubUserMetadataRepresentation

/// Represents user metadata that can be subscribed to and unsubscribed from using the PubNub service.
public class UserMetadataRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
}
jguz-pubnub marked this conversation as resolved.
Show resolved Hide resolved
}

// MARK: - PubNubChannelMetadataRepresentation

/// Represents channel metadata that can be subscribed to and unsubscribed from using the PubNub service.
public class ChannelMetadataRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
}
jguz-pubnub marked this conversation as resolved.
Show resolved Hide resolved
}
129 changes: 129 additions & 0 deletions Sources/PubNub/Events/New/EventEmitter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// EventEmitter.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//

import Foundation

// MARK: - StatusEmitter

/// A protocol for types that emit PubNub status events from the Subscribe loop.
public protocol StatusEmitter: AnyObject {
/// A closure to be called when the connection status changes.
var onConnectionStateChange: ((ConnectionStatus) -> Void)? { get set }
/// A closure to be called when a subscription error occurs.
var onSubscribeError: ((PubNubError) -> Void)? { get set }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one here for some form of backward compatibility? If there were an error, then depending on from the current state, we would receive different connection statuses: connection error or unexpected disconnect.

Copy link
Contributor Author

@jguz-pubnub jguz-pubnub Feb 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, that's because of the old implementation. You're right, it should be enough to read an underlying error from ConnectionStatus enum cases like connectionError and disconnectedUnexpectedly and getting rid of onSubscribeError closure you pointed out. So what I would suggest is improving existing ConnectionStatus like this:

enum ConnectionStatus {
  case connectionError(PubNubError)
  case disconnectedUnexpectedly(PubNubError)
}

The reason I haven't done it yes is that there's disconnectedUnexpectedly case introduced a long time ago before EE. Now it's time to change it even if it might be a breaking change. I will run it by the team.

}

// MARK: - EventEmitter

/// A protocol for types that emit PubNub events.
///
/// Utilize closures to receive notifications when specific types of PubNub events occur.
public protocol EventEmitter: AnyObject {
/// An underlying queue to dispatch events
var queue: DispatchQueue { get }
/// A unique emitter's identifier
var uuid: UUID { get }
/// Receiver for a single event
var onEvent: ((PubNubEvent) -> Void)? { get set }
/// Receiver for multiple events. This will also emit individual events to `onEvent:`
var onEvents: (([PubNubEvent]) -> Void)? { get set }
/// Receiver for Message events
var onMessage: ((PubNubMessage) -> Void)? { get set }
/// Receiver for Signal events
var onSignal: ((PubNubMessage) -> Void)? { get set }
/// Receiver for Presence events
var onPresence: ((PubNubPresenceChange) -> Void)? { get set }
/// Receiver for Message Action events
var onMessageAction: ((PubNubMessageActionEvent) -> Void)? { get set }
/// Receiver for File Upload events
var onFileEvent: ((PubNubFileEvent) -> Void)? { get set }
/// Receiver for App Context events
var onAppContext: ((PubNubAppContextEvent) -> Void)? { get set }
}

/// A protocol representing a type that can be utilized to dispose of a conforming object.
public protocol SubscriptionDisposable {
/// Determines whether current emitter is disposed
var isDisposed: Bool { get }
/// Stops listening to incoming events and disposes current emitter
func dispose()
}

extension EventEmitter {
func emit(events: [PubNubEvent]) {
queue.async { [weak self] in
if !events.isEmpty {
self?.onEvents?(events)
}
for event in events {
self?.onEvent?(event)
switch event {
case let .messageReceived(message):
self?.onMessage?(message)
case let .signalReceived(signal):
self?.onSignal?(signal)
case let .presenceChange(presence):
self?.onPresence?(presence)
case let .appContextEvent(appContextEvent):
self?.onAppContext?(appContextEvent)
case let .messageActionEvent(messageActionEvent):
self?.onMessageAction?(messageActionEvent)
case let .fileUploadEvent(fileEvent):
self?.onFileEvent?(fileEvent)
}
}
}
}
}

extension EventEmitter {
func clearCallbacks() {
onEvent = nil
onEvents = nil
onMessage = nil
onSignal = nil
onPresence = nil
onMessageAction = nil
onFileEvent = nil
onAppContext = nil
}
}

// `SubscribeMessagesReceiver` is an internal protocol defining a receiver for subscription messages.
// Types that conform to this protocol are responsible for handling and processing these payloads
// into concrete events for the user.
protocol SubscribeMessagesReceiver: AnyObject {
// A dictionary representing the names of the underlying subscriptions
var subscriptionTopology: [SubscribableType : [String]] { get }
// This method should return an array of `PubNubEvent` instances,
// representing the concrete events for the user.
@discardableResult func onPayloadsReceived(payloads: [SubscribeMessagePayload]) -> [PubNubEvent]
}

// An internal class that functions as a bridge between the legacy `BaseSubscriptionListener`
// and either `Subscription` or `SubscriptionSet`, forwarding the received payloads.
class BaseSubscriptionListenerAdapter: BaseSubscriptionListener {
private(set) weak var receiver: SubscribeMessagesReceiver?

init(receiver: SubscribeMessagesReceiver, uuid: UUID, queue: DispatchQueue) {
self.receiver = receiver
super.init(queue: queue, uuid: uuid)
}

override func emit(batch: [SubscribeMessagePayload]) {
if let receiver = receiver {
receiver.onPayloadsReceived(payloads: batch)
}
}

deinit {
cancel()
}
}
Loading
Loading