Skip to content

Commit

Permalink
Large update to client multiplexedSocket. It now recovers from a serv…
Browse files Browse the repository at this point in the history
…er side channel closing, also lengthened the ping time. Converted for web and ios.
  • Loading branch information
bjsvedin committed Oct 15, 2023
1 parent f3511ac commit 6d1e0af
Show file tree
Hide file tree
Showing 19 changed files with 355 additions and 220 deletions.
2 changes: 1 addition & 1 deletion KtorBatteries.podspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Pod::Spec.new do |s|
s.name = 'KtorBatteries'
s.version = '0.1.0'
s.version = '0.1.1'
s.summary = 'KtorBatteries'

s.description = <<-DESC
Expand Down
3 changes: 2 additions & 1 deletion client/src/main/equivalents/swift.fqnames
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ com.lightningkite.ktordb.live
com.lightningkite.ktordb.live.LiveFullReadModelApi
com.lightningkite.ktordb.live.LiveObserveModelApi
com.lightningkite.ktordb.live.toListObservable
com.lightningkite.ktordb.live.filter
com.lightningkite.ktordb.live.LiveReadModelApi
com.lightningkite.ktordb.live.LiveWriteModelApi
com.lightningkite.ktordb.live.sharedSocketShouldBeActive
com.lightningkite.ktordb.live._overrideWebSocketProvider
com.lightningkite.ktordb.live.retryTime
com.lightningkite.ktordb.live.lastRetry
com.lightningkite.ktordb.live._overrideWebSocketProvider
com.lightningkite.ktordb.live.sharedSocketCache
com.lightningkite.ktordb.live.sharedSocket
com.lightningkite.ktordb.live.MultiplexedWebsocketPart
Expand Down
1 change: 1 addition & 0 deletions client/src/main/equivalents/ts.fqnames
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ com.lightningkite.ktordb.anyClear>com.lightningkite.ktordb.PropChaincom.lightnin
com.lightningkite.ktordb.CollectionChanges
com.lightningkite.ktordb.aggregator>com.lightningkite.ktordb.Aggregate
com.lightningkite.ktordb.live.LiveReadModelApi
com.lightningkite.ktordb.live.filter
com.lightningkite.ktordb.ListChange
com.lightningkite.ktordb.notIn>com.lightningkite.ktordb.PropChaincom.lightningkite.ktordb.notIn.K, com.lightningkite.ktordb.notIn.T
com.lightningkite.ktordb.UUIDFor
Expand Down
108 changes: 62 additions & 46 deletions client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.lightningkite.ktordb.MultiplexMessage
import com.lightningkite.rx.mapNotNull
import com.lightningkite.rx.okhttp.*
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.serialization.KSerializer
import kotlinx.serialization.serializer
import java.util.*
Expand Down Expand Up @@ -93,53 +94,64 @@ fun <IN : IsCodableAndHashableNotNull, OUT : IsCodableAndHashable> multiplexedSo
): Observable<WebSocketIsh<IN, OUT>> {
val shortUrl = url.substringBefore('?')
val channel = UUID.randomUUID().toString()
var lastSocket: WebSocketInterface? = null
return sharedSocket(url)
.switchMapSingle {
// println("Setting up channel on socket to $shortUrl with $path")
lastSocket = it

val multiplexedIn = it.read.mapNotNull {
.switchMap { sharedSocket ->
// println("Setting up channel $channel to $shortUrl with $path")
val multiplexedIn = sharedSocket.read.mapNotNull {
val text = it.text ?: return@mapNotNull null
if (text.isBlank()) return@mapNotNull null
text.fromJsonString<MultiplexMessage>()
}

.filter { it.channel == channel }
var current = PublishSubject.create<IN>()
multiplexedIn
.filter { it.channel == channel && it.start }
.firstOrError()
.map { _ ->
println("Connected to channel $channel")
WebSocketIsh<IN, OUT>(
messages = multiplexedIn.mapNotNull {
if (it.channel == channel)
if (it.end) {
println("Socket Closed by Server")
throw Exception("Channel Closed By Server")
}else
it.data?.fromJsonString(inType)
else null
},
send = { message: OUT ->
println("Sending $message to $it")
it.write.onNext(
.mapNotNull { message ->
when {
message.start -> {
// println("Channel ${message.channel} established with $sharedSocket")
WebSocketIsh<IN, OUT>(
messages = current,
send = { message ->
// println("Sending $message to $channel")
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
data = message.toJsonString(outType)
).toJsonString()
)
)
}
)
}

message.data != null -> {
// println("Got ${message.data} to ${message.channel}")
current.onNext(message.data?.fromJsonString(inType))
null
}

message.end -> {
// println("Channel ${message.channel} terminated")
current = PublishSubject.create()
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
data = message.toJsonString(outType)
path = path,
start = true
).toJsonString()
)
)
null
}
)
}
.retryWhen @SwiftReturnType("Observable<Error>") {
val temp = retryTime
retryTime = temp * 2L
it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)

else -> null
}
}
.doOnSubscribe { _ ->
it.write.onNext(
// println("Sending onSubscribe Startup Message")
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
Expand All @@ -149,18 +161,22 @@ fun <IN : IsCodableAndHashableNotNull, OUT : IsCodableAndHashable> multiplexedSo
)
)
}

}
.doOnDispose {
// println("Disconnecting channel on socket to $shortUrl with $path")
lastSocket?.write?.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
path = path,
end = true
).toJsonString()
)
)
.doOnDispose {
// println("Disconnecting channel on socket to $shortUrl with $path")
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
path = path,
end = true
).toJsonString()
)
)
}
.retryWhen @SwiftReturnType("Observable<Error>") {
val temp = retryTime
retryTime = temp * 2L
it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)
}
}
}
}
16 changes: 10 additions & 6 deletions ios/KtorBatteries/Classes/client/live/LiveObserveModelApi.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
// Generated by Khrysalis - this file will be overwritten.
import KhrysalisRuntime
import RxSwift
import RxSwiftPlus
import Foundation

