Skip to content

Commit

Permalink
WebsocketDatabaseChangeSubscriptionCleanup fix for a bad condition re…
Browse files Browse the repository at this point in the history
…sulting in ALL change subs being closed every 5 minutes.
  • Loading branch information
bjsvedin committed Oct 9, 2023
1 parent 8cffdd5 commit 2d0e0b2
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import com.lightningkite.lightningserver.typed.ApiWebsocket
import com.lightningkite.lightningserver.typed.typedWebsocket
import com.lightningkite.lightningserver.websocket.WebSocketIdentifier
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import kotlinx.serialization.UseContextualSerialization
import java.time.Instant
import java.time.Duration
import java.time.Instant
import kotlin.reflect.KProperty1

@LightningServerDsl
Expand Down Expand Up @@ -76,7 +77,7 @@ fun <USER, T : HasId<ID>, ID : Comparable<ID>> ServerPath.restApiWebsocket(
condition = condition { it._id eq event.id },
modification = modification {
it.condition assign c
if(key != null)
if (key != null)
it.relevant assign q.condition.relevantHashCodesForKey(key)
else
it.relevant assign null
Expand All @@ -93,7 +94,7 @@ fun <USER, T : HasId<ID>, ID : Comparable<ID>> ServerPath.restApiWebsocket(
CollectionChanges.serializer(info.serialization.serializer)
) { changes: CollectionChanges<T> ->
val jobs = ArrayList<Job>()
val targets = if(key != null) {
val targets = if (key != null) {
val relevantValues = changes.changes.asSequence().flatMap { listOfNotNull(it.old, it.new) }
.map { key.get(it).hashCode() }
.toSet()
Expand Down Expand Up @@ -148,11 +149,11 @@ fun <USER, T : HasId<ID>, ID : Comparable<ID>> ServerPath.restApiWebsocket(
}
}

class RestApiWebsocketHelper private constructor(val database: ()->Database) {
class RestApiWebsocketHelper private constructor(val database: () -> Database) {

companion object {
private val existing = HashMap<()->Database, RestApiWebsocketHelper>()
operator fun get(database: ()->Database) = existing.getOrPut(database) { RestApiWebsocketHelper(database) }
private val existing = HashMap<() -> Database, RestApiWebsocketHelper>()
operator fun get(database: () -> Database) = existing.getOrPut(database) { RestApiWebsocketHelper(database) }
}

fun subscriptionDb() = database().collection<__WebSocketDatabaseChangeSubscription>()
Expand All @@ -161,14 +162,14 @@ class RestApiWebsocketHelper private constructor(val database: ()->Database) {
val now = Instant.now()
val db =
subscriptionDb().deleteMany(condition {
it.condition eq ""
it.establishedAt lt now.minus(Duration.ofMinutes(5))
(it.condition eq "") and
(it.establishedAt lt now.minus(Duration.ofMinutes(5)))
} or condition {
it.establishedAt lt now.minus(Duration.ofHours(1))
})

for (changeSub in db) {
try{
try {
changeSub._id.close()
} catch (e: Exception) {
// We don't really care. We just want to shut down as many of these as we can.
Expand All @@ -193,27 +194,32 @@ data class __WebSocketDatabaseChangeSubscription(
) : HasId<WebSocketIdentifier>


fun <T, V> Condition<T>.relevantHashCodesForKey(key: KProperty1<T, V>): Set<Int>? = when(this) {
fun <T, V> Condition<T>.relevantHashCodesForKey(key: KProperty1<T, V>): Set<Int>? = when (this) {
is Condition.And<T> -> conditions
.asSequence()
.mapNotNull { it.relevantHashCodesForKey(key) }
.reduceOrNull { a, b -> a.intersect(b) }

is Condition.Or<T> -> conditions
.asSequence()
.map { it.relevantHashCodesForKey(key) }
.reduceOrNull { a, b -> if(a == null || b == null) null else a.union(b) }
is Condition.OnField<*, *> -> if(this.key == key) condition.relevantHashCodes() else null
.reduceOrNull { a, b -> if (a == null || b == null) null else a.union(b) }

is Condition.OnField<*, *> -> if (this.key == key) condition.relevantHashCodes() else null
else -> null
}
fun <T> Condition<T>.relevantHashCodes(): Set<Int>? = when(this) {

fun <T> Condition<T>.relevantHashCodes(): Set<Int>? = when (this) {
is Condition.And<T> -> conditions
.asSequence()
.mapNotNull { it.relevantHashCodes() }
.reduceOrNull { a, b -> a.intersect(b) }

is Condition.Or<T> -> conditions
.asSequence()
.map { it.relevantHashCodes() }
.reduceOrNull { a, b -> if(a == null || b == null) null else a.union(b) }
.reduceOrNull { a, b -> if (a == null || b == null) null else a.union(b) }

is Condition.Equal -> setOf(value.hashCode())
is Condition.Inside -> values.map { it.hashCode() }.toSet()
else -> null
Expand Down

0 comments on commit 2d0e0b2

Please sign in to comment.