Skip to content

Commit

Permalink
Attempted a channel recover when server side closes it. It almost wor…
Browse files Browse the repository at this point in the history
…ks. It actually kills the whole socket, which then reconnects.
  • Loading branch information
bjsvedin committed Oct 9, 2023
1 parent 2bd8420 commit f3511ac
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
package com.lightningkite.ktordb.live

import com.lightningkite.khrysalis.SharedCode
import com.lightningkite.khrysalis.SwiftReturnType
import com.lightningkite.ktordb.*
import com.lightningkite.ktordb.HasId
import com.lightningkite.ktordb.ListChange
import com.lightningkite.ktordb.Query
import com.lightningkite.rx.okhttp.HttpClient
import io.reactivex.rxjava3.core.Observable
import java.util.*
import java.util.concurrent.TimeUnit

class LiveObserveModelApi<Model : HasId<UUID>>(
val openSocket: (query: Query<Model>) -> Observable<List<Model>>
Expand All @@ -25,11 +28,7 @@ class LiveObserveModelApi<Model : HasId<UUID>>(
if (token != null) "$multiplexUrl?jwt=$token" else multiplexUrl,
path
)
.switchMap {
it.send(query)
it.messages.onErrorResumeNext { Observable.never() }
}
.toListObservable(query.orderBy.comparator ?: compareBy { it._id })
.filter(query)
}
)
}
Expand Down Expand Up @@ -59,4 +58,11 @@ fun <T : HasId<UUID>> Observable<ListChange<T>>.toListObservable(ordering: Compa
} ?: it.old?.let { localList.removeAll { o -> it._id == o._id } }
localList
}
}
}

fun <T : HasId<UUID>> Observable<WebSocketIsh<ListChange<T>, Query<T>>>.filter(query: Query<T>): Observable<List<T>> =
this
.doOnNext { it.send(query) }
.switchMap { it.messages }
.retryWhen @SwiftReturnType("Observable<Error>") { it.delay(5000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) }
.toListObservable(query.orderBy.comparator ?: compareBy { it._id })
95 changes: 56 additions & 39 deletions client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import kotlinx.serialization.serializer
import java.util.*
import java.util.concurrent.TimeUnit

var sharedSocketShouldBeActive: Observable<Boolean> = Observable.just(false)

var _overrideWebSocketProvider: ((url: String) -> Observable<WebSocketInterface>)? = null
var sharedSocketShouldBeActive: Observable<Boolean> = Observable.just(true)
private var retryTime = 1000L
private var lastRetry = 0L

var _overrideWebSocketProvider: ((url: String) -> Observable<WebSocketInterface>)? = null
private val sharedSocketCache = HashMap<String, Observable<WebSocketInterface>>()
fun sharedSocket(url: String): Observable<WebSocketInterface> {
return sharedSocketCache.getOrPut(url) {
Expand All @@ -30,24 +30,24 @@ fun sharedSocket(url: String): Observable<WebSocketInterface> {
(_overrideWebSocketProvider?.invoke(url) ?: HttpClient.webSocket(url))
.switchMap {
lastRetry = System.currentTimeMillis()
// println("Connection to $shortUrl established, starting pings")
// println("Connection to $shortUrl established, starting pings")
// Only have this observable until it fails

val pingMessages: Observable<WebSocketInterface> =
Observable.interval(5000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)
Observable.interval(30_000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)
.map { _ ->
// println("Sending ping to $url")
it.write.onNext(WebSocketFrame(text = ""))
// println("Sending ping to $url")
it.write.onNext(WebSocketFrame(text = " "))
}.switchMap { Observable.never() }

val timeoutAfterSeconds: Observable<WebSocketInterface> = it.read
.doOnNext {
// println("Got message from $shortUrl: ${it}")
// println("Got message from $shortUrl: ${it}")
if (System.currentTimeMillis() > lastRetry + 60_000L) {
retryTime = 1000L
}
}
.timeout(10_000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)
.timeout(40_000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)
.switchMap { Observable.never() }

Observable.merge(
Expand Down Expand Up @@ -82,57 +82,74 @@ class WebSocketIsh<IN : IsCodableAndHashable, OUT : IsCodableAndHashable>(
inline fun <reified IN : IsCodableAndHashableNotNull, reified OUT : IsCodableAndHashable> multiplexedSocket(
url: String,
path: String,
noinline onSetup: (WebSocketIsh<IN, OUT>) -> Unit = {}
): Observable<WebSocketIsh<IN, OUT>> = multiplexedSocket(url, path, serializer<IN>(), serializer<OUT>(), onSetup)
): Observable<WebSocketIsh<IN, OUT>> = multiplexedSocket(url, path, serializer<IN>(), serializer<OUT>())

@JsName("multiplexedSocket")
fun <IN : IsCodableAndHashableNotNull, OUT : IsCodableAndHashable> multiplexedSocket(
url: String,
path: String,
inType: KSerializer<IN>,
outType: KSerializer<OUT>,
onSetup: (WebSocketIsh<IN, OUT>) -> Unit = {}
): Observable<WebSocketIsh<IN, OUT>> {
val shortUrl = url.substringBefore('?')
val channel = UUID.randomUUID().toString()
var lastSocket: WebSocketInterface? = null
return sharedSocket(url)
.map {
.switchMapSingle {
// println("Setting up channel on socket to $shortUrl with $path")
lastSocket = it
it.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
path = path,
start = true
).toJsonString()
)
)
val part = MultiplexedWebsocketPart(
messages = it.read.mapNotNull {
val text = it.text ?: return@mapNotNull null
if (text == "") return@mapNotNull null
val message: MultiplexMessage = text.fromJsonString() ?: return@mapNotNull null
if (message.channel == channel) message.data else null
},
send = { message ->

val multiplexedIn = it.read.mapNotNull {
val text = it.text ?: return@mapNotNull null
if (text.isBlank()) return@mapNotNull null
text.fromJsonString<MultiplexMessage>()
}

multiplexedIn
.filter { it.channel == channel && it.start }
.firstOrError()
.map { _ ->
println("Connected to channel $channel")
WebSocketIsh<IN, OUT>(
messages = multiplexedIn.mapNotNull {
if (it.channel == channel)
if (it.end) {
println("Socket Closed by Server")
throw Exception("Channel Closed By Server")
}else
it.data?.fromJsonString(inType)
else null
},
send = { message: OUT ->
println("Sending $message to $it")
it.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
data = message.toJsonString(outType)
).toJsonString()
)
)
}
)
}
.retryWhen @SwiftReturnType("Observable<Error>") {
val temp = retryTime
retryTime = temp * 2L
it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)
}
.doOnSubscribe { _ ->
it.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
data = message
path = path,
start = true
).toJsonString()
)
)
}
)
val typedPart = WebSocketIsh<IN, OUT>(
messages = part.messages.mapNotNull { it.fromJsonString(inType) },
send = { m -> part.send(m.toJsonString(outType)) }
)
onSetup(typedPart)
typedPart

}
.doOnDispose {
// println("Disconnecting channel on socket to $shortUrl with $path")
Expand All @@ -146,4 +163,4 @@ fun <IN : IsCodableAndHashableNotNull, OUT : IsCodableAndHashable> multiplexedSo
)
)
}
}
}

0 comments on commit f3511ac

Please sign in to comment.