diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 503bce24f..4cf422e24 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- java_version: [19]
+ java_version: [21]
steps:
- name: Environment
@@ -66,7 +66,7 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{secrets.TOWER_CI_AWS_SECRET}}
DOCKER_USER: ${{ secrets.DOCKER_USER }}
DOCKER_PAT: ${{ secrets.DOCKER_PAT }}
- QUAY_USER: ${{ secrets.QUAY_USER }}
+ QUAY_USER: "pditommaso+wave_ci_tests"
QUAY_PAT: ${{ secrets.QUAY_PAT }}
AZURECR_USER: ${{ secrets.AZURECR_USER }}
AZURECR_PAT: ${{ secrets.AZURECR_PAT }}
diff --git a/.github/workflows/seqera_docs_changelog.yml b/.github/workflows/seqera_docs_changelog.yml
new file mode 100644
index 000000000..e38ead83d
--- /dev/null
+++ b/.github/workflows/seqera_docs_changelog.yml
@@ -0,0 +1,61 @@
+name: Push changelog to Seqera Docs
+on:
+ release:
+ types: [published]
+ workflow_dispatch:
+ inputs:
+ release_name:
+ description: 'Release version (e.g. 1.0.0)'
+ required: true
+ release_body:
+ description: 'Release changelog content'
+ required: true
+
+jobs:
+ update-docs:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Clone seqeralabs/docs
+ run: |
+ git clone https://github.com/seqeralabs/docs.git seqeralabs-docs
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Create changelog file
+ run: |
+ mkdir -p seqeralabs-docs/changelog/wave
+ cat << EOF > seqeralabs-docs/changelog/wave/${{ github.event.release.name || inputs.release_name }}.mdx
+ ---
+ title: Wave ${{ github.event.release.name || inputs.release_name }}
+ date: $(date +%Y-%m-%d)
+ tags: [wave]
+ ---
+
+ ${{ github.event.release.body || inputs.release_body }}
+ EOF
+
+ - uses: actions/create-github-app-token@v1
+ id: generate-token
+ with:
+ app-id: ${{ secrets.DOCS_BOT_APP_ID }}
+ private-key: ${{ secrets.DOCS_BOT_APP_PRIVATE_KEY }}
+ owner: seqeralabs
+ repositories: docs
+
+ - name: Create Pull Request
+ uses: peter-evans/create-pull-request@v7
+ with:
+ token: ${{ steps.generate-token.outputs.token }}
+ branch-token: ${{ steps.generate-token.outputs.token }}
+ path: seqeralabs-docs
+ commit-message: "Changelog: Wave ${{ github.event.release.name || inputs.release_name }}"
+ title: "Changelog: Wave ${{ github.event.release.name || inputs.release_name }}"
+ body: |
+ This PR adds the changelog for Wave ${{ github.event.release.name || inputs.release_name }} to the Seqera documentation.
+
+ This is an automated PR created from the Wave repository.
+ branch: changelog-wave-${{ github.event.release.name || inputs.release_name }}
+ base: master
+ delete-branch: true
diff --git a/.gitignore b/.gitignore
index 41b2ff645..be360a502 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,3 +37,6 @@ deployment-url.txt
tsp-output/
node_modules/
package-lock.json
+
+# Seqera Docs clone
+seqeralabs-docs
\ No newline at end of file
diff --git a/Makefile b/Makefile
index 23895e870..ab044536b 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-config ?= compileClasspath
+config ?= runtimeClasspath
ifdef module
mm = :${module}:
diff --git a/README.md b/README.md
index 10d853924..13acb4415 100644
--- a/README.md
+++ b/README.md
@@ -18,7 +18,7 @@ images.
* Push and cache built containers to a user-provided container repository;
* Build Singularity native containers both using a Singularity spec file, Conda package(s);
* Push Singularity native container images to OCI-compliant registries;
-
+* Scan container images for security vulnerabilities
### How it works
@@ -34,7 +34,7 @@ container registry where the image is stored, while the instrumented layers are
### Requirements
-* Java 19 or later
+* Java 21 or later
* Linux or macOS
* Redis 6.2 (or later)
* Docker engine (for development)
diff --git a/VERSION b/VERSION
index 43ded9062..d32434904 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.13.5
+1.15.5
diff --git a/build.gradle b/build.gradle
index 28442ab3d..71d6bf6c2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2,10 +2,10 @@ import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
plugins {
- id 'java-library'
+ id 'io.seqera.wave.java-library-conventions'
id 'io.seqera.wave.groovy-application-conventions'
- id "com.github.johnrengelman.shadow" version "7.1.1"
- id "io.micronaut.minimal.application" version "3.7.0"
+ id "com.github.johnrengelman.shadow" version "8.1.1"
+ id "io.micronaut.minimal.application" version "4.1.1"
id "com.google.cloud.tools.jib" version "3.4.2"
id 'org.asciidoctor.jvm.convert' version '3.3.2'
id 'jacoco'
@@ -29,73 +29,83 @@ repositories {
}
dependencies {
- annotationProcessor("io.micronaut:micronaut-http-validation")
- compileOnly("io.micronaut.data:micronaut-data-processor")
- compileOnly("io.micronaut:micronaut-inject-groovy")
- compileOnly("io.micronaut:micronaut-http-validation")
- implementation("jakarta.persistence:jakarta.persistence-api:3.0.0")
- api 'io.seqera:lib-mail:1.0.0'
- api 'io.seqera:wave-api:0.13.3'
- api 'io.seqera:wave-utils:0.14.1'
- implementation("io.micronaut:micronaut-http-client")
- implementation("io.micronaut:micronaut-jackson-databind")
- implementation("io.micronaut.groovy:micronaut-runtime-groovy")
- implementation("io.micronaut.reactor:micronaut-reactor")
- implementation("io.micronaut.reactor:micronaut-reactor-http-client")
- implementation("jakarta.annotation:jakarta.annotation-api")
- implementation("io.micronaut:micronaut-validation")
+ annotationProcessor 'io.micronaut.validation:micronaut-validation-processor'
+ annotationProcessor 'io.micronaut:micronaut-http-validation'
+ compileOnly 'io.micronaut.data:micronaut-data-processor'
+ compileOnly 'io.micronaut:micronaut-inject-groovy'
+ compileOnly 'io.micronaut:micronaut-http-validation'
+ implementation 'jakarta.persistence:jakarta.persistence-api:3.0.0'
+ api 'io.seqera:lib-mail:1.2.1'
+ api 'io.seqera:wave-api:0.14.0'
+ api 'io.seqera:wave-utils:0.15.0'
+ implementation 'io.seqera:lib-crypto:1.0.0'
+ implementation 'io.micronaut:micronaut-http-client'
+ implementation 'io.micronaut:micronaut-jackson-databind'
+ implementation 'io.micronaut.groovy:micronaut-runtime-groovy'
+ implementation 'io.micronaut.reactor:micronaut-reactor'
+ implementation 'io.micronaut.reactor:micronaut-reactor-http-client'
+ implementation 'jakarta.annotation:jakarta.annotation-api'
+ implementation 'io.micronaut.validation:micronaut-validation'
implementation 'io.micronaut.security:micronaut-security'
- implementation "org.codehaus.groovy:groovy-json"
- implementation "org.codehaus.groovy:groovy-nio"
- implementation 'com.google.guava:guava:32.1.2-jre'
+ implementation 'io.micronaut:micronaut-websocket'
+ implementation 'org.apache.groovy:groovy-json'
+ implementation 'org.apache.groovy:groovy-nio'
+ implementation 'com.google.guava:guava:33.3.1-jre'
implementation 'dev.failsafe:failsafe:3.1.0'
- implementation('io.projectreactor:reactor-core')
- implementation("io.seqera:tower-crypto:22.4.0-watson") { transitive = false } // to be replaced with 22.4.0 once released
- implementation 'org.apache.commons:commons-compress:1.24.0'
- implementation 'org.apache.commons:commons-lang3:3.12.0'
- implementation 'io.kubernetes:client-java:19.0.0'
- implementation 'io.kubernetes:client-java-api-fluent:18.0.1'
- implementation 'com.google.code.gson:gson:2.9.0'
- implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
+ implementation 'io.micronaut.reactor:micronaut-reactor'
+ implementation 'io.micronaut.reactor:micronaut-reactor-http-client'
+ implementation 'org.apache.commons:commons-compress:1.27.1'
+ implementation 'org.apache.commons:commons-lang3:3.17.0'
+ implementation 'io.kubernetes:client-java:21.0.1'
+ implementation 'io.kubernetes:client-java-api-fluent:21.0.1'
+ implementation 'com.google.code.gson:gson:2.10.1'
+ implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
- implementation 'com.squareup.moshi:moshi:1.14.0'
- implementation 'com.squareup.moshi:moshi-adapters:1.14.0'
- implementation 'redis.clients:jedis:5.0.2'
- implementation "io.github.resilience4j:resilience4j-ratelimiter:0.17.0"
+ implementation 'com.squareup.moshi:moshi:1.15.1'
+ implementation 'com.squareup.moshi:moshi-adapters:1.15.1'
+ implementation 'redis.clients:jedis:5.1.3'
+ implementation 'io.github.resilience4j:resilience4j-ratelimiter:0.17.0'
+ implementation 'io.micronaut:micronaut-retry'
// caching deps
- implementation("io.micronaut.cache:micronaut-cache-core")
- implementation("io.micronaut.cache:micronaut-cache-caffeine")
- implementation("io.micronaut.aws:micronaut-aws-parameter-store")
- implementation "software.amazon.awssdk:ecr"
- implementation "software.amazon.awssdk:ecrpublic"
+ implementation 'io.micronaut.cache:micronaut-cache-core'
+ implementation 'io.micronaut.cache:micronaut-cache-caffeine'
+ implementation 'io.micronaut.aws:micronaut-aws-parameter-store'
+ implementation 'software.amazon.awssdk:ecr'
+ implementation 'software.amazon.awssdk:ecrpublic'
implementation 'software.amazon.awssdk:ses'
- implementation 'org.yaml:snakeyaml:2.0'
+ implementation 'org.yaml:snakeyaml:2.2'
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'
implementation 'org.luaj:luaj-jse:3.0.1'
//object storage dependency
- implementation("io.micronaut.objectstorage:micronaut-object-storage-aws")
+ implementation 'io.micronaut.objectstorage:micronaut-object-storage-aws'
// include sts to allow the use of service account role - https://stackoverflow.com/a/73306570
// this sts dependency is require by micronaut-aws-parameter-store,
// not directly used by the app, for this reason keeping `runtimeOnly`
- runtimeOnly "software.amazon.awssdk:sts"
-
- runtimeOnly("io.netty:netty-tcnative-boringssl-static:2.0.0.Final")
- runtimeOnly("javax.xml.bind:jaxb-api:2.3.1")
- testImplementation("org.testcontainers:testcontainers")
- testImplementation("org.testcontainers:mysql:1.17.3")
+ runtimeOnly 'software.amazon.awssdk:sts'
+ runtimeOnly 'io.netty:netty-tcnative-boringssl-static:2.0.0.Final'
+ runtimeOnly 'javax.xml.bind:jaxb-api:2.3.1'
+ testImplementation 'org.testcontainers:testcontainers'
+ testImplementation 'org.testcontainers:mysql:1.17.3'
// --
- implementation("ch.qos.logback:logback-classic:1.4.8")
+ implementation 'ch.qos.logback:logback-classic:1.5.12'
// rate limit
- implementation 'com.github.seqeralabs:spillway:7b72700293'
+ implementation 'com.coveo:spillway:3.0.0'
// monitoring
- implementation "io.micronaut.micrometer:micronaut-micrometer-registry-prometheus"
+ implementation 'io.micronaut.micrometer:micronaut-micrometer-core'
+ implementation 'io.micronaut.micrometer:micronaut-micrometer-registry-prometheus'
// Also required to enable endpoint
- implementation "io.micronaut:micronaut-management"
+ implementation 'io.micronaut:micronaut-management'
//views
- implementation("io.micronaut.views:micronaut-views-handlebars")
+ implementation 'io.micronaut.views:micronaut-views-handlebars'
+
+ // upgrade indirect dependencies
+ runtimeOnly 'org.bouncycastle:bcpkix-jdk18on:1.78'
+ runtimeOnly 'org.bitbucket.b_c:jose4j:0.9.4'
+ runtimeOnly 'io.netty:netty-bom:4.1.115.Final'
+ runtimeOnly 'com.google.protobuf:protobuf-java:4.27.5'
}
application {
@@ -148,8 +158,7 @@ jib {
run{
def envs = findProperty('micronautEnvs')
- // note: "--enable-preview" is required to use virtual threads on Java 19 and 20
- def args = ["-Dmicronaut.environments=$envs","--enable-preview"]
+ def args = ["-Dmicronaut.environments=$envs","-Djdk.tracePinnedThreads=short", "--add-opens","java.base/java.lang=ALL-UNNAMED"]
if( environment['JVM_OPTS'] ) args.add(environment['JVM_OPTS'])
jvmArgs args
systemProperties 'DOCKER_USER': project.findProperty('DOCKER_USER') ?: environment['DOCKER_USER'],
diff --git a/buildSrc/src/main/groovy/io.seqera.wave.groovy-application-conventions.gradle b/buildSrc/src/main/groovy/io.seqera.wave.groovy-application-conventions.gradle
index 0cf39fc5e..4ff807f4c 100644
--- a/buildSrc/src/main/groovy/io.seqera.wave.groovy-application-conventions.gradle
+++ b/buildSrc/src/main/groovy/io.seqera.wave.groovy-application-conventions.gradle
@@ -11,8 +11,3 @@ plugins {
}
group = 'io.seqera'
-
-tasks.withType(Test) {
- // note: "--enable-preview" is required to use virtual thread on Java 19 and 20
- jvmArgs (["--enable-preview"])
-}
diff --git a/buildSrc/src/main/groovy/io.seqera.wave.groovy-common-conventions.gradle b/buildSrc/src/main/groovy/io.seqera.wave.groovy-common-conventions.gradle
index da3059f91..45696fae9 100644
--- a/buildSrc/src/main/groovy/io.seqera.wave.groovy-common-conventions.gradle
+++ b/buildSrc/src/main/groovy/io.seqera.wave.groovy-common-conventions.gradle
@@ -14,17 +14,17 @@ repositories {
java {
toolchain {
- languageVersion = JavaLanguageVersion.of(19)
+ languageVersion = JavaLanguageVersion.of(21)
}
}
compileJava {
- options.release.set(11)
+ options.release.set(17)
}
-tasks.withType(GroovyCompile) {
- sourceCompatibility = '11'
- targetCompatibility = '11'
+tasks.withType(GroovyCompile).configureEach {
+ sourceCompatibility = '17'
+ targetCompatibility = '17'
}
group = 'io.seqera'
diff --git a/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle b/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle
index f6197b641..5b7eb0d1e 100644
--- a/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle
+++ b/buildSrc/src/main/groovy/io.seqera.wave.java-library-conventions.gradle
@@ -16,17 +16,17 @@ repositories {
java {
toolchain {
- languageVersion = JavaLanguageVersion.of(19)
+ languageVersion = JavaLanguageVersion.of(21)
}
}
compileJava {
- options.release.set(11)
+ options.release.set(17)
}
-tasks.withType(GroovyCompile) {
- sourceCompatibility = '11'
- targetCompatibility = '11'
+tasks.withType(GroovyCompile).configureEach {
+ sourceCompatibility = '17'
+ targetCompatibility = '17'
}
test {
@@ -40,22 +40,22 @@ java {
}
dependencies {
- implementation 'org.slf4j:slf4j-api:1.7.36'
+ implementation 'org.slf4j:slf4j-api:2.0.16'
+ implementation 'org.slf4j:slf4j-jdk-platform-logging:2.0.16'
- testImplementation 'ch.qos.logback:logback-core:1.2.11'
- testImplementation 'ch.qos.logback:logback-classic:1.2.11'
- testImplementation "org.codehaus.groovy:groovy:3.0.15"
- testImplementation "org.codehaus.groovy:groovy-nio:3.0.15"
- testImplementation ("org.codehaus.groovy:groovy-test:3.0.17")
- testImplementation ("cglib:cglib-nodep:3.3.0")
- testImplementation ("org.objenesis:objenesis:3.2")
- testImplementation ("org.spockframework:spock-core:2.3-groovy-3.0") { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
- testImplementation ('org.spockframework:spock-junit4:2.3-groovy-3.0') { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
+ testImplementation 'ch.qos.logback:logback-core:1.5.12'
+ testImplementation 'ch.qos.logback:logback-classic:1.5.12'
+ testImplementation 'org.apache.groovy:groovy:4.0.15'
+ testImplementation 'org.apache.groovy:groovy-nio:4.0.15'
+ testImplementation 'org.apache.groovy:groovy-test:4.0.15'
+ testImplementation 'org.objenesis:objenesis:3.4'
+ testImplementation 'net.bytebuddy:byte-buddy:1.14.17'
+ testImplementation 'org.spockframework:spock-core:2.3-groovy-4.0'
+ testImplementation 'org.spockframework:spock-junit4:2.3-groovy-4.0'
}
-tasks.withType(Test) {
- jvmArgs ([
- '--enable-preview',
+tasks.withType(Test).configureEach {
+ jvmArgs([
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.io=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
diff --git a/changelog.txt b/changelog.txt
index 960bfe42f..e6cc1d01e 100644
--- a/changelog.txt
+++ b/changelog.txt
@@ -1,4 +1,90 @@
# Wave changelog
+1.15.5 - 9 Dec 2024
+- Add Jedis pool metrics binder (#756) [a6b2833d]
+- Add jul-to-slf4j dependency [2874187b]
+- Add slf4j backend for Java sys logger [ec5c67fe]
+- Add support for common pool and virtual threads pool metrics (#762) [8aa9075c]
+- Add GH Workflow to sync changelog with Seqera docs (#761) [c8bb7b03]
+- Fix handling error for known http statuses [513a1ef2]
+- Improve container validation [1c36fd4c]
+- Remove unused stream-executor [61357048]
+- Suppress Caffeine log warnings [2fd351d3]
+- Tune Tower connector delay and retry [3fd5643f]
+- Unwrap Failsafe target exception [6fb214a4]
+
+1.15.4 - 27 Nov 2024
+- Add blocking executor to async caches (#759) [86f7d3e3]
+- Add ExecutesOn annotation to error controller [a2db2b00]
+
+1.15.3 - 25 Nov 2024
+- Add Support Redis support for SSL and password (#717) [bf63599d]
+- Bump MN 4.7.1 (#741) [203e5dd0]
+- Remove double cache builder invocation [994b722c]
+- Bump lib-mail version 1.2.1 [4da98a23]
+- Bump io.seqera:lib-crypto [36f5b24d]
+- Bump gson 2.10.1 [117703b9]
+- Bump protobuf-java version 4.27.5 [3ea758b4]
+- Bump k8s client to version 21.0.1 (#553) [51788578]
+
+1.15.2 [skipped]
+
+1.15.1 - 20 Nov 2024
+- Check block existence with object operation (#750) [86ef526c]
+- Add /v1alpha2/validate-creds (#752) [e24ec62c]
+- Switched env vars around (#748) [080d5cce]
+
+1.15.0 - 18 Nov 2024
+- Migration to virtual threads - phase 1 (#746) [aaf0420c]
+- Use runAsync instead supplyAsync [ffd0dacd]
+- Remove deprecated ThreadPoolBuilder [7af3046f]
+- Replace Guava cache with Caffeine (#745) [cf813e0a]
+- Update project deps [f24b684d]
+- Bump guava to version 33.3.1-jre [328e9ea3]
+- Bump Netty version 4.1.115.Final [9ba433ce]
+- Bump gradle 8.10.2 [52272fe1]
+
+1.14.1 - 14 Nov 2024
+- Fix creds validation endpoint (#740) [8c0f3a4c]
+
+1.14.0 - 10 Nov 2024
+- Fix K8s env propagation [76f0a456]
+- Remove deprecated K8s methods (#734) [481298bf]
+- Bump to Micronaut 4.6 (#318) [f67e8556]
+- Bump Java 21 as build requirement (#519) [132f9491]
+- Bump bitbucket.b_c:jose4j:0.9.4 [2e10416a]
+- Bump bouncycastle:bcpkix-jdk18on:1.78 [ede22ce5]
+- Bump jedis 5.1.3 (#732) [2ee0854e]
+- Bump logback 1.5.12 [f5fe3fa4]
+- Bump make deps runtimeclasspath [2a342b18]
+- Bump snakeyaml 2.2 [6aeb3c33]
+- Bump spillway 3.0.0 (#731) [1502696d]
+- Bump explicit dep to websocket module [2e413ac2]
+- Enables EKS Pod identity via AWS SDK 2.27.8
+
+1.13.11 - 2 Nov 2024
+- Rename async methods for semantic consistency [38114d75]
+- Save scan record async (#730) [3ad82a3a]
+- Cap number of vulnerabilities reported in scan report to 100 (#728) [2f0d8f9f]
+- Bump org.apache.commons:commons-compress:1.27.1 (#722) [adb75007]
+
+1.13.10 - 29 Oct 2024
+- Log slow processing stream messages [e8a6b7ee]
+- Prevent scan when mode is not defined [d42bcae1]
+
+1.13.9 - 29 Oct 2024
+- Fix inspect view (#725) [dcf41dea] [e38e2c44]
+
+1.13.8 - 26 Oct 2024
+- Fix update scan status synchronously [e767c367]
+- Bump scan warn colour [705141f0]
+- Improve scan logging [f01e4dba]
+
+1.13.7 - 25 Oct 2024
+- Add ability to configure trivy environment & DBs (#720) [0f600306]
+
+1.13.6 - 25 Oct 2024
+- Add scan color for different vuls (#719) [ab81b6dc]
+
1.13.5 - 23 Oct 2024
- Fix Do not render inspect url on fail [d96275a1]
- Fix inspect view empty nodes (#706) [b3473b7e]
diff --git a/configuration.md b/configuration.md
index d162a98b5..d119cae16 100644
--- a/configuration.md
+++ b/configuration.md
@@ -184,6 +184,16 @@ Rate limit configuration controls the limits of anonymous and authenticated user
- **`redis.pool.enabled`**: whether to enable the Redis pool. It is set to `true` by default, enabling the use of a connection pool for efficient management of connections to the Redis server. *Optional*.
+- **`redis.pool.minIdle`**: Specifies the minimum number of idle connections to maintain in the Redis connection pool. The default value is `0`. This ensures that connections are readily available for use. *Optional*.
+
+- **`redis.pool.maxIdle`**: Specifies the maximum number of idle connections to maintain in the Redis connection pool. The default value is `10`. *Optional*.
+
+- **`redis.pool.maxTotal`**: Specifies the maximum number of connections that can be maintained in the Redis connection pool. The default value is `50`. This helps to manage resource usage efficiently while supporting high demand. *Optional*.
+
+- **`redis.client.timeout`**: Defines the timeout duration (in milliseconds) for Redis client operations. The default value is `5000` (5 seconds). *Optional*.
+
+- **`redis.password`**: Specifies the password used to authenticate with the Redis server. This is needed when redis authentication is enabled. *Optional*.
+
- **`surreal.default.ns`**: the namespace for the Surreal database. It can be set using `${SURREALDB_NS}` environment variable. *Mandatory*.
- **`surreal.default.db`**: the name of the Surreal database. It can be set using`${SURREALDB_DB}` environment variable. This setting defines the target database within the Surreal database system that Wave should interact with. *Mandatory*.
diff --git a/docs/cli/index.mdx b/docs/cli/index.mdx
index 6581f628d..45f1239e2 100644
--- a/docs/cli/index.mdx
+++ b/docs/cli/index.mdx
@@ -27,8 +27,8 @@ The following CLI arguments are available for Seqera Platform integration:
The following environment variables are available for Seqera Platform integration:
-- `TOWER_API_ENDPOINT`: A Seqera Platform auth token so that Wave can access your private registry credentials.
-- `TOWER_ACCESS_TOKEN`: For Enterprise customers, the URL endpoint for your instance, such as `https://api.cloud.seqera.io`.
+- `TOWER_ACCESS_TOKEN`: A Seqera Platform auth token so that Wave can access your private registry credentials.
+- `TOWER_API_ENDPOINT`: For Enterprise customers, the URL endpoint for your instance, such as `https://api.cloud.seqera.io`.
- `TOWER_WORKSPACE_ID`: A Seqera Platform workspace ID, such as `1234567890`, where credentials may be stored.
## Usage limits
diff --git a/gradle.properties b/gradle.properties
index 645585ab7..9c2c3bcbb 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,5 +16,5 @@
# along with this program. If not, see .
#
-micronautVersion=3.10.3
+micronautVersion=4.7.1
micronautEnvs=dev,h2,mail,aws-ses
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index a4413138c..df97d72b8 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
diff --git a/settings.gradle b/settings.gradle
index 395412606..0605b3681 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,3 +1,10 @@
+plugins {
+ // required to download the toolchain (jdk) from a remote repository
+ // https://github.com/gradle/foojay-toolchains
+ // https://docs.gradle.org/current/userguide/toolchains.html#sub:download_repositories
+ id("org.gradle.toolchains.foojay-resolver-convention") version "0.7.0"
+}
+
rootProject.name="wave"
// only for development
diff --git a/src/main/groovy/io/seqera/wave/ErrorHandler.groovy b/src/main/groovy/io/seqera/wave/ErrorHandler.groovy
index 0d872918b..69a4ab430 100644
--- a/src/main/groovy/io/seqera/wave/ErrorHandler.groovy
+++ b/src/main/groovy/io/seqera/wave/ErrorHandler.groovy
@@ -24,6 +24,7 @@ import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpResponse
import io.micronaut.http.HttpResponseFactory
import io.micronaut.http.HttpStatus
+import io.micronaut.http.exceptions.HttpStatusException
import io.micronaut.security.authentication.AuthorizationException
import io.seqera.wave.exception.BuildTimeoutException
import io.seqera.wave.exception.DockerRegistryException
@@ -55,8 +56,9 @@ class ErrorHandler {
def HttpResponse handle(HttpRequest httpRequest, Throwable t, Mapper responseFactory) {
final errId = LongRndKey.rndHex()
final request = httpRequest?.toString()
+ final knownException = t instanceof WaveException || t instanceof HttpStatusException
def msg = t.message
- if( t instanceof WaveException && msg ) {
+ if( knownException && msg ) {
// the the error cause
if( t.cause ) msg += " - Cause: ${t.cause.message ?: t.cause}".toString()
// render the message for logging
@@ -81,6 +83,13 @@ class ErrorHandler {
log.error(render, t)
}
+ if( t instanceof HttpStatusException ) {
+ final body = (t.body.isPresent() ? t.body.get() : t.message) as T
+ return HttpResponse
+ .status(t.status)
+ .body(body)
+ }
+
if( t instanceof RegistryForwardException ) {
// report this error as it has been returned by the target registry
return HttpResponse
diff --git a/src/main/groovy/io/seqera/wave/auth/BasicAuthenticationProvider.groovy b/src/main/groovy/io/seqera/wave/auth/BasicAuthenticationProvider.groovy
index b76abda4e..d76ff89ee 100644
--- a/src/main/groovy/io/seqera/wave/auth/BasicAuthenticationProvider.groovy
+++ b/src/main/groovy/io/seqera/wave/auth/BasicAuthenticationProvider.groovy
@@ -19,18 +19,17 @@
package io.seqera.wave.auth
import groovy.util.logging.Slf4j
+import io.micronaut.core.annotation.NonNull
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.HttpRequest
-import io.micronaut.security.authentication.AuthenticationProvider
+import io.micronaut.security.authentication.AuthenticationFailureReason
import io.micronaut.security.authentication.AuthenticationRequest
import io.micronaut.security.authentication.AuthenticationResponse
+import io.micronaut.security.authentication.provider.HttpRequestAuthenticationProvider
import io.seqera.wave.service.account.AccountService
import io.seqera.wave.util.StringUtils
import jakarta.inject.Inject
import jakarta.inject.Singleton
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.FluxSink
/**
* Basic Authentication provider
*
@@ -38,25 +37,22 @@ import reactor.core.publisher.FluxSink
*/
@Slf4j
@Singleton
-class BasicAuthenticationProvider implements AuthenticationProvider {
+class BasicAuthenticationProvider implements HttpRequestAuthenticationProvider {
@Inject
private AccountService accountService
@Override
- Publisher authenticate(@Nullable HttpRequest> httpRequest, AuthenticationRequest, ?> authRequest) {
- Flux.create(emitter -> {
- final user = authRequest.identity?.toString()
- final pass = authRequest.secret?.toString()
- if (accountService.isAuthorised(user, pass)) {
- log.trace "Auth request OK - user '$user'; password: '${StringUtils.redact(pass)}'"
- emitter.next(AuthenticationResponse.success((String) authRequest.identity))
- emitter.complete()
- }
- else {
- log.trace "Auth request FAILED - user '$user'; password: '${StringUtils.redact(pass)}'"
- emitter.error(AuthenticationResponse.exception())
- }
- }, FluxSink.OverflowStrategy.ERROR)
+ AuthenticationResponse authenticate(@Nullable HttpRequest httpRequest, @NonNull AuthenticationRequest authRequest) {
+ final user = authRequest.identity?.toString()
+ final pass = authRequest.secret?.toString()
+ if (accountService.isAuthorised(user, pass)) {
+ log.trace "Auth request OK - user '$user'; password: '${StringUtils.redact(pass)}'"
+ return AuthenticationResponse.success(authRequest.identity)
+ }
+ else {
+ log.trace "Auth request FAILED - user '$user'; password: '${StringUtils.redact(pass)}'"
+ return AuthenticationResponse.failure(AuthenticationFailureReason.CREDENTIALS_DO_NOT_MATCH)
+ }
}
}
diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy
index d8ea12d26..a4252c4f3 100644
--- a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy
@@ -21,19 +21,20 @@ package io.seqera.wave.auth
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.CompletionException
+import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
-import com.google.common.cache.CacheBuilder
-import com.google.common.cache.CacheLoader
-import com.google.common.cache.LoadingCache
-import com.google.common.util.concurrent.UncheckedExecutionException
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache
+import com.github.benmanes.caffeine.cache.CacheLoader
+import com.github.benmanes.caffeine.cache.Caffeine
import groovy.json.JsonSlurper
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.ToString
import groovy.util.logging.Slf4j
+import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.exception.RegistryUnauthorizedAccessException
@@ -41,7 +42,9 @@ import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.util.RegHelper
import io.seqera.wave.util.Retryable
import io.seqera.wave.util.StringUtils
+import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
+import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.auth.RegistryUtils.isServerError
@@ -65,6 +68,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
@Inject
private RegistryTokenStore tokenStore
+ @Inject
+ @Named(TaskExecutors.BLOCKING)
+ private ExecutorService ioExecutor
+
@Canonical
@ToString(includePackage = false, includeNames = true)
static private class CacheKey {
@@ -101,16 +108,24 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
return result
}
- private LoadingCache cacheTokens = CacheBuilder
- .newBuilder()
- .maximumSize(10_000)
- .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
- .build(loader)
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ private AsyncLoadingCache cacheTokens
@Inject
private RegistryLookupService lookupService
- @Inject RegistryCredentialsFactory credentialsFactory
+ @Inject
+ private RegistryCredentialsFactory credentialsFactory
+
+ @PostConstruct
+ private void init() {
+ cacheTokens = Caffeine
+ .newBuilder()
+ .maximumSize(10_000)
+ .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
+ .executor(ioExecutor)
+ .buildAsync(loader)
+ }
/**
* Implements container registry login
@@ -269,9 +284,10 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
protected String getAuthToken(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
try {
- return cacheTokens.get(key)
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ return cacheTokens.synchronous().get(key)
}
- catch (UncheckedExecutionException | ExecutionException e) {
+ catch (CompletionException e) {
// this catches the exception thrown in the cache loader lookup
// and throws the causing exception that should be `RegistryUnauthorizedAccessException`
throw e.cause
@@ -287,7 +303,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
*/
void invalidateAuthorization(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
- cacheTokens.invalidate(key)
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ cacheTokens.synchronous().invalidate(key)
tokenStore.remove(getStableKey(key))
}
diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryConfig.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryConfig.groovy
index eb43f3dad..560a784a5 100644
--- a/src/main/groovy/io/seqera/wave/auth/RegistryConfig.groovy
+++ b/src/main/groovy/io/seqera/wave/auth/RegistryConfig.groovy
@@ -47,7 +47,7 @@ class RegistryConfig {
* io: [ ... ]
* ]
*/
- private Map registries
+ Map registries
RegistryKeys getRegistryKeys(String registryName) {
final String defaultRegistry = registries.get('default')?.toString() ?: 'docker.io'
diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy
index 0dc641879..13775d4a4 100644
--- a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy
@@ -20,20 +20,23 @@ package io.seqera.wave.auth
import java.net.http.HttpRequest
import java.net.http.HttpResponse
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.CompletionException
+import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
-import com.google.common.cache.CacheBuilder
-import com.google.common.cache.CacheLoader
-import com.google.common.cache.LoadingCache
-import com.google.common.util.concurrent.UncheckedExecutionException
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache
+import com.github.benmanes.caffeine.cache.CacheLoader
+import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
+import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.exception.RegistryForwardException
import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.util.Retryable
+import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
+import jakarta.inject.Named
import jakarta.inject.Singleton
import static io.seqera.wave.WaveDefault.DOCKER_IO
import static io.seqera.wave.WaveDefault.DOCKER_REGISTRY_1
@@ -56,6 +59,10 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
@Inject
private RegistryAuthStore store
+ @Inject
+ @Named(TaskExecutors.BLOCKING)
+ private ExecutorService ioExecutor
+
private CacheLoader loader = new CacheLoader() {
@Override
RegistryAuth load(URI endpoint) throws Exception {
@@ -74,11 +81,18 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
}
}
- private LoadingCache cache = CacheBuilder
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ private AsyncLoadingCache cache
+
+ @PostConstruct
+ void init() {
+ cache = Caffeine
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(1, TimeUnit.HOURS)
- .build(loader)
+ .executor(ioExecutor)
+ .buildAsync(loader)
+ }
protected RegistryAuth lookup0(URI endpoint) {
final httpClient = HttpClientFactory.followRedirectsHttpClient()
@@ -117,10 +131,11 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
RegistryInfo lookup(String registry) {
try {
final endpoint = registryEndpoint(registry)
- final auth = cache.get(endpoint)
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ final auth = cache.synchronous().get(endpoint)
return new RegistryInfo(registry, endpoint, auth)
}
- catch (UncheckedExecutionException | ExecutionException e) {
+ catch (CompletionException e) {
// this catches the exception thrown in the cache loader lookup
// and throws the causing exception that should be `RegistryUnauthorizedAccessException`
throw e.cause
diff --git a/src/main/groovy/io/seqera/wave/configuration/ScanConfig.groovy b/src/main/groovy/io/seqera/wave/configuration/ScanConfig.groovy
index a0f833f27..0ff20d39c 100644
--- a/src/main/groovy/io/seqera/wave/configuration/ScanConfig.groovy
+++ b/src/main/groovy/io/seqera/wave/configuration/ScanConfig.groovy
@@ -21,16 +21,14 @@ package io.seqera.wave.configuration
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
-
-import io.micronaut.context.annotation.Requires
-import io.micronaut.core.annotation.Nullable
import javax.annotation.PostConstruct
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
+import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
-import io.seqera.wave.util.StringUtils
+import io.micronaut.core.annotation.Nullable
import jakarta.inject.Singleton
/**
* Container Scan service settings
@@ -83,8 +81,12 @@ class ScanConfig {
Duration scanIdDuration
@Nullable
- @Value('${wave.scan.github-token}')
- String githubToken
+ @Value('${wave.scan.environment}')
+ List environment
+
+ @Nullable
+ @Value('${wave.scan.vulnerability.limit:100}')
+ Integer vulnerabilityLimit
String getScanImage() {
return scanImage
@@ -93,7 +95,11 @@ class ScanConfig {
@Memoized
Path getCacheDirectory() {
final result = Path.of(buildDirectory).toAbsolutePath().resolve('.trivy-cache')
- Files.createDirectories(result)
+ try {
+ Files.createDirectories(result)
+ } catch (IOException e) {
+ log.error "Unable to create scan cache directory=${result} - cause: ${e.message}"
+ }
return result
}
@@ -118,8 +124,23 @@ class ScanConfig {
return severity
}
+ List> getEnvironmentAsTuples() {
+ if( !environment )
+ return List.of()
+ final result = new ArrayList>()
+ for( String entry : environment ) {
+ final p=entry.indexOf('=')
+ final name = p!=-1 ? entry.substring(0,p) : entry
+ final value = p!=-1 ? entry.substring(p+1) : ''
+ if( !value )
+ log.warn "Invalid 'wave.scan.environment' value -- offending entry: '$entry'"
+ result.add(new Tuple2(name,value))
+ }
+ return result
+ }
+
@PostConstruct
private void init() {
- log.info("Scanner config: docker image name: ${scanImage}; cache directory: ${cacheDirectory}; timeout=${timeout}; cpus: ${requestsCpu}; mem: ${requestsMemory}; severity: $severity; retry-attempts: $retryAttempts; github-token=${StringUtils.redact(githubToken)}")
+ log.info("Scan config: docker image name: ${scanImage}; cache directory: ${cacheDirectory}; timeout=${timeout}; cpus: ${requestsCpu}; mem: ${requestsMemory}; severity: $severity; vulnerability-limit: $vulnerabilityLimit; retry-attempts: $retryAttempts; env=${environment}")
}
}
diff --git a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy
index 06679fa4a..f99549087 100644
--- a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy
@@ -42,7 +42,7 @@ import jakarta.inject.Inject
@Slf4j
@CompileStatic
@Controller("/")
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class BuildController {
@Inject
diff --git a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy
index 9001ac458..4145d4385 100644
--- a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy
@@ -29,6 +29,7 @@ import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpResponse
+import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Delete
import io.micronaut.http.annotation.Error
@@ -102,7 +103,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture
@Slf4j
@CompileStatic
@Controller("/")
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class ContainerController {
@Inject
@@ -180,14 +181,14 @@ class ContainerController {
@Deprecated
@Post('/container-token')
- @ExecuteOn(TaskExecutors.IO)
- CompletableFuture> getToken(HttpRequest httpRequest, SubmitContainerTokenRequest req) {
+ @ExecuteOn(TaskExecutors.BLOCKING)
+ CompletableFuture> getToken(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) {
return getContainerImpl(httpRequest, req, false)
}
@Post('/v1alpha2/container')
- @ExecuteOn(TaskExecutors.IO)
- CompletableFuture> getTokenV2(HttpRequest httpRequest, SubmitContainerTokenRequest req) {
+ @ExecuteOn(TaskExecutors.BLOCKING)
+ CompletableFuture> getTokenV2(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) {
return getContainerImpl(httpRequest, req, true)
}
@@ -264,7 +265,7 @@ class ContainerController {
final ip = addressResolver.resolve(httpRequest)
// check the rate limit before continuing
if( rateLimiterService )
- rateLimiterService.acquirePull(new AcquireRequest(identity.userId as String, ip))
+ rateLimiterService.acquirePull(new AcquireRequest(identity.userEmail, ip))
// create request data
final data = makeRequestData(req, identity, ip)
final token = containerService.computeToken(data)
@@ -284,7 +285,7 @@ class ContainerController {
protected void storeContainerRequest0(SubmitContainerTokenRequest req, ContainerRequest data, TokenData token, String target, String ip) {
try {
final recrd = new WaveContainerRecord(req, data, target, ip, token.expiration)
- persistenceService.saveContainerRequest(recrd)
+ persistenceService.saveContainerRequestAsync(recrd)
}
catch (Throwable e) {
log.error("Unable to store container request with token: ${token}", e)
diff --git a/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy b/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy
index bceda35d2..af4ad61b3 100644
--- a/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ErrorController.groovy
@@ -25,6 +25,8 @@ import io.micronaut.http.HttpResponse
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Error
import io.micronaut.http.hateoas.JsonError
+import io.micronaut.scheduling.TaskExecutors
+import io.micronaut.scheduling.annotation.ExecuteOn
import io.seqera.wave.ErrorHandler
import jakarta.inject.Inject
/**
@@ -35,9 +37,11 @@ import jakarta.inject.Inject
@Slf4j
@CompileStatic
@Controller('/error')
+@ExecuteOn(TaskExecutors.BLOCKING)
class ErrorController {
- @Inject ErrorHandler handler
+ @Inject
+ private ErrorHandler handler
@Error(global = true)
HttpResponse handleException(HttpRequest request, Throwable exception) {
diff --git a/src/main/groovy/io/seqera/wave/controller/InspectController.groovy b/src/main/groovy/io/seqera/wave/controller/InspectController.groovy
index 88d66f9b8..6093d4dfb 100644
--- a/src/main/groovy/io/seqera/wave/controller/InspectController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/InspectController.groovy
@@ -25,6 +25,7 @@ import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.HttpResponse
+import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.QueryValue
@@ -49,7 +50,7 @@ import static io.seqera.wave.util.ContainerHelper.patchPlatformEndpoint
@Slf4j
@CompileStatic
@Controller("/")
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class InspectController {
@Inject
@@ -70,7 +71,7 @@ class InspectController {
private String serverUrl
@Post("/v1alpha1/inspect")
- CompletableFuture> inspect(ContainerInspectRequest req, @Nullable @QueryValue String platform) {
+ CompletableFuture> inspect(@Body ContainerInspectRequest req, @Nullable @QueryValue String platform) {
if( !req.containerImage )
throw new BadRequestException("Missing 'containerImage' attribute")
diff --git a/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy b/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy
index a9ce6f89a..f8008674e 100644
--- a/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy
@@ -52,7 +52,7 @@ import static io.micronaut.http.HttpHeaders.WWW_AUTHENTICATE
@Requires(property = 'wave.metrics.enabled', value = 'true')
@Secured(SecurityRule.IS_AUTHENTICATED)
@Controller
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class MetricsController {
@Inject
diff --git a/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy
index 8aaf87a51..c9c3bc8f7 100644
--- a/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy
@@ -36,7 +36,7 @@ import jakarta.inject.Inject
@Slf4j
@CompileStatic
@Controller("/")
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class MirrorController {
@Inject
diff --git a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy
index ec6416537..634e7cad3 100644
--- a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy
@@ -67,7 +67,7 @@ import reactor.core.publisher.Mono
@Slf4j
@CompileStatic
@Controller("/v2")
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class RegistryProxyController {
@Inject
@@ -123,7 +123,7 @@ class RegistryProxyController {
if( route.manifest && route.digest ){
String ip = addressResolver.resolve(httpRequest)
- rateLimiterService?.acquirePull( new AcquireRequest(route.identity.userId as String, ip) )
+ rateLimiterService?.acquirePull( new AcquireRequest(route.identity.userEmail, ip) )
}
// check if it's a container under build
diff --git a/src/main/groovy/io/seqera/wave/controller/ScanController.groovy b/src/main/groovy/io/seqera/wave/controller/ScanController.groovy
index 230a8b4a9..43fc38a5d 100644
--- a/src/main/groovy/io/seqera/wave/controller/ScanController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ScanController.groovy
@@ -39,7 +39,7 @@ import jakarta.inject.Inject
@CompileStatic
@Requires(bean = ContainerScanService)
@Controller("/")
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class ScanController {
@Inject
diff --git a/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy b/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy
index 207c6e8d5..7f0a895e4 100644
--- a/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy
@@ -39,7 +39,7 @@ import io.seqera.wave.util.BuildInfo
@Slf4j
@Controller("/")
@CompileStatic
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class ServiceInfoController {
@Value('${wave.landing.url}')
diff --git a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy
index 631d0bcc8..cebc541b5 100644
--- a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy
@@ -18,27 +18,30 @@
package io.seqera.wave.controller
-import javax.validation.Valid
-
+import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Post
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.seqera.wave.auth.RegistryAuthService
import jakarta.inject.Inject
-import reactor.core.publisher.Mono
+import jakarta.validation.Valid
-@ExecuteOn(TaskExecutors.IO)
-@Controller("/validate-creds")
+@Controller("/")
+@ExecuteOn(TaskExecutors.BLOCKING)
class ValidateController {
@Inject RegistryAuthService loginService
- @Post
- Mono validateCreds(@Valid ValidateRegistryCredsRequest request){
- Mono.just(
- loginService.validateUser(request.registry, request.userName, request.password)
- )
+ @Deprecated
+ @Post("/validate-creds")
+ Boolean validateCreds(@Valid ValidateRegistryCredsRequest request){
+ loginService.validateUser(request.registry, request.userName, request.password)
+ }
+
+ @Post("/v1alpha2/validate-creds")
+ Boolean validateCredsV2(@Valid @Body ValidateRegistryCredsRequest request){
+ loginService.validateUser(request.registry, request.userName, request.password)
}
}
diff --git a/src/main/groovy/io/seqera/wave/controller/ValidateRegistryCredsRequest.groovy b/src/main/groovy/io/seqera/wave/controller/ValidateRegistryCredsRequest.groovy
index db465ef0a..3fbe92d07 100644
--- a/src/main/groovy/io/seqera/wave/controller/ValidateRegistryCredsRequest.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ValidateRegistryCredsRequest.groovy
@@ -18,10 +18,8 @@
package io.seqera.wave.controller
-import io.micronaut.core.annotation.Nullable
-import javax.validation.constraints.NotBlank
-
import io.micronaut.core.annotation.Introspected
+import jakarta.validation.constraints.NotBlank
@Introspected
class ValidateRegistryCredsRequest {
diff --git a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy
index 69d37d94b..68328eb36 100644
--- a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy
+++ b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy
@@ -20,6 +20,7 @@ package io.seqera.wave.controller
import java.util.regex.Pattern
+import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Value
@@ -45,6 +46,7 @@ import io.seqera.wave.service.persistence.WaveBuildRecord
import io.seqera.wave.service.persistence.WaveScanRecord
import io.seqera.wave.service.scan.ContainerScanService
import io.seqera.wave.service.scan.ScanEntry
+import io.seqera.wave.service.scan.ScanVulnerability
import io.seqera.wave.util.JacksonHelper
import jakarta.inject.Inject
import static io.seqera.wave.util.DataTimeUtils.formatDuration
@@ -57,7 +59,7 @@ import static io.seqera.wave.util.DataTimeUtils.formatTimestamp
@Slf4j
@CompileStatic
@Controller("/view")
-@ExecuteOn(TaskExecutors.IO)
+@ExecuteOn(TaskExecutors.BLOCKING)
class ViewController {
@Inject
@@ -408,7 +410,10 @@ class ViewController {
}
Map makeScanViewBinding(WaveScanRecord result, Map binding=new HashMap(10)) {
+ final color = getScanColor(result.vulnerabilities)
binding.should_refresh = !result.done()
+ binding.scan_color_bg = color.background
+ binding.scan_color_fg = color.foreground
binding.scan_id = result.id
binding.scan_container_image = result.containerImage ?: '-'
binding.scan_platform = result.platform?.toString() ?: '-'
@@ -437,4 +442,24 @@ class ViewController {
return binding
}
+ @Canonical
+ static class Colour {
+ final background
+ final foreground
+ }
+
+ protected static Colour getScanColor(List vulnerabilities){
+ boolean hasMedium = vulnerabilities.stream()
+ .anyMatch(v -> v.severity.equals("MEDIUM"))
+ boolean hasHighOrCritical = vulnerabilities.stream()
+ .anyMatch(v -> v.severity.equals("HIGH") || v.severity.equals("CRITICAL"))
+ if(hasHighOrCritical){
+ return new Colour('#ffe4e2', '#e00404')
+ }
+ else if(hasMedium){
+ return new Colour('#fff8c5', "#000000")
+ }
+ return new Colour('#dff0d8', '#3c763d')
+ }
+
}
diff --git a/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy b/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy
index 909cc8d5d..bd3837e87 100644
--- a/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy
+++ b/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy
@@ -280,7 +280,7 @@ class ContainerAugmenter {
return result
}
- synchronized protected Map layerBlob(String image, ContainerLayer layer) {
+ protected Map layerBlob(String image, ContainerLayer layer) {
log.debug "Adding layer: $layer to image: $client.registry.name/$image"
// store the layer blob in the cache
final String path = "$client.registry.name/v2/$image/blobs/$layer.gzipDigest"
@@ -295,7 +295,6 @@ class ContainerAugmenter {
protected Tuple2 updateImageManifest(String imageName, String imageManifest, String newImageConfigDigest, newImageConfigSize, boolean oci) {
-
// turn the json string into a json map
// and append the new layer
final manifest = (Map) new JsonSlurper().parseText(imageManifest)
diff --git a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy
index 1d9bf61de..01ed6bd94 100644
--- a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy
+++ b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy
@@ -18,6 +18,8 @@
package io.seqera.wave.core
+import java.util.concurrent.CompletableFuture
+
import groovy.transform.CompileStatic
import groovy.transform.ToString
import groovy.util.logging.Slf4j
@@ -133,7 +135,7 @@ class RegistryProxyService {
return
try {
- persistenceService.updateContainerRequest(route.token, digest)
+ persistenceService.updateContainerRequestAsync(route.token, digest)
} catch (Throwable t) {
log.error("Unable store container request for token: $route.token", t)
}
@@ -193,7 +195,7 @@ class RegistryProxyService {
String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) {
try {
- return getImageDigest0(containerImage, identity, retryOnNotFound)
+ return getImageDigest0(containerImage, identity, retryOnNotFound).get()
}
catch(Exception e) {
log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}"
@@ -203,8 +205,15 @@ class RegistryProxyService {
static private List RETRY_ON_NOT_FOUND = HTTP_RETRYABLE_ERRORS + 404
+ // note: return a CompletableFuture to force micronaut to use caffeine AsyncCache
+ // that provides a workaround about the use of virtual threads with SyncCache
+ // see https://github.com/ben-manes/caffeine/issues/1468#issuecomment-1906733926
@Cacheable(value = 'cache-registry-proxy', atomic = true, parameters = ['image'])
- protected String getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
+ protected CompletableFuture getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
+ CompletableFuture.completedFuture(getImageDigest1(image, identity, retryOnNotFound))
+ }
+
+ protected String getImageDigest1(String image, PlatformId identity, boolean retryOnNotFound) {
final coords = ContainerCoordinates.parse(image)
final route = RoutePath.v2manifestPath(coords, identity)
final proxyClient = client(route)
diff --git a/src/main/groovy/io/seqera/wave/exchange/PairingRequest.groovy b/src/main/groovy/io/seqera/wave/exchange/PairingRequest.groovy
index b54ac876d..fb17c68ec 100644
--- a/src/main/groovy/io/seqera/wave/exchange/PairingRequest.groovy
+++ b/src/main/groovy/io/seqera/wave/exchange/PairingRequest.groovy
@@ -18,11 +18,10 @@
package io.seqera.wave.exchange
-import javax.validation.constraints.NotBlank
-import javax.validation.constraints.NotNull
-
import groovy.transform.CompileStatic
import io.micronaut.core.annotation.Introspected
+import jakarta.validation.constraints.NotBlank
+import jakarta.validation.constraints.NotNull
/**
* Model the request for a remote service instance to register
diff --git a/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy b/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy
index da19fc7c6..bb70548ac 100644
--- a/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy
+++ b/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy
@@ -83,10 +83,10 @@ class PullMetricsRequestsFilter implements HttpServerFilter {
final contentType = response.headers.get(HttpHeaders.CONTENT_TYPE)
if( contentType && contentType in MANIFEST_TYPES ) {
final route = routeHelper.parse(request.path)
- CompletableFuture.supplyAsync(() -> metricsService.incrementPullsCounter(route.identity), executor)
+ CompletableFuture.runAsync(() -> metricsService.incrementPullsCounter(route.identity), executor)
final version = route.request?.containerConfig?.fusionVersion()
if (version) {
- CompletableFuture.supplyAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor)
+ CompletableFuture.runAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor)
}
}
}
diff --git a/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy b/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy
index f1301037e..46c9730d2 100644
--- a/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy
+++ b/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy
@@ -22,6 +22,7 @@ import java.net.http.HttpClient
import java.time.Duration
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
+import java.util.concurrent.locks.ReentrantLock
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
@@ -39,9 +40,9 @@ class HttpClientFactory {
static private Duration timeout = Duration.ofSeconds(20)
- static private final Object l1 = new Object()
+ static private final ReentrantLock l1 = new ReentrantLock()
- static private final Object l2 = new Object()
+ static private final ReentrantLock l2 = new ReentrantLock()
private static HttpClient client1
@@ -51,20 +52,26 @@ class HttpClientFactory {
static HttpClient followRedirectsHttpClient() {
if( client1!=null )
return client1
- synchronized (l1) {
+ l1.lock()
+ try {
if( client1!=null )
return client1
return client1=followRedirectsHttpClient0()
+ } finally {
+ l1.unlock()
}
}
static HttpClient neverRedirectsHttpClient() {
if( client2!=null )
return client2
- synchronized (l2) {
+ l2.lock()
+ try {
if( client2!=null )
return client2
return client2=neverRedirectsHttpClient0()
+ } finally {
+ l2.unlock()
}
}
diff --git a/src/main/groovy/io/seqera/wave/metrics/ExecutorsMetricsBinder.groovy b/src/main/groovy/io/seqera/wave/metrics/ExecutorsMetricsBinder.groovy
new file mode 100644
index 000000000..2f1a1000e
--- /dev/null
+++ b/src/main/groovy/io/seqera/wave/metrics/ExecutorsMetricsBinder.groovy
@@ -0,0 +1,73 @@
+/*
+ * Wave, containers provisioning service
+ * Copyright (c) 2023-2024, Seqera Labs
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package io.seqera.wave.metrics
+
+import java.lang.reflect.Field
+import java.util.concurrent.ForkJoinPool
+
+import groovy.transform.CompileStatic
+import groovy.util.logging.Slf4j
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics
+import io.micronaut.context.annotation.Context
+import jakarta.annotation.PostConstruct
+import jakarta.inject.Inject
+/**
+ * Register Micrometer metrics for ForkJoin commonPool and virtual threads scheduler
+ *
+ * @author Paolo Di Tommaso
+ */
+@Slf4j
+@Context
+@CompileStatic
+class ExecutorsMetricsBinder {
+
+ @Inject
+ private MeterRegistry registry
+
+ @PostConstruct
+ void register() {
+ log.info "+ Registering executor metrics binder"
+ registerCommonPoolMetrics(registry)
+ registerVirtualThreadPoolMetrics(registry)
+ }
+
+ void registerCommonPoolMetrics(MeterRegistry registry) {
+ final commonPool = ForkJoinPool.commonPool()
+ ExecutorServiceMetrics.monitor(registry, commonPool, "ForkJoin.commonPool")
+ }
+
+ void registerVirtualThreadPoolMetrics(MeterRegistry registry) {
+ try {
+ // Create a virtual thread executor
+ Class> VirtualThread = Class.forName("java.lang.VirtualThread");
+
+ // Use reflection to get the internal ForkJoinPool
+ Field poolField = VirtualThread.getDeclaredField("DEFAULT_SCHEDULER");
+ poolField.setAccessible(true);
+ ForkJoinPool virtualThreadPool = (ForkJoinPool) poolField.get(null);
+
+ // Register metrics for the virtual thread pool
+ ExecutorServiceMetrics.monitor(registry, virtualThreadPool, "ForkJoin.virtualPool")
+ }
+ catch (Exception e) {
+ log.warn "Unable to registry carrier threads pool metrics", e
+ }
+ }
+}
diff --git a/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy b/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy
index 7dbd76fd3..0a0d30f1a 100644
--- a/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy
+++ b/src/main/groovy/io/seqera/wave/model/ContainerCoordinates.groovy
@@ -53,7 +53,8 @@ class ContainerCoordinates implements ContainerPath {
static ContainerCoordinates parse(String path) {
if( !path )
throw new IllegalArgumentException("Container image name is not provided")
-
+ if( path.contains(' ') )
+ throw new IllegalArgumentException("Invalid container name - offending image: '$path'")
final scheme = StringUtils.getUrlProtocol(path)
if( scheme ) {
if( scheme!='oras') throw new IllegalArgumentException("Invalid container scheme: '$scheme' - offending image: '$path'")
diff --git a/src/main/groovy/io/seqera/wave/proxy/ErrResponse.groovy b/src/main/groovy/io/seqera/wave/proxy/ErrResponse.groovy
index 7283fb205..5905f539e 100644
--- a/src/main/groovy/io/seqera/wave/proxy/ErrResponse.groovy
+++ b/src/main/groovy/io/seqera/wave/proxy/ErrResponse.groovy
@@ -90,17 +90,17 @@ class ErrResponse implements HttpResponse {
}
static ErrResponse forString(String msg, HttpRequest request) {
- final head = HttpHeaders.of('Content-Type': ['text/plain'], {true})
+ final head = HttpHeaders.of('Content-Type': ['text/plain'], (a, b) -> true)
new ErrResponse(statusCode: 400, body: msg, request: request, uri: request.uri(), headers: head)
}
static ErrResponse forStream(String msg, HttpRequest request) {
- final head = HttpHeaders.of('Content-Type': ['text/plain'], {true})
+ final head = HttpHeaders.of('Content-Type': ['text/plain'], (a, b) -> true)
new ErrResponse(statusCode: 400, body: new ByteArrayInputStream(msg.bytes), request: request, uri: request.uri(), headers: head)
}
static ErrResponse forByteArray(String msg, HttpRequest request) {
- final head = HttpHeaders.of('Content-Type': ['text/plain'], {true})
+ final head = HttpHeaders.of('Content-Type': ['text/plain'], (a, b) -> true)
new ErrResponse(statusCode: 400, body: msg.bytes, request: request, uri: request.uri(), headers: head)
}
}
diff --git a/src/main/groovy/io/seqera/wave/ratelimit/AcquireRequest.groovy b/src/main/groovy/io/seqera/wave/ratelimit/AcquireRequest.groovy
index 5404af93f..ae6354245 100644
--- a/src/main/groovy/io/seqera/wave/ratelimit/AcquireRequest.groovy
+++ b/src/main/groovy/io/seqera/wave/ratelimit/AcquireRequest.groovy
@@ -35,7 +35,7 @@ class AcquireRequest {
/**
* Principal key to use in the search. Can be null
*/
- String userId
+ String user
/**
* Secondary key to use if principal is not present
diff --git a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy
index c7431642f..337e5656b 100644
--- a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy
+++ b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillWayStorageFactory.groovy
@@ -18,8 +18,6 @@
package io.seqera.wave.ratelimit.impl
-import javax.validation.constraints.NotNull
-
import com.coveo.spillway.storage.InMemoryStorage
import com.coveo.spillway.storage.LimitUsageStorage
import com.coveo.spillway.storage.RedisStorage
@@ -27,9 +25,10 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
-import io.seqera.wave.configuration.RateLimiterConfig
import io.seqera.wave.configuration.RedisConfig
+import jakarta.inject.Inject
import jakarta.inject.Singleton
+import jakarta.validation.constraints.NotNull
import redis.clients.jedis.JedisPool
/**
@@ -53,9 +52,7 @@ class SpillWayStorageFactory {
@Singleton
@Requires(property = 'redis.uri')
- LimitUsageStorage redisStorage(@NotNull RedisConfig redisConfig){
- log.info "Using redis $redisConfig.uri as storage for rate limit"
- def jedisPool = new JedisPool(redisConfig.uri)
- return RedisStorage.builder().withJedisPool(jedisPool).build()
+ LimitUsageStorage redisStorage(JedisPool pool){
+ return RedisStorage.builder().withJedisPool(pool).build()
}
}
diff --git a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillwayRateLimiter.groovy b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillwayRateLimiter.groovy
index 358e951dd..83ee12bc3 100644
--- a/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillwayRateLimiter.groovy
+++ b/src/main/groovy/io/seqera/wave/ratelimit/impl/SpillwayRateLimiter.groovy
@@ -18,8 +18,6 @@
package io.seqera.wave.ratelimit.impl
-import javax.validation.constraints.NotNull
-
import com.coveo.spillway.Spillway
import com.coveo.spillway.SpillwayFactory
import com.coveo.spillway.limit.Limit
@@ -34,6 +32,7 @@ import io.seqera.wave.exception.SlowDownException
import io.seqera.wave.ratelimit.AcquireRequest
import io.seqera.wave.ratelimit.RateLimiterService
import jakarta.inject.Singleton
+import jakarta.validation.constraints.NotNull
/**
* This class manage how many requests can be requested from an user during a configurable period
*
@@ -60,7 +59,7 @@ class SpillwayRateLimiter implements RateLimiterService {
init(storage, config)
}
- protected void init(@NotNull LimitUsageStorage storage, @NotNull RateLimiterConfig config){
+ protected void init(LimitUsageStorage storage, RateLimiterConfig config){
SpillwayFactory spillwayFactory = new SpillwayFactory(storage)
initBuilds(spillwayFactory, config)
initPulls(spillwayFactory, config)
@@ -69,20 +68,20 @@ class SpillwayRateLimiter implements RateLimiterService {
@Override
void acquireBuild(AcquireRequest request) throws SlowDownException {
- Spillway resource = request.userId ? authsBuilds : anonymousBuilds
- String key = request.userId ?: request.ip
+ Spillway resource = request.user ? authsBuilds : anonymousBuilds
+ String key = request.user ?: request.ip
if (!resource.tryCall(key)) {
- final prefix = request.userId ? 'user' : 'IP'
+ final prefix = request.user ? 'user' : 'IP'
throw new SlowDownException("Request exceeded build rate limit for $prefix $key")
}
}
@Override
void acquirePull(AcquireRequest request) throws SlowDownException {
- Spillway resource = request.userId ? authsPulls : anonymousPulls
- String key = request.userId ?: request.ip
+ Spillway resource = request.user ? authsPulls : anonymousPulls
+ String key = request.user ?: request.ip
if (!resource.tryCall(key)) {
- final prefix = request.userId ? 'user' : 'IP'
+ final prefix = request.user ? 'user' : 'IP'
throw new SlowDownException("Request exceeded pull rate limit for $prefix $key")
}
}
diff --git a/src/main/groovy/io/seqera/wave/redis/JedisPoolMetricsBinder.groovy b/src/main/groovy/io/seqera/wave/redis/JedisPoolMetricsBinder.groovy
new file mode 100644
index 000000000..e1e8a9d83
--- /dev/null
+++ b/src/main/groovy/io/seqera/wave/redis/JedisPoolMetricsBinder.groovy
@@ -0,0 +1,60 @@
+/*
+ * Wave, containers provisioning service
+ * Copyright (c) 2023-2024, Seqera Labs
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package io.seqera.wave.redis
+
+import groovy.transform.CompileStatic;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.binder.MeterBinder;
+import redis.clients.jedis.JedisPool;
+
+/**
+ * Implements {@link MeterBinder} for {@link redis.clients.jedis.JedisPool}
+ *
+ * @author Paolo Di Tommaso
+ */
+@CompileStatic
+class JedisPoolMetricsBinder implements MeterBinder {
+
+ private final JedisPool pool
+
+ JedisPoolMetricsBinder(JedisPool pool) {
+ this.pool = pool;
+ }
+
+ @Override
+ void bindTo(MeterRegistry registry) {
+ registry.gauge("jedis.pool.active", pool, JedisPool::getNumActive);
+ registry.gauge("jedis.pool.idle", pool, JedisPool::getNumIdle);
+ registry.gauge("jedis.pool.waiters", pool, JedisPool::getNumWaiters);
+
+ // Connection lifecycle metrics
+ registry.gauge("jedis.pool.created", pool, JedisPool::getCreatedCount);
+ registry.gauge("jedis.pool.destroyed", pool, JedisPool::getDestroyedCount);
+
+ // Borrow/Return statistics
+ registry.gauge("jedis.pool.borrowed", pool, JedisPool::getBorrowedCount);
+ registry.gauge("jedis.pool.returned", pool, JedisPool::getReturnedCount);
+
+ // Additional metrics (resets, evictions, etc.)
+ registry.gauge("jedis.pool.max.borrow.wait.millis", pool, (p)-> p.maxBorrowWaitDuration.toMillis() as double)
+ registry.gauge("jedis.pool.mean.borrow.wait.millis", pool, (p)-> p.meanBorrowWaitDuration.toMillis() as double)
+ registry.gauge("jedis.pool.mean.active.millis", pool, (p)-> p.meanActiveDuration.toMillis() as double)
+ registry.gauge("jedis.pool.mean.idle.millis", pool, (p)-> p.meanIdleDuration.toMillis() as double)
+ }
+}
diff --git a/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy b/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy
index c11ff2e85..f17c330c1 100644
--- a/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy
+++ b/src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy
@@ -20,12 +20,19 @@ package io.seqera.wave.redis
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
+import io.micrometer.core.instrument.MeterRegistry
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
+import io.micronaut.core.annotation.Nullable
+import jakarta.inject.Inject
import jakarta.inject.Singleton
+import redis.clients.jedis.DefaultJedisClientConfig
+import redis.clients.jedis.JedisClientConfig
import redis.clients.jedis.JedisPool
import redis.clients.jedis.JedisPoolConfig
+import redis.clients.jedis.exceptions.InvalidURIException
+import redis.clients.jedis.util.JedisURIHelper
/**
* Redis connection pool factory
*
@@ -37,19 +44,50 @@ import redis.clients.jedis.JedisPoolConfig
@CompileStatic
class RedisFactory {
+ @Inject
+ private MeterRegistry meterRegistry
+
@Singleton
JedisPool createRedisPool(
- @Value('${redis.uri}') String uri,
+ @Value('${redis.uri}') String connection,
@Value('${redis.pool.minIdle:0}') int minIdle,
@Value('${redis.pool.maxIdle:10}') int maxIdle,
- @Value('${redis.pool.maxTotal:50}') int maxTotal
+ @Value('${redis.pool.maxTotal:50}') int maxTotal,
+ @Value('${redis.client.timeout:5000}') int timeout,
+ @Nullable @Value('${redis.password}') String password
) {
- log.info "Using redis $uri as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}"
+ log.info "Using redis ${connection} as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}; timeout: ${timeout}"
+
+ final uri = URI.create(connection)
+ // pool config
final config = new JedisPoolConfig()
config.setMinIdle(minIdle)
config.setMaxIdle(maxIdle)
config.setMaxTotal(maxTotal)
- return new JedisPool(config, URI.create(uri))
+ // client config
+ final clientConfig = clientConfig(uri, password, timeout)
+ // create the jedis pool
+ final result = new JedisPool(config, JedisURIHelper.getHostAndPort(uri), clientConfig)
+ // Instrument the internal pool
+ new JedisPoolMetricsBinder(result).bindTo(meterRegistry);
+ // final return the jedis pool
+ return result
+ }
+
+ protected JedisClientConfig clientConfig(URI uri, String password, int timeout) {
+ if (!JedisURIHelper.isValid(uri)) {
+ throw new InvalidURIException("Invalid Redis connection URI: ${uri}")
+ }
+
+ return DefaultJedisClientConfig.builder().connectionTimeoutMillis(timeout)
+ .socketTimeoutMillis(timeout)
+ .blockingSocketTimeoutMillis(timeout)
+ .user(JedisURIHelper.getUser(uri))
+ .password(password?:JedisURIHelper.getPassword(uri))
+ .database(JedisURIHelper.getDBIndex(uri))
+ .protocol(JedisURIHelper.getRedisProtocol(uri))
+ .ssl(JedisURIHelper.isRedisSSLScheme(uri))
+ .build()
}
}
diff --git a/src/main/groovy/io/seqera/wave/service/account/AccountServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/account/AccountServiceImpl.groovy
index 3cccda9ab..f8d6bb4f1 100644
--- a/src/main/groovy/io/seqera/wave/service/account/AccountServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/account/AccountServiceImpl.groovy
@@ -36,7 +36,7 @@ import jakarta.annotation.PostConstruct
@ConfigurationProperties('wave')
class AccountServiceImpl implements AccountService {
- private Map accounts = Map.of()
+ Map accounts = Map.of()
@PostConstruct
private dumpAccounts() {
diff --git a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
index 17fdb3720..3db479f4b 100644
--- a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
+++ b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
@@ -18,16 +18,21 @@
package io.seqera.wave.service.aws
+import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
-import com.google.common.cache.CacheBuilder
-import com.google.common.cache.CacheLoader
-import com.google.common.cache.LoadingCache
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache
+import com.github.benmanes.caffeine.cache.CacheLoader
+import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
+import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.util.StringUtils
+import jakarta.annotation.PostConstruct
+import jakarta.inject.Inject
+import jakarta.inject.Named
import jakarta.inject.Singleton
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
@@ -73,12 +78,22 @@ class AwsEcrService {
}
}
- private LoadingCache cache = CacheBuilder
- .newBuilder()
- .maximumSize(10_000)
- .expireAfterWrite(3, TimeUnit.HOURS)
- .build(loader)
-
+ @Inject
+ @Named(TaskExecutors.BLOCKING)
+ private ExecutorService ioExecutor
+
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ private AsyncLoadingCache cache
+
+ @PostConstruct
+ private void init() {
+ cache = Caffeine
+ .newBuilder()
+ .maximumSize(10_000)
+ .expireAfterWrite(3, TimeUnit.HOURS)
+ .executor(ioExecutor)
+ .buildAsync(loader)
+ }
private EcrClient ecrClient(String accessKey, String secretKey, String region) {
EcrClient.builder()
@@ -126,7 +141,8 @@ class AwsEcrService {
try {
// get the token from the cache, if missing the it's automatically
// fetch using the AWS ECR client
- return cache.get(new AwsCreds(accessKey,secretKey,region,isPublic))
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ return cache.synchronous().get(new AwsCreds(accessKey,secretKey,region,isPublic))
}
catch (Exception e) {
final type = isPublic ? "ECR public" : "ECR"
diff --git a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy
index 606a4ecd3..c6c3b61d7 100644
--- a/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/blob/impl/BlobCacheServiceImpl.groovy
@@ -17,9 +17,6 @@
*/
package io.seqera.wave.service.blob.impl
-import java.net.http.HttpClient
-import java.net.http.HttpRequest
-import java.net.http.HttpResponse
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
@@ -29,7 +26,6 @@ import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.configuration.HttpClientConfig
import io.seqera.wave.core.RegistryProxyService
import io.seqera.wave.core.RoutePath
-import io.seqera.wave.http.HttpClientFactory
import io.seqera.wave.service.blob.BlobCacheService
import io.seqera.wave.service.blob.BlobEntry
import io.seqera.wave.service.blob.BlobSigningService
@@ -38,14 +34,16 @@ import io.seqera.wave.service.job.JobHandler
import io.seqera.wave.service.job.JobService
import io.seqera.wave.service.job.JobSpec
import io.seqera.wave.service.job.JobState
+import io.seqera.wave.util.BucketTokenizer
import io.seqera.wave.util.Escape
-import io.seqera.wave.util.Retryable
import io.seqera.wave.util.StringUtils
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Named
import jakarta.inject.Singleton
-import static io.seqera.wave.WaveDefault.HTTP_SERVER_ERRORS
+import software.amazon.awssdk.services.s3.S3Client
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest
+import software.amazon.awssdk.services.s3.model.S3Exception
/**
* Implements cache for container image layer blobs
*
@@ -79,11 +77,12 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler {
@Inject
private HttpClientConfig httpConfig
- private HttpClient httpClient
+ @Inject
+ @Named('BlobS3Client')
+ private S3Client s3Client
@PostConstruct
private void init() {
- httpClient = HttpClientFactory.followRedirectsHttpClient()
log.info "Creating Blob cache service - $blobConfig"
}
@@ -98,7 +97,7 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler {
// therefore it's safe to check and return directly
// if it exists (no risk of returning a partial upload)
// https://developers.cloudflare.com/r2/reference/consistency/
- if( blobExists(info.locationUri) && !debug ) {
+ if( blobExists(info.objectUri) && !debug ) {
log.debug "== Blob cache exists for object '${info.locationUri}'"
return info.cached()
}
@@ -113,21 +112,24 @@ class BlobCacheServiceImpl implements BlobCacheService, JobHandler {
return result?.withLocation(locationUri)
}
- protected boolean blobExists(String uri) {
- final request = HttpRequest
- .newBuilder(new URI(uri))
- .method("HEAD", HttpRequest.BodyPublishers.noBody())
- .build()
-
- // retry strategy
- final retryable = Retryable
- .>of(httpConfig)
- .retryIf((response) -> response.statusCode() in HTTP_SERVER_ERRORS)
- .onRetry((event) -> log.warn("Unable to connect '$uri' - event: $event"))
-
- // submit the request
- final resp = retryable.apply(()-> httpClient.send(request, HttpResponse.BodyHandlers.ofString()))
- return resp.statusCode() == 200
+ protected boolean blobExists(String blobLocation) {
+ try {
+ final object = BucketTokenizer.from(blobLocation)
+ final request = HeadObjectRequest
+ .builder()
+ .bucket(object.bucket)
+ .key(object.key)
+ .build() as HeadObjectRequest
+ // Execute the request
+ s3Client.headObject(request)
+ return true
+ }
+ catch (S3Exception e) {
+ if (e.statusCode() != 404) {
+ log.error "Unexpected response=${e.statusCode()} checking existence for object=${blobLocation} - cause: ${e.message}"
+ }
+ return false
+ }
}
/**
diff --git a/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy
index 0d36875d5..63793b1f8 100644
--- a/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy
@@ -29,7 +29,6 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.event.ApplicationEventPublisher
import io.micronaut.core.annotation.Nullable
-import io.micronaut.runtime.event.annotation.EventListener
import io.micronaut.scheduling.TaskExecutors
import io.seqera.wave.api.BuildContext
import io.seqera.wave.auth.RegistryCredentialsProvider
@@ -191,8 +190,7 @@ class ContainerBuildServiceImpl implements ContainerBuildService, JobHandler metricsService.incrementBuildsCounter(request.identity), executor)
+ CompletableFuture
+ .runAsync(() -> metricsService.incrementBuildsCounter(request.identity), executor)
// launch the build async
CompletableFuture
@@ -326,16 +325,14 @@ class ContainerBuildServiceImpl implements ContainerBuildService, JobHandler implements FutureStore {
private volatile Duration pollInterval
@Inject
- @Named('future-store-executor')
+ @Named(TaskExecutors.BLOCKING)
private ExecutorService executor
AbstractFutureStore(FutureHash store, EncodingStrategy encodingStrategy) {
diff --git a/src/main/groovy/io/seqera/wave/service/data/future/impl/RedisFutureHash.groovy b/src/main/groovy/io/seqera/wave/service/data/future/impl/RedisFutureHash.groovy
index 0541486ef..5a6870dce 100644
--- a/src/main/groovy/io/seqera/wave/service/data/future/impl/RedisFutureHash.groovy
+++ b/src/main/groovy/io/seqera/wave/service/data/future/impl/RedisFutureHash.groovy
@@ -50,7 +50,7 @@ class RedisFutureHash implements FutureHash {
@Override
void put(String key, String value, Duration expiration) {
try (Jedis conn = pool.getResource()) {
- final params = new SetParams().ex(expiration.toSeconds())
+ final params = new SetParams().px(expiration.toMillis())
conn.set(key, value, params)
}
}
diff --git a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy
index 0f6ddc9a0..feb9f2508 100644
--- a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy
+++ b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy
@@ -20,11 +20,12 @@ package io.seqera.wave.service.data.queue
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
-import com.google.common.cache.Cache
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.AsyncCache
+import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.websocket.exceptions.WebSocketSessionException
@@ -60,21 +61,28 @@ abstract class AbstractMessageQueue implements Runnable {
final private String name0
- final private Cache closedClients = CacheBuilder
- .newBuilder()
- .expireAfterWrite(10, TimeUnit.MINUTES)
- .build()
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ final private AsyncCache closedClients
- AbstractMessageQueue(MessageQueue broker) {
+ AbstractMessageQueue(MessageQueue broker, ExecutorService ioExecutor) {
final type = TypeHelper.getGenericType(this, 0)
this.encoder = new MoshiEncodeStrategy(type) {}
this.broker = broker
+ this.closedClients = createCache(ioExecutor)
this.name0 = name() + '-thread-' + count.getAndIncrement()
this.thread = new Thread(this, name0)
this.thread.setDaemon(true)
this.thread.start()
}
+ private AsyncCache createCache(ExecutorService ioExecutor) {
+ Caffeine
+ .newBuilder()
+ .executor(ioExecutor)
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .buildAsync()
+ }
+
protected abstract String name()
protected abstract Duration pollInterval()
@@ -149,13 +157,15 @@ abstract class AbstractMessageQueue implements Runnable {
@Override
void run() {
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ final clientsCache = closedClients.synchronous()
while( !thread.isInterrupted() ) {
try {
int sent=0
final clients = new HashMap>(this.clients)
for( Map.Entry> entry : clients ) {
// ignore clients marked as closed
- if( closedClients.getIfPresent(entry.key))
+ if( clientsCache.getIfPresent(entry.key))
continue
// infer the target queue from the client key
final target = targetFromClientKey(entry.key)
@@ -173,7 +183,7 @@ abstract class AbstractMessageQueue implements Runnable {
// offer back the value to be processed again
broker.offer(target, value)
if( e.message?.contains('close') ) {
- closedClients.put(entry.key, true)
+ clientsCache.put(entry.key, true)
}
}
}
diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy
index 560a81306..ae16d7140 100644
--- a/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy
+++ b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy
@@ -98,6 +98,10 @@ abstract class AbstractMessageStream implements Closeable {
* The {@link Predicate} to be invoked when a stream message is consumed (read from) the stream.
*/
void addConsumer(String streamId, MessageConsumer consumer) {
+ // the use of synchronized block is meant to prevent a race condition while
+ // updating the 'listeners' from concurrent invocations.
+ // however, considering the addConsumer is invoked during the initialization phase
+ // (and therefore in the same thread) in should not be really needed.
synchronized (listeners) {
if( listeners.containsKey(streamId))
throw new IllegalStateException("Only one consumer can be defined for each stream - offending streamId=$streamId; consumer=$consumer")
diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy
index 609b71061..c417568b3 100644
--- a/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy
+++ b/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy
@@ -62,6 +62,9 @@ class RedisMessageStream implements MessageStream {
@Value('${wave.message-stream.claim-timeout:5s}')
private Duration claimTimeout
+ @Value('${wave.message-stream.consume-warn-timeout-millis:4000}')
+ private long consumeWarnTimeoutMillis
+
private String consumerName
@PostConstruct
@@ -102,11 +105,17 @@ class RedisMessageStream implements MessageStream {
@Override
boolean consume(String streamId, MessageConsumer consumer) {
try (Jedis jedis = pool.getResource()) {
+ String msg
+ final long begin = System.currentTimeMillis()
final entry = claimMessage(jedis,streamId) ?: readMessage(jedis, streamId)
- if( entry && consumer.accept(entry.getFields().get(DATA_FIELD)) ) {
+ if( entry && consumer.accept(msg=entry.getFields().get(DATA_FIELD)) ) {
final tx = jedis.multi()
// acknowledge the entry has been processed so that it cannot be claimed anymore
tx.xack(streamId, CONSUMER_GROUP_NAME, entry.getID())
+ final delta = System.currentTimeMillis()-begin
+ if( delta>consumeWarnTimeoutMillis ) {
+ log.warn "Redis message stream - consume processing took ${Duration.ofMillis(delta)} - offending entry=${entry.getID()}; message=${msg}"
+ }
// this remove permanently the entry from the stream
tx.xdel(streamId, entry.getID())
tx.exec()
diff --git a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy
index 1c4ec6c03..16a3c52e7 100644
--- a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy
+++ b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy
@@ -20,14 +20,19 @@ package io.seqera.wave.service.job
import java.time.Duration
import java.time.Instant
+import java.util.concurrent.ExecutorService
+import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Context
+import io.micronaut.scheduling.TaskExecutors
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
+import jakarta.inject.Named
+
/**
* Implement the logic to handle Blob cache transfer (uploads)
*
@@ -50,16 +55,34 @@ class JobManager {
@Inject
private JobConfig config
- private Cache debounceCache
+ @Inject
+ @Named(TaskExecutors.BLOCKING)
+ private ExecutorService ioExecutor
+
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ private AsyncCache debounceCache
@PostConstruct
void init() {
log.info "Creating job manager - config=$config"
- debounceCache = Caffeine.newBuilder().expireAfterWrite(config.graceInterval.multipliedBy(2)).build()
+ debounceCache = Caffeine
+ .newBuilder()
+ .expireAfterWrite(config.graceInterval.multipliedBy(2))
+ .executor(ioExecutor)
+ .buildAsync()
queue.addConsumer((job)-> processJob(job))
}
-
+ /**
+ * Process a job entry aorrding the state modelled by the {@link JobSpec} object.
+ *
+ * @param jobSpec
+ * A {@link JobSpec} object representing the job to be processed
+ * @return
+ * {@code true} to signal the process has been processed successfully and it should
+ * be removed from the underlying queue, or {@code false} if the job execution has
+ * not yet completed.
+ */
protected boolean processJob(JobSpec jobSpec) {
try {
return processJob0(jobSpec)
@@ -73,7 +96,8 @@ class JobManager {
}
protected JobState state(JobSpec job) {
- return state0(job, config.graceInterval, debounceCache)
+ // FIXME https://github.com/seqeralabs/wave/issues/747
+ return state0(job, config.graceInterval, debounceCache.synchronous())
}
protected JobState state0(final JobSpec job, final Duration graceInterval, final Cache cache) {
diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy
index 0d8de9015..ccaedec07 100644
--- a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy
+++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy
@@ -23,11 +23,9 @@ import java.time.Duration
import io.kubernetes.client.openapi.models.V1Job
import io.kubernetes.client.openapi.models.V1Pod
-import io.kubernetes.client.openapi.models.V1PodList
import io.seqera.wave.configuration.BlobCacheConfig
-import io.seqera.wave.configuration.ScanConfig
import io.seqera.wave.configuration.MirrorConfig
-
+import io.seqera.wave.configuration.ScanConfig
/**
* Defines Kubernetes operations
*
@@ -43,23 +41,6 @@ interface K8sService {
void deletePod(String name)
- @Deprecated
- V1Pod buildContainer(String name, String containerImage, List args, Path workDir, Path creds, Duration timeout, Map nodeSelector)
-
- @Deprecated
- V1Pod scanContainer(String name, String containerImage, List args, Path workDir, Path creds, ScanConfig scanConfig, Map nodeSelector)
-
- @Deprecated
- Integer waitPodCompletion(V1Pod pod, long timeout)
-
- @Deprecated
- void deletePodWhenReachStatus(String podName, String statusName, long timeout)
-
- @Deprecated
- V1Job createJob(String name, String containerImage, List args)
-
- V1Job getJob(String name)
-
JobStatus getJobStatus(String name)
void deleteJob(String name)
@@ -72,9 +53,6 @@ interface K8sService {
V1Job launchMirrorJob(String name, String containerImage, List args, Path workDir, Path creds, MirrorConfig config)
- @Deprecated
- V1PodList waitJob(V1Job job, Long timeout)
-
V1Pod getLatestPodForJob(String jobName)
}
diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy
index ac46c912d..199c788ac 100644
--- a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy
@@ -22,12 +22,10 @@ import java.nio.file.Path
import java.time.Duration
import javax.annotation.PostConstruct
-import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.kubernetes.client.custom.Quantity
import io.kubernetes.client.openapi.models.V1ContainerBuilder
-import io.kubernetes.client.openapi.models.V1DeleteOptions
import io.kubernetes.client.openapi.models.V1EnvVar
import io.kubernetes.client.openapi.models.V1HostPathVolumeSource
import io.kubernetes.client.openapi.models.V1Job
@@ -36,7 +34,6 @@ import io.kubernetes.client.openapi.models.V1JobStatus
import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource
import io.kubernetes.client.openapi.models.V1Pod
import io.kubernetes.client.openapi.models.V1PodBuilder
-import io.kubernetes.client.openapi.models.V1PodList
import io.kubernetes.client.openapi.models.V1ResourceRequirements
import io.kubernetes.client.openapi.models.V1Volume
import io.kubernetes.client.openapi.models.V1VolumeMount
@@ -46,9 +43,9 @@ import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.configuration.BuildConfig
+import io.seqera.wave.configuration.MirrorConfig
import io.seqera.wave.configuration.ScanConfig
import io.seqera.wave.core.ContainerPlatform
-import io.seqera.wave.configuration.MirrorConfig
import io.seqera.wave.service.scan.Trivy
import jakarta.inject.Inject
import jakarta.inject.Singleton
@@ -136,61 +133,6 @@ class K8sServiceImpl implements K8sService {
}
}
- /**
- * Create a K8s job with the specified name
- *
- * @param name
- * The K8s job name. It must be unique
- * @param containerImage
- * The container image to be used to run the job
- * @param args
- * The command to be executed by the job
- * @return
- * An instance of {@link V1Job}
- */
- @Override
- @CompileDynamic
- @Deprecated
- V1Job createJob(String name, String containerImage, List args) {
-
- V1Job body = new V1JobBuilder()
- .withNewMetadata()
- .withNamespace(namespace)
- .withName(name)
- .endMetadata()
- .withNewSpec()
- .withBackoffLimit(0)
- .withNewTemplate()
- .editOrNewSpec()
- .addNewContainer()
- .withName(name)
- .withImage(containerImage)
- .withArgs(args)
- .endContainer()
- .withRestartPolicy("Never")
- .endSpec()
- .endTemplate()
- .endSpec()
- .build()
-
- return k8sClient
- .batchV1Api()
- .createNamespacedJob(namespace, body, null, null, null,null)
- }
-
- /**
- * Get a Jobs Job.
- *
- * @param name The job name
- * @return An instance of {@link V1Job}
- */
- @Override
- V1Job getJob(String name) {
- k8sClient
- .batchV1Api()
- .readNamespacedJob(name, namespace, null)
- }
-
/**
* Get a Job status
*
@@ -201,7 +143,8 @@ class K8sServiceImpl implements K8sService {
JobStatus getJobStatus(String name) {
final job = k8sClient
.batchV1Api()
- .readNamespacedJob(name, namespace, null)
+ .readNamespacedJob(name, namespace)
+ .execute()
if( !job ) {
log.warn "K8s job=$name - unknown"
return null
@@ -235,7 +178,8 @@ class K8sServiceImpl implements K8sService {
V1Pod getPod(String name) {
return k8sClient
.coreV1Api()
- .readNamespacedPod(name, namespace, null)
+ .readNamespacedPod(name, namespace)
+ .execute()
}
/**
@@ -307,31 +251,7 @@ class K8sServiceImpl implements K8sService {
.subPath(rel)
}
- /**
- * Create a container for container image building via buildkit
- *
- * @param name
- * The name of pod
- * @param containerImage
- * The container image to be used
- * @param args
- * The build command to be performed
- * @param workDir
- * The build context directory
- * @param creds
- * The target container repository credentials
- * @return
- * The {@link V1Pod} description the submitted pod
- */
- @Override
@Deprecated
- V1Pod buildContainer(String name, String containerImage, List args, Path workDir, Path creds, Duration timeout, Map nodeSelector) {
- final spec = buildSpec(name, containerImage, args, workDir, creds, timeout, nodeSelector)
- return k8sClient
- .coreV1Api()
- .createNamespacedPod(namespace, spec, null, null, null,null)
- }
-
V1Pod buildSpec(String name, String containerImage, List args, Path workDir, Path credsFile, Duration timeout, Map nodeSelector) {
// dirty dependency to avoid introducing another parameter
@@ -374,7 +294,6 @@ class K8sServiceImpl implements K8sService {
.withRestartPolicy("Never")
.addAllToVolumes(volumes)
-
final requests = new V1ResourceRequirements()
if( requestsCpu )
requests.putRequestsItem('cpu', new Quantity(requestsCpu))
@@ -408,47 +327,6 @@ class K8sServiceImpl implements K8sService {
builder.build()
}
- /**
- * Wait for a pod a completion.
- *
- * NOTE: this method assumes the pod is running exactly *one* container.
- *
- * @param pod
- * The pod name
- * @param timeout
- * Max wait time in milliseconds
- * @return
- * An Integer value representing the container exit code or {@code null} if the state cannot be determined
- * or timeout was reached.
- */
- @Override
- @Deprecated
- Integer waitPodCompletion(V1Pod pod, long timeout) {
- final start = System.currentTimeMillis()
- // wait for termination
- while( true ) {
- final phase = pod.status?.phase
- if( phase && phase != 'Pending' ) {
- final status = pod.status.containerStatuses.first()
- if( !status )
- return null
- if( !status.state )
- return null
- if( status.state.terminated ) {
- return status.state.terminated.exitCode
- }
- }
-
- if( phase == 'Failed' )
- return null
- final delta = System.currentTimeMillis()-start
- if( delta > timeout )
- return null
- sleep 5_000
- pod = getPod(pod.metadata.name)
- }
- }
-
/**
* Fetch the logs of a pod.
*
@@ -478,37 +356,8 @@ class K8sServiceImpl implements K8sService {
void deletePod(String name) {
k8sClient
.coreV1Api()
- .deleteNamespacedPod(name, namespace, (String)null, (String)null, (Integer)null, (Boolean)null, (String)null, (V1DeleteOptions)null)
- }
-
- /**
- * Delete a pod where the status is reached
- *
- * @param name The name of the pod to be deleted
- * @param statusName The status to be reached
- * @param timeout The max wait time in milliseconds
- */
- @Override
- @Deprecated
- void deletePodWhenReachStatus(String podName, String statusName, long timeout){
- final pod = getPod(podName)
- final start = System.currentTimeMillis()
- while( (System.currentTimeMillis() - start) < timeout ) {
- if( pod?.status?.phase == statusName ) {
- deletePod(podName)
- return
- }
- sleep 5_000
- }
- }
-
- @Override
- @Deprecated
- V1Pod scanContainer(String name, String containerImage, List args, Path workDir, Path creds, ScanConfig scanConfig, Map nodeSelector) {
- final spec = scanSpec(name, containerImage, args, workDir, creds, scanConfig, nodeSelector)
- return k8sClient
- .coreV1Api()
- .createNamespacedPod(namespace, spec, null, null, null,null)
+ .deleteNamespacedPod(name, namespace)
+ .execute()
}
@Deprecated
@@ -543,7 +392,6 @@ class K8sServiceImpl implements K8sService {
.withRestartPolicy("Never")
.addAllToVolumes(volumes)
-
final requests = new V1ResourceRequirements()
if( scanConfig.requestsCpu )
requests.putRequestsItem('cpu', new Quantity(scanConfig.requestsCpu))
@@ -583,7 +431,8 @@ class K8sServiceImpl implements K8sService {
return k8sClient
.batchV1Api()
- .createNamespacedJob(namespace, spec, null, null, null,null)
+ .createNamespacedJob(namespace, spec)
+ .execute()
}
V1Job createTransferJobSpec(String name, String containerImage, List args, BlobCacheConfig blobConfig) {
@@ -646,7 +495,8 @@ class K8sServiceImpl implements K8sService {
final spec = buildJobSpec(name, containerImage, args, workDir, creds, timeout, nodeSelector)
return k8sClient
.batchV1Api()
- .createNamespacedJob(namespace, spec, null, null, null,null)
+ .createNamespacedJob(namespace, spec)
+ .execute()
}
V1Job buildJobSpec(String name, String containerImage, List args, Path workDir, Path credsFile, Duration timeout, Map nodeSelector) {
@@ -735,7 +585,8 @@ class K8sServiceImpl implements K8sService {
final spec = scanJobSpec(name, containerImage, args, workDir, creds, scanConfig)
return k8sClient
.batchV1Api()
- .createNamespacedJob(namespace, spec, null, null, null,null)
+ .createNamespacedJob(namespace, spec)
+ .execute()
}
V1Job scanJobSpec(String name, String containerImage, List args, Path workDir, Path credsFile, ScanConfig scanConfig) {
@@ -784,8 +635,11 @@ class K8sServiceImpl implements K8sService {
.withVolumeMounts(mounts)
.withResources(requests)
- if( scanConfig.githubToken ) {
- container.withEnv(new V1EnvVar().name('GITHUB_TOKEN').value(scanConfig.githubToken))
+ final env = scanConfig.environmentAsTuples
+ for( Tuple2 entry : env ) {
+ final String k = entry.v1
+ final String v = entry.v2
+ container.addToEnv(new V1EnvVar().name(k).value(v))
}
// spec section
@@ -799,7 +653,8 @@ class K8sServiceImpl implements K8sService {
final spec = mirrorJobSpec(name, containerImage, args, workDir, creds, config)
return k8sClient
.batchV1Api()
- .createNamespacedJob(namespace, spec, null, null, null,null)
+ .createNamespacedJob(namespace, spec)
+ .execute()
}
V1Job mirrorJobSpec(String name, String containerImage, List args, Path workDir, Path credsFile, MirrorConfig config) {
@@ -862,33 +717,6 @@ class K8sServiceImpl implements K8sService {
return result
}
- /**
- * Wait for a job to complete
- *
- * @param k8s job
- * @param timeout
- * Max wait time in milliseconds
- * @return list of pods created by the job
- */
- @Deprecated
- @Override
- V1PodList waitJob(V1Job job, Long timeout) {
- sleep 5_000
- final startTime = System.currentTimeMillis()
- // wait for termination
- while (System.currentTimeMillis() - startTime < timeout) {
- final name = job.metadata.name
- final status = getJobStatus(name)
- if (status != JobStatus.Pending) {
- return k8sClient
- .coreV1Api()
- .listNamespacedPod(namespace, null, null, null, null, "job-name=$name", null, null, null, null, null, null)
- }
- job = getJob(name)
- }
- return null
- }
-
/**
* Delete a job
*
@@ -898,7 +726,9 @@ class K8sServiceImpl implements K8sService {
void deleteJob(String name) {
k8sClient
.batchV1Api()
- .deleteNamespacedJob(name, namespace, null, null, null, null,"Foreground", null)
+ .deleteNamespacedJob(name, namespace)
+ .propagationPolicy("Foreground")
+ .execute()
}
@Override
@@ -906,7 +736,9 @@ class K8sServiceImpl implements K8sService {
// list all pods for the given job
final allPods = k8sClient
.coreV1Api()
- .listNamespacedPod(namespace, null, null, null, null, "job-name=${jobName}", null, null, null, null, null, null)
+ .listNamespacedPod(namespace)
+ .labelSelector("job-name=${jobName}")
+ .execute()
if( !allPods || !allPods.items )
return null
diff --git a/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy
index 8be138dd1..5fadd46c9 100644
--- a/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/logs/BuildLogServiceImpl.groovy
@@ -80,7 +80,7 @@ class BuildLogServiceImpl implements BuildLogService {
@Inject
@Named(TaskExecutors.IO)
- private volatile ExecutorService ioExecutor
+ private ExecutorService ioExecutor
@PostConstruct
private void init() {
diff --git a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy
index 8d061a86e..1126fe87d 100644
--- a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy
+++ b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy
@@ -75,7 +75,7 @@ class ContainerMirrorServiceImpl implements ContainerMirrorService, JobHandler metricsService.incrementMirrorsCounter(request.identity), ioExecutor)
+ CompletableFuture.runAsync(() -> metricsService.incrementMirrorsCounter(request.identity), ioExecutor)
jobService.launchMirror(request)
return new BuildTrack(request.mirrorId, request.targetImage, false, null)
}
@@ -124,9 +124,14 @@ class ContainerMirrorServiceImpl implements ContainerMirrorService, JobHandler {
PairingOutboundQueue(
MessageQueue broker,
- @Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval
+ @Value('${wave.pairing.channel.awaitTimeout:100ms}') Duration pollInterval,
+ @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor
) {
- super(broker)
+ super(broker, ioExecutor)
this.pollInterval = pollInterval
}
diff --git a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy
index 1605e3b96..2f83ed648 100644
--- a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy
+++ b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy
@@ -25,6 +25,8 @@ import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.client.exceptions.HttpClientResponseException
+import io.micronaut.scheduling.TaskExecutors
+import io.micronaut.scheduling.annotation.ExecuteOn
import io.micronaut.websocket.CloseReason
import io.micronaut.websocket.WebSocketSession
import io.micronaut.websocket.annotation.OnClose
@@ -47,6 +49,7 @@ import static io.seqera.wave.util.LongRndKey.rndHex
@Slf4j
@CompileStatic
@Singleton
+@ExecuteOn(TaskExecutors.BLOCKING)
@ServerWebSocket("/pairing/{service}/token/{token}{?endpoint}")
class PairingWebSocket {
@@ -76,7 +79,7 @@ class PairingWebSocket {
// Register the client and the sender callback that it's needed to deliver
// the message to the remote client
channel.registerClient(service, endpoint, session.id,(pairingMessage) -> {
- log.trace "Websocket send message id=$pairingMessage.msgId"
+ log.trace "Sendind message=${pairingMessage} - endpoint: ${endpoint} [sessionId: $session.id]"
session .sendAsync(pairingMessage)
})
diff --git a/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy b/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy
index 0cccdde92..91a23523f 100644
--- a/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy
+++ b/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy
@@ -40,7 +40,7 @@ interface PersistenceService {
*
* @param build A {@link WaveBuildRecord} object
*/
- void saveBuild(WaveBuildRecord build)
+ void saveBuildAsync(WaveBuildRecord build)
/**
* Retrieve a {@link WaveBuildRecord} object for the given build id
@@ -80,7 +80,7 @@ interface PersistenceService {
*
* @param data A {@link WaveContainerRecord} object representing a Wave request record
*/
- void saveContainerRequest(WaveContainerRecord data)
+ void saveContainerRequestAsync(WaveContainerRecord data)
/**
* Update a container request with the digest of the resolved request
@@ -88,7 +88,7 @@ interface PersistenceService {
* @param token The request unique token
* @param digest The resolved digest
*/
- void updateContainerRequest(String token, ContainerDigestPair digest)
+ void updateContainerRequestAsync(String token, ContainerDigestPair digest)
/**
* Retrieve a {@link WaveContainerRecord} object corresponding to the a specified request token
@@ -103,7 +103,7 @@ interface PersistenceService {
*
* @param data A {@link WaveScanRecord} object representing the a container scan request
*/
- void saveScanRecord(WaveScanRecord scanRecord)
+ void saveScanRecordAsync(WaveScanRecord scanRecord)
/**
* Retrieve a {@link WaveScanRecord} object for the specified build ID
@@ -144,7 +144,7 @@ interface PersistenceService {
*
* @param mirror {@link MirrorEntry} object
*/
- void saveMirrorResult(MirrorResult mirror)
+ void saveMirrorResultAsync(MirrorResult mirror)
/**
* Retrieve all {@link WaveScanRecord} object for the given partial scan id
diff --git a/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy b/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy
index a9b60fa69..002f60752 100644
--- a/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy
+++ b/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy
@@ -42,7 +42,7 @@ class LocalPersistenceService implements PersistenceService {
private Map mirrorStore = new HashMap<>()
@Override
- void saveBuild(WaveBuildRecord record) {
+ void saveBuildAsync(WaveBuildRecord record) {
buildStore[record.buildId] = record
}
@@ -76,12 +76,12 @@ class LocalPersistenceService implements PersistenceService {
}
@Override
- void saveContainerRequest(WaveContainerRecord data) {
+ void saveContainerRequestAsync(WaveContainerRecord data) {
requestStore.put(data.id, data)
}
@Override
- void updateContainerRequest(String token, ContainerDigestPair digest) {
+ void updateContainerRequestAsync(String token, ContainerDigestPair digest) {
final data = requestStore.get(token)
if( data ) {
requestStore.put(token, new WaveContainerRecord(data, digest.source, digest.target))
@@ -99,7 +99,7 @@ class LocalPersistenceService implements PersistenceService {
}
@Override
- void saveScanRecord(WaveScanRecord scanRecord) {
+ void saveScanRecordAsync(WaveScanRecord scanRecord) {
scanStore.put(scanRecord.id, scanRecord)
}
@@ -129,7 +129,7 @@ class LocalPersistenceService implements PersistenceService {
}
@Override
- void saveMirrorResult(MirrorResult mirror) {
+ void saveMirrorResultAsync(MirrorResult mirror) {
mirrorStore.put(mirror.mirrorId, mirror)
}
diff --git a/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy b/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy
index badd7c4aa..2901ec22f 100644
--- a/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy
+++ b/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy
@@ -54,9 +54,15 @@ interface SurrealClient {
@Post("/sql")
Flux