Skip to content

Commit

Permalink
Merged Shane's PR for catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
UnknownJoe796 committed May 7, 2024
1 parent 7cc7216 commit 4dc27de
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 1 deletion.
77 changes: 77 additions & 0 deletions server-cassandra/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import com.lightningkite.deployhelpers.developer
import com.lightningkite.deployhelpers.github
import com.lightningkite.deployhelpers.mit
import com.lightningkite.deployhelpers.standardPublishing

plugins {
kotlin("jvm")
id("com.google.devtools.ksp")
kotlin("plugin.serialization")
id("org.jetbrains.dokka")
id("signing")
`maven-publish`
}

val kotlinVersion: String by project
val khrysalisVersion: String by project
val coroutines: String by project
val awsVersion = "2.25.24"
dependencies {
api(project(":server-core"))
api("com.datastax.oss:java-driver-core:4.17.0")
api("com.datastax.oss:java-driver-query-builder:4.17.0")
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit")
testImplementation("org.testcontainers:cassandra:1.19.7")
testImplementation(project(":server-testing"))

ksp(project(":processor"))
kspTest(project(":processor"))
}

ksp {
arg("generateFields", "true")
}

kotlin {
sourceSets.main {
kotlin.srcDir("build/generated/ksp/main/kotlin")
}
sourceSets.test {
kotlin.srcDir("build/generated/ksp/test/kotlin")
}
}

tasks.withType<JavaCompile>().configureEach {
this.targetCompatibility = "11"
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
kotlinOptions.freeCompilerArgs += "-opt-in=kotlinx.serialization.ExperimentalSerializationApi"
kotlinOptions.freeCompilerArgs += "-opt-in=kotlin.RequiresOptIn"
kotlinOptions.jvmTarget = JavaVersion.VERSION_11.toString()
}


standardPublishing {
name.set("Lightning Server Cassandra")
description.set("Cassandra support for Lightning Server")
github("lightningkite", "lightning-server")

licenses {
mit()
}

developers {
developer(
id = "LightningKiteJoseph",
name = "Joseph Ivie",
email = "[email protected]",
)
developer(
id = "bjsvedin",
name = "Brady Svedin",
email = "[email protected]",
)
}
}
tasks.getByName("sourceJar").dependsOn("kspKotlin")
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package com.lightningkite.lightningserver.cassandra

import com.datastax.oss.driver.api.core.CqlIdentifier
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.querybuilder.QueryBuilder.*
import com.datastax.oss.driver.api.querybuilder.*
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder.*
import com.datastax.oss.driver.api.core.cql.*
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata
import com.datastax.oss.driver.api.core.metadata.schema.IndexKind
import com.datastax.oss.driver.api.core.metadata.schema.IndexMetadata
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata
import com.datastax.oss.driver.api.core.type.DataType
import com.datastax.oss.driver.api.querybuilder.schema.*
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.future.await


suspend fun CqlSession.executeSuspending(query: String) = executeAsync(query).await()
suspend fun CqlSession.executeSuspending(statement: Statement<*>) = executeAsync(statement).await()
suspend fun CqlSession.executeSuspending(query: String, vararg values: Any?) = executeAsync(query, *values).await()
suspend fun CqlSession.executeSuspending(query: String, values: Map<String, Any?>) = executeAsync(query, values).await()

suspend fun CqlSession.prepareSuspending(query: String) = prepareAsync(query).await()
suspend fun CqlSession.prepareSuspending(statement: SimpleStatement) = prepareAsync(statement).await()
suspend fun CqlSession.prepareSuspending(request: PrepareRequest) = prepareAsync(request).await()

suspend fun AsyncResultSet.asFlow() = flow<Row> {
var c = this@asFlow
c.currentPage().forEach { emit(it) }
while (hasMorePages()) {
c = fetchNextPage().await()
c.currentPage().forEach { emit(it) }
}
}

data class LsCqlDesiredTable(
val keyspace: CqlIdentifier,
val name: CqlIdentifier,
val columns: List<LsCqlDesiredColumn>,
val indexes: List<LsCqlDesiredIndex>,
val partitionKey: List<LsCqlDesiredColumn>,
val clusteringColumns: Map<LsCqlDesiredColumn, ClusteringOrder>,
val isCompactStorage: Boolean = false,
val isVirtual: Boolean = false,
) {
constructor(meta: TableMetadata) : this(
keyspace = meta.keyspace,
name = meta.name,
columns = meta.columns.map { LsCqlDesiredColumn(it) },
indexes = meta.indexes.map { LsCqlDesiredIndex(it) },
partitionKey = meta.partitionKey.map { LsCqlDesiredColumn(it) },
clusteringColumns = meta.clusteringColumns.mapKeys { LsCqlDesiredColumn(it.key) },
isCompactStorage = meta.isCompactStorage,
isVirtual = meta.isVirtual,
)
}

data class LsCqlDesiredColumn(
val name: CqlIdentifier,
val type: DataType
) {
constructor(meta: ColumnMetadata) : this(
name = meta.name,
type = meta.type,
)
}

