From 7d2fe290a14fc5cac069ac64e52ac96aaf9b40ab Mon Sep 17 00:00:00 2001 From: Armin Zavada Date: Tue, 24 Sep 2024 14:33:38 +0200 Subject: [PATCH] Using docker-java client to run docker containers --- .../gradle/conventions/jvm.gradle.kts | 2 + engine/build.gradle.kts | 4 +- .../src/testFixtures/kotlin/ThetaExecutor.kt | 153 ++++++++++-------- engine/src/testFixtures/kotlin/utils.kt | 85 ++++++++++ gradle/libs.versions.toml | 4 + 5 files changed, 176 insertions(+), 72 deletions(-) create mode 100644 engine/src/testFixtures/kotlin/utils.kt diff --git a/buildSrc/src/main/kotlin/hu/bme/mit/semantifyr/gradle/conventions/jvm.gradle.kts b/buildSrc/src/main/kotlin/hu/bme/mit/semantifyr/gradle/conventions/jvm.gradle.kts index 9bcb92a..47932dc 100644 --- a/buildSrc/src/main/kotlin/hu/bme/mit/semantifyr/gradle/conventions/jvm.gradle.kts +++ b/buildSrc/src/main/kotlin/hu/bme/mit/semantifyr/gradle/conventions/jvm.gradle.kts @@ -7,6 +7,7 @@ package hu.bme.mit.semantifyr.gradle.conventions import org.gradle.accessors.dm.LibrariesForLibs +import org.gradle.api.tasks.testing.logging.TestExceptionFormat plugins { `java-library` @@ -44,6 +45,7 @@ tasks { minHeapSize = "512m" maxHeapSize = "4G" testLogging.showStandardStreams = true + testLogging.exceptionFormat = TestExceptionFormat.FULL finalizedBy(tasks.jacocoTestReport) } diff --git a/engine/build.gradle.kts b/engine/build.gradle.kts index 2ba2cdb..aab9e8d 100644 --- a/engine/build.gradle.kts +++ b/engine/build.gradle.kts @@ -36,9 +36,11 @@ dependencies { runtimeOnly(libs.viatra.query.runtime) runtimeOnly(libs.viatra.transformation.runtime) runtimeOnly(libs.slf4j.simple) - runtimeOnly(libs.slf4j.log4j) testFixturesApi("commons-io:commons-io:2.14.0") testFixturesApi(project(":oxsts.lang")) testFixturesApi(testFixtures(project(":oxsts.lang"))) + + testFixturesImplementation(libs.docker.java.core) + testFixturesImplementation(libs.docker.java.transport) } diff --git a/engine/src/testFixtures/kotlin/ThetaExecutor.kt b/engine/src/testFixtures/kotlin/ThetaExecutor.kt index 05edd46..9e0a997 100644 --- a/engine/src/testFixtures/kotlin/ThetaExecutor.kt +++ b/engine/src/testFixtures/kotlin/ThetaExecutor.kt @@ -4,18 +4,21 @@ * SPDX-License-Identifier: EPL-2.0 */ +import com.github.dockerjava.api.command.CreateContainerResponse +import com.github.dockerjava.api.exception.ConflictException +import com.github.dockerjava.api.model.Bind +import com.github.dockerjava.api.model.HostConfig +import com.github.dockerjava.api.model.Volume +import com.github.dockerjava.api.model.WaitResponse +import com.github.dockerjava.core.DefaultDockerClientConfig +import com.github.dockerjava.core.DockerClientImpl +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.async import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.runInterruptible -import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout import org.slf4j.LoggerFactory import java.io.File @@ -24,24 +27,9 @@ import java.nio.file.Path import java.nio.file.Paths import java.util.concurrent.TimeUnit import kotlin.io.path.absolute -import kotlin.time.Duration import kotlin.time.toDuration import kotlin.time.toDurationUnit -suspend fun List>.awaitAny(): T { - val firstCompleted = CompletableDeferred() - - forEach { job -> - job.invokeOnCompletion { exception -> - if (exception == null && !firstCompleted.isCompleted) { - firstCompleted.complete(job.getCompleted()) - } - } - } - - return firstCompleted.await() -} - class ThetaExecutionResult( val exitCode: Int, val id: Int, @@ -63,16 +51,14 @@ class ThetaExecutor( val logger = LoggerFactory.getLogger(javaClass) - fun initTheta() { - val process = ProcessBuilder( - "docker", - "pull", - "ftsrg/theta-xsts-cli:$version", - ) - .inheritIO() - .start() + private val config = DefaultDockerClientConfig.createDefaultConfigBuilder().build() + private val httpClient = ApacheDockerHttpClient.Builder() + .dockerHost(config.getDockerHost()) + .sslConfig(config.getSSLConfig()).build() + private val dockerClient = DockerClientImpl.getInstance(config, httpClient); - process.waitFor() + fun initTheta() { + dockerClient.pullImageCmd("ftsrg/theta-xsts-cli").withTag(version).start().awaitCompletion() } private suspend fun runTheta( @@ -80,61 +66,86 @@ class ThetaExecutor( name: String, parameter: String, id: Int - ) = withContext(Dispatchers.IO) { + ): ThetaExecutionResult { val model = "$name.xsts" val property = "$name.prop" val cex = "$name$id.cex" val logName = "theta$id.out" val errName = "theta$id.err" - val process = ProcessBuilder( - "docker", - "run", - "--rm", - "-v", "$workingDirectory:/host", - "ftsrg/theta-xsts-cli:$version", - "CEGAR", - "--model", "/host/$model", - "--property", "/host/$property", - "--cexfile", "/host/$cex", - *parameter.split(" ").toTypedArray(), - ) - .redirectOutput(File(workingDirectory, logName)) - .redirectError(File(workingDirectory, errName)) - .start() - - val exitCode = try { - withTimeout(timeout.toDuration(timeUnit.toDurationUnit())) { - runInterruptible { - process.waitFor() + logger.info("Starting container ($id)") + + val container = createContainer(logName, errName, workingDirectory, model, property, cex, parameter) + dockerClient.startContainerCmd(container.id).exec() + + val waitResult = try { + val result = withTimeout(timeout.toDuration(timeUnit.toDurationUnit())) { + runAsync { + dockerClient.waitContainerCmd(container.id).exec(it) } } + + logger.info("Theta finished ($id)") + + result } catch (e: TimeoutCancellationException) { - -1 + logger.info("Theta timed out ($id)") + throw e } catch (e: CancellationException) { - -2 + logger.info("Theta cancelled ($id)") + throw e } finally { - process.destroyForcibly() + try { + logger.info("Removing container ($id)") + dockerClient.removeContainerCmd(container.id).withForce(true).exec() + } catch (e: ConflictException) { + // ignore, it means the container is already being removed + } } - val result = ThetaExecutionResult( - exitCode = exitCode, + return ThetaExecutionResult( + exitCode = waitResult.statusCode, id = id, modelPath = "$workingDirectory${File.separator}$model", propertyPath = "$workingDirectory${File.separator}$property", cexPath = "$workingDirectory${File.separator}$cex", logPath = "$workingDirectory${File.separator}$logName", - errPath = "$workingDirectory${File.separator}$errName" + errPath = "$workingDirectory${File.separator}$errName", ) + } - when (result.exitCode) { - 0 -> logger.info("Theta ($id) finished successfully!") - -1 -> logger.info("Theta ($id) timed out!") - -2 -> logger.info("Theta ($id) has been cancelled!") - else -> logger.error("Theta ($id) failed execution:\n" + File(result.errPath).readText()) - } - - result + private fun createContainer( + logName: String, + errName: String, + workingDirectory: String, + model: String, + property: String, + cex: String, + parameter: String + ): CreateContainerResponse { + val logFile = File(workingDirectory, logName) + val errFile = File(workingDirectory, errName) + + val hostConfig = HostConfig.newHostConfig() + .withBinds(Bind(workingDirectory, Volume("/host"))) + + val container = dockerClient.createContainerCmd("ftsrg/theta-xsts-cli:$version") + .withCmd( + "CEGAR", + "--model", "/host/$model", + "--property", "/host/$property", + "--cexfile", "/host/$cex", + *parameter.split(" ").toTypedArray(), + ) + .withHostConfig(hostConfig) + .exec() + + dockerClient.logContainerCmd(container.id) + .withStdOut(true) + .withStdErr(true) + .exec(StreamLoggerCallback(logFile.outputStream(), errFile.outputStream())) + + return container } private suspend fun runWorkflow(workingDirectory: String, name: String) = coroutineScope { @@ -144,13 +155,13 @@ class ThetaExecutor( } } - val finishedJob = jobs.awaitAny() - - jobs.forEach { - it.cancelAndJoin() + try { + jobs.awaitAny() + } finally { + jobs.forEach { + it.cancelAndJoin() + } } - - finishedJob } fun run(workingDirectory: String, name: String) = runBlocking { diff --git a/engine/src/testFixtures/kotlin/utils.kt b/engine/src/testFixtures/kotlin/utils.kt new file mode 100644 index 0000000..7f70632 --- /dev/null +++ b/engine/src/testFixtures/kotlin/utils.kt @@ -0,0 +1,85 @@ +import com.github.dockerjava.api.async.ResultCallback +import com.github.dockerjava.api.model.Frame +import com.github.dockerjava.api.model.StreamType +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.runInterruptible +import java.io.Closeable +import java.io.OutputStream + +suspend fun List>.awaitAny(): T { + val firstCompleted = CompletableDeferred() + var counter = 0 + + forEach { job -> + job.invokeOnCompletion { exception -> + counter++ + + if (exception == null && !firstCompleted.isCompleted) { + firstCompleted.complete(job.getCompleted()) + } else { + if (counter == size) { + firstCompleted.completeExceptionally(IllegalStateException("All executed jobs failed: ", exception)) + } + } + } + } + + return firstCompleted.await() +} + +class DefferedResultCallback : ResultCallback { + val job = CompletableDeferred() + private lateinit var item: T + + override fun close() { + + } + + override fun onStart(closeable: Closeable?) { + + } + + override fun onError(throwable: Throwable) { + job.completeExceptionally(throwable) + } + + override fun onComplete() { + job.complete(item) + } + + override fun onNext(item: T) { + this.item = item + } + +} + +suspend fun runAsync(block: (DefferedResultCallback) -> Unit): T { + val callback = DefferedResultCallback() + + runInterruptible { + block(callback) + } + + return callback.job.await() +} + +class StreamLoggerCallback( + private val logStream: OutputStream, + private val errorStream: OutputStream +) : ResultCallback.Adapter() { + + override fun onNext(frame: Frame) { + if (frame.streamType == StreamType.STDOUT) { + logStream.write(frame.payload) + } else if (frame.streamType == StreamType.STDERR) { + errorStream.write(frame.payload) + } + } + + override fun close() { + super.close() + logStream.close() + errorStream.close() + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 200d822..7f7c3d2 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,6 +12,7 @@ xtext = "2.32.0" kotlin = '2.0.0' kotlinxCoroutines = "1.9.0" viatra = '2.8.0' +dockerJava = '3.4.0' [libraries] ecore = { group = "org.eclipse.emf", name = "org.eclipse.emf.ecore", version.ref = "ecore" } @@ -42,5 +43,8 @@ viatra-query-language = { group = 'org.eclipse.viatra', name = 'viatra-query-lan viatra-query-runtime = { group = 'org.eclipse.viatra', name = 'viatra-query-runtime', version.ref = "viatra" } viatra-transformation-runtime = { group = 'org.eclipse.viatra', name = 'viatra-transformation-runtime', version.ref = "viatra" } +docker-java-core = { group = "com.github.docker-java" , name = "docker-java-core", version.ref = "dockerJava" } +docker-java-transport = { group = "com.github.docker-java" , name = "docker-java-transport-httpclient5", version.ref = "dockerJava" } + [plugins] kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }