Skip to content

Commit

Permalink
feat(listeners): adding a new Listeners API
Browse files Browse the repository at this point in the history
  • Loading branch information
jguz-pubnub committed Feb 13, 2024
1 parent 649a9e9 commit d0ec8a3
Show file tree
Hide file tree
Showing 27 changed files with 2,609 additions and 316 deletions.
206 changes: 148 additions & 58 deletions PubNub.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

83 changes: 33 additions & 50 deletions Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,24 @@ struct SubscribeInput: Equatable {
private let channelEntries: [String: PubNubChannel]
private let groupEntries: [String: PubNubChannel]

typealias InsertingResult = (
newInput: SubscribeInput,
insertedChannels: [PubNubChannel],
insertedGroups: [PubNubChannel]
)
typealias RemovingResult = (
newInput: SubscribeInput,
removedChannels: [PubNubChannel],
removedGroups: [PubNubChannel]
)

init(channels: [PubNubChannel] = [], groups: [PubNubChannel] = []) {
self.channelEntries = channels.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) }
self.groupEntries = groups.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) }
self.channelEntries = channels.reduce(into: [String: PubNubChannel]()) { r, channel in
_ = r.insert(channel)
}
self.groupEntries = groups.reduce(into: [String: PubNubChannel]()) { r, channel in
_ = r.insert(channel)
}
}

