diff --git a/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt b/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt index c261662c..050fdc22 100644 --- a/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt +++ b/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt @@ -6,16 +6,12 @@ import com.lightningkite.lightningdb.MultiplexMessage import com.lightningkite.rx.mapNotNull import com.lightningkite.rx.okhttp.* import com.lightningkite.uuid -import io.reactivex.rxjava3.core.Completable import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.subjects.PublishSubject import kotlinx.serialization.KSerializer -import kotlinx.serialization.decodeFromString import kotlinx.serialization.serializer -import java.lang.IllegalStateException import java.util.* import java.util.concurrent.TimeUnit -import kotlin.collections.HashMap -import java.util.* var sharedSocketShouldBeActive: Observable = Observable.just(true) @@ -101,6 +97,7 @@ fun multiplexedSock send = { m -> it.send(m.toJsonString(outType)) } ) } + fun multiplexedSocketRaw( url: String, path: String, @@ -108,33 +105,61 @@ fun multiplexedSocketRaw( ): Observable> { val shortUrl = url.substringBefore('?') val channel = uuid().toString() - var lastSocket: WebSocketInterface? = null return sharedSocket(url) - .switchMapSingle { - println("Setting up socket to $shortUrl with $path") - lastSocket = it - val multiplexedIn = it.read.mapNotNull { + .switchMap { sharedSocket -> +// println("Setting up channel $channel to $shortUrl with $path") + val multiplexedIn = sharedSocket.read.mapNotNull { val text = it.text ?: return@mapNotNull null if (text.isBlank()) return@mapNotNull null text.fromJsonString() - } + }.filter { it.channel == channel } + var current = PublishSubject.create() multiplexedIn - .filter { it.channel == channel && it.start } - .firstOrError() - .map { _ -> - println("Connected to channel $channel") - WebSocketIsh( - messages = multiplexedIn.mapNotNull { - if(it.channel == channel) it.data else null - }, - send = { message -> - println("Sending $message to $it") - it.write.onNext(WebSocketFrame(text = MultiplexMessage(channel = channel, data = message).toJsonString())) + .mapNotNull { message -> + when { + message.start -> { +// println("Channel ${message.channel} established with $sharedSocket") + WebSocketIsh( + messages = current, + send = { message -> +// println("Sending $message to $channel") + sharedSocket.write.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + data = message + ).toJsonString() + ) + ) + } + ) } - ) + message.data != null -> { +// println("Got ${message.data} to ${message.channel}") + current.onNext(message.data) + null + } + message.end -> { +// println("Channel ${message.channel} terminated") + current = PublishSubject.create() + sharedSocket.write.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + path = path, + queryParams = queryParams, + start = true + ).toJsonString() + ) + ) + null + } + else -> null + } } .doOnSubscribe { _ -> - it.write.onNext( +// println("Sending onSubscribe Startup Message") + sharedSocket.write.onNext( WebSocketFrame( text = MultiplexMessage( channel = channel, @@ -145,17 +170,22 @@ fun multiplexedSocketRaw( ) ) } - } - .doOnDispose { - println("Disconnecting channel on socket to $shortUrl with $path") - lastSocket?.write?.onNext( - WebSocketFrame( - text = MultiplexMessage( - channel = channel, - path = path, - end = true - ).toJsonString() - ) - ) + .doOnDispose { +// println("Disconnecting channel on socket to $shortUrl with $path") + sharedSocket?.write?.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + path = path, + end = true + ).toJsonString() + ) + ) + } + .retryWhen @SwiftReturnType("Observable") { + val temp = retryTime + retryTime = temp * 2L + it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) + } } }