Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jguz-pubnub committed Dec 1, 2023
1 parent b4f324e commit a15a14a
Show file tree
Hide file tree
Showing 22 changed files with 226 additions and 207 deletions.
18 changes: 9 additions & 9 deletions Sources/PubNub/EventEngine/Core/Dispatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@ struct DispatcherListener<Event> {

// MARK: - Dispatcher

protocol Dispatcher<Invocation, Event, Input> {
protocol Dispatcher<Invocation, Event, Dependencies> {
associatedtype Invocation: AnyEffectInvocation
associatedtype Event
associatedtype Input
associatedtype Dependencies

func dispatch(
invocations: [EffectInvocation<Invocation>],
with customInput: EventEngineCustomInput<Input>,
with dependencies: EventEngineDependencies<Dependencies>,
notify listener: DispatcherListener<Event>
)
}

// MARK: - EffectDispatcher

class EffectDispatcher<Invocation: AnyEffectInvocation, Event, Input>: Dispatcher {
private let factory: any EffectHandlerFactory<Invocation, Event, Input>
class EffectDispatcher<Invocation: AnyEffectInvocation, Event, Dependencies>: Dispatcher {
private let factory: any EffectHandlerFactory<Invocation, Event, Dependencies>
private let effectsCache = EffectsCache<Event>()

init(factory: some EffectHandlerFactory<Invocation, Event, Input>) {
init(factory: some EffectHandlerFactory<Invocation, Event, Dependencies>) {
self.factory = factory
}

Expand All @@ -63,20 +63,20 @@ class EffectDispatcher<Invocation: AnyEffectInvocation, Event, Input>: Dispatche

func dispatch(
invocations: [EffectInvocation<Invocation>],
with customInput: EventEngineCustomInput<Input>,
with dependencies: EventEngineDependencies<Dependencies>,
notify listener: DispatcherListener<Event>
) {
invocations.forEach {
switch $0 {
case .managed(let invocation):
executeEffect(
effect: factory.effect(for: invocation, with: customInput),
effect: factory.effect(for: invocation, with: dependencies),
storageId: invocation.id,
notify: listener
)
case .regular(let invocation):
executeEffect(
effect: factory.effect(for: invocation, with: customInput),
effect: factory.effect(for: invocation, with: dependencies),
storageId: UUID().uuidString,
notify: listener
)
Expand Down
10 changes: 5 additions & 5 deletions Sources/PubNub/EventEngine/Core/EffectHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import Foundation

// MARK: - EffectHandlerFactory

protocol EffectHandlerFactory<EffectInvocation, Event, Input> {
associatedtype EffectInvocation
protocol EffectHandlerFactory<Invocation, Event, Dependencies> {
associatedtype Invocation
associatedtype Event
associatedtype Input
associatedtype Dependencies

func effect(
for invocation: EffectInvocation,
with customInput: EventEngineCustomInput<Input>
for invocation: Invocation,
with dependencies: EventEngineDependencies<Dependencies>
) -> any EffectHandler<Event>
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/PubNub/EventEngine/Core/EventEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,30 @@

import Foundation

struct EventEngineCustomInput<Value> {
let value: Value
struct EventEngineDependencies<Dependencies> {
let value: Dependencies
}

class EventEngine<State, Event, Invocation: AnyEffectInvocation, Input> {
private let transition: any TransitionProtocol<State, Event, Invocation>
private let dispatcher: any Dispatcher<Invocation, Event, Input>
private(set) var state: State

var customInput: EventEngineCustomInput<Input>
var dependencies: EventEngineDependencies<Input>
var onStateUpdated: ((State) -> Void)?

init(
state: State,
transition: some TransitionProtocol<State, Event, Invocation>,
onStateUpdated: ((State) -> Void)? = nil,
dispatcher: some Dispatcher<Invocation, Event, Input>,
customInput: EventEngineCustomInput<Input>
dependencies: EventEngineDependencies<Input>
) {
self.state = state
self.onStateUpdated = onStateUpdated
self.transition = transition
self.dispatcher = dispatcher
self.customInput = customInput
self.dependencies = dependencies
}

func send(event: Event) {
Expand Down Expand Up @@ -81,7 +81,7 @@ class EventEngine<State, Event, Invocation: AnyEffectInvocation, Input> {
)
dispatcher.dispatch(
invocations: invocations,
with: customInput,
with: dependencies,
notify: listener
)
}
Expand Down
21 changes: 13 additions & 8 deletions Sources/PubNub/EventEngine/Core/EventEngineFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,38 @@

import Foundation

typealias SubscribeEngine = EventEngine<(any SubscribeState), Subscribe.Event, Subscribe.Invocation, Subscribe.EngineInput>
typealias PresenceEngine = EventEngine<(any PresenceState), Presence.Event, Presence.Invocation, Presence.EngineInput>
typealias SubscribeEngine = EventEngine<(any SubscribeState), Subscribe.Event, Subscribe.Invocation, Subscribe.Dependencies>
typealias PresenceEngine = EventEngine<(any PresenceState), Presence.Event, Presence.Invocation, Presence.Dependencies>

typealias SubscribeTransitions = TransitionProtocol<(any SubscribeState), Subscribe.Event, Subscribe.Invocation>
typealias PresenceTransitions = TransitionProtocol<(any PresenceState), Presence.Event, Presence.Invocation>
typealias SubscribeDispatcher = Dispatcher<Subscribe.Invocation, Subscribe.Event, Subscribe.Dependencies>
typealias PresenceDispatcher = Dispatcher<Presence.Invocation, Presence.Event, Presence.Dependencies>

class EventEngineFactory {
func subscribeEngine(
with configuration: PubNubConfiguration,
dispatcher: some Dispatcher<Subscribe.Invocation, Subscribe.Event, Subscribe.EngineInput>,
transition: some TransitionProtocol<any SubscribeState, Subscribe.Event, Subscribe.Invocation>
dispatcher: some SubscribeDispatcher = EffectDispatcher(factory: SubscribeEffectFactory.defaultFactory()),
transition: some SubscribeTransitions = SubscribeTransition()
) -> SubscribeEngine {
EventEngine(
state: Subscribe.UnsubscribedState(),
transition: transition,
dispatcher: dispatcher,
customInput: EventEngineCustomInput(value: Subscribe.EngineInput(configuration: configuration))
dependencies: EventEngineDependencies(value: Subscribe.Dependencies(configuration: configuration))
)
}

func presenceEngine(
with configuration: PubNubConfiguration,
dispatcher: some Dispatcher<Presence.Invocation, Presence.Event, Presence.EngineInput>,
transition: some TransitionProtocol<any PresenceState, Presence.Event, Presence.Invocation>
dispatcher: some PresenceDispatcher = EffectDispatcher(factory: PresenceEffectFactory.defaultFactory()),
transition: some PresenceTransitions
) -> PresenceEngine {
EventEngine(
state: Presence.HeartbeatInactive(),
transition: transition,
dispatcher: dispatcher,
customInput: EventEngineCustomInput(value: Presence.EngineInput(configuration: configuration))
dependencies: EventEngineDependencies(value: Presence.Dependencies(configuration: configuration))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,27 @@ class PresenceEffectFactory: EffectHandlerFactory {
self.sessionResponseQueue = sessionResponseQueue
}

static func defaultFactory() -> PresenceEffectFactory {
PresenceEffectFactory(
session: HTTPSession(
configuration: .pubnub,
sessionQueue: DispatchQueue(label: "Presence Response Queue"),
sessionStream: SessionListener()
)
)
}

func effect(
for invocation: Presence.Invocation,
with customInput: EventEngineCustomInput<Presence.EngineInput>
with dependencies: EventEngineDependencies<Presence.Dependencies>
) -> any EffectHandler<Presence.Event> {
switch invocation {
case .heartbeat(let channels, let groups):
return HeartbeatEffect(
request: PresenceHeartbeatRequest(
channels: channels,
groups: groups,
configuration: customInput.value.configuration,
configuration: dependencies.value.configuration,
session: session,
sessionResponseQueue: sessionResponseQueue
)
Expand All @@ -56,26 +66,26 @@ class PresenceEffectFactory: EffectHandlerFactory {
request: PresenceHeartbeatRequest(
channels: channels,
groups: groups,
configuration: customInput.value.configuration,
configuration: dependencies.value.configuration,
session: session,
sessionResponseQueue: sessionResponseQueue
),
retryAttempt: retryAttempt,
reason: reason,
configuration: customInput.value.configuration
configuration: dependencies.value.configuration
)
case .leave(let channels, let groups):
return LeaveEffect(
request: PresenceLeaveRequest(
channels: channels,
groups: groups,
configuration: customInput.value.configuration,
configuration: dependencies.value.configuration,
session: session,
sessionResponseQueue: sessionResponseQueue
)
)
case .wait:
return WaitEffect(configuration: customInput.value.configuration)
return WaitEffect(configuration: dependencies.value.configuration)
}
}
}
2 changes: 1 addition & 1 deletion Sources/PubNub/EventEngine/Presence/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ extension Presence {
}

extension Presence {
struct EngineInput {
struct Dependencies {
let configuration: PubNubConfiguration
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,23 @@ class SubscribeEffectFactory: EffectHandlerFactory {
self.messageCache = messageCache
}

static func defaultFactory() -> SubscribeEffectFactory {
SubscribeEffectFactory(session: HTTPSession(
configuration: URLSessionConfiguration.subscription,
sessionQueue: DispatchQueue(label: "Subscribe Response Queue"),
sessionStream: SessionListener()
))
}

func effect(
for invocation: Subscribe.Invocation,
with customInput: EventEngineCustomInput<Subscribe.EngineInput>
with dependencies: EventEngineDependencies<Subscribe.Dependencies>
) -> any EffectHandler<Subscribe.Event> {
switch invocation {
case .handshakeRequest(let channels, let groups):
return HandshakeEffect(
request: SubscribeRequest(
configuration: customInput.value.configuration,
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
timetoken: 0,
Expand All @@ -61,7 +69,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
case .handshakeReconnect(let channels, let groups, let retryAttempt, let reason):
return HandshakeReconnectEffect(
request: SubscribeRequest(
configuration: customInput.value.configuration,
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
timetoken: 0,
Expand All @@ -74,7 +82,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
case .receiveMessages(let channels, let groups, let cursor):
return ReceivingEffect(
request: SubscribeRequest(
configuration: customInput.value.configuration,
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
timetoken: cursor.timetoken,
Expand All @@ -86,7 +94,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
case .receiveReconnect(let channels, let groups, let cursor, let retryAttempt, let reason):
return ReceiveReconnectEffect(
request: SubscribeRequest(
configuration: customInput.value.configuration,
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
timetoken: cursor.timetoken,
Expand All @@ -101,13 +109,13 @@ class SubscribeEffectFactory: EffectHandlerFactory {
return EmitMessagesEffect(
messages: messages,
cursor: cursor,
listeners: customInput.value.listeners,
listeners: dependencies.value.listeners,
messageCache: messageCache
)
case .emitStatus(let statusChange):
return EmitStatusEffect(
statusChange: statusChange,
listeners: customInput.value.listeners
listeners: dependencies.value.listeners
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/PubNub/EventEngine/Subscribe/Subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ extension Subscribe {
}

extension Subscribe {
struct EngineInput {
struct Dependencies {
let configuration: PubNubConfiguration
let listeners: [BaseSubscriptionListener]

Expand Down
56 changes: 35 additions & 21 deletions Sources/PubNub/PubNub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ public class PubNub {
/// - configuration: The default configurations that will be used
/// - session: Session used for performing request/response REST calls
/// - subscribeSession: The network session used for Subscription only
public init(
/// - fileSession: The network session used for File uploading/downloading only
public convenience init(
configuration: PubNubConfiguration,
session: SessionReplaceable? = nil,
subscribeSession: SessionReplaceable? = nil,
fileSession: URLSessionReplaceable? = nil,
subscriptionSession: SubscriptionSession? = nil
fileSession: URLSessionReplaceable? = nil
) {
instanceID = UUID()
self.configuration = configuration

let instanceID = UUID()

// Default operators based on config
var operators = [RequestOperator]()
if let retryOperator = configuration.automaticRetry {
Expand All @@ -88,26 +87,41 @@ public class PubNub {
.defaultRequestOperator?
.merge(requestOperator: MultiplexRequestOperator(operators: operators))
}

// Immutable session
self.networkSession = networkSession


let fileSession = fileSession ?? URLSession(
configuration: .pubnubBackground,
delegate: FileSessionManager(),
delegateQueue: .main
)

// Set initial session also based on configuration
subscription = subscriptionSession ?? SubscribeSessionFactory.shared.getSession(
let subscriptionSession = SubscribeSessionFactory.shared.getSession(
from: configuration,
with: subscribeSession,
presenceSession: session
)

if let fileSession = fileSession {
fileURLSession = fileSession
} else {
fileURLSession = URLSession(
configuration: .pubnubBackground,
delegate: FileSessionManager(),
delegateQueue: .main
)
}

self.init(
instanceID: instanceID,
configuration: configuration,
session: networkSession,
fileSession: fileSession,
subscriptionSession: subscriptionSession
)
}

init(
instanceID: UUID = UUID(),
configuration: PubNubConfiguration,
session: SessionReplaceable,
fileSession: URLSessionReplaceable,
subscriptionSession: SubscriptionSession
) {
self.instanceID = instanceID
self.configuration = configuration
self.subscription = subscriptionSession
self.networkSession = session
self.fileURLSession = fileSession
}

func route<Decoder>(
Expand Down
Loading

0 comments on commit a15a14a

Please sign in to comment.