private init(channels: [String: PubNubChannel], groups: [String: PubNubChannel]) {
Expand Down Expand Up @@ -89,52 +104,42 @@ struct SubscribeInput: Equatable {
func adding(
channels: [PubNubChannel],
and groups: [PubNubChannel]
) -> (
newInput: SubscribeInput,
insertedChannels: [PubNubChannel],
insertedGroups: [PubNubChannel]
) {
) -> SubscribeInput.InsertingResult {
// Gets a copy of current channels and channel groups
var currentChannels = channelEntries
var currentGroups = groupEntries

let insertedChannels = channels.filter { currentChannels.insert($0) }
let insertedGroups = groups.filter { currentGroups.insert($0) }

return (
return InsertingResult(
newInput: SubscribeInput(channels: currentChannels, groups: currentGroups),
insertedChannels: insertedChannels,
insertedGroups: insertedGroups
)
}

func removing(
channels: [String],
and groups: [String]
) -> (
newInput: SubscribeInput,
removedChannels: [PubNubChannel],
removedGroups: [PubNubChannel]
) {
channels: [PubNubChannel],
presenceChannelsOnly: [PubNubChannel],
groups: [PubNubChannel],
presenceGroupsOnly: [PubNubChannel]
) -> SubscribeInput.RemovingResult {
// Gets a copy of current channels and channel groups
var currentChannels = channelEntries
var currentGroups = groupEntries

let removedChannels = channels.compactMap {
if $0.isPresenceChannelName {
return currentChannels.unsubscribePresence($0.trimmingPresenceChannelSuffix)
} else {
return currentChannels.removeValue(forKey: $0)
}
currentChannels.removeValue(forKey: $0.id)
} + presenceChannelsOnly.compactMap {
currentChannels.unsubscribePresence($0.id)
}

let removedGroups = groups.compactMap {
if $0.isPresenceChannelName {
return currentGroups.unsubscribePresence($0.trimmingPresenceChannelSuffix)
} else {
return currentGroups.removeValue(forKey: $0)
}
currentGroups.removeValue(forKey: $0.id)
} + presenceGroupsOnly.compactMap {
currentGroups.unsubscribePresence($0.id)
}

return (
return RemovingResult(
newInput: SubscribeInput(channels: currentChannels, groups: currentGroups),
removedChannels: removedChannels,
removedGroups: removedGroups
Expand All @@ -158,28 +163,6 @@ extension Dictionary where Key == String, Value == PubNubChannel {
self[channel.id] = channel
return true
}

func difference(_ dict: [Key:Value]) -> [Key: Value] {
let entriesInSelfAndNotInDict = filter {
dict[$0.0] != self[$0.0]
}
return entriesInSelfAndNotInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in
var res = res
res[entry.0] = entry.1
return res
}
}

func intersection(_ dict: [Key:Value]) -> [Key: Value] {
let entriesInSelfAndInDict = filter {
dict[$0.0] == self[$0.0]
}
return entriesInSelfAndInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in
var res = res
res[entry.0] = entry.1
return res
}
}

// Updates current Dictionary with the new channel value unsubscribed from Presence.
// Returns the updated value if the corresponding entry matching the passed `id:` was found, otherwise `nil`
Expand Down
93 changes: 93 additions & 0 deletions Sources/PubNub/Events/New/Entities/EntityCreator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// PubNub+Subscribable.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//

import Foundation

/// Protocol for types capable of creating references for entities to which the user can subscribe,
/// receiving real-time updates.
public protocol EntityCreator {
/// Creates a new channel entity the user can subscribe to.
///
/// This method does not create any entity, either locally or remotely; it merely provides
/// a reference to a channel that can be subscribed to and unsubscribed from
///
/// - Parameters:
/// - name: The unique identifier for the channel.
/// - Returns: A `ChannelRepresentation` object representing the channel.
func channel(_ name: String) -> ChannelRepresentation

/// Creates a new channel group entity the user can subscribe to.
///
/// - Parameters:
/// - name: The unique identifier for the channel group.
/// - Returns: A `ChannelGroupRepresentation` object representing the channel group.
func channelGroup(_ name: String) -> ChannelGroupRepresentation

/// Creates user metadata entity the user can subscribe to.
///
/// This method does not create any entity, either locally or remotely; it merely provides
/// a reference to a channel that can be subscribed to and unsubscribed from
///
/// - Parameters:
/// - name: The unique identifier for the user metadata.
/// - Returns: A `UserMetadataRepresentation` object representing the user metadata.
func userMetadata(_ name: String) -> UserMetadataRepresentation

/// Creates channel metadata entity the user can subscribe to.
///
/// This method does not create any entity, either locally or remotely; it merely provides
/// a reference to a channel that can be subscribed to and unsubscribed from
///
/// - Parameters:
/// - name: The unique identifier for the channel metadata.
/// - Returns: A `ChannelMetadataRepresentation` object representing the channel metadata.
func channelMetadata(_ name: String) -> ChannelMetadataRepresentation
}

public extension EntityCreator {
/// Creates a `SubscriptionSet` object from the collection of `Subscribable` entites.
///
/// Use this function to set up and manage subscriptions for a collection of `Subscribable` entities.
///
/// - Parameters:
/// - queue: The dispatch queue on which the subscription events should be handled
/// - entities: A collection of `Subscribable` entities to subscribe to
/// - options: Additional options for configuring the subscription
/// - Returns: A `SubscriptionSet` instance for managing the specified entities.
func subscription(
queue: DispatchQueue = .main,
entities: any Collection<Subscribable>,
options: SubscriptionOptions = SubscriptionOptions.empty()
) -> SubscriptionSet {
SubscriptionSet(
queue: queue,
entities: entities,
options: options
)
}
}

// This internal protocol is designed for types capable of receiving an intent
// to Subscribe or Unsubscribe and invoking the PubNub service with computed channels
// and channel groups.
protocol SubscribeReceiver: AnyObject {
func registerAdapter(_ adapter: BaseSubscriptionListenerAdapter)

func internalSubscribe(
with channels: [Subscription],
and groups: [Subscription],
at timetoken: Timetoken?
)
func internalUnsubscribe(
from channels: [Subscription],
and groups: [Subscription],
presenceOnly: Bool
)
}
47 changes: 47 additions & 0 deletions Sources/PubNub/Events/New/Entities/EntitySubscribable.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//
// EntitySubscribable.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//

import Foundation

// MARK: - PubNubChannelRepresentation

/// Represents a channel that can be subscribed to and unsubscribed from using the PubNub service.
public class ChannelRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
}
}

// MARK: - PubNubChannelGroupRepresentation

/// Represents a channel group that can be subscribed to and unsubscribed from using the PubNub service.
public class ChannelGroupRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channelGroup, receiver: receiver)
}
}

// MARK: - PubNubUserMetadataRepresentation

/// Represents user metadata that can be subscribed to and unsubscribed from using the PubNub service.
public class UserMetadataRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
}
}

// MARK: - PubNubChannelMetadataRepresentation

/// Represents channel metadata that can be subscribed to and unsubscribed from using the PubNub service.
public class ChannelMetadataRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
}
}
129 changes: 129 additions & 0 deletions Sources/PubNub/Events/New/EventEmitter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// EventEmitter.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//

