From 6d1e0afb3967a3e3ec3c740a20df40976b54fe7b Mon Sep 17 00:00:00 2001 From: Brady Svedin Date: Sat, 14 Oct 2023 20:16:23 -0600 Subject: [PATCH] Large update to client multiplexedSocket. It now recovers from a server side channel closing, also lengthened the ping time. Converted for web and ios. --- KtorBatteries.podspec | 2 +- client/src/main/equivalents/swift.fqnames | 3 +- client/src/main/equivalents/ts.fqnames | 1 + .../com/lightningkite/ktordb/live/sockets.kt | 108 +++++++------ .../client/live/LiveObserveModelApi.swift | 16 +- .../Classes/client/live/sockets.swift | 140 ++++++++++++---- .../Classes/client/mock/MockTable.swift | 12 +- .../client/mock/MockWriteModelApi.swift | 2 +- web/package-lock.json | 28 ++-- web/package.json | 4 +- web/src/live/LiveCompleteModelApi.ts | 2 +- web/src/live/LiveFullReadModelApi.ts | 2 +- web/src/live/LiveObserveModelApi.ts | 62 +++---- web/src/live/LiveReadModelApi.ts | 6 +- web/src/live/LiveWriteModelApi.ts | 6 +- web/src/live/sockets.ts | 153 ++++++++++++------ web/src/mock/MockObserveModelApi.ts | 2 +- web/src/mock/MockReadModelApi.ts | 12 +- web/src/mock/MockWriteModelApi.ts | 14 +- 19 files changed, 355 insertions(+), 220 deletions(-) diff --git a/KtorBatteries.podspec b/KtorBatteries.podspec index 2ced3e8..a917d7b 100644 --- a/KtorBatteries.podspec +++ b/KtorBatteries.podspec @@ -1,6 +1,6 @@ Pod::Spec.new do |s| s.name = 'KtorBatteries' - s.version = '0.1.0' + s.version = '0.1.1' s.summary = 'KtorBatteries' s.description = <<-DESC diff --git a/client/src/main/equivalents/swift.fqnames b/client/src/main/equivalents/swift.fqnames index 3eaae39..b4bf2ad 100644 --- a/client/src/main/equivalents/swift.fqnames +++ b/client/src/main/equivalents/swift.fqnames @@ -11,12 +11,13 @@ com.lightningkite.ktordb.live com.lightningkite.ktordb.live.LiveFullReadModelApi com.lightningkite.ktordb.live.LiveObserveModelApi com.lightningkite.ktordb.live.toListObservable +com.lightningkite.ktordb.live.filter com.lightningkite.ktordb.live.LiveReadModelApi com.lightningkite.ktordb.live.LiveWriteModelApi com.lightningkite.ktordb.live.sharedSocketShouldBeActive -com.lightningkite.ktordb.live._overrideWebSocketProvider com.lightningkite.ktordb.live.retryTime com.lightningkite.ktordb.live.lastRetry +com.lightningkite.ktordb.live._overrideWebSocketProvider com.lightningkite.ktordb.live.sharedSocketCache com.lightningkite.ktordb.live.sharedSocket com.lightningkite.ktordb.live.MultiplexedWebsocketPart diff --git a/client/src/main/equivalents/ts.fqnames b/client/src/main/equivalents/ts.fqnames index cdec108..9a045ba 100644 --- a/client/src/main/equivalents/ts.fqnames +++ b/client/src/main/equivalents/ts.fqnames @@ -95,6 +95,7 @@ com.lightningkite.ktordb.anyClear>com.lightningkite.ktordb.PropChaincom.lightnin com.lightningkite.ktordb.CollectionChanges com.lightningkite.ktordb.aggregator>com.lightningkite.ktordb.Aggregate com.lightningkite.ktordb.live.LiveReadModelApi +com.lightningkite.ktordb.live.filter com.lightningkite.ktordb.ListChange com.lightningkite.ktordb.notIn>com.lightningkite.ktordb.PropChaincom.lightningkite.ktordb.notIn.K, com.lightningkite.ktordb.notIn.T com.lightningkite.ktordb.UUIDFor diff --git a/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt b/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt index ebd0e0d..e734324 100644 --- a/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt +++ b/client/src/main/kotlin/com/lightningkite/ktordb/live/sockets.kt @@ -7,6 +7,7 @@ import com.lightningkite.ktordb.MultiplexMessage import com.lightningkite.rx.mapNotNull import com.lightningkite.rx.okhttp.* import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.subjects.PublishSubject import kotlinx.serialization.KSerializer import kotlinx.serialization.serializer import java.util.* @@ -93,53 +94,64 @@ fun multiplexedSo ): Observable> { val shortUrl = url.substringBefore('?') val channel = UUID.randomUUID().toString() - var lastSocket: WebSocketInterface? = null return sharedSocket(url) - .switchMapSingle { -// println("Setting up channel on socket to $shortUrl with $path") - lastSocket = it - - val multiplexedIn = it.read.mapNotNull { + .switchMap { sharedSocket -> +// println("Setting up channel $channel to $shortUrl with $path") + val multiplexedIn = sharedSocket.read.mapNotNull { val text = it.text ?: return@mapNotNull null if (text.isBlank()) return@mapNotNull null text.fromJsonString() } - + .filter { it.channel == channel } + var current = PublishSubject.create() multiplexedIn - .filter { it.channel == channel && it.start } - .firstOrError() - .map { _ -> - println("Connected to channel $channel") - WebSocketIsh( - messages = multiplexedIn.mapNotNull { - if (it.channel == channel) - if (it.end) { - println("Socket Closed by Server") - throw Exception("Channel Closed By Server") - }else - it.data?.fromJsonString(inType) - else null - }, - send = { message: OUT -> - println("Sending $message to $it") - it.write.onNext( + .mapNotNull { message -> + when { + message.start -> { +// println("Channel ${message.channel} established with $sharedSocket") + WebSocketIsh( + messages = current, + send = { message -> +// println("Sending $message to $channel") + sharedSocket.write.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + data = message.toJsonString(outType) + ).toJsonString() + ) + ) + } + ) + } + + message.data != null -> { +// println("Got ${message.data} to ${message.channel}") + current.onNext(message.data?.fromJsonString(inType)) + null + } + + message.end -> { +// println("Channel ${message.channel} terminated") + current = PublishSubject.create() + sharedSocket.write.onNext( WebSocketFrame( text = MultiplexMessage( channel = channel, - data = message.toJsonString(outType) + path = path, + start = true ).toJsonString() ) ) + null } - ) - } - .retryWhen @SwiftReturnType("Observable") { - val temp = retryTime - retryTime = temp * 2L - it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) + + else -> null + } } .doOnSubscribe { _ -> - it.write.onNext( +// println("Sending onSubscribe Startup Message") + sharedSocket.write.onNext( WebSocketFrame( text = MultiplexMessage( channel = channel, @@ -149,18 +161,22 @@ fun multiplexedSo ) ) } - - } - .doOnDispose { -// println("Disconnecting channel on socket to $shortUrl with $path") - lastSocket?.write?.onNext( - WebSocketFrame( - text = MultiplexMessage( - channel = channel, - path = path, - end = true - ).toJsonString() - ) - ) + .doOnDispose { +// println("Disconnecting channel on socket to $shortUrl with $path") + sharedSocket.write.onNext( + WebSocketFrame( + text = MultiplexMessage( + channel = channel, + path = path, + end = true + ).toJsonString() + ) + ) + } + .retryWhen @SwiftReturnType("Observable") { + val temp = retryTime + retryTime = temp * 2L + it.delay(temp, TimeUnit.MILLISECONDS, HttpClient.responseScheduler!!) + } } -} \ No newline at end of file +} diff --git a/ios/KtorBatteries/Classes/client/live/LiveObserveModelApi.swift b/ios/KtorBatteries/Classes/client/live/LiveObserveModelApi.swift index 2d8f713..f6e94dd 100644 --- a/ios/KtorBatteries/Classes/client/live/LiveObserveModelApi.swift +++ b/ios/KtorBatteries/Classes/client/live/LiveObserveModelApi.swift @@ -2,13 +2,14 @@ // Generated by Khrysalis - this file will be overwritten. import KhrysalisRuntime import RxSwift +import RxSwiftPlus import Foundation public final class LiveObserveModelApi : ObserveModelApi { public var openSocket: (Query) -> Observable> public init(openSocket: @escaping (Query) -> Observable>) { self.openSocket = openSocket - self.alreadyOpen = ([:] as Dictionary, Observable>>) + self.alreadyOpen = Dictionary, Observable>>() super.init() //Necessary properties should be initialized now } @@ -44,6 +45,13 @@ public func xObservableToListObservable(_ this: Observable(_ this: Observable, Query>>, _ query: Query) -> Observable> { + return xObservableToListObservable(this + .doOnNext { (it) -> Void in it.send(query) } + .switchMap { (it) -> Observable> in it.messages } + .retry(when: { (it) -> Observable in it.delay(.milliseconds(5000), scheduler: MainScheduler.instance) }), ordering: getListComparator(query.orderBy) ?? compareBy(selector: { (it) in it._id })); +} public final class LiveObserveModelApiCompanion { public init() { //Necessary properties should be initialized now @@ -51,10 +59,6 @@ public final class LiveObserveModelApiCompanion { public static let INSTANCE = LiveObserveModelApiCompanion() public func create(multiplexUrl: String, token: String?, headers: Dictionary, path: String) -> LiveObserveModelApi { - return LiveObserveModelApi(openSocket: { (query) -> Observable> in xObservableToListObservable((multiplexedSocket(url: token != nil ? "\(String(kotlin: multiplexUrl))?jwt=\(String(kotlin: token))" : multiplexUrl, path: path) as Observable, Query>>) - .switchMap { (it) -> Observable> in - it.send(query) - return it.messages.catchError({ (it) -> Observable> in Observable.never() }) - }, ordering: getListComparator(query.orderBy) ?? compareBy(selector: { (it) in it._id })) }); + return LiveObserveModelApi(openSocket: { (query) -> Observable> in xObservableFilter((multiplexedSocket(url: token != nil ? "\(String(kotlin: multiplexUrl))?jwt=\(String(kotlin: token))" : multiplexUrl, path: path) as Observable, Query>>), query) }); } } diff --git a/ios/KtorBatteries/Classes/client/live/sockets.swift b/ios/KtorBatteries/Classes/client/live/sockets.swift index 956027d..3477251 100644 --- a/ios/KtorBatteries/Classes/client/live/sockets.swift +++ b/ios/KtorBatteries/Classes/client/live/sockets.swift @@ -5,11 +5,11 @@ import RxSwift import RxSwiftPlus import Foundation -public var sharedSocketShouldBeActive: Observable = Observable.just(false) - -public var _overrideWebSocketProvider: ((String) -> Observable)? = nil +public var sharedSocketShouldBeActive: Observable = Observable.just(true) private var retryTime = 1000 private var lastRetry = 0 + +public var _overrideWebSocketProvider: ((String) -> Observable)? = nil private var sharedSocketCache = Dictionary>() public func sharedSocket(url: String) -> Observable { return sharedSocketCache.getOrPut(key: url) { () -> Observable in sharedSocketShouldBeActive @@ -22,22 +22,17 @@ public func sharedSocket(url: String) -> Observable { return (_overrideWebSocketProvider?(url) ?? HttpClient.INSTANCE.webSocket(url: url)) .switchMap { (it) -> Observable in lastRetry = Int(Date().timeIntervalSince1970 * 1000.0) -// print("Connection to \(String(kotlin: shortUrl)) established, starting pings") + // println("Connection to $shortUrl established, starting pings") // Only have this observable until it fails - let pingMessages: Observable = Observable.interval(RxTimeInterval.milliseconds(Int(5000)), scheduler: HttpClient.INSTANCE.responseScheduler!) - .map { (_) -> Void in -// print("Sending ping to \(String(kotlin: url))") - it.write.onNext(WebSocketFrame(text: "")) - }.switchMap { (it) -> Observable in Observable.never() } + let pingMessages: Observable = Observable.interval(RxTimeInterval.milliseconds(Int(30000)), scheduler: HttpClient.INSTANCE.responseScheduler!) + .map { (_) -> Void in it.write.onNext(WebSocketFrame(text: " ")) }.switchMap { (it) -> Observable in Observable.never() } let timeoutAfterSeconds: Observable = it.read - .doOnNext { (it) -> Void in -// print("Got message from \(String(kotlin: shortUrl)): \(it)") - if Int(Date().timeIntervalSince1970 * 1000.0) > lastRetry + 60000 { + .doOnNext { (it) -> Void in if Int(Date().timeIntervalSince1970 * 1000.0) > lastRetry + 60000 { retryTime = 1000 } } - .timeout(.milliseconds(10000), scheduler: MainScheduler.instance) + .timeout(.milliseconds(40000), scheduler: MainScheduler.instance) .switchMap { (it) -> Observable in Observable.never() } return Observable.merge(Observable.just(it), pingMessages, timeoutAfterSeconds) @@ -75,29 +70,110 @@ public final class WebSocketIsh(url: String, path: String, onSetup: @escaping (WebSocketIsh) -> Void = { (it) -> Void in }) -> Observable> { - return multiplexedSocket(url: url, path: path, inType: IN.self, outType: OUT.self, onSetup: onSetup); +public func multiplexedSocket(url: String, path: String) -> Observable> { + return multiplexedSocket(url: url, path: path, inType: IN.self, outType: OUT.self); } -public func multiplexedSocket(url: String, path: String, inType: IN.Type, outType: OUT.Type, onSetup: @escaping (WebSocketIsh) -> Void = { (it) -> Void in }) -> Observable> { +public func multiplexedSocket(url: String, path: String, inType: IN.Type, outType: OUT.Type) -> Observable> { let shortUrl = url.substringBefore(delimiter: "?") let channel = String(kotlin: UUID.randomUUID()) - var lastSocket: WebSocketInterface? = nil + + return sharedSocket(url: url) - .map { (it) -> WebSocketIsh in - // print("Setting up channel on socket to \(shortUrl) with \(path)") - lastSocket = it - it.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, start: true).toJsonString())) - let part = MultiplexedWebsocketPart(messages: it.read.compactMap({ (it) -> String? in - guard let text = it.text else { return nil } - if text == "" { return nil } - guard let message: MultiplexMessage = text.fromJsonString() else { return nil } - return message.channel == channel ? message.data : nil - }), send: { (message) -> Void in it.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, data: message).toJsonString())) }) - let typedPart = (WebSocketIsh(messages: part.messages.compactMap({ (it) -> IN? in it.fromJsonString(serializer: IN.self) }) as Observable, send: { (m) -> Void in part.send(m.toJsonString()) } as (OUT) -> Void) as WebSocketIsh) - onSetup(typedPart) - return typedPart - } - .doOnDispose { () -> Void in lastSocket?.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, end: true).toJsonString())) } + .switchMap { sharedSocket in + print("Setting up channel $channel to $shortUrl with $path") + let multiplexedIn: Observable = sharedSocket.read.compactMap({ (it) -> MultiplexMessage? in + guard let text = it.text else { return nil } + if text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty { return nil } + guard let message: MultiplexMessage = text.fromJsonString() else { + return nil + } + return message + }) + .filter { (it) -> Bool in it.channel == channel } + + var current = PublishSubject() + return multiplexedIn + .compactMap { message in + if message.start { + print("Channel ${message.channel} established with $sharedSocket") + return (WebSocketIsh( + messages: current as Observable, + send: { (message) -> Void in + sharedSocket.write.onNext( + WebSocketFrame( + text: MultiplexMessage( + channel: channel, + data: message.toJsonString()).toJsonString() + ) + ) + } as (OUT) -> Void) as WebSocketIsh) + } else if message.data != nil { + print("Got ${message.data} to ${message.channel}") + guard let next:IN = message.data?.fromJsonString() else { + return nil + } + current.onNext(next) + + return nil + } else if message.end { + print("Channel ${message.channel} terminated") + current = PublishSubject() + sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, start: true).toJsonString())) + return nil + } else { + return nil + } + +// return (WebSocketIsh(messages: current as Observable, send: { (message) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, data: message.toJsonString()).toJsonString())) } as (OUT) -> Void) as WebSocketIsh) + } + .doOnSubscribe { (_) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, start: true).toJsonString())) } + .doOnDispose { () -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, end: true).toJsonString())) } + .retry(when: { (it) -> Observable in + let temp = retryTime + retryTime = temp * 2 + return it.delay(.milliseconds(temp), scheduler: MainScheduler.instance) + }) + + } + // + // return sharedSocket(url: url) + // .switchMap { (sharedSocket) -> Observable> in + // print("Setting up channel $channel to $shortUrl with $path") + // let multiplexedIn = sharedSocket.read.compactMap({ (it) -> MultiplexMessage? in + // guard let text = it.text else { return nil } + // if text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty { return nil } + // return (text.fromJsonString() as MultiplexMessage?) + // }) + // .filter { (it) -> Bool in it.channel == channel } + // var current = PublishSubject() + // return multiplexedIn + // .compactMap({ (message) -> WebSocketIsh? in + // if message.start { + // print("Channel ${message.channel} established with $sharedSocket") + // return (WebSocketIsh(messages: current as Observable, send: { (message) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, data: message.toJsonString()).toJsonString())) } as (OUT) -> Void) as WebSocketIsh) + // } else if message.data != nil { + // print("Got ${message.data} to ${message.channel}") + // if let data = message.date { + // current.onNext(data.fromJsonString(serializer: IN.self)) + // } + // return nil + // } else if message.end { + // print("Channel ${message.channel} terminated") + // current = PublishSubject() + // sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, start: true).toJsonString())) + // return nil + // } else { + // return nil + // } + // }) + // .doOnSubscribe { (_) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, start: true).toJsonString())) } + // .doOnDispose { () -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, end: true).toJsonString())) } + // .retry(when: { (it) -> Observable in + // let temp = retryTime + // retryTime = temp * 2 + // return it.delay(.milliseconds(temp), scheduler: MainScheduler.instance) + // }) + // } } diff --git a/ios/KtorBatteries/Classes/client/mock/MockTable.swift b/ios/KtorBatteries/Classes/client/mock/MockTable.swift index cc57044..d79d5e2 100644 --- a/ios/KtorBatteries/Classes/client/mock/MockTable.swift +++ b/ios/KtorBatteries/Classes/client/mock/MockTable.swift @@ -28,17 +28,17 @@ public final class MockTable where Model.ID == UUID { } public func addItem(item: Model) -> Model { - var array58 = self.data - array58[item._id] = item - self.data = array58 + var array65 = self.data + array65[item._id] = item + self.data = array65 self.signals.onNext(SignalData(item: item, created: true, deleted: false)) return item } public func replaceItem(item: Model) -> Model { - var array62 = self.data - array62[item._id] = item - self.data = array62 + var array69 = self.data + array69[item._id] = item + self.data = array69 self.signals.onNext(SignalData(item: item, created: false, deleted: false)) return item } diff --git a/ios/KtorBatteries/Classes/client/mock/MockWriteModelApi.swift b/ios/KtorBatteries/Classes/client/mock/MockWriteModelApi.swift index 27f6687..dbb07fb 100644 --- a/ios/KtorBatteries/Classes/client/mock/MockWriteModelApi.swift +++ b/ios/KtorBatteries/Classes/client/mock/MockWriteModelApi.swift @@ -46,7 +46,7 @@ public final class MockWriteModelApi : WriteModelApi where .asList() .filter({ (it) -> Bool in modification.condition.invoke(on: it) }) .map({ (it) -> Model in self.table.replaceItem(item: modification.modification.invoke(on: it)) })) - .map { (it) -> Int in Int(it.count) }; + .map { (it) -> Int in it.count }; } override public func delete(id: UUIDFor) -> Single { diff --git a/web/package-lock.json b/web/package-lock.json index 8743065..f725882 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -1,15 +1,15 @@ { "name": "@lightningkite/ktor-batteries", - "version": "0.1.4", + "version": "0.1.5", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@lightningkite/ktor-batteries", - "version": "0.1.4", + "version": "0.1.5", "license": "MIT", "dependencies": { - "@js-joda/core": "^4.3.1", + "@js-joda/core": "^5.5.3", "@lightningkite/khrysalis-runtime": "^1.0.5", "@lightningkite/rxjs-plus": "1.0.0", "@types/mersenne-twister": "1.x", @@ -1627,9 +1627,9 @@ } }, "node_modules/@js-joda/core": { - "version": "4.3.1", - "resolved": "https://registry.npmjs.org/@js-joda/core/-/core-4.3.1.tgz", - "integrity": "sha512-oeaetlodcqVsiZDxnEcqsbs+sXBkASxua0mXs5OXuPQXz3/wdPTMlxwfQ4z2HKcOik3S9voW3QJkp/KLWDhvRQ==" + "version": "5.6.1", + "resolved": "https://registry.npmjs.org/@js-joda/core/-/core-5.6.1.tgz", + "integrity": "sha512-Xla/d7ZMMR6+zRd6lTio0wRZECfcfFJP7GGe9A9L4tDOlD5CX4YcZ4YZle9w58bBYzssojVapI84RraKWDQZRg==" }, "node_modules/@lightningkite/khrysalis-runtime": { "version": "1.0.5", @@ -1647,11 +1647,6 @@ "uuid": "9.x" } }, - "node_modules/@lightningkite/khrysalis-runtime/node_modules/@js-joda/core": { - "version": "5.5.3", - "resolved": "https://registry.npmjs.org/@js-joda/core/-/core-5.5.3.tgz", - "integrity": "sha512-7dqNYwG8gCt4hfg5PKgM7xLEcgSBcx/UgC92OMnhMmvAnq11QzDFPrxUkNR/u5kn17WWLZ8beZ4A3Qrz4pZcmQ==" - }, "node_modules/@lightningkite/khrysalis-runtime/node_modules/@types/uuid": { "version": "9.0.4", "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.4.tgz", @@ -8491,9 +8486,9 @@ } }, "@js-joda/core": { - "version": "4.3.1", - "resolved": "https://registry.npmjs.org/@js-joda/core/-/core-4.3.1.tgz", - "integrity": "sha512-oeaetlodcqVsiZDxnEcqsbs+sXBkASxua0mXs5OXuPQXz3/wdPTMlxwfQ4z2HKcOik3S9voW3QJkp/KLWDhvRQ==" + "version": "5.6.1", + "resolved": "https://registry.npmjs.org/@js-joda/core/-/core-5.6.1.tgz", + "integrity": "sha512-Xla/d7ZMMR6+zRd6lTio0wRZECfcfFJP7GGe9A9L4tDOlD5CX4YcZ4YZle9w58bBYzssojVapI84RraKWDQZRg==" }, "@lightningkite/khrysalis-runtime": { "version": "1.0.5", @@ -8511,11 +8506,6 @@ "uuid": "9.x" }, "dependencies": { - "@js-joda/core": { - "version": "5.5.3", - "resolved": "https://registry.npmjs.org/@js-joda/core/-/core-5.5.3.tgz", - "integrity": "sha512-7dqNYwG8gCt4hfg5PKgM7xLEcgSBcx/UgC92OMnhMmvAnq11QzDFPrxUkNR/u5kn17WWLZ8beZ4A3Qrz4pZcmQ==" - }, "@types/uuid": { "version": "9.0.4", "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.4.tgz", diff --git a/web/package.json b/web/package.json index 7e5c3a0..1ecbf88 100644 --- a/web/package.json +++ b/web/package.json @@ -1,11 +1,11 @@ { "name": "@lightningkite/ktor-batteries", - "version": "0.1.4", + "version": "0.1.5", "description": "", "main": "dist/index.js", "types": "dist/index.d.ts", "dependencies": { - "@js-joda/core": "^4.3.1", + "@js-joda/core": "^5.5.3", "iter-tools-es": "^7.1.4", "rxjs": "7.4.0", "uuid": "8.x", diff --git a/web/src/live/LiveCompleteModelApi.ts b/web/src/live/LiveCompleteModelApi.ts index 0cb61d4..3bbd757 100644 --- a/web/src/live/LiveCompleteModelApi.ts +++ b/web/src/live/LiveCompleteModelApi.ts @@ -23,7 +23,7 @@ export namespace LiveCompleteModelApi { private constructor() { } public static INSTANCE = new Companion(); - + public create>(Model: Array, root: string, multiplexSocketUrl: string, path: string, token: (string | null), headers: Map = new Map([])): LiveCompleteModelApi { return new LiveCompleteModelApi(LiveReadModelApi.Companion.INSTANCE.create(Model, root, path, token, headers), LiveWriteModelApi.Companion.INSTANCE.create(Model, root, path, token, headers), LiveObserveModelApi.Companion.INSTANCE.create(Model, multiplexSocketUrl, token, headers, path)); } diff --git a/web/src/live/LiveFullReadModelApi.ts b/web/src/live/LiveFullReadModelApi.ts index c16b46c..237db19 100644 --- a/web/src/live/LiveFullReadModelApi.ts +++ b/web/src/live/LiveFullReadModelApi.ts @@ -20,7 +20,7 @@ export namespace LiveFullReadModelApi { private constructor() { } public static INSTANCE = new Companion(); - + public create>(Model: Array, root: string, multiplexSocketUrl: string, path: string, token: (string | null), headers: Map = new Map([])): LiveFullReadModelApi { return new LiveFullReadModelApi(LiveReadModelApi.Companion.INSTANCE.create(Model, root, path, token, headers), LiveObserveModelApi.Companion.INSTANCE.create(Model, multiplexSocketUrl, token, headers, path)); } diff --git a/web/src/live/LiveObserveModelApi.ts b/web/src/live/LiveObserveModelApi.ts index 2ae7574..9e22259 100644 --- a/web/src/live/LiveObserveModelApi.ts +++ b/web/src/live/LiveObserveModelApi.ts @@ -7,8 +7,8 @@ import { Query } from '../db/Query' import { xListComparatorGet } from '../db/SortPart' import { WebSocketIsh, multiplexedSocketReified } from './sockets' import { Comparable, Comparator, EqualOverrideMap, compareBy, listRemoveAll, runOrNull, safeEq, xMutableMapGetOrPut } from '@lightningkite/khrysalis-runtime' -import { NEVER, Observable } from 'rxjs' -import {catchError, map, publishReplay, refCount, switchMap, tap} from 'rxjs/operators' +import { Observable } from 'rxjs' +import { delay, map, publishReplay, refCount, retryWhen, switchMap, tap } from 'rxjs/operators' //! Declares com.lightningkite.ktordb.live.LiveObserveModelApi export class LiveObserveModelApi> extends ObserveModelApi { @@ -25,12 +25,10 @@ export class LiveObserveModelApi> extends ObserveMod public observe(query: Query): Observable> { //multiplexedSocket, Query>("$multiplexUrl?jwt=$token", path) return xMutableMapGetOrPut, Observable>>(this.alreadyOpen, query, (): Observable> => (this.openSocket(query) - .pipe(tap({ - unsubscribe: (): void => { - this.alreadyOpen.delete(query); - } - })) - .pipe(publishReplay(1)) + .pipe(tap({ unsubscribe: (): void => { + this.alreadyOpen.delete(query); + } })) + .pipe(publishReplay(1)) .pipe(refCount()))); } } @@ -40,19 +38,11 @@ export namespace LiveObserveModelApi { private constructor() { } public static INSTANCE = new Companion(); - + public create>(Model: Array, multiplexUrl: string, token: (string | null), headers: Map, path: string): LiveObserveModelApi { - return new LiveObserveModelApi((query: Query): Observable> => (xObservableToListObservable(multiplexedSocketReified, Query>([ListChange, Model], [Query, Model], ((): string => { - if (token !== null) { - return `${multiplexUrl}?jwt=${token}` - } else { - return multiplexUrl - } - })(), path, undefined) - .pipe(switchMap((it: WebSocketIsh, Query>): Observable> => { - it.send(query); - return it.messages.pipe(catchError((it: any): Observable> => (NEVER))); - })), xListComparatorGet(query.orderBy) ?? compareBy((it: Model): (Comparable<(any | null)> | null) => (it._id))))); + return new LiveObserveModelApi((query: Query): Observable> => (xObservableFilter(multiplexedSocketReified, Query>([ListChange, Model], [Query, Model], ((): string => { + if (token !== null) { return `${multiplexUrl}?jwt=${token}` } else { return multiplexUrl } + })(), path), query))); } } } @@ -61,22 +51,32 @@ export namespace LiveObserveModelApi { export function xObservableToListObservable>(this_: Observable>, ordering: Comparator): Observable> { const localList = ([] as Array); return this_.pipe(map((it: ListChange): Array => { - const it_11 = it.wholeList; - if (it_11 !== null) { - localList.length = 0; localList.push(...it_11.slice().sort(ordering)); + const it_7 = it.wholeList; + if (it_7 !== null) { + localList.length = 0; localList.push(...it_7.slice().sort(ordering)); } - const it_13 = it._new; - if (it_13 !== null) { - listRemoveAll(localList, (o: T): boolean => (safeEq(it_13._id, o._id))); - let index = localList.findIndex((inList: T): boolean => (ordering(it_13, inList) < 0)); + const it_9 = it._new; + if (it_9 !== null) { + listRemoveAll(localList, (o: T): boolean => (safeEq(it_9._id, o._id))); + let index = localList.findIndex((inList: T): boolean => (ordering(it_9, inList) < 0)); if (index === (-1)) { index = localList.length } - localList.splice(index, 0, it_13); + localList.splice(index, 0, it_9); } else { - const it_20 = it.old; - if (it_20 !== null) { - listRemoveAll(localList, (o: T): boolean => (safeEq(it_20._id, o._id))); + const it_16 = it.old; + if (it_16 !== null) { + listRemoveAll(localList, (o: T): boolean => (safeEq(it_16._id, o._id))); } } return localList; })); +} + +//! Declares com.lightningkite.ktordb.live.filter>io.reactivex.rxjava3.core.Observablecom.lightningkite.ktordb.live.WebSocketIshcom.lightningkite.ktordb.ListChangecom.lightningkite.ktordb.live.filter.T, com.lightningkite.ktordb.Querycom.lightningkite.ktordb.live.filter.T +export function xObservableFilter>(this_: Observable, Query>>, query: Query): Observable> { + return xObservableToListObservable(this_ + .pipe(tap((it: WebSocketIsh, Query>): void => { + it.send(query); + })) + .pipe(switchMap((it: WebSocketIsh, Query>): Observable> => (it.messages))) + .pipe(retryWhen( (it: Observable): Observable => (it.pipe(delay(5000))))), xListComparatorGet(query.orderBy) ?? compareBy((it: T): (Comparable<(any | null)> | null) => (it._id))); } \ No newline at end of file diff --git a/web/src/live/LiveReadModelApi.ts b/web/src/live/LiveReadModelApi.ts index a557b4d..096fed4 100644 --- a/web/src/live/LiveReadModelApi.ts +++ b/web/src/live/LiveReadModelApi.ts @@ -13,9 +13,7 @@ export class LiveReadModelApi> extends ReadModelApi< public constructor(public readonly url: string, token: (string | null), headers: Map = new Map([]), public readonly serializer: ReifiedType, public readonly querySerializer: ReifiedType) { super(); this.authHeaders = ((): (Map | null) => { - if (token === null || token === undefined) { - return null - } + if (token === null || token === undefined) { return null } return ((it: string): Map => (new Map([...headers, ...new Map([["Authorization", `Bearer ${it}`]])])))(token) })() ?? headers; } @@ -42,7 +40,7 @@ export namespace LiveReadModelApi { private constructor() { } public static INSTANCE = new Companion(); - + public create>(Model: Array, root: string, path: string, token: (string | null), headers: Map = new Map([])): LiveReadModelApi { return new LiveReadModelApi(`${root}${path}`, token, headers, Model, [Query, Model]); } diff --git a/web/src/live/LiveWriteModelApi.ts b/web/src/live/LiveWriteModelApi.ts index a5b9543..b723cfa 100644 --- a/web/src/live/LiveWriteModelApi.ts +++ b/web/src/live/LiveWriteModelApi.ts @@ -16,9 +16,7 @@ export class LiveWriteModelApi> extends WriteModelAp public constructor(public readonly url: string, token: (string | null), headers: Map, public readonly serializer: ReifiedType) { super(); this.authHeaders = ((): (Map | null) => { - if (token === null || token === undefined) { - return null - } + if (token === null || token === undefined) { return null } return ((it: string): Map => (new Map([...headers, ...new Map([["Authorization", `Bearer ${it}`]])])))(token) })() ?? headers; } @@ -72,7 +70,7 @@ export namespace LiveWriteModelApi { private constructor() { } public static INSTANCE = new Companion(); - + public create>(Model: Array, root: string, path: string, token: (string | null), headers: Map = new Map([])): LiveWriteModelApi { return new LiveWriteModelApi(`${root}${path}`, token, headers, Model); } diff --git a/web/src/live/sockets.ts b/web/src/live/sockets.ts index a1dec14..6abd190 100644 --- a/web/src/live/sockets.ts +++ b/web/src/live/sockets.ts @@ -1,18 +1,20 @@ // Package: com.lightningkite.ktordb.live // Generated by Khrysalis - this file will be overwritten. -import { MultiplexMessage } from '../db/MultiplexMessage' +import {MultiplexMessage} from '../db/MultiplexMessage' import { ReifiedType, runOrNull, safeEq, + xCharSequenceIsBlank, xMutableMapGetOrPut, xStringSubstringBefore } from '@lightningkite/khrysalis-runtime' -import { HttpClient, JSON2, WebSocketFrame, WebSocketInterface, isNonNull } from '@lightningkite/rxjs-plus' -import { NEVER, Observable, filter, interval, merge, of, map as rMap } from 'rxjs' +import {doOnSubscribe, HttpClient, isNonNull, JSON2, WebSocketFrame, WebSocketInterface} from '@lightningkite/rxjs-plus' +import {filter as rFilter, interval, map as rMap, merge, NEVER, Observable, of, Subject, SubscriptionLike} from 'rxjs' import { delay, distinctUntilChanged, + filter, map, publishReplay, refCount, @@ -21,20 +23,35 @@ import { tap, timeout } from 'rxjs/operators' -import { v4 as randomUuidV4 } from 'uuid' +import {v4 as randomUuidV4} from 'uuid' //! Declares com.lightningkite.ktordb.live.sharedSocketShouldBeActive -export let _sharedSocketShouldBeActive: Observable = of(false); -export function getSharedSocketShouldBeActive(): Observable { return _sharedSocketShouldBeActive; } -export function setSharedSocketShouldBeActive(value: Observable) { _sharedSocketShouldBeActive = value; } +export let _sharedSocketShouldBeActive: Observable = of(true); + +export function getSharedSocketShouldBeActive(): Observable { + return _sharedSocketShouldBeActive; +} + +export function setSharedSocketShouldBeActive(value: Observable) { + _sharedSocketShouldBeActive = value; +} -//! Declares com.lightningkite.ktordb.live._overrideWebSocketProvider -export let __overrideWebSocketProvider: (((url: string) => Observable) | null) = null; -export function get_overrideWebSocketProvider(): (((url: string) => Observable) | null) { return __overrideWebSocketProvider; } -export function set_overrideWebSocketProvider(value: (((url: string) => Observable) | null)) { __overrideWebSocketProvider = value; } let retryTime = 1000; let lastRetry = 0; + +//! Declares com.lightningkite.ktordb.live._overrideWebSocketProvider +export let __overrideWebSocketProvider: (((url: string) => Observable) | null) = null; + +export function get_overrideWebSocketProvider(): (((url: string) => Observable) | null) { + return __overrideWebSocketProvider; +} + +export function set_overrideWebSocketProvider(value: (((url: string) => Observable) | null)) { + __overrideWebSocketProvider = value; +} + const sharedSocketCache = new Map>(); + //! Declares com.lightningkite.ktordb.live.sharedSocket export function sharedSocket(url: string): Observable { return xMutableMapGetOrPut>(sharedSocketCache, url, (): Observable => (getSharedSocketShouldBeActive() @@ -49,23 +66,23 @@ export function sharedSocket(url: string): Observable { return (runOrNull(get_overrideWebSocketProvider(), _ => _(url)) ?? HttpClient.INSTANCE.webSocket(url)) .pipe(switchMap((it: WebSocketInterface): Observable => { lastRetry = Date.now(); - // println("Connection to $shortUrl established, starting pings") + // println("Connection to $shortUrl established, starting pings") // Only have this observable until it fails - const pingMessages: Observable = interval(5000) + const pingMessages: Observable = interval(30000) .pipe(map((_0: number): void => { - // println("Sending ping to $url") - return it.write.next({text: "", binary: null}); + // println("Sending ping to $url") + return it.write.next({text: " ", binary: null}); })).pipe(switchMap((it: void): Observable => (NEVER))); const timeoutAfterSeconds: Observable = it.read .pipe(tap((it: WebSocketFrame): void => { - // println("Got message from $shortUrl: ${it}") + // println("Got message from $shortUrl: ${it}") if (Date.now() > lastRetry + 60000) { retryTime = 1000; } })) - .pipe(timeout(10000)) + .pipe(timeout(40000)) .pipe(switchMap((it: WebSocketFrame): Observable => (NEVER))); return merge(of(it), pingMessages, timeoutAfterSeconds); @@ -95,6 +112,7 @@ export class MultiplexedWebsocketPart { public constructor(public readonly messages: Observable, public readonly send: ((a: string) => void)) { } } + //! Declares com.lightningkite.ktordb.live.WebSocketIsh export class WebSocketIsh { public constructor(public readonly messages: Observable, public readonly send: ((a: OUT) => void)) { @@ -102,43 +120,80 @@ export class WebSocketIsh { } //! Declares com.lightningkite.ktordb.live.multiplexedSocket -export function multiplexedSocketReified(IN: Array, OUT: Array, url: string, path: string, onSetup: ((a: WebSocketIsh) => void) = (it: WebSocketIsh): void => {}): Observable> { - return multiplexedSocket(url, path, IN, OUT, onSetup); +export function multiplexedSocketReified(IN: Array, OUT: Array, url: string, path: string): Observable> { + return multiplexedSocket(url, path, IN, OUT); } //! Declares com.lightningkite.ktordb.live.multiplexedSocket -export function multiplexedSocket(url: string, path: string, inType: ReifiedType, outType: ReifiedType, onSetup: ((a: WebSocketIsh) => void) = (it: WebSocketIsh): void => {}): Observable> { +export function multiplexedSocket(url: string, path: string, inType: ReifiedType, outType: ReifiedType): Observable> { const shortUrl = xStringSubstringBefore(url, '?', undefined); const channel = randomUuidV4(); - let lastSocket: (WebSocketInterface | null) = null; return sharedSocket(url) - .pipe(map((it: WebSocketInterface): WebSocketIsh => { - // println("Setting up channel on socket to $shortUrl with $path") - lastSocket = it; - it.write.next({ text: JSON.stringify(new MultiplexMessage(channel, path, true, undefined, undefined)), binary: null }); - const part = new MultiplexedWebsocketPart(it.read.pipe(rMap((it: WebSocketFrame): (string | null) => { - const text = it.text - if(text === null) { return null } - if (text === "") { return null } - const message: MultiplexMessage | null = JSON2.parse<(MultiplexMessage | null)>(text, [MultiplexMessage]) - if(message === null) { return null } - return message.channel === channel ? message.data : null - }), filter(isNonNull)), (message: string): void => { - it.write.next({ text: JSON.stringify(new MultiplexMessage(channel, undefined, undefined, undefined, message)), binary: null }); - }); - const typedPart = new WebSocketIsh(part.messages.pipe(rMap((it: string): (IN | null) => (JSON2.parse(it, inType))), filter(isNonNull)), (m: OUT): void => { - part.send(JSON.stringify(m)); - }); - onSetup(typedPart); - return typedPart; - })) - .pipe(tap({ - unsubscribe: (): void => { - // println("Disconnecting channel on socket to $shortUrl with $path") - const temp50 = (lastSocket?.write ?? null); - if (temp50 !== null && temp50 !== undefined) { - temp50.next({ text: JSON.stringify(new MultiplexMessage(channel, path, undefined, true, undefined)), binary: null }) - }; - } + .pipe(switchMap((sharedSocket: WebSocketInterface): Observable> => { + // console.log("Setting up channel $channel to $shortUrl with $path") + const multiplexedIn = sharedSocket.read.pipe(rMap((it: WebSocketFrame): (MultiplexMessage | null) => { + const text = it.text + if (text === null) { + return null + } + if (xCharSequenceIsBlank(text)) { + return null + } + return JSON2.parse(text, [MultiplexMessage]); + }), rFilter(isNonNull)) + .pipe(filter((it: MultiplexMessage): boolean => (it.channel === channel))); + let current = new Subject(); + return multiplexedIn + .pipe(rMap((message: MultiplexMessage): (WebSocketIsh | null) => (((): (WebSocketIsh | null) => { + if (message.start) { + // console.log("Channel ${message.channel} established with $sharedSocket") + return new WebSocketIsh(current, (message: OUT): void => { + // console.log("Sending $message to $channel") + sharedSocket.write.next({ + text: JSON.stringify(new MultiplexMessage(channel, undefined, undefined, undefined, JSON.stringify(message))), + binary: null + }); + }); + } else if (message.data !== null) { + // console.log("Got ${message.data} to ${message.channel}") + const temp53 = message.data; + if (temp53 === null || temp53 === undefined) { + return null + } + current.next(JSON2.parse(temp53, inType)) + return null; + } else if (message.end) { + // console.log("Channel ${message.channel} terminated") + current = new Subject(); + sharedSocket.write.next({ + text: JSON.stringify(new MultiplexMessage(channel, path, true, undefined, undefined)), + binary: null + }); + return null; + } else { + return null + } + })())), rFilter(isNonNull)) + .pipe(doOnSubscribe((_0: SubscriptionLike): void => { + // console.log("Sending onSubscribe Startup Message") + sharedSocket.write.next({ + text: JSON.stringify(new MultiplexMessage(channel, path, true, undefined, undefined)), + binary: null + }); + })) + .pipe(tap({ + unsubscribe: (): void => { + // console.log("Disconnecting channel on socket to $shortUrl with $path") + sharedSocket.write.next({ + text: JSON.stringify(new MultiplexMessage(channel, path, undefined, true, undefined)), + binary: null + }); + } + })) + .pipe(retryWhen((it: Observable): Observable => { + const temp = retryTime; + retryTime = temp * 2; + return it.pipe(delay(temp)); + })); })); } diff --git a/web/src/mock/MockObserveModelApi.ts b/web/src/mock/MockObserveModelApi.ts index 614418d..22d7c5d 100644 --- a/web/src/mock/MockObserveModelApi.ts +++ b/web/src/mock/MockObserveModelApi.ts @@ -4,7 +4,7 @@ import { ObserveModelApi } from '../ObserveModelApi' import { HasId } from '../db/HasId' import { Query } from '../db/Query' import { MockTable } from './MockTable' -import {Observable, startWith} from 'rxjs' +import { Observable, startWith } from 'rxjs' //! Declares com.lightningkite.ktordb.mock.MockObserveModelApi export class MockObserveModelApi> extends ObserveModelApi { diff --git a/web/src/mock/MockReadModelApi.ts b/web/src/mock/MockReadModelApi.ts index e965411..2efe212 100644 --- a/web/src/mock/MockReadModelApi.ts +++ b/web/src/mock/MockReadModelApi.ts @@ -19,19 +19,17 @@ export class MockReadModelApi> extends ReadModelApi< public list(query: Query): Observable> { return of(this.table - .asList() - .filter((item: Model): boolean => (query.condition.invoke(item))) - .slice().sort(xListComparatorGet(query.orderBy) ?? compareBy((it: Model): (Comparable<(any | null)> | null) => (it._id))) - .slice(query.skip) + .asList() + .filter((item: Model): boolean => (query.condition.invoke(item))) + .slice().sort(xListComparatorGet(query.orderBy) ?? compareBy((it: Model): (Comparable<(any | null)> | null) => (it._id))) + .slice(query.skip) .slice(0, query.limit)); } public get(id: UUIDFor): Observable { return ((): (Observable | null) => { const temp9 = this.table.getItem(id); - if (temp9 === null || temp9 === undefined) { - return null - } + if (temp9 === null || temp9 === undefined) { return null } return ((it: Model): Observable => (of(it)))(temp9) })() ?? throwError(new ItemNotFound(`404 item with key ${id} not found`)); } diff --git a/web/src/mock/MockWriteModelApi.ts b/web/src/mock/MockWriteModelApi.ts index 0f58508..30ac1c4 100644 --- a/web/src/mock/MockWriteModelApi.ts +++ b/web/src/mock/MockWriteModelApi.ts @@ -42,9 +42,7 @@ export class MockWriteModelApi> extends WriteModelAp public patch(id: UUIDFor, modification: Modification): Observable { return ((): (Observable | null) => { const temp7 = (this.table.data.get(id) ?? null); - if (temp7 === null || temp7 === undefined) { - return null - } + if (temp7 === null || temp7 === undefined) { return null } return ((item: Model): Observable => { const modified = modification.invoke(item); this.table.replaceItem(modified); @@ -55,8 +53,8 @@ export class MockWriteModelApi> extends WriteModelAp public patchBulk(modification: MassModification): Observable { return of(this.table - .asList() - .filter((it: Model): boolean => (modification.condition.invoke(it))) + .asList() + .filter((it: Model): boolean => (modification.condition.invoke(it))) .map((it: Model): Model => (this.table.replaceItem(modification.modification.invoke(it))))) .pipe(map((it: Array): number => (it.length))); } @@ -67,9 +65,9 @@ export class MockWriteModelApi> extends WriteModelAp public deleteBulk(condition: Condition): Observable { return of(this.table - .asList() - .filter((it: Model): boolean => (condition.invoke(it))) - .forEach((it: Model): void => { + .asList() + .filter((it: Model): boolean => (condition.invoke(it))) + .forEach((it: Model): void => { this.table.deleteItem(it); })); }