Skip to content

Commit

Permalink
Added clean channel handling for multiplex sockets. When the server e…
Browse files Browse the repository at this point in the history
…nds a channel, the client will re-establish that channel if the observable is still subscribed.
  • Loading branch information
bjsvedin committed Oct 9, 2023
1 parent 2d0e0b2 commit d7859c5
Showing 1 changed file with 66 additions and 36 deletions.
102 changes: 66 additions & 36 deletions client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ import com.lightningkite.khrysalis.*
import com.lightningkite.lightningdb.MultiplexMessage
import com.lightningkite.rx.mapNotNull
import com.lightningkite.rx.okhttp.*
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.serialization.KSerializer
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.serializer
import java.lang.IllegalStateException
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.collections.HashMap
import java.util.*


var sharedSocketShouldBeActive: Observable<Boolean> = Observable.just(true)
Expand Down Expand Up @@ -100,40 +96,69 @@ fun <IN: IsCodableAndHashableNotNull, OUT: IsCodableAndHashable> multiplexedSock
send = { m -> it.send(m.toJsonString(outType)) }
)
}

fun multiplexedSocketRaw(
url: String,
path: String,
queryParams: Map<String, List<String>> = mapOf()
): Observable<WebSocketIsh<String, String>> {
val shortUrl = url.substringBefore('?')
val channel = UUID.randomUUID().toString()
var lastSocket: WebSocketInterface? = null
return sharedSocket(url)
.switchMapSingle {
println("Setting up socket to $shortUrl with $path")
lastSocket = it
val multiplexedIn = it.read.mapNotNull {
.switchMap { sharedSocket ->
// println("Setting up channel $channel to $shortUrl with $path")
val multiplexedIn = sharedSocket.read.mapNotNull {
val text = it.text ?: return@mapNotNull null
if (text.isBlank()) return@mapNotNull null
text.fromJsonString<MultiplexMessage>()
}
}.filter { it.channel == channel }
var current = PublishSubject.create<String>()
multiplexedIn
.filter { it.channel == channel && it.start }
.firstOrError()
.map { _ ->
println("Connected to channel $channel")
WebSocketIsh<String, String>(
messages = multiplexedIn.mapNotNull {
if(it.channel == channel) it.data else null
},
send = { message ->
println("Sending $message to $it")
it.write.onNext(WebSocketFrame(text = MultiplexMessage(channel = channel, data = message).toJsonString()))
.mapNotNull { message ->
when {
message.start -> {
// println("Channel ${message.channel} established with $sharedSocket")
WebSocketIsh<String, String>(
messages = current,
send = { message ->
// println("Sending $message to $channel")
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
data = message
).toJsonString()
)
)
}
)
}
)
message.data != null -> {
// println("Got ${message.data} to ${message.channel}")
current.onNext(message.data)
null
}
message.end -> {
// println("Channel ${message.channel} terminated")
current = PublishSubject.create()
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
path = path,
queryParams = queryParams,
start = true
).toJsonString()
)
)
null
}
else -> null
}
}
.doOnSubscribe { _ ->
it.write.onNext(
// println("Sending onSubscribe Startup Message")
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
Expand All @@ -144,17 +169,22 @@ fun multiplexedSocketRaw(
)
)
}
}
.doOnDispose {
println("Disconnecting channel on socket to $shortUrl with $path")
lastSocket?.write?.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
path = path,
end = true
).toJsonString()
)
)
.doOnDispose {
// println("Disconnecting channel on socket to $shortUrl with $path")
sharedSocket?.write?.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
path = path,
end = true
).toJsonString()
)
)
}
.retryWhen @SwiftReturnType("Observable<Error>") {
val temp = retryTime
retryTime = temp * 2L
it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!)
}
}
}

0 comments on commit d7859c5

Please sign in to comment.