From d7859c5d0f3612b86cb97742cb1d7a078a97ddf3 Mon Sep 17 00:00:00 2001 From: Brady Svedin Date: Mon, 9 Oct 2023 13:52:20 -0600 Subject: [PATCH] Added clean channel handling for multiplex sockets. When the server ends a channel, the client will re-establish that channel if the observable is still subscribed. --- .../lightningkite/lightningdb/live/sockets.kt | 102 +++++++++++------- 1 file changed, 66 insertions(+), 36 deletions(-) diff --git a/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt b/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt index c058df6b..bae33f37 100644 --- a/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt +++ b/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt @@ -5,16 +5,12 @@ import com.lightningkite.khrysalis.* import com.lightningkite.lightningdb.MultiplexMessage import com.lightningkite.rx.mapNotNull import com.lightningkite.rx.okhttp.* -import io.reactivex.rxjava3.core.Completable import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.subjects.PublishSubject import kotlinx.serialization.KSerializer -import kotlinx.serialization.decodeFromString import kotlinx.serialization.serializer -import java.lang.IllegalStateException import java.util.* import java.util.concurrent.TimeUnit -import kotlin.collections.HashMap -import java.util.* var sharedSocketShouldBeActive: Observable = Observable.just(true) @@ -100,6 +96,7 @@ fun multiplexedSock send = { m -> it.send(m.toJsonString(outType)) } ) } + fun multiplexedSocketRaw( url: String, path: String, @@ -107,33 +104,61 @@ fun multiplexedSocketRaw( ): Observable> { val shortUrl = url.substringBefore('?') val channel = UUID.randomUUID().toString() - var lastSocket: WebSocketInterface? = null return sharedSocket(url) - .switchMapSingle { - println("Setting up socket to $shortUrl with $path") - lastSocket = it - val multiplexedIn = it.read.mapNotNull { + .switchMap { sharedSocket -> +// println("Setting up channel $channel to $shortUrl with $path") + val multiplexedIn = sharedSocket.read.mapNotNull { val text = it.text ?: return@mapNotNull null if (text.isBlank()) return@mapNotNull null text.fromJsonString() - } + }.filter { it.channel == channel } + var current = PublishSubject.create() multiplexedIn - .filter { it.channel == channel && it.start } - .firstOrError() - .map { _ -> - println("Connected to channel $channel") - WebSocketIsh( - messages = multiplexedIn.mapNotNull { - if(it.channel == channel) it.data else null - }, - send = { message -> - println("Sending $message to $it") - it.write.onNext(WebSocketFrame(text = MultiplexMessage(channel = channel, data = message).toJsonString())) + .mapNotNull { message -> + when { + message.start -> { +// println("Channel ${message.channel} established with $sharedSocket") + WebSocketIsh( + messages = current, + send = { message -> +// println("Sending $message to $channel") + sharedSocket.write.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + data = message + ).toJsonString() + ) + ) + } + ) } - ) + message.data != null -> { +// println("Got ${message.data} to ${message.channel}") + current.onNext(message.data) + null + } + message.end -> { +// println("Channel ${message.channel} terminated") + current = PublishSubject.create() + sharedSocket.write.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + path = path, + queryParams = queryParams, + start = true + ).toJsonString() + ) + ) + null + } + else -> null + } } .doOnSubscribe { _ -> - it.write.onNext( +// println("Sending onSubscribe Startup Message") + sharedSocket.write.onNext( WebSocketFrame( text = MultiplexMessage( channel = channel, @@ -144,17 +169,22 @@ fun multiplexedSocketRaw( ) ) } - } - .doOnDispose { - println("Disconnecting channel on socket to $shortUrl with $path") - lastSocket?.write?.onNext( - WebSocketFrame( - text = MultiplexMessage( - channel = channel, - path = path, - end = true - ).toJsonString() - ) - ) + .doOnDispose { +// println("Disconnecting channel on socket to $shortUrl with $path") + sharedSocket?.write?.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + path = path, + end = true + ).toJsonString() + ) + ) + } + .retryWhen @SwiftReturnType("Observable") { + val temp = retryTime + retryTime = temp * 2L + it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) + } } }