data class LsCqlDesiredIndex(
val name: CqlIdentifier,
val kind: IndexKind,
val target: String,
val options: Map<String, String>
) {
constructor(meta: IndexMetadata) : this(
name = meta.name,
kind = meta.kind,
target = meta.target,
options = meta.options
)
}

suspend fun CqlSession.updateSchema(from: LsCqlDesiredTable?, to: LsCqlDesiredTable) {
if (from == null) {
executeSuspending(
createTable(to.keyspace, to.name).ifNotExists().let {
var x: OngoingPartitionKey = it
to.partitionKey.forEach { k ->
x = x.withPartitionKey(k.name, k.type)
}
x as CreateTable
}.let {
var x = it
to.clusteringColumns.forEach { (k, o) ->
x = x.withClusteringColumn(k.name, k.type)
}
x
}.let {
var x = it
to.columns.forEach { c ->
x = x.withColumn(c.name, c.type)
}
x
}.let {
var x = it as CreateTableWithOptions
to.clusteringColumns.forEach { (k, o) ->
x = x.withClusteringOrder(k.name, o)
}
x
}
.let {
if(to.isCompactStorage) it.withCompactStorage()
else it
}
.build()
)
} else {
val newColumns = (to.columns.map { it.name }.toSet() - from.columns.map { it.name }.toSet()).map { to.columns.find { c -> it == c.name }!! }
val oldColumns = (from.columns.map { it.name }.toSet() - to.columns.map { it.name }.toSet()).map { from.columns.find { c -> it == c.name }!! }
val changedColumns = (to.columns.filter {
val old = from.columns.find { c -> c.name == it.name }
old != null && old.type != it.type
})
if(changedColumns.isNotEmpty()) throw Exception("Cannot change a column's type without blowing something up!")
// Add columns
executeSuspending(alterTable(to.keyspace, to.name).let {
var x = it as AlterTableAddColumnEnd
newColumns.forEach { c ->
x = x.addColumn(c.name, c.type)
}
x
}.build())
// TODO: Drop columns?

}
// Update indicies
val newIndexes = to.indexes.toSet() - (from?.indexes?.toSet() ?: setOf())
val oldIndexes = (from?.indexes?.toSet() ?: setOf()) - to.indexes.toSet()
for(index in oldIndexes) {
executeSuspending(dropIndex(index.name).build())
}
for(index in newIndexes) {
executeSuspending(createIndex().onTable(index.name).let {
val x = it
index.target.forEach {

}
x
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.lightningkite.lightningserver.cassandra

import kotlin.test.Test
import com.datastax.oss.driver.api.core.*
import com.datastax.oss.driver.api.querybuilder.QueryBuilder.*
import com.datastax.oss.driver.api.querybuilder.*
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder.*
import com.datastax.oss.driver.api.core.*
import com.datastax.oss.driver.api.core.cql.*
import com.datastax.oss.driver.api.core.type.DataTypes
import com.lightningkite.lightningserver.logging.LoggingSettings
import kotlinx.coroutines.future.await
import kotlinx.coroutines.runBlocking
import org.testcontainers.containers.CassandraContainer

class Experiments {
@Test fun test() {
val container = CassandraContainer("cassandra:5")
LoggingSettings()
if(!container.isRunning()) container.start()
CqlSession.builder()
.addContactPoint(container.contactPoint)
.withLocalDatacenter(container.localDatacenter)
.build()
.use { session ->
runBlocking {
session.executeSuspending(createKeyspace("test").ifNotExists().withSimpleStrategy(1).build())
session.executeSuspending(createTable("test", "TestTable")
.withPartitionKey("id", DataTypes.UUID)
.withColumn("id", DataTypes.UUID)
.withColumn("title", DataTypes.TEXT)
.build()
)
session.executeSuspending(insertInto("test", "TestTable").value("id", literal(com.lightningkite.uuid())).value("title", literal("Test")).build())
session.executeSuspending(selectFrom("test", "TestTable").columns("id", "title").where().limit(1).build()).currentPage().forEach {
println("Got item $it")
}
session.metadata.getKeyspace("test").get().tables.forEach { (key, value) ->
println("---$key---")
value.columns.forEach { key, value ->
println("$key: ${value.type}")
}
value.indexes.values.forEach {
it.name
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class OauthProofEndpoints(
property = "email",
value = email,
at = now()
)).encodeURLQueryComponent()}")
)).encodeURLQueryComponent()}&backend=${generalSettings().publicUrl.encodeURLQueryComponent()}")
}
override val indirectLink: ServerPath = path("open").get.handler {
HttpResponse.redirectToGet(callback.loginUrl(uuid()))
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pluginManagement {
google()
gradlePluginPortal()
mavenCentral()
maven("https://jitpack.io")
}

versionCatalogs {
Expand All @@ -41,6 +42,7 @@ include(":shared")
include(":server")
include(":server-aws")
include(":server-azure")
//include(":server-cassandra")
include(":server-core")
include(":server-testing")
include(":server-dynamodb")
Expand Down

0 comments on commit 4dc27de

Please sign in to comment.