import Foundation

// MARK: - StatusEmitter

/// A protocol for types that emit PubNub status events from the Subscribe loop.
public protocol StatusEmitter: AnyObject {
/// A closure to be called when the connection status changes.
var didReceiveConnectionStatusChange: ((ConnectionStatus) -> Void)? { get set }
/// A closure to be called when a subscription error occurs.
var didReceiveSubscribeError: ((PubNubError) -> Void)? { get set }
}

// MARK: - EventEmitter

/// A protocol for types that emit PubNub events.
///
/// Utilize closures to receive notifications when specific types of PubNub events occur.
public protocol EventEmitter: AnyObject {
/// An underlying queue to dispatch events
var queue: DispatchQueue { get }
/// A unique emitter's identifier
var uuid: UUID { get }
/// Receiver for a single event
var eventStream: ((PubNubEvent) -> Void)? { get set }
/// Receiver for multiple events. This will also emit individual events to `eventStream:`
var eventsStream: (([PubNubEvent]) -> Void)? { get set }
/// Receiver for Message events
var messagesStream: ((PubNubMessage) -> Void)? { get set }
/// Receiver for Signal events
var signalsStream: ((PubNubMessage) -> Void)? { get set }
/// Receiver for Presence events
var presenceStream: ((PubNubPresenceChange) -> Void)? { get set }
/// Receiver for Message Action events
var messageActionsStream: ((PubNubMessageActionEvent) -> Void)? { get set }
/// Receiver for File Upload events
var filesStream: ((PubNubFileEvent) -> Void)? { get set }
/// Receiver for App Context events
var appContextStream: ((PubNubAppContextEvent) -> Void)? { get set }
}

/// A protocol representing a type that can be used to dispose of subscriptions.
public protocol SubscriptionDisposable {
/// Determines whether current emitter is disposed
var isDisposed: Bool { get }
/// Stops listening to incoming events and disposes current emitter
func dispose()
}

extension EventEmitter {
func emit(events: [PubNubEvent]) {
queue.async { [weak self] in
if !events.isEmpty {
self?.eventsStream?(events)
}
for event in events {
self?.eventStream?(event)
switch event {
case let .messageReceived(message):
self?.messagesStream?(message)
case let .signalReceived(signal):
self?.signalsStream?(signal)
case let .presenceChange(presence):
self?.presenceStream?(presence)
case let .appContextEvent(appContextEvent):
self?.appContextStream?(appContextEvent)
case let .messageActionEvent(messageActionEvent):
self?.messageActionsStream?(messageActionEvent)
case let .fileUploadEvent(fileEvent):
self?.filesStream?(fileEvent)
}
}
}
}
}

extension EventEmitter {
func clearCallbacks() {
eventStream = nil
eventsStream = nil
messagesStream = nil
signalsStream = nil
presenceStream = nil
messageActionsStream = nil
filesStream = nil
appContextStream = nil
}
}

// `SubscribeMessagesReceiver` is an internal protocol defining a receiver for subscription messages.
// Types that conform to this protocol are responsible for handling and processing these payloads
// into concrete events for the user.
protocol SubscribeMessagesReceiver: AnyObject {
// A dictionary representing the names of the underlying subscriptions
var subscriptionTopology: [SubscribableType : [String]] { get }
// This method should return an array of `PubNubEvent` instances,
// representing the concrete events for the user.
@discardableResult func onPayloadsReceived(payloads: [SubscribeMessagePayload]) -> [PubNubEvent]
}

// An internal class that functions as a bridge between the legacy `BaseSubscriptionListener`
// and either `Subscription` or `SubscriptionSet`, forwarding the received payloads.
class BaseSubscriptionListenerAdapter: BaseSubscriptionListener {
private(set) weak var receiver: SubscribeMessagesReceiver?

init(receiver: SubscribeMessagesReceiver, uuid: UUID, queue: DispatchQueue) {
self.receiver = receiver
super.init(queue: queue, uuid: uuid)
}

override func emit(batch: [SubscribeMessagePayload]) {
if let receiver = receiver {
receiver.onPayloadsReceived(payloads: batch)
}
}

deinit {
cancel()
}
}
Loading

0 comments on commit d0ec8a3

Please sign in to comment.