diff --git a/client/src/main/kotlin/com/lightningkite/ktordb/live/LiveObserveModelApi.kt b/client/src/main/kotlin/com/lightningkite/ktordb/live/LiveObserveModelApi.kt index 300ffdc..232e2d5 100644 --- a/client/src/main/kotlin/com/lightningkite/ktordb/live/LiveObserveModelApi.kt +++ b/client/src/main/kotlin/com/lightningkite/ktordb/live/LiveObserveModelApi.kt @@ -2,12 +2,15 @@ package com.lightningkite.ktordb.live import com.lightningkite.khrysalis.SharedCode +import com.lightningkite.khrysalis.SwiftReturnType import com.lightningkite.ktordb.* import com.lightningkite.ktordb.HasId import com.lightningkite.ktordb.ListChange import com.lightningkite.ktordb.Query +import com.lightningkite.rx.okhttp.HttpClient import io.reactivex.rxjava3.core.Observable import java.util.* +import java.util.concurrent.TimeUnit class LiveObserveModelApi>( val openSocket: (query: Query) -> Observable> @@ -25,11 +28,7 @@ class LiveObserveModelApi>( if (token != null) "$multiplexUrl?jwt=$token" else multiplexUrl, path ) - .switchMap { - it.send(query) - it.messages.onErrorResumeNext { Observable.never() } - } - .toListObservable(query.orderBy.comparator ?: compareBy { it._id }) + .filter(query) } ) } @@ -59,4 +58,11 @@ fun > Observable>.toListObservable(ordering: Compa } ?: it.old?.let { localList.removeAll { o -> it._id == o._id } } localList } -} \ No newline at end of file +} + +fun > Observable, Query>>.filter(query: Query): Observable> = + this + .doOnNext { it.send(query) } + .switchMap { it.messages } + .retryWhen @SwiftReturnType("Observable") { it.delay(5000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) } + .toListObservable(query.orderBy.comparator ?: compareBy { it._id }) \ No newline at end of file diff --git a/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt b/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt index ffb6401..ebd0e0d 100644 --- a/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt +++ b/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt @@ -12,11 +12,11 @@ import kotlinx.serialization.serializer import java.util.* import java.util.concurrent.TimeUnit -var sharedSocketShouldBeActive: Observable = Observable.just(false) - -var _overrideWebSocketProvider: ((url: String) -> Observable)? = null +var sharedSocketShouldBeActive: Observable = Observable.just(true) private var retryTime = 1000L private var lastRetry = 0L + +var _overrideWebSocketProvider: ((url: String) -> Observable)? = null private val sharedSocketCache = HashMap>() fun sharedSocket(url: String): Observable { return sharedSocketCache.getOrPut(url) { @@ -30,24 +30,24 @@ fun sharedSocket(url: String): Observable { (_overrideWebSocketProvider?.invoke(url) ?: HttpClient.webSocket(url)) .switchMap { lastRetry = System.currentTimeMillis() -// println("Connection to $shortUrl established, starting pings") +// println("Connection to $shortUrl established, starting pings") // Only have this observable until it fails val pingMessages: Observable = - Observable.interval(5000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) + Observable.interval(30_000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) .map { _ -> -// println("Sending ping to $url") - it.write.onNext(WebSocketFrame(text = "")) +// println("Sending ping to $url") + it.write.onNext(WebSocketFrame(text = " ")) }.switchMap { Observable.never() } val timeoutAfterSeconds: Observable = it.read .doOnNext { -// println("Got message from $shortUrl: ${it}") +// println("Got message from $shortUrl: ${it}") if (System.currentTimeMillis() > lastRetry + 60_000L) { retryTime = 1000L } } - .timeout(10_000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) + .timeout(40_000L, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) .switchMap { Observable.never() } Observable.merge( @@ -82,8 +82,7 @@ class WebSocketIsh( inline fun multiplexedSocket( url: String, path: String, - noinline onSetup: (WebSocketIsh) -> Unit = {} -): Observable> = multiplexedSocket(url, path, serializer(), serializer(), onSetup) +): Observable> = multiplexedSocket(url, path, serializer(), serializer()) @JsName("multiplexedSocket") fun multiplexedSocket( @@ -91,48 +90,66 @@ fun multiplexedSo path: String, inType: KSerializer, outType: KSerializer, - onSetup: (WebSocketIsh) -> Unit = {} ): Observable> { val shortUrl = url.substringBefore('?') val channel = UUID.randomUUID().toString() var lastSocket: WebSocketInterface? = null return sharedSocket(url) - .map { + .switchMapSingle { // println("Setting up channel on socket to $shortUrl with $path") lastSocket = it - it.write.onNext( - WebSocketFrame( - text = MultiplexMessage( - channel = channel, - path = path, - start = true - ).toJsonString() - ) - ) - val part = MultiplexedWebsocketPart( - messages = it.read.mapNotNull { - val text = it.text ?: return@mapNotNull null - if (text == "") return@mapNotNull null - val message: MultiplexMessage = text.fromJsonString() ?: return@mapNotNull null - if (message.channel == channel) message.data else null - }, - send = { message -> + + val multiplexedIn = it.read.mapNotNull { + val text = it.text ?: return@mapNotNull null + if (text.isBlank()) return@mapNotNull null + text.fromJsonString() + } + + multiplexedIn + .filter { it.channel == channel && it.start } + .firstOrError() + .map { _ -> + println("Connected to channel $channel") + WebSocketIsh( + messages = multiplexedIn.mapNotNull { + if (it.channel == channel) + if (it.end) { + println("Socket Closed by Server") + throw Exception("Channel Closed By Server") + }else + it.data?.fromJsonString(inType) + else null + }, + send = { message: OUT -> + println("Sending $message to $it") + it.write.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + data = message.toJsonString(outType) + ).toJsonString() + ) + ) + } + ) + } + .retryWhen @SwiftReturnType("Observable") { + val temp = retryTime + retryTime = temp * 2L + it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) + } + .doOnSubscribe { _ -> it.write.onNext( WebSocketFrame( text = MultiplexMessage( channel = channel, - data = message + path = path, + start = true ).toJsonString() ) ) } - ) - val typedPart = WebSocketIsh( - messages = part.messages.mapNotNull { it.fromJsonString(inType) }, - send = { m -> part.send(m.toJsonString(outType)) } - ) - onSetup(typedPart) - typedPart + } .doOnDispose { // println("Disconnecting channel on socket to $shortUrl with $path") @@ -146,4 +163,4 @@ fun multiplexedSo ) ) } -} +} \ No newline at end of file