From bbe9886401f9b13e840f51bc4ed3b6a59e285455 Mon Sep 17 00:00:00 2001 From: Shane Thompson Date: Wed, 6 Dec 2023 14:23:22 -0700 Subject: [PATCH 1/2] some small updates with kotlin client code so that it converts to swift correctly --- .../lightningdb/live/LiveObserveModelApi.kt | 4 +- .../lightningkite/lightningdb/live/sockets.kt | 6 +-- .../Classes/client/live/sockets.swift | 50 +++++++++++-------- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt b/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt index 0069e53a..e47eedb0 100644 --- a/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt +++ b/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt @@ -47,7 +47,7 @@ class LiveObserveModelApi>( } } -fun , ID:Comparable> Observable>.toListObservable(ordering: Comparator): Observable> { +fun > Observable>.toListObservable(ordering: Comparator): Observable> { val localList = ArrayList() return map { it.wholeList?.let { localList.clear(); localList.addAll(it.sortedWith(ordering)) } @@ -61,7 +61,7 @@ fun , ID:Comparable> Observable>.toListObservabl } } -fun , ID: Comparable> Observable, Query>>.filter(query: Query): Observable> = +fun > Observable, Query>>.filter(query: Query): Observable> = this .doOnNext { it.send(query) diff --git a/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt b/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt index bae33f37..428aa4a7 100644 --- a/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt +++ b/client/src/main/kotlin/com/lightningkite/lightningdb/live/sockets.kt @@ -112,7 +112,7 @@ fun multiplexedSocketRaw( if (text.isBlank()) return@mapNotNull null text.fromJsonString() }.filter { it.channel == channel } - var current = PublishSubject.create() + var current: PublishSubject = PublishSubject.create() multiplexedIn .mapNotNull { message -> when { @@ -135,7 +135,7 @@ fun multiplexedSocketRaw( } message.data != null -> { // println("Got ${message.data} to ${message.channel}") - current.onNext(message.data) + current.onNext(message.data!!) null } message.end -> { @@ -171,7 +171,7 @@ fun multiplexedSocketRaw( } .doOnDispose { // println("Disconnecting channel on socket to $shortUrl with $path") - sharedSocket?.write?.onNext( + sharedSocket.write.onNext( WebSocketFrame( text = MultiplexMessage( channel = channel, diff --git a/ios/LightningServer/Classes/client/live/sockets.swift b/ios/LightningServer/Classes/client/live/sockets.swift index 4bfe7048..00ad3a7d 100644 --- a/ios/LightningServer/Classes/client/live/sockets.swift +++ b/ios/LightningServer/Classes/client/live/sockets.swift @@ -69,34 +69,44 @@ public func multiplexedSocket return multiplexedSocketRaw(url: url, path: path, queryParams: queryParams) .map { (it) -> WebSocketIsh in WebSocketIsh(messages: it.messages.compactMap({ (it) -> IN? in it.fromJsonString(serializer: IN.self) }), send: { (m) -> Void in it.send(m.toJsonString()) }) }; } + public func multiplexedSocketRaw(url: String, path: String, queryParams: Dictionary> = dictionaryOf()) -> Observable> { let shortUrl = url.substringBefore(delimiter: "?") let channel = String(kotlin: UUID.randomUUID()) - var lastSocket: WebSocketInterface? = nil return sharedSocket(url: url) - .flatMap({ (it) -> Single> in - print("Setting up socket to \(String(kotlin: shortUrl)) with \(String(kotlin: path))") - lastSocket = it - let multiplexedIn = it.read.compactMap({ (it) -> MultiplexMessage? in + .switchMap { (sharedSocket) -> Observable> in + // println("Setting up channel $channel to $shortUrl with $path") + let multiplexedIn = sharedSocket.read.compactMap({ (it) -> MultiplexMessage? in guard let text = it.text else { return nil } if text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty { return nil } return (text.fromJsonString() as MultiplexMessage?) - }) + }).filter { (it) -> Bool in it.channel == channel } + var current: PublishSubject = PublishSubject() return multiplexedIn - .filter { (it) -> Bool in it.channel == channel && it.start } - .firstOrError() - .map { (_) -> WebSocketIsh in - print("Connected to channel \(String(kotlin: channel))") - return (WebSocketIsh(messages: multiplexedIn.compactMap({ (it) -> String? in it.channel == channel ? it.data : nil }) as Observable, send: { (message) -> Void in - print("Sending \(String(kotlin: message)) to \(it)") - it.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, data: message).toJsonString())) - } as (String) -> Void) as WebSocketIsh) - } - .doOnSubscribe { (_) -> Void in it.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, queryParams: queryParams, start: true).toJsonString())) } - }) - .doOnDispose { () -> Void in - print("Disconnecting channel on socket to \(String(kotlin: shortUrl)) with \(String(kotlin: path))") - lastSocket?.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, end: true).toJsonString())) + .compactMap({ (message) -> WebSocketIsh? in run { () -> WebSocketIsh? in + if message.start { + // println("Channel ${message.channel} established with $sharedSocket") + return (WebSocketIsh(messages: current as Observable, send: { (message) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, data: message).toJsonString())) } as (String) -> Void) as WebSocketIsh) + } else if message.data != nil { + // println("Got ${message.data} to ${message.channel}") + current.onNext(message.data!) + return nil + } else if message.end { + // println("Channel ${message.channel} terminated") + current = PublishSubject() + sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, queryParams: queryParams, start: true).toJsonString())) + return nil + } else { + return nil + } + } }) + .doOnSubscribe { (_) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, queryParams: queryParams, start: true).toJsonString())) } + .doOnDispose { () -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, end: true).toJsonString())) } + .retry(when: { (it) -> Observable in + let temp = retryTime + retryTime = temp * 2 + return it.delay(.milliseconds(temp), scheduler: MainScheduler.instance) + }) } } From 5234bcf87ec6e2ec2892ee5c148d48c1cfa2f79d Mon Sep 17 00:00:00 2001 From: Shane Thompson Date: Wed, 6 Dec 2023 15:09:25 -0700 Subject: [PATCH 2/2] undid some LiveObservableModelApi changes to kotlin code. --- .../com/lightningkite/lightningdb/live/LiveObserveModelApi.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt b/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt index e47eedb0..c063cd75 100644 --- a/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt +++ b/client/src/main/kotlin/com/lightningkite/lightningdb/live/LiveObserveModelApi.kt @@ -47,7 +47,7 @@ class LiveObserveModelApi>( } } -fun > Observable>.toListObservable(ordering: Comparator): Observable> { +fun , ID: Comparable> Observable>.toListObservable(ordering: Comparator): Observable> { val localList = ArrayList() return map { it.wholeList?.let { localList.clear(); localList.addAll(it.sortedWith(ordering)) } @@ -61,7 +61,7 @@ fun > Observable>.toListObservable(ordering: Comparat } } -fun > Observable, Query>>.filter(query: Query): Observable> = +fun , ID: Comparable> Observable, Query>>.filter(query: Query): Observable> = this .doOnNext { it.send(query)