Skip to content

Commit

Permalink
Merge pull request #31 from lightningkite/swift-fixes
Browse files Browse the repository at this point in the history
some small updates with kotlin client code so that it converts to swi…
  • Loading branch information
shanelk authored Dec 6, 2023
2 parents 697d762 + 5234bcf commit 1ed265f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class LiveObserveModelApi<Model : HasId<UUID>>(
}
}

fun <T : HasId<ID>, ID:Comparable<ID>> Observable<ListChange<T>>.toListObservable(ordering: Comparator<T>): Observable<List<T>> {
fun <T : HasId<ID>, ID: Comparable<ID>> Observable<ListChange<T>>.toListObservable(ordering: Comparator<T>): Observable<List<T>> {
val localList = ArrayList<T>()
return map {
it.wholeList?.let { localList.clear(); localList.addAll(it.sortedWith(ordering)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fun multiplexedSocketRaw(
if (text.isBlank()) return@mapNotNull null
text.fromJsonString<MultiplexMessage>()
}.filter { it.channel == channel }
var current = PublishSubject.create<String>()
var current: PublishSubject<String> = PublishSubject.create()
multiplexedIn
.mapNotNull { message ->
when {
Expand All @@ -135,7 +135,7 @@ fun multiplexedSocketRaw(
}
message.data != null -> {
// println("Got ${message.data} to ${message.channel}")
current.onNext(message.data)
current.onNext(message.data!!)
null
}
message.end -> {
Expand Down Expand Up @@ -171,7 +171,7 @@ fun multiplexedSocketRaw(
}
.doOnDispose {
// println("Disconnecting channel on socket to $shortUrl with $path")
sharedSocket?.write?.onNext(
sharedSocket.write.onNext(
WebSocketFrame(
text = MultiplexMessage(
channel = channel,
Expand Down
50 changes: 30 additions & 20 deletions ios/LightningServer/Classes/client/live/sockets.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,44 @@ public func multiplexedSocket<IN : Codable & Hashable, OUT : Codable & Hashable>
return multiplexedSocketRaw(url: url, path: path, queryParams: queryParams)
.map { (it) -> WebSocketIsh<IN, OUT> in WebSocketIsh(messages: it.messages.compactMap({ (it) -> IN? in it.fromJsonString(serializer: IN.self) }), send: { (m) -> Void in it.send(m.toJsonString()) }) };
}

public func multiplexedSocketRaw(url: String, path: String, queryParams: Dictionary<String, Array<String>> = dictionaryOf()) -> Observable<WebSocketIsh<String, String>> {
let shortUrl = url.substringBefore(delimiter: "?")
let channel = String(kotlin: UUID.randomUUID())
var lastSocket: WebSocketInterface? = nil
return sharedSocket(url: url)
.flatMap({ (it) -> Single<WebSocketIsh<String, String>> in
print("Setting up socket to \(String(kotlin: shortUrl)) with \(String(kotlin: path))")
lastSocket = it
let multiplexedIn = it.read.compactMap({ (it) -> MultiplexMessage? in
.switchMap { (sharedSocket) -> Observable<WebSocketIsh<String, String>> in
// println("Setting up channel $channel to $shortUrl with $path")
let multiplexedIn = sharedSocket.read.compactMap({ (it) -> MultiplexMessage? in
guard let text = it.text else { return nil }
if text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty { return nil }
return (text.fromJsonString() as MultiplexMessage?)
})
}).filter { (it) -> Bool in it.channel == channel }
var current: PublishSubject<String> = PublishSubject()
return multiplexedIn
.filter { (it) -> Bool in it.channel == channel && it.start }
.firstOrError()
.map { (_) -> WebSocketIsh<String, String> in
print("Connected to channel \(String(kotlin: channel))")
return (WebSocketIsh(messages: multiplexedIn.compactMap({ (it) -> String? in it.channel == channel ? it.data : nil }) as Observable<String>, send: { (message) -> Void in
print("Sending \(String(kotlin: message)) to \(it)")
it.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, data: message).toJsonString()))
} as (String) -> Void) as WebSocketIsh<String, String>)
}
.doOnSubscribe { (_) -> Void in it.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, queryParams: queryParams, start: true).toJsonString())) }
})
.doOnDispose { () -> Void in
print("Disconnecting channel on socket to \(String(kotlin: shortUrl)) with \(String(kotlin: path))")
lastSocket?.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, end: true).toJsonString()))
.compactMap({ (message) -> WebSocketIsh<String, String>? in run { () -> WebSocketIsh<String, String>? in
if message.start {
// println("Channel ${message.channel} established with $sharedSocket")
return (WebSocketIsh(messages: current as Observable<String>, send: { (message) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, data: message).toJsonString())) } as (String) -> Void) as WebSocketIsh<String, String>)
} else if message.data != nil {
// println("Got ${message.data} to ${message.channel}")
current.onNext(message.data!)
return nil
} else if message.end {
// println("Channel ${message.channel} terminated")
current = PublishSubject()
sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, queryParams: queryParams, start: true).toJsonString()))
return nil
} else {
return nil
}
} })
.doOnSubscribe { (_) -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, queryParams: queryParams, start: true).toJsonString())) }
.doOnDispose { () -> Void in sharedSocket.write.onNext(WebSocketFrame(text: MultiplexMessage(channel: channel, path: path, end: true).toJsonString())) }
.retry(when: { (it) -> Observable<Error> in
let temp = retryTime
retryTime = temp * 2
return it.delay(.milliseconds(temp), scheduler: MainScheduler.instance)
})
}
}

0 comments on commit 1ed265f

Please sign in to comment.