Skip to content

Commit

Permalink
Version updates, socket
Browse files Browse the repository at this point in the history
  • Loading branch information
UnknownJoe796 committed Apr 18, 2024
1 parent 39ba602 commit 23e7598
Show file tree
Hide file tree
Showing 26 changed files with 197 additions and 308 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ buildscript {
}
dependencies {
classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion")
classpath("org.jetbrains.dokka:dokka-gradle-plugin:1.9.10")
classpath("org.jetbrains.dokka:dokka-gradle-plugin:1.9.20")
classpath("com.lightningkite:deploy-helpers:0.0.7")
classpath("com.android.tools.build:gradle:7.4.2")
classpath("org.owasp:dependency-check-gradle:9.0.9")
classpath("org.owasp:dependency-check-gradle:9.1.0")
classpath("com.github.ben-manes:gradle-versions-plugin:0.51.0")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.lightningkite.kiteui.TypedWebSocket
import com.lightningkite.kiteui.reactive.*
import kotlinx.serialization.KSerializer

interface ModelRestEndpoints<T : HasId<ID>, ID : Comparable<ID>> {
interface ClientModelRestEndpoints<T : HasId<ID>, ID : Comparable<ID>> {
suspend fun default(): T = throw IllegalArgumentException()
suspend fun query(input: Query<T>): List<T>
suspend fun queryPartial(input: QueryPartial<T>): List<Partial<T>>
Expand All @@ -26,11 +26,11 @@ interface ModelRestEndpoints<T : HasId<ID>, ID : Comparable<ID>> {
suspend fun groupAggregate(input: GroupAggregateQuery<T>): Map<String, Double?>
}

interface ModelRestEndpointsPlusWs<T : HasId<ID>, ID : Comparable<ID>> : ModelRestEndpoints<T, ID> {
interface ClientModelRestEndpointsPlusWs<T : HasId<ID>, ID : Comparable<ID>> : ClientModelRestEndpoints<T, ID> {
suspend fun watch(): TypedWebSocket<Query<T>, ListChange<T>>
}

interface ModelRestEndpointsPlusUpdatesWebsocket<T : HasId<ID>, ID : Comparable<ID>> : ModelRestEndpoints<T, ID> {
interface ClientModelRestEndpointsPlusUpdatesWebsocket<T : HasId<ID>, ID : Comparable<ID>> : ClientModelRestEndpoints<T, ID> {
suspend fun updates(): TypedWebSocket<Condition<T>, CollectionUpdates<T, ID>>
}

Expand All @@ -53,7 +53,7 @@ interface ModelCollection<T : HasId<ID>, ID : Comparable<ID>> {
}

interface CachingModelRestEndpoints<T : HasId<ID>, ID : Comparable<ID>> : ModelCollection<T, ID> {
val skipCache: ModelRestEndpoints<T, ID>
val skipCache: ClientModelRestEndpoints<T, ID>
fun totallyInvalidate()
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import com.lightningkite.kiteui.launchGlobal
import com.lightningkite.kiteui.reactive.Constant
import com.lightningkite.kiteui.reactive.Readable

class MockModelRestEndpoints<T : HasId<ID>, ID : Comparable<ID>>(val log: (String) -> Unit) :
ModelRestEndpointsPlusWs<T, ID>, ModelRestEndpointsPlusUpdatesWebsocket<T, ID> {
class MockClientModelRestEndpoints<T : HasId<ID>, ID : Comparable<ID>>(val log: (String) -> Unit) :
ClientModelRestEndpointsPlusWs<T, ID>, ClientModelRestEndpointsPlusUpdatesWebsocket<T, ID> {
val items = HashMap<ID, T>()
val watchers = ArrayList<(changes: List<EntryChange<T>>) -> Unit>()
override suspend fun query(input: Query<T>): List<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes

class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
override val skipCache: ModelRestEndpoints<T, ID>,
override val skipCache: ClientModelRestEndpoints<T, ID>,
val serializer: KSerializer<T>,
cacheTime: Duration = 5.minutes,
) : CachingModelRestEndpoints<T, ID> {
Expand Down Expand Up @@ -40,7 +40,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
listeners.toList().forEach {
try {
it()
} catch(e: Exception) {
} catch (e: Exception) {
e.printStackTrace2()
}
}
Expand All @@ -57,16 +57,16 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
}

override val state: ReadableState<T?>
get() = if(upToDate) ReadableState(value) else ReadableState.notReady
get() = if (upToDate) ReadableState(value) else ReadableState.notReady

override suspend infix fun set(value: T?) {
if (value == null) delete()
else {
val result = skipCache.replace(id, value)
this.value = result
for (query in queries) {
query.value.onNewValue(result)
query.value.refreshIfNeeded()
for (query in queries.values.toList()) {
query.onNewValue(result)
query.refreshIfNeeded()
}
}
}
Expand All @@ -79,10 +79,10 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
} // TODO: we can do better than this
// val oldValue = value
value = result
for (query in queries) {
for (query in queries.values.toList()) {
// oldValue?._id?.let { query.value.onRemoved(it) }
result?.let { query.value.onNewValue(it) }
query.value.refreshIfNeeded()
result?.let { query.onNewValue(it) }
query.refreshIfNeeded()
}
return result
}
Expand All @@ -101,17 +101,17 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
// println("Assigned value to null")
value = null
oldValue?.let { value ->
for (query in queries) {
query.value.onRemoved(id)
query.value.refreshIfNeeded()
for (query in queries.values.toList()) {
query.onRemoved(id)
query.refreshIfNeeded()
}
}
}
}

private var totalInvalidation: Double = 0.0

override fun totallyInvalidate(){
override fun totallyInvalidate() {
totalInvalidation = clockMillis()
cache.values.forEach { it.invalidate() }
}
Expand Down Expand Up @@ -144,10 +144,12 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
private val listeners = ArrayList<() -> Unit>()
val inUse: Boolean get() = listeners.isNotEmpty()
override fun addListener(listener: () -> Unit): () -> Unit {
println("Listener to $query added")
listeners.add(listener)
return {
val pos = listeners.indexOfFirst { it === listener }
if (pos != -1) {
println("Listener to $query removed")
listeners.removeAt(pos)
unreportedChanges = true
}
Expand All @@ -160,8 +162,14 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
return
}
if (hasMorePages) {
val lastItem = cache[ids.lastOrNull() ?: return]?.value ?: return
if (comparator.compare(item, lastItem) > 0) return
val lastItem = cache[ids.lastOrNull() ?: run {
return
}]?.value ?: run {
return
}
if (comparator.compare(item, lastItem) > 0) {
return
}
}
if (item._id in ids) {
unreportedChanges = true
Expand Down Expand Up @@ -206,7 +214,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
}

private val currentSocket: Async<LivingSocket<T>?> = asyncGlobal {
(skipCache as? ModelRestEndpointsPlusUpdatesWebsocket<T, ID>)?.updates()?.apply {
(skipCache as? ClientModelRestEndpointsPlusUpdatesWebsocket<T, ID>)?.updates()?.apply {
onMessage {
it.updates.forEach { new ->
cache.getOrPut(new._id) { WritableModelImpl(new._id) }.apply {
Expand All @@ -227,7 +235,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
override val connected: Readable<Boolean> get() = it.connected
override fun send(condition: Condition<T>) = it.send(condition)
}
} ?: (skipCache as? ModelRestEndpointsPlusWs<T, ID>)?.watch()?.apply {
} ?: (skipCache as? ClientModelRestEndpointsPlusWs<T, ID>)?.watch()?.apply {
onMessage {
val old = it.old
val new = it.new
Expand Down Expand Up @@ -326,7 +334,12 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
val id = new._id
val impl = cache.getOrPut(id) { WritableModelImpl(id) }
impl.value = new
queries.forEach { it.value.onNewValue(new); it.value.refreshIfNeeded() }
println("---INSERT $new---")
queries.forEach {
println("Updating query ${it.key}")
it.value.onNewValue(new)
it.value.refreshIfNeeded()
}
return impl
}

Expand Down Expand Up @@ -363,7 +376,7 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(

var locked = false
suspend fun regularly() {
if(locked) return
if (locked) return
locked = true
try {
if (listeningDirty) {
Expand All @@ -376,8 +389,11 @@ class ModelCache<T : HasId<ID>, ID : Comparable<ID>>(
updateSocket(if (subConditions.isEmpty()) Condition.Never() else Condition.Or(subConditions))
}
for (query in queries.values.toList()) {
println("Checkingg query ${query.query}")
println("query.inUse(${query.inUse}) && !query.upToDate(${query.upToDate})")
if (query.inUse && !query.upToDate) {
skipCache.query(query.query).let {
println("Query got result, applying")
for (item in it) cache.getOrPut(item._id) { WritableModelImpl(item._id) }.value = item
query.reset(it.map { it._id })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ class ModelRestEndpointsTest {
prepareModels()
val collectionsToTest = listOf(
MockModelCollection(SampleModel.serializer()),
ModelCache(MockModelRestEndpoints(::println), SampleModel.serializer()),
ModelCache(MockClientModelRestEndpoints(::println), SampleModel.serializer()),
ModelCache(
object : ModelRestEndpointsPlusWs<SampleModel, UUID> by MockModelRestEndpoints(::println) {},
object : ClientModelRestEndpointsPlusWs<SampleModel, UUID> by MockClientModelRestEndpoints(::println) {},
SampleModel.serializer()
),
ModelCache(
object : ModelRestEndpoints<SampleModel, UUID> by MockModelRestEndpoints(::println) {},
object : ClientModelRestEndpoints<SampleModel, UUID> by MockClientModelRestEndpoints(::println) {},
SampleModel.serializer()
)
)
Expand Down
13 changes: 12 additions & 1 deletion demo/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,18 @@ repositories {

val ktorVersion:String by project
dependencies {
api(project(":server"))
api(project(":server-aws"))
api(project(":server-azure"))
api(project(":server-core"))
api(project(":server-testing"))
api(project(":server-dynamodb"))
api(project(":server-firebase"))
api(project(":server-ktor"))
api(project(":server-memcached"))
api(project(":server-mongo"))
api(project(":server-redis"))
api(project(":server-sentry"))
api(project(":server-sftp"))
ksp(project(":processor"))
implementation("com.lightningkite:kotliner-cli:1.0.3")
implementation("io.ktor:ktor-server-call-logging:$ktorVersion")
Expand Down
Loading

0 comments on commit 23e7598

Please sign in to comment.