From 046a123d1aa7009500ab472e47122775853520f2 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 16:33:25 +0100 Subject: [PATCH 1/9] Migration to virtual trheads - phase 1 (#737) This commit introduces the use of BLOCKING tasks executors for controllers, PairingWebSocket and TowerConnector. When running over Java 21 (and later) the blocking tasks executor use virtual thread pool to carry out the tasks execution. Signed-off-by: Paolo Di Tommaso --- .../seqera/wave/controller/BuildController.groovy | 2 +- .../wave/controller/ContainerController.groovy | 6 +++--- .../wave/controller/InspectController.groovy | 2 +- .../wave/controller/MetricsController.groovy | 2 +- .../wave/controller/MirrorController.groovy | 2 +- .../controller/RegistryProxyController.groovy | 2 +- .../seqera/wave/controller/ScanController.groovy | 2 +- .../wave/controller/ServiceInfoController.groovy | 2 +- .../wave/controller/ValidateController.groovy | 2 +- .../seqera/wave/controller/ViewController.groovy | 2 +- .../io/seqera/wave/core/ContainerAugmenter.groovy | 3 +-- .../io/seqera/wave/http/HttpClientFactory.groovy | 15 +++++++++++---- .../data/stream/AbstractMessageStream.groovy | 4 ++++ .../io/seqera/wave/service/job/JobManager.groovy | 1 - .../pairing/socket/PairingWebSocket.groovy | 2 +- .../tower/client/connector/TowerConnector.groovy | 6 +++++- .../connector/WebSocketTowerConnector.groovy | 9 +-------- 17 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy index 06679fa4a..f99549087 100644 --- a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy @@ -42,7 +42,7 @@ import jakarta.inject.Inject @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class BuildController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy index 10f71ddb4..a65568c40 100644 --- a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy @@ -103,7 +103,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ContainerController { @Inject @@ -181,13 +181,13 @@ class ContainerController { @Deprecated @Post('/container-token') - @ExecuteOn(TaskExecutors.IO) + @ExecuteOn(TaskExecutors.BLOCKING) CompletableFuture> getToken(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) { return getContainerImpl(httpRequest, req, false) } @Post('/v1alpha2/container') - @ExecuteOn(TaskExecutors.IO) + @ExecuteOn(TaskExecutors.BLOCKING) CompletableFuture> getTokenV2(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) { return getContainerImpl(httpRequest, req, true) } diff --git a/src/main/groovy/io/seqera/wave/controller/InspectController.groovy b/src/main/groovy/io/seqera/wave/controller/InspectController.groovy index 35e50bc58..6093d4dfb 100644 --- a/src/main/groovy/io/seqera/wave/controller/InspectController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/InspectController.groovy @@ -50,7 +50,7 @@ import static io.seqera.wave.util.ContainerHelper.patchPlatformEndpoint @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class InspectController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy b/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy index a9ce6f89a..f8008674e 100644 --- a/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/MetricsController.groovy @@ -52,7 +52,7 @@ import static io.micronaut.http.HttpHeaders.WWW_AUTHENTICATE @Requires(property = 'wave.metrics.enabled', value = 'true') @Secured(SecurityRule.IS_AUTHENTICATED) @Controller -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class MetricsController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy index 8aaf87a51..c9c3bc8f7 100644 --- a/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy @@ -36,7 +36,7 @@ import jakarta.inject.Inject @Slf4j @CompileStatic @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class MirrorController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy index ec6416537..263763299 100644 --- a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy @@ -67,7 +67,7 @@ import reactor.core.publisher.Mono @Slf4j @CompileStatic @Controller("/v2") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class RegistryProxyController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/ScanController.groovy b/src/main/groovy/io/seqera/wave/controller/ScanController.groovy index 230a8b4a9..43fc38a5d 100644 --- a/src/main/groovy/io/seqera/wave/controller/ScanController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ScanController.groovy @@ -39,7 +39,7 @@ import jakarta.inject.Inject @CompileStatic @Requires(bean = ContainerScanService) @Controller("/") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ScanController { @Inject diff --git a/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy b/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy index 207c6e8d5..7f0a895e4 100644 --- a/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ServiceInfoController.groovy @@ -39,7 +39,7 @@ import io.seqera.wave.util.BuildInfo @Slf4j @Controller("/") @CompileStatic -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ServiceInfoController { @Value('${wave.landing.url}') diff --git a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy index f7136f572..fd23fbdc6 100644 --- a/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ValidateController.groovy @@ -26,8 +26,8 @@ import io.seqera.wave.auth.RegistryAuthService import jakarta.inject.Inject import jakarta.validation.Valid -@ExecuteOn(TaskExecutors.IO) @Controller("/validate-creds") +@ExecuteOn(TaskExecutors.BLOCKING) class ValidateController { @Inject RegistryAuthService loginService diff --git a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy index d30db91d8..68328eb36 100644 --- a/src/main/groovy/io/seqera/wave/controller/ViewController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ViewController.groovy @@ -59,7 +59,7 @@ import static io.seqera.wave.util.DataTimeUtils.formatTimestamp @Slf4j @CompileStatic @Controller("/view") -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) class ViewController { @Inject diff --git a/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy b/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy index 909cc8d5d..bd3837e87 100644 --- a/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy +++ b/src/main/groovy/io/seqera/wave/core/ContainerAugmenter.groovy @@ -280,7 +280,7 @@ class ContainerAugmenter { return result } - synchronized protected Map layerBlob(String image, ContainerLayer layer) { + protected Map layerBlob(String image, ContainerLayer layer) { log.debug "Adding layer: $layer to image: $client.registry.name/$image" // store the layer blob in the cache final String path = "$client.registry.name/v2/$image/blobs/$layer.gzipDigest" @@ -295,7 +295,6 @@ class ContainerAugmenter { protected Tuple2 updateImageManifest(String imageName, String imageManifest, String newImageConfigDigest, newImageConfigSize, boolean oci) { - // turn the json string into a json map // and append the new layer final manifest = (Map) new JsonSlurper().parseText(imageManifest) diff --git a/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy b/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy index f1301037e..46c9730d2 100644 --- a/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy +++ b/src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy @@ -22,6 +22,7 @@ import java.net.http.HttpClient import java.time.Duration import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.locks.ReentrantLock import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -39,9 +40,9 @@ class HttpClientFactory { static private Duration timeout = Duration.ofSeconds(20) - static private final Object l1 = new Object() + static private final ReentrantLock l1 = new ReentrantLock() - static private final Object l2 = new Object() + static private final ReentrantLock l2 = new ReentrantLock() private static HttpClient client1 @@ -51,20 +52,26 @@ class HttpClientFactory { static HttpClient followRedirectsHttpClient() { if( client1!=null ) return client1 - synchronized (l1) { + l1.lock() + try { if( client1!=null ) return client1 return client1=followRedirectsHttpClient0() + } finally { + l1.unlock() } } static HttpClient neverRedirectsHttpClient() { if( client2!=null ) return client2 - synchronized (l2) { + l2.lock() + try { if( client2!=null ) return client2 return client2=neverRedirectsHttpClient0() + } finally { + l2.unlock() } } diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy index 560a81306..ae16d7140 100644 --- a/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy @@ -98,6 +98,10 @@ abstract class AbstractMessageStream implements Closeable { * The {@link Predicate} to be invoked when a stream message is consumed (read from) the stream. */ void addConsumer(String streamId, MessageConsumer consumer) { + // the use of synchronized block is meant to prevent a race condition while + // updating the 'listeners' from concurrent invocations. + // however, considering the addConsumer is invoked during the initialization phase + // (and therefore in the same thread) in should not be really needed. synchronized (listeners) { if( listeners.containsKey(streamId)) throw new IllegalStateException("Only one consumer can be defined for each stream - offending streamId=$streamId; consumer=$consumer") diff --git a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy index 1c4ec6c03..1127859f6 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy @@ -59,7 +59,6 @@ class JobManager { queue.addConsumer((job)-> processJob(job)) } - protected boolean processJob(JobSpec jobSpec) { try { return processJob0(jobSpec) diff --git a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy index a4f7b0785..3e6682fa5 100644 --- a/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy +++ b/src/main/groovy/io/seqera/wave/service/pairing/socket/PairingWebSocket.groovy @@ -49,7 +49,7 @@ import static io.seqera.wave.util.LongRndKey.rndHex @Slf4j @CompileStatic @Singleton -@ExecuteOn(TaskExecutors.IO) +@ExecuteOn(TaskExecutors.BLOCKING) @ServerWebSocket("/pairing/{service}/token/{token}{?endpoint}") class PairingWebSocket { diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy index ec213628a..f9872fc3c 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy @@ -82,7 +82,7 @@ abstract class TowerConnector { private SpillwayRateLimiter limiter @Inject - @Named(TaskExecutors.IO) + @Named(TaskExecutors.BLOCKING) private volatile ExecutorService ioExecutor private CacheLoader> loader = new CacheLoader>() { @@ -102,6 +102,10 @@ abstract class TowerConnector { return refreshCache } + protected ExecutorService getIoExecutor() { + return ioExecutor + } + /** * Generic async get with authorization * that converts to the provided json model T diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy index c52646638..04c591a09 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/WebSocketTowerConnector.groovy @@ -19,19 +19,16 @@ package io.seqera.wave.tower.client.connector import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutorService import java.util.function.Function import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Requires -import io.micronaut.scheduling.TaskExecutors import io.seqera.wave.service.pairing.socket.PairingChannel import io.seqera.wave.service.pairing.socket.msg.PairingMessage import io.seqera.wave.service.pairing.socket.msg.ProxyHttpRequest import io.seqera.wave.service.pairing.socket.msg.ProxyHttpResponse import jakarta.inject.Inject -import jakarta.inject.Named import jakarta.inject.Singleton import static io.seqera.wave.service.pairing.PairingService.TOWER_SERVICE /** @@ -49,15 +46,11 @@ class WebSocketTowerConnector extends TowerConnector { @Inject private PairingChannel channel - @Inject - @Named(TaskExecutors.IO) - private ExecutorService ioExecutor - @Override CompletableFuture sendAsync(String endpoint, ProxyHttpRequest request) { return channel .sendRequest(TOWER_SERVICE, endpoint, request) - .thenApplyAsync(Function.identity() as Function, ioExecutor) + .thenApplyAsync(Function.identity() as Function, getIoExecutor()) } } From 1a17e1fc958ab2ac10426ca35087647ef6711034 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 17:29:09 +0100 Subject: [PATCH 2/9] Add jdk.tracePinnedThreads=short to default run options [ci skip] Signed-off-by: Paolo Di Tommaso --- build.gradle | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 7de40d237..ff02cb7cb 100644 --- a/build.gradle +++ b/build.gradle @@ -156,8 +156,7 @@ jib { run{ def envs = findProperty('micronautEnvs') - // note: "--enable-preview" is required to use virtual threads on Java 19 and 20 - def args = ["-Dmicronaut.environments=$envs","--enable-preview"] + def args = ["-Dmicronaut.environments=$envs","-Djdk.tracePinnedThreads=short"] if( environment['JVM_OPTS'] ) args.add(environment['JVM_OPTS']) jvmArgs args systemProperties 'DOCKER_USER': project.findProperty('DOCKER_USER') ?: environment['DOCKER_USER'], From 7352f79ccdf8537ada477fa6f9035d225dbfbc39 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 17:35:39 +0100 Subject: [PATCH 3/9] Remove deprecated ThreadPoolBuilder Signed-off-by: Paolo Di Tommaso --- .../seqera/wave/util/ThreadPoolBuilder.groovy | 176 ------------------ 1 file changed, 176 deletions(-) delete mode 100644 src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy diff --git a/src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy b/src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy deleted file mode 100644 index 7afdf0e06..000000000 --- a/src/main/groovy/io/seqera/wave/util/ThreadPoolBuilder.groovy +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Wave, containers provisioning service - * Copyright (c) 2023-2024, Seqera Labs - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package io.seqera.wave.util - - - -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.RejectedExecutionHandler -import java.util.concurrent.ThreadFactory -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger - -import groovy.transform.CompileStatic -import groovy.util.logging.Slf4j -/** - * Builder class to create instance of {@link ThreadPoolExecutor} - * - * @author Paolo Di Tommaso - */ -@Slf4j -@CompileStatic -@Deprecated -class ThreadPoolBuilder { - - static AtomicInteger poolCount = new AtomicInteger() - - private String name - - private int minSize - - private int maxSize - - private BlockingQueue workQueue - - private int queueSize = -1 - - private Long keepAliveTime - - private RejectedExecutionHandler rejectionPolicy - - private ThreadFactory threadFactory - - private boolean allowCoreThreadTimeout - - String getName() { name } - - int getMinSize() { minSize } - - int getMaxSize() { maxSize } - - int getQueueSize() { queueSize } - - BlockingQueue getWorkQueue() { workQueue } - - Long getKeepAliveTime() { keepAliveTime } - - RejectedExecutionHandler getRejectionPolicy() { rejectionPolicy } - - ThreadFactory getThreadFactory() { threadFactory } - - boolean getAllowCoreThreadTimeout() { allowCoreThreadTimeout } - - ThreadPoolBuilder withName(String name) { - if( name ) { - this.name = name - this.threadFactory = new CustomThreadFactory(name) - } - return this - } - - ThreadPoolBuilder withThreadFactory(ThreadFactory threadFactory) { - assert !name || !threadFactory, "Property 'threadFactory' or 'name' was already set" - this.threadFactory = threadFactory - return this - } - - ThreadPoolBuilder withRejectionPolicy(RejectedExecutionHandler rejectionPolicy) { - this.rejectionPolicy = rejectionPolicy - return this - } - - ThreadPoolBuilder withMinSize(int min) { - this.minSize = min - return this - } - - ThreadPoolBuilder withMaxSize(int max) { - this.maxSize = max - return this - } - - ThreadPoolBuilder withQueueSize(int size) { - this.queueSize = size - this.workQueue = new LinkedBlockingQueue(size) - return this - } - - ThreadPoolBuilder withQueue(BlockingQueue workQueue) { - this.workQueue = workQueue - return this - } - - ThreadPoolBuilder withKeepAliveTime( long millis ) { - keepAliveTime = millis - return this - } - - ThreadPoolBuilder withAllowCoreThreadTimeout(boolean flag) { - this.allowCoreThreadTimeout = flag - return this - } - - ThreadPoolExecutor build() { - assert minSize <= maxSize - - if( !name ) - name = "nf-thread-pool-${poolCount.getAndIncrement()}" - - if(keepAliveTime==null) - keepAliveTime = 60_000 - if( workQueue==null ) - workQueue = new LinkedBlockingQueue<>() - if( rejectionPolicy==null ) - rejectionPolicy = new ThreadPoolExecutor.CallerRunsPolicy() - if( threadFactory==null ) - threadFactory = new CustomThreadFactory(name) - - log.debug "Creating thread pool '$name' minSize=$minSize; maxSize=$maxSize; workQueue=${workQueue.getClass().getSimpleName()}[${queueSize}]; allowCoreThreadTimeout=$allowCoreThreadTimeout" - - final result = new ThreadPoolExecutor( - minSize, - maxSize, - keepAliveTime, TimeUnit.MILLISECONDS, - workQueue, - threadFactory, - rejectionPolicy) - - result.allowCoreThreadTimeOut(allowCoreThreadTimeout) - - return result - } - - - static ThreadPoolExecutor io(String name=null) { - io(10, 100, 10_000, name) - } - - - static ThreadPoolExecutor io(int min, int max, int queue, String name=null) { - new ThreadPoolBuilder() - .withMinSize(min) - .withMaxSize(max) - .withQueueSize(queue) - .withName(name) - .build() - } - -} From 1a9f79105e14fc28681df57363a9d8a35b82c3cf Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 18:55:27 +0100 Subject: [PATCH 4/9] Use runAsync instead supplyAsync Signed-off-by: Paolo Di Tommaso --- .../io/seqera/wave/filter/PullMetricsRequestsFilter.groovy | 4 ++-- .../service/builder/impl/ContainerBuildServiceImpl.groovy | 3 ++- .../wave/service/mirror/ContainerMirrorServiceImpl.groovy | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy b/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy index da19fc7c6..bb70548ac 100644 --- a/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy +++ b/src/main/groovy/io/seqera/wave/filter/PullMetricsRequestsFilter.groovy @@ -83,10 +83,10 @@ class PullMetricsRequestsFilter implements HttpServerFilter { final contentType = response.headers.get(HttpHeaders.CONTENT_TYPE) if( contentType && contentType in MANIFEST_TYPES ) { final route = routeHelper.parse(request.path) - CompletableFuture.supplyAsync(() -> metricsService.incrementPullsCounter(route.identity), executor) + CompletableFuture.runAsync(() -> metricsService.incrementPullsCounter(route.identity), executor) final version = route.request?.containerConfig?.fusionVersion() if (version) { - CompletableFuture.supplyAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor) + CompletableFuture.runAsync(() -> metricsService.incrementFusionPullsCounter(route.identity), executor) } } } diff --git a/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy index 6dbe9f787..895a90210 100644 --- a/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/builder/impl/ContainerBuildServiceImpl.groovy @@ -206,7 +206,8 @@ class ContainerBuildServiceImpl implements ContainerBuildService, JobHandler metricsService.incrementBuildsCounter(request.identity), executor) + CompletableFuture + .runAsync(() -> metricsService.incrementBuildsCounter(request.identity), executor) // launch the build async CompletableFuture diff --git a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy index 42c4bafce..1126fe87d 100644 --- a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy @@ -75,7 +75,7 @@ class ContainerMirrorServiceImpl implements ContainerMirrorService, JobHandler metricsService.incrementMirrorsCounter(request.identity), ioExecutor) + CompletableFuture.runAsync(() -> metricsService.incrementMirrorsCounter(request.identity), ioExecutor) jobService.launchMirror(request) return new BuildTrack(request.mirrorId, request.targetImage, false, null) } From 4fa09b760d4ceb9cb68167a6e90af0cf2b210ccb Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 21:07:53 +0100 Subject: [PATCH 5/9] Bump guava to version 33.3.1-jre Signed-off-by: Paolo Di Tommaso --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index ff02cb7cb..56c9eb999 100644 --- a/build.gradle +++ b/build.gradle @@ -49,7 +49,7 @@ dependencies { implementation 'io.micronaut:micronaut-websocket' implementation 'org.apache.groovy:groovy-json' implementation 'org.apache.groovy:groovy-nio' - implementation 'com.google.guava:guava:32.1.2-jre' + implementation 'com.google.guava:guava:33.3.1-jre' implementation 'dev.failsafe:failsafe:3.1.0' implementation 'io.micronaut.reactor:micronaut-reactor' implementation 'io.micronaut.reactor:micronaut-reactor-http-client' From 079b7f8b095e4315bd75cba8b16aa6f6158d41e6 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 21:23:46 +0100 Subject: [PATCH 6/9] Fix use of Sync cache with caffeine Signed-off-by: Paolo Di Tommaso --- .../io/seqera/wave/core/RegistryProxyService.groovy | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy index 32cf49604..01ed6bd94 100644 --- a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy +++ b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy @@ -18,6 +18,8 @@ package io.seqera.wave.core +import java.util.concurrent.CompletableFuture + import groovy.transform.CompileStatic import groovy.transform.ToString import groovy.util.logging.Slf4j @@ -193,7 +195,7 @@ class RegistryProxyService { String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) { try { - return getImageDigest0(containerImage, identity, retryOnNotFound) + return getImageDigest0(containerImage, identity, retryOnNotFound).get() } catch(Exception e) { log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}" @@ -203,8 +205,15 @@ class RegistryProxyService { static private List RETRY_ON_NOT_FOUND = HTTP_RETRYABLE_ERRORS + 404 + // note: return a CompletableFuture to force micronaut to use caffeine AsyncCache + // that provides a workaround about the use of virtual threads with SyncCache + // see https://github.com/ben-manes/caffeine/issues/1468#issuecomment-1906733926 @Cacheable(value = 'cache-registry-proxy', atomic = true, parameters = ['image']) - protected String getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) { + protected CompletableFuture getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) { + CompletableFuture.completedFuture(getImageDigest1(image, identity, retryOnNotFound)) + } + + protected String getImageDigest1(String image, PlatformId identity, boolean retryOnNotFound) { final coords = ContainerCoordinates.parse(image) final route = RoutePath.v2manifestPath(coords, identity) final proxyClient = client(route) From 9850c84bd2eb663aa91a6d6d9665621b512063df Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 17 Nov 2024 23:49:40 +0100 Subject: [PATCH 7/9] Fix pinning thread with caffeine Signed-off-by: Paolo Di Tommaso --- .../seqera/wave/auth/RegistryAuthServiceImpl.groovy | 13 ++++++++----- .../wave/auth/RegistryLookupServiceImpl.groovy | 10 ++++++---- .../io/seqera/wave/service/aws/AwsEcrService.groovy | 10 ++++++---- .../service/data/queue/AbstractMessageQueue.groovy | 13 ++++++++----- .../io/seqera/wave/service/job/JobManager.groovy | 12 +++++++++--- .../tower/client/connector/TowerConnector.groovy | 10 +++++----- .../seqera/wave/service/job/JobManagerTest.groovy | 10 +++++----- 7 files changed, 47 insertions(+), 31 deletions(-) diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy index 00dc5e9fa..7ae087db7 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryAuthServiceImpl.groovy @@ -24,9 +24,9 @@ import java.time.Duration import java.util.concurrent.CompletionException import java.util.concurrent.TimeUnit +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.json.JsonSlurper import groovy.transform.Canonical import groovy.transform.CompileStatic @@ -100,11 +100,12 @@ class RegistryAuthServiceImpl implements RegistryAuthService { return result } - private LoadingCache cacheTokens = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cacheTokens = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS) - .build(loader) + .buildAsync(loader) @Inject private RegistryLookupService lookupService @@ -268,7 +269,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService { protected String getAuthToken(String image, RegistryAuth auth, RegistryCredentials creds) { final key = new CacheKey(image, auth, creds) try { - return cacheTokens.get(key) + // FIXME https://github.com/seqeralabs/wave/issues/747 + return cacheTokens.synchronous().get(key) } catch (CompletionException e) { // this catches the exception thrown in the cache loader lookup @@ -286,7 +288,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService { */ void invalidateAuthorization(String image, RegistryAuth auth, RegistryCredentials creds) { final key = new CacheKey(image, auth, creds) - cacheTokens.invalidate(key) + // FIXME https://github.com/seqeralabs/wave/issues/747 + cacheTokens.synchronous().invalidate(key) tokenStore.remove(getStableKey(key)) } diff --git a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy index 58c41f617..2b15b8efa 100644 --- a/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/auth/RegistryLookupServiceImpl.groovy @@ -23,9 +23,9 @@ import java.net.http.HttpResponse import java.util.concurrent.CompletionException import java.util.concurrent.TimeUnit +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.seqera.wave.configuration.HttpClientConfig @@ -73,11 +73,12 @@ class RegistryLookupServiceImpl implements RegistryLookupService { } } - private LoadingCache cache = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cache = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterAccess(1, TimeUnit.HOURS) - .build(loader) + .buildAsync(loader) protected RegistryAuth lookup0(URI endpoint) { final httpClient = HttpClientFactory.followRedirectsHttpClient() @@ -116,7 +117,8 @@ class RegistryLookupServiceImpl implements RegistryLookupService { RegistryInfo lookup(String registry) { try { final endpoint = registryEndpoint(registry) - final auth = cache.get(endpoint) + // FIXME https://github.com/seqeralabs/wave/issues/747 + final auth = cache.synchronous().get(endpoint) return new RegistryInfo(registry, endpoint, auth) } catch (CompletionException e) { diff --git a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy index a85b1b354..30f9d0e04 100644 --- a/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy +++ b/src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy @@ -21,9 +21,9 @@ package io.seqera.wave.service.aws import java.util.concurrent.TimeUnit import java.util.regex.Pattern +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.Canonical import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -73,11 +73,12 @@ class AwsEcrService { } } - private LoadingCache cache = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncLoadingCache cache = Caffeine.newBuilder() .newBuilder() .maximumSize(10_000) .expireAfterWrite(3, TimeUnit.HOURS) - .build(loader) + .buildAsync(loader) private EcrClient ecrClient(String accessKey, String secretKey, String region) { @@ -126,7 +127,8 @@ class AwsEcrService { try { // get the token from the cache, if missing the it's automatically // fetch using the AWS ECR client - return cache.get(new AwsCreds(accessKey,secretKey,region,isPublic)) + // FIXME https://github.com/seqeralabs/wave/issues/747 + return cache.synchronous().get(new AwsCreds(accessKey,secretKey,region,isPublic)) } catch (Exception e) { final type = isPublic ? "ECR public" : "ECR" diff --git a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy index 3a90e3108..8c2d3fa2c 100644 --- a/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/queue/AbstractMessageQueue.groovy @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.AsyncCache import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -60,10 +60,11 @@ abstract class AbstractMessageQueue implements Runnable { final private String name0 - final private Cache closedClients = Caffeine.newBuilder() + // FIXME https://github.com/seqeralabs/wave/issues/747 + final private AsyncCache closedClients = Caffeine.newBuilder() .newBuilder() .expireAfterWrite(10, TimeUnit.MINUTES) - .build() + .buildAsync() AbstractMessageQueue(MessageQueue broker) { final type = TypeHelper.getGenericType(this, 0) @@ -149,13 +150,15 @@ abstract class AbstractMessageQueue implements Runnable { @Override void run() { + // FIXME https://github.com/seqeralabs/wave/issues/747 + final clientsCache = closedClients.synchronous() while( !thread.isInterrupted() ) { try { int sent=0 final clients = new HashMap>(this.clients) for( Map.Entry> entry : clients ) { // ignore clients marked as closed - if( closedClients.getIfPresent(entry.key)) + if( clientsCache.getIfPresent(entry.key)) continue // infer the target queue from the client key final target = targetFromClientKey(entry.key) @@ -173,7 +176,7 @@ abstract class AbstractMessageQueue implements Runnable { // offer back the value to be processed again broker.offer(target, value) if( e.message?.contains('close') ) { - closedClients.put(entry.key, true) + clientsCache.put(entry.key, true) } } } diff --git a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy index 1127859f6..e10cde8d4 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy @@ -21,6 +21,7 @@ package io.seqera.wave.service.job import java.time.Duration import java.time.Instant +import com.github.benmanes.caffeine.cache.AsyncCache import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import groovy.transform.CompileStatic @@ -50,12 +51,16 @@ class JobManager { @Inject private JobConfig config - private Cache debounceCache + // FIXME https://github.com/seqeralabs/wave/issues/747 + private AsyncCache debounceCache @PostConstruct void init() { log.info "Creating job manager - config=$config" - debounceCache = Caffeine.newBuilder().expireAfterWrite(config.graceInterval.multipliedBy(2)).build() + debounceCache = Caffeine + .newBuilder() + .expireAfterWrite(config.graceInterval.multipliedBy(2)) + .buildAsync() queue.addConsumer((job)-> processJob(job)) } @@ -72,7 +77,8 @@ class JobManager { } protected JobState state(JobSpec job) { - return state0(job, config.graceInterval, debounceCache) + // FIXME https://github.com/seqeralabs/wave/issues/747 + return state0(job, config.graceInterval, debounceCache.synchronous()) } protected JobState state0(final JobSpec job, final Duration graceInterval, final Cache cache) { diff --git a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy index e7df86d0c..27e12d87f 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/connector/TowerConnector.groovy @@ -26,10 +26,10 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import java.util.function.Function +import com.github.benmanes.caffeine.cache.AsyncLoadingCache import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.CacheLoader import com.github.benmanes.caffeine.cache.Caffeine -import com.github.benmanes.caffeine.cache.LoadingCache import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -92,14 +92,14 @@ abstract class TowerConnector { } } - private LoadingCache> refreshCache = Caffeine.newBuilder() + private AsyncLoadingCache> refreshCache = Caffeine .newBuilder() .expireAfterWrite(1, TimeUnit.MINUTES) - .build(loader) + .buildAsync(loader) /** Only for testing - do not use */ Cache> refreshCache0() { - return refreshCache + return refreshCache.synchronous() } protected ExecutorService getIoExecutor() { @@ -246,7 +246,7 @@ abstract class TowerConnector { * @return The refreshed {@link JwtAuth} object */ protected CompletableFuture refreshJwtToken(String endpoint, JwtAuth auth) { - return refreshCache.get(new JwtRefreshParams(endpoint,auth)) + return refreshCache.synchronous().get(new JwtRefreshParams(endpoint,auth)) } protected CompletableFuture refreshJwtToken0(String endpoint, JwtAuth auth) { diff --git a/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy b/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy index f58513a13..04ea45d71 100644 --- a/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/job/JobManagerTest.groovy @@ -26,7 +26,6 @@ import java.time.Instant import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine - /** * * @author Munish Chouhan @@ -38,7 +37,7 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def cache = Caffeine.newBuilder().build() + def cache = Caffeine.newBuilder().buildAsync() def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now(), Duration.ofMinutes(10)) @@ -57,7 +56,8 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config) + def cache = Caffeine.newBuilder().buildAsync() + def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now(), Duration.ofMinutes(10)) @@ -75,7 +75,7 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def cache = Caffeine.newBuilder().build() + def cache = Caffeine.newBuilder().buildAsync() def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config:config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now() - Duration.ofMinutes(5), Duration.ofMinutes(2)) @@ -94,7 +94,7 @@ class JobManagerTest extends Specification { def jobService = Mock(JobService) def jobDispatcher = Mock(JobDispatcher) def config = new JobConfig(graceInterval: Duration.ofMillis(1)) - def cache = Caffeine.newBuilder().build() + def cache = Caffeine.newBuilder().buildAsync() def manager = new JobManager(jobService: jobService, dispatcher: jobDispatcher, config: config, debounceCache: cache) and: def jobSpec = JobSpec.transfer('foo', 'scheduler-1', Instant.now().minus(Duration.ofMillis(500)), Duration.ofMinutes(10)) From 47e6eb6315a2b3385804d911db96be12502db84a Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 18 Nov 2024 07:04:47 +0100 Subject: [PATCH 8/9] [release] 1.14.2-B4 Signed-off-by: Paolo Di Tommaso --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 0b963338f..f0ce0eb97 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.14.2-B3 +1.14.2-B4 From be54773642d84519f19d1ce54470cfdc8e18c37f Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 18 Nov 2024 07:43:19 +0100 Subject: [PATCH 9/9] Update VERSION [ci skip] --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index f0ce0eb97..0b963338f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.14.2-B4 +1.14.2-B3