public final class LiveObserveModelApi<Model : HasId> : ObserveModelApi<Model> {
public var openSocket: (Query<Model>) -> Observable<Array<Model>>
public init(openSocket: @escaping (Query<Model>) -> Observable<Array<Model>>) {
self.openSocket = openSocket
self.alreadyOpen = ([:] as Dictionary<Query<Model>, Observable<Array<Model>>>)
self.alreadyOpen = Dictionary<Query<Model>, Observable<Array<Model>>>()
super.init()
//Necessary properties should be initialized now
}
Expand Down Expand Up @@ -44,17 +45,20 @@ public func xObservableToListObservable<T : HasId>(_ this: Observable<ListChange
return localList
})
}

public func xObservableFilter<T : HasId>(_ this: Observable<WebSocketIsh<ListChange<T>, Query<T>>>, _ query: Query<T>) -> Observable<Array<T>> {
return xObservableToListObservable(this
.doOnNext { (it) -> Void in it.send(query) }
.switchMap { (it) -> Observable<ListChange<T>> in it.messages }
.retry(when: { (it) -> Observable<Error> in it.delay(.milliseconds(5000), scheduler: MainScheduler.instance) }), ordering: getListComparator(query.orderBy) ?? compareBy(selector: { (it) in it._id }));
}
public final class LiveObserveModelApiCompanion {
public init() {
//Necessary properties should be initialized now
}
public static let INSTANCE = LiveObserveModelApiCompanion()

public func create<Model : HasId>(multiplexUrl: String, token: String?, headers: Dictionary<String, String>, path: String) -> LiveObserveModelApi<Model> {
return LiveObserveModelApi<Model>(openSocket: { (query) -> Observable<Array<Model>> in xObservableToListObservable((multiplexedSocket(url: token != nil ? "\(String(kotlin: multiplexUrl))?jwt=\(String(kotlin: token))" : multiplexUrl, path: path) as Observable<WebSocketIsh<ListChange<Model>, Query<Model>>>)
.switchMap { (it) -> Observable<ListChange<Model>> in
it.send(query)
return it.messages.catchError({ (it) -> Observable<ListChange<Model>> in Observable.never() })
}, ordering: getListComparator(query.orderBy) ?? compareBy(selector: { (it) in it._id })) });
return LiveObserveModelApi<Model>(openSocket: { (query) -> Observable<Array<Model>> in xObservableFilter((multiplexedSocket(url: token != nil ? "\(String(kotlin: multiplexUrl))?jwt=\(String(kotlin: token))" : multiplexUrl, path: path) as Observable<WebSocketIsh<ListChange<Model>, Query<Model>>>), query) });
}
}
Loading

0 comments on commit 6d1e0af

Please sign in to comment.