Skip to content

Commit

Permalink
Using docker-java client to run docker containers
Browse files Browse the repository at this point in the history
  • Loading branch information
arminzavada committed Sep 24, 2024
1 parent f4ad65c commit 7d2fe29
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package hu.bme.mit.semantifyr.gradle.conventions

import org.gradle.accessors.dm.LibrariesForLibs
import org.gradle.api.tasks.testing.logging.TestExceptionFormat

plugins {
`java-library`
Expand Down Expand Up @@ -44,6 +45,7 @@ tasks {
minHeapSize = "512m"
maxHeapSize = "4G"
testLogging.showStandardStreams = true
testLogging.exceptionFormat = TestExceptionFormat.FULL

finalizedBy(tasks.jacocoTestReport)
}
Expand Down
4 changes: 3 additions & 1 deletion engine/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ dependencies {
runtimeOnly(libs.viatra.query.runtime)
runtimeOnly(libs.viatra.transformation.runtime)
runtimeOnly(libs.slf4j.simple)
runtimeOnly(libs.slf4j.log4j)

testFixturesApi("commons-io:commons-io:2.14.0")
testFixturesApi(project(":oxsts.lang"))
testFixturesApi(testFixtures(project(":oxsts.lang")))

testFixturesImplementation(libs.docker.java.core)
testFixturesImplementation(libs.docker.java.transport)
}
153 changes: 82 additions & 71 deletions engine/src/testFixtures/kotlin/ThetaExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
* SPDX-License-Identifier: EPL-2.0
*/

import com.github.dockerjava.api.command.CreateContainerResponse
import com.github.dockerjava.api.exception.ConflictException
import com.github.dockerjava.api.model.Bind
import com.github.dockerjava.api.model.HostConfig
import com.github.dockerjava.api.model.Volume
import com.github.dockerjava.api.model.WaitResponse
import com.github.dockerjava.core.DefaultDockerClientConfig
import com.github.dockerjava.core.DockerClientImpl
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import org.slf4j.LoggerFactory
import java.io.File
Expand All @@ -24,24 +27,9 @@ import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import kotlin.io.path.absolute
import kotlin.time.Duration
import kotlin.time.toDuration
import kotlin.time.toDurationUnit

suspend fun <T> List<Deferred<T>>.awaitAny(): T {
val firstCompleted = CompletableDeferred<T>()

forEach { job ->
job.invokeOnCompletion { exception ->
if (exception == null && !firstCompleted.isCompleted) {
firstCompleted.complete(job.getCompleted())
}
}
}

return firstCompleted.await()
}

class ThetaExecutionResult(
val exitCode: Int,
val id: Int,
Expand All @@ -63,78 +51,101 @@ class ThetaExecutor(

val logger = LoggerFactory.getLogger(javaClass)

fun initTheta() {
val process = ProcessBuilder(
"docker",
"pull",
"ftsrg/theta-xsts-cli:$version",
)
.inheritIO()
.start()
private val config = DefaultDockerClientConfig.createDefaultConfigBuilder().build()
private val httpClient = ApacheDockerHttpClient.Builder()
.dockerHost(config.getDockerHost())
.sslConfig(config.getSSLConfig()).build()
private val dockerClient = DockerClientImpl.getInstance(config, httpClient);

process.waitFor()
fun initTheta() {
dockerClient.pullImageCmd("ftsrg/theta-xsts-cli").withTag(version).start().awaitCompletion()
}

private suspend fun runTheta(
workingDirectory: String,
name: String,
parameter: String,
id: Int
) = withContext(Dispatchers.IO) {
): ThetaExecutionResult {
val model = "$name.xsts"
val property = "$name.prop"
val cex = "$name$id.cex"
val logName = "theta$id.out"
val errName = "theta$id.err"

val process = ProcessBuilder(
"docker",
"run",
"--rm",
"-v", "$workingDirectory:/host",
"ftsrg/theta-xsts-cli:$version",
"CEGAR",
"--model", "/host/$model",
"--property", "/host/$property",
"--cexfile", "/host/$cex",
*parameter.split(" ").toTypedArray(),
)
.redirectOutput(File(workingDirectory, logName))
.redirectError(File(workingDirectory, errName))
.start()

val exitCode = try {
withTimeout(timeout.toDuration(timeUnit.toDurationUnit())) {
runInterruptible {
process.waitFor()
logger.info("Starting container ($id)")

val container = createContainer(logName, errName, workingDirectory, model, property, cex, parameter)
dockerClient.startContainerCmd(container.id).exec()

val waitResult = try {
val result = withTimeout(timeout.toDuration(timeUnit.toDurationUnit())) {
runAsync<WaitResponse> {
dockerClient.waitContainerCmd(container.id).exec(it)
}
}

logger.info("Theta finished ($id)")

result
} catch (e: TimeoutCancellationException) {
-1
logger.info("Theta timed out ($id)")
throw e
} catch (e: CancellationException) {
-2
logger.info("Theta cancelled ($id)")
throw e
} finally {
process.destroyForcibly()
try {
logger.info("Removing container ($id)")
dockerClient.removeContainerCmd(container.id).withForce(true).exec()
} catch (e: ConflictException) {
// ignore, it means the container is already being removed
}
}

val result = ThetaExecutionResult(
exitCode = exitCode,
return ThetaExecutionResult(
exitCode = waitResult.statusCode,
id = id,
modelPath = "$workingDirectory${File.separator}$model",
propertyPath = "$workingDirectory${File.separator}$property",
cexPath = "$workingDirectory${File.separator}$cex",
logPath = "$workingDirectory${File.separator}$logName",
errPath = "$workingDirectory${File.separator}$errName"
errPath = "$workingDirectory${File.separator}$errName",
)
}

when (result.exitCode) {
0 -> logger.info("Theta ($id) finished successfully!")
-1 -> logger.info("Theta ($id) timed out!")
-2 -> logger.info("Theta ($id) has been cancelled!")
else -> logger.error("Theta ($id) failed execution:\n" + File(result.errPath).readText())
}

result
private fun createContainer(
logName: String,
errName: String,
workingDirectory: String,
model: String,
property: String,
cex: String,
parameter: String
): CreateContainerResponse {
val logFile = File(workingDirectory, logName)
val errFile = File(workingDirectory, errName)

val hostConfig = HostConfig.newHostConfig()
.withBinds(Bind(workingDirectory, Volume("/host")))

val container = dockerClient.createContainerCmd("ftsrg/theta-xsts-cli:$version")
.withCmd(
"CEGAR",
"--model", "/host/$model",
"--property", "/host/$property",
"--cexfile", "/host/$cex",
*parameter.split(" ").toTypedArray(),
)
.withHostConfig(hostConfig)
.exec()

dockerClient.logContainerCmd(container.id)
.withStdOut(true)
.withStdErr(true)
.exec(StreamLoggerCallback(logFile.outputStream(), errFile.outputStream()))

return container
}

private suspend fun runWorkflow(workingDirectory: String, name: String) = coroutineScope {
Expand All @@ -144,13 +155,13 @@ class ThetaExecutor(
}
}

val finishedJob = jobs.awaitAny()

jobs.forEach {
it.cancelAndJoin()
try {
jobs.awaitAny()
} finally {
jobs.forEach {
it.cancelAndJoin()
}
}

finishedJob
}

fun run(workingDirectory: String, name: String) = runBlocking {
Expand Down
85 changes: 85 additions & 0 deletions engine/src/testFixtures/kotlin/utils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import com.github.dockerjava.api.async.ResultCallback
import com.github.dockerjava.api.model.Frame
import com.github.dockerjava.api.model.StreamType
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.runInterruptible
import java.io.Closeable
import java.io.OutputStream

suspend fun <T> List<Deferred<T>>.awaitAny(): T {
val firstCompleted = CompletableDeferred<T>()
var counter = 0

forEach { job ->
job.invokeOnCompletion { exception ->
counter++

if (exception == null && !firstCompleted.isCompleted) {
firstCompleted.complete(job.getCompleted())
} else {
if (counter == size) {
firstCompleted.completeExceptionally(IllegalStateException("All executed jobs failed: ", exception))
}
}
}
}

return firstCompleted.await()
}

class DefferedResultCallback<T : Any> : ResultCallback<T> {
val job = CompletableDeferred<T>()
private lateinit var item: T

override fun close() {

}

override fun onStart(closeable: Closeable?) {

}

override fun onError(throwable: Throwable) {
job.completeExceptionally(throwable)
}

override fun onComplete() {
job.complete(item)
}

override fun onNext(item: T) {
this.item = item
}

}

suspend fun <T : Any> runAsync(block: (DefferedResultCallback<T>) -> Unit): T {
val callback = DefferedResultCallback<T>()

runInterruptible {
block(callback)
}

return callback.job.await()
}

class StreamLoggerCallback(
private val logStream: OutputStream,
private val errorStream: OutputStream
) : ResultCallback.Adapter<Frame>() {

override fun onNext(frame: Frame) {
if (frame.streamType == StreamType.STDOUT) {
logStream.write(frame.payload)
} else if (frame.streamType == StreamType.STDERR) {
errorStream.write(frame.payload)
}
}

override fun close() {
super.close()
logStream.close()
errorStream.close()
}
}
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ xtext = "2.32.0"
kotlin = '2.0.0'
kotlinxCoroutines = "1.9.0"
viatra = '2.8.0'
dockerJava = '3.4.0'

[libraries]
ecore = { group = "org.eclipse.emf", name = "org.eclipse.emf.ecore", version.ref = "ecore" }
Expand Down Expand Up @@ -42,5 +43,8 @@ viatra-query-language = { group = 'org.eclipse.viatra', name = 'viatra-query-lan
viatra-query-runtime = { group = 'org.eclipse.viatra', name = 'viatra-query-runtime', version.ref = "viatra" }
viatra-transformation-runtime = { group = 'org.eclipse.viatra', name = 'viatra-transformation-runtime', version.ref = "viatra" }

docker-java-core = { group = "com.github.docker-java" , name = "docker-java-core", version.ref = "dockerJava" }
docker-java-transport = { group = "com.github.docker-java" , name = "docker-java-transport-httpclient5", version.ref = "dockerJava" }

[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }

0 comments on commit 7d2fe29

Please sign in to comment.