diff --git a/client/src/commonMain/kotlin/com/lightningkite/lightningserver/db/ModelCache.kt b/client/src/commonMain/kotlin/com/lightningkite/lightningserver/db/ModelCache.kt index 26508fe5..c730ca9c 100644 --- a/client/src/commonMain/kotlin/com/lightningkite/lightningserver/db/ModelCache.kt +++ b/client/src/commonMain/kotlin/com/lightningkite/lightningserver/db/ModelCache.kt @@ -22,6 +22,7 @@ class ModelCache, ID : Comparable>( inner class WritableModelImpl(val id: ID) : WritableModel { var live: Int = 0 var lastSet: Double = 0.0 + var invalid = false // The invalid logic and code isn't the cleanest, not entirely what I want, but it works. var ready: Boolean = false override val serializer: KSerializer @@ -36,6 +37,7 @@ class ModelCache, ID : Comparable>( set(value) { ready = true lastSet = clockMillis() + invalid = false field = value listeners.toList().forEach { it() } awaiting.toList().forEach { it.resume(value) } @@ -70,8 +72,7 @@ class ModelCache, ID : Comparable>( val result = skipCache.replace(id, value) this.value = result for (query in queries) { - query.value.onRemoved(id) - query.value.onAdded(result) + query.value.onNewValue(result) query.value.refreshIfNeeded() } } @@ -83,11 +84,11 @@ class ModelCache, ID : Comparable>( } catch (e: Exception) { null } // TODO: we can do better than this - val oldValue = value +// val oldValue = value value = result for (query in queries) { - oldValue?._id?.let { query.value.onRemoved(it) } - result?.let { query.value.onAdded(it) } +// oldValue?._id?.let { query.value.onRemoved(it) } + result?.let { query.value.onNewValue(it) } query.value.refreshIfNeeded() } return result @@ -99,7 +100,7 @@ class ModelCache, ID : Comparable>( } override suspend fun invalidate() { - lastSet = 0.0 + invalid = true } fun virtualDelete() { @@ -107,10 +108,8 @@ class ModelCache, ID : Comparable>( value = null oldValue?.let { value -> for (query in queries) { - if (query.key.condition(value)) { - query.value.onRemoved(id) - query.value.refreshIfNeeded() - } + query.value.onRemoved(id) + query.value.refreshIfNeeded() } } } @@ -122,15 +121,17 @@ class ModelCache, ID : Comparable>( var live: Int = 0 var lastSet: Double = 0.0 var lastInvalidateCheck = clockMillis() - val upToDate: Boolean get() { - val now = clockMillis() - val result = (ready && (live > 0 || (now - lastSet < cacheMs && lastInvalidateCheck > totalInvalidation))) - lastInvalidateCheck = now - return result - } + val upToDate: Boolean + get() { + val now = clockMillis() + val result = + (ready && (live > 0 || (now - lastSet < cacheMs && lastInvalidateCheck > totalInvalidation))) + lastInvalidateCheck = now + return result + } val comparator = query.orderBy.comparator ?: compareBy { it._id } - var complete: Boolean = false + var hasMorePages: Boolean = false var ready: Boolean = false val ids = ArrayList() var unreportedChanges = false @@ -154,15 +155,21 @@ class ModelCache, ID : Comparable>( } } - fun onAdded(item: T) { - if (!query.condition(item)) return - if (comparator == null) return - if (!complete) { + fun onNewValue(item: T) { + if (!query.condition(item)) { + unreportedChanges = unreportedChanges || ids.remove(item._id) + return + } + if (hasMorePages) { val lastItem = cache[ids.lastOrNull() ?: return]?.value ?: return if (comparator.compare(item, lastItem) > 0) return } + if (item._id in ids) { + unreportedChanges = true + return + } for (index in ids.indices) { - val nextItem = cache[ids.lastOrNull() ?: continue]?.value ?: continue + val nextItem = cache[ids.getOrNull(index) ?: continue]?.value ?: continue if (comparator.compare(item, nextItem) < 0) { ids.add(index, item._id) unreportedChanges = true @@ -174,8 +181,7 @@ class ModelCache, ID : Comparable>( } fun onRemoved(id: ID) { - ids.remove(id) - unreportedChanges = true + unreportedChanges = unreportedChanges || ids.remove(id) } fun reset(ids: Collection) { @@ -183,7 +189,7 @@ class ModelCache, ID : Comparable>( lastSet = clockMillis() this.ids.clear() this.ids.addAll(ids) - complete = ids.size < query.limit + hasMorePages = ids.size == query.limit unreportedChanges = true } @@ -207,7 +213,7 @@ class ModelCache, ID : Comparable>( cache.getOrPut(new._id) { WritableModelImpl(new._id) }.apply { value = new } - queries.forEach { it.value.onAdded(new); it.value.refreshIfNeeded() } + queries.forEach { it.value.onNewValue(new); it.value.refreshIfNeeded() } } it.remove.forEach { old -> cache[old]?.virtualDelete() @@ -231,7 +237,7 @@ class ModelCache, ID : Comparable>( cache.getOrPut(new._id) { WritableModelImpl(new._id) }.apply { value = new } - queries.forEach { it.value.onAdded(new); it.value.refreshIfNeeded() } + queries.forEach { it.value.onNewValue(new); it.value.refreshIfNeeded() } } old != null -> { @@ -321,7 +327,7 @@ class ModelCache, ID : Comparable>( val id = new._id val impl = cache.getOrPut(id) { WritableModelImpl(id) } impl.value = new - queries.forEach { it.value.onAdded(new); it.value.refreshIfNeeded() } + queries.forEach { it.value.onNewValue(new); it.value.refreshIfNeeded() } return impl } @@ -334,7 +340,7 @@ class ModelCache, ID : Comparable>( impl.value = new } queries.forEach { - for (item in newItems) it.value.onAdded(item) + for (item in newItems) it.value.onNewValue(item) it.value.refreshIfNeeded() } return newItems @@ -342,15 +348,10 @@ class ModelCache, ID : Comparable>( override suspend fun upsert(item: T): WritableModel { val new = skipCache.upsert(item._id, item) - val id = new._id - val impl = cache.getOrPut(id) { WritableModelImpl(id) } - val old = impl.value + val impl = cache.getOrPut(new._id) { WritableModelImpl(new._id) } impl.value = new queries.forEach { - if (old != null) { - it.value.onRemoved(id) - } - it.value.onAdded(new) + it.value.onNewValue(new) it.value.refreshIfNeeded() } return impl @@ -376,12 +377,13 @@ class ModelCache, ID : Comparable>( skipCache.query(query.query).let { for (item in it) cache.getOrPut(item._id) { WritableModelImpl(item._id) }.value = item query.reset(it.map { it._id }) - query.refreshIfNeeded() } } } - val needsUpdates = cache.values.toList().asSequence().filter { it.inUse && !it.upToDate }.map { it.id }.toSet() + + val needsUpdates = cache.values.toList().asSequence().filter { (it.inUse && !it.upToDate) || it.invalid }.map { it.id }.toSet() if (needsUpdates.isNotEmpty()) { + println("Needs Updating") val limit = 1000 val results = needsUpdates .chunked(limit) @@ -393,10 +395,21 @@ class ModelCache, ID : Comparable>( ) ) } - needsUpdates.forEach { id -> - cache.getOrPut(id) { WritableModelImpl(id) }.value = results.find { it._id == id } + for (id in needsUpdates) { + results.find { it._id == id }?.also { updated -> + cache.getOrPut(id) { WritableModelImpl(id) }.value = updated + for (query in queries.values.toList()) { + query.onNewValue(updated) + } + } ?: run { + cache[id]?.virtualDelete() + } } } + + for (query in queries.values.toList()) { + query.refreshIfNeeded() + } } init {