Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to virtual threads - phase 1 #746

Merged
merged 10 commits into from
Nov 18, 2024
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.14.2-B3
1.14.2-B4
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ jib {

run{
def envs = findProperty('micronautEnvs')
// note: "--enable-preview" is required to use virtual threads on Java 19 and 20
def args = ["-Dmicronaut.environments=$envs","--enable-preview"]
def args = ["-Dmicronaut.environments=$envs","-Djdk.tracePinnedThreads=short"]
if( environment['JVM_OPTS'] ) args.add(environment['JVM_OPTS'])
jvmArgs args
systemProperties 'DOCKER_USER': project.findProperty('DOCKER_USER') ?: environment['DOCKER_USER'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import java.time.Duration
import java.util.concurrent.CompletionException
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import groovy.json.JsonSlurper
import groovy.transform.Canonical
import groovy.transform.CompileStatic
Expand Down Expand Up @@ -100,11 +100,12 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
return result
}

private LoadingCache<CacheKey, String> cacheTokens = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<CacheKey, String> cacheTokens = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(_1_HOUR.toMillis(), TimeUnit.MILLISECONDS)
.build(loader)
.buildAsync(loader)

@Inject
private RegistryLookupService lookupService
Expand Down Expand Up @@ -268,7 +269,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
protected String getAuthToken(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
try {
return cacheTokens.get(key)
// FIXME https://github.com/seqeralabs/wave/issues/747
return cacheTokens.synchronous().get(key)
}
catch (CompletionException e) {
// this catches the exception thrown in the cache loader lookup
Expand All @@ -286,7 +288,8 @@ class RegistryAuthServiceImpl implements RegistryAuthService {
*/
void invalidateAuthorization(String image, RegistryAuth auth, RegistryCredentials creds) {
final key = new CacheKey(image, auth, creds)
cacheTokens.invalidate(key)
// FIXME https://github.com/seqeralabs/wave/issues/747
cacheTokens.synchronous().invalidate(key)
tokenStore.remove(getStableKey(key))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import java.net.http.HttpResponse
import java.util.concurrent.CompletionException
import java.util.concurrent.TimeUnit

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.wave.configuration.HttpClientConfig
Expand Down Expand Up @@ -73,11 +73,12 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
}
}

private LoadingCache<URI, RegistryAuth> cache = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<URI, RegistryAuth> cache = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.build(loader)
.buildAsync(loader)

protected RegistryAuth lookup0(URI endpoint) {
final httpClient = HttpClientFactory.followRedirectsHttpClient()
Expand Down Expand Up @@ -116,7 +117,8 @@ class RegistryLookupServiceImpl implements RegistryLookupService {
RegistryInfo lookup(String registry) {
try {
final endpoint = registryEndpoint(registry)
final auth = cache.get(endpoint)
// FIXME https://github.com/seqeralabs/wave/issues/747
final auth = cache.synchronous().get(endpoint)
return new RegistryInfo(registry, endpoint, auth)
}
catch (CompletionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import jakarta.inject.Inject
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class BuildController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ContainerController {

@Inject
Expand Down Expand Up @@ -181,13 +181,13 @@ class ContainerController {

@Deprecated
@Post('/container-token')
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
CompletableFuture<HttpResponse<SubmitContainerTokenResponse>> getToken(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) {
return getContainerImpl(httpRequest, req, false)
}

@Post('/v1alpha2/container')
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
CompletableFuture<HttpResponse<SubmitContainerTokenResponse>> getTokenV2(HttpRequest httpRequest, @Body SubmitContainerTokenRequest req) {
return getContainerImpl(httpRequest, req, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import static io.seqera.wave.util.ContainerHelper.patchPlatformEndpoint
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class InspectController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import static io.micronaut.http.HttpHeaders.WWW_AUTHENTICATE
@Requires(property = 'wave.metrics.enabled', value = 'true')
@Secured(SecurityRule.IS_AUTHENTICATED)
@Controller
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class MetricsController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import jakarta.inject.Inject
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class MirrorController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import reactor.core.publisher.Mono
@Slf4j
@CompileStatic
@Controller("/v2")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class RegistryProxyController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import jakarta.inject.Inject
@CompileStatic
@Requires(bean = ContainerScanService)
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ScanController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import io.seqera.wave.util.BuildInfo
@Slf4j
@Controller("/")
@CompileStatic
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ServiceInfoController {

@Value('${wave.landing.url}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import io.seqera.wave.auth.RegistryAuthService
import jakarta.inject.Inject
import jakarta.validation.Valid

@ExecuteOn(TaskExecutors.IO)
@Controller("/validate-creds")
@ExecuteOn(TaskExecutors.BLOCKING)
class ValidateController {

@Inject RegistryAuthService loginService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import static io.seqera.wave.util.DataTimeUtils.formatTimestamp
@Slf4j
@CompileStatic
@Controller("/view")
@ExecuteOn(TaskExecutors.IO)
@ExecuteOn(TaskExecutors.BLOCKING)
class ViewController {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class ContainerAugmenter {
return result
}

synchronized protected Map layerBlob(String image, ContainerLayer layer) {
protected Map layerBlob(String image, ContainerLayer layer) {
log.debug "Adding layer: $layer to image: $client.registry.name/$image"
// store the layer blob in the cache
final String path = "$client.registry.name/v2/$image/blobs/$layer.gzipDigest"
Expand All @@ -295,7 +295,6 @@ class ContainerAugmenter {


protected Tuple2<String,Integer> updateImageManifest(String imageName, String imageManifest, String newImageConfigDigest, newImageConfigSize, boolean oci) {

// turn the json string into a json map
// and append the new layer
final manifest = (Map) new JsonSlurper().parseText(imageManifest)
Expand Down
13 changes: 11 additions & 2 deletions src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package io.seqera.wave.core

import java.util.concurrent.CompletableFuture

import groovy.transform.CompileStatic
import groovy.transform.ToString
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -193,7 +195,7 @@ class RegistryProxyService {

String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) {
try {
return getImageDigest0(containerImage, identity, retryOnNotFound)
return getImageDigest0(containerImage, identity, retryOnNotFound).get()
}
catch(Exception e) {
log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}"
Expand All @@ -203,8 +205,15 @@ class RegistryProxyService {

static private List<Integer> RETRY_ON_NOT_FOUND = HTTP_RETRYABLE_ERRORS + 404

// note: return a CompletableFuture to force micronaut to use caffeine AsyncCache
// that provides a workaround about the use of virtual threads with SyncCache
// see https://github.com/ben-manes/caffeine/issues/1468#issuecomment-1906733926
@Cacheable(value = 'cache-registry-proxy', atomic = true, parameters = ['image'])
protected String getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
protected CompletableFuture<String> getImageDigest0(String image, PlatformId identity, boolean retryOnNotFound) {
CompletableFuture.completedFuture(getImageDigest1(image, identity, retryOnNotFound))
}

protected String getImageDigest1(String image, PlatformId identity, boolean retryOnNotFound) {
final coords = ContainerCoordinates.parse(image)
final route = RoutePath.v2manifestPath(coords, identity)
final proxyClient = client(route)
Expand Down
15 changes: 11 additions & 4 deletions src/main/groovy/io/seqera/wave/http/HttpClientFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.http.HttpClient
import java.time.Duration
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -39,9 +40,9 @@ class HttpClientFactory {

static private Duration timeout = Duration.ofSeconds(20)

static private final Object l1 = new Object()
static private final ReentrantLock l1 = new ReentrantLock()

static private final Object l2 = new Object()
static private final ReentrantLock l2 = new ReentrantLock()

private static HttpClient client1

Expand All @@ -51,20 +52,26 @@ class HttpClientFactory {
static HttpClient followRedirectsHttpClient() {
if( client1!=null )
return client1
synchronized (l1) {
l1.lock()
try {
if( client1!=null )
return client1
return client1=followRedirectsHttpClient0()
} finally {
l1.unlock()
}
}

static HttpClient neverRedirectsHttpClient() {
if( client2!=null )
return client2
synchronized (l2) {
l2.lock()
try {
if( client2!=null )
return client2
return client2=neverRedirectsHttpClient0()
} finally {
l2.unlock()
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/groovy/io/seqera/wave/service/aws/AwsEcrService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ package io.seqera.wave.service.aws
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import com.github.benmanes.caffeine.cache.AsyncLoadingCache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -73,11 +73,12 @@ class AwsEcrService {
}
}

private LoadingCache<AwsCreds, String> cache = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncLoadingCache<AwsCreds, String> cache = Caffeine.newBuilder()
.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(3, TimeUnit.HOURS)
.build(loader)
.buildAsync(loader)


private EcrClient ecrClient(String accessKey, String secretKey, String region) {
Expand Down Expand Up @@ -126,7 +127,8 @@ class AwsEcrService {
try {
// get the token from the cache, if missing the it's automatically
// fetch using the AWS ECR client
return cache.get(new AwsCreds(accessKey,secretKey,region,isPublic))
// FIXME https://github.com/seqeralabs/wave/issues/747
return cache.synchronous().get(new AwsCreds(accessKey,secretKey,region,isPublic))
}
catch (Exception e) {
final type = isPublic ? "ECR public" : "ECR"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Caffeine
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -60,10 +60,11 @@ abstract class AbstractMessageQueue<M> implements Runnable {

final private String name0

final private Cache<String,Boolean> closedClients = Caffeine.newBuilder()
// FIXME https://github.com/seqeralabs/wave/issues/747
final private AsyncCache<String,Boolean> closedClients = Caffeine.newBuilder()
.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.build()
.buildAsync()

AbstractMessageQueue(MessageQueue<String> broker) {
final type = TypeHelper.getGenericType(this, 0)
Expand Down Expand Up @@ -149,13 +150,15 @@ abstract class AbstractMessageQueue<M> implements Runnable {

@Override
void run() {
// FIXME https://github.com/seqeralabs/wave/issues/747
final clientsCache = closedClients.synchronous()
while( !thread.isInterrupted() ) {
try {
int sent=0
final clients = new HashMap<String,MessageSender<String>>(this.clients)
for( Map.Entry<String,MessageSender<String>> entry : clients ) {
// ignore clients marked as closed
if( closedClients.getIfPresent(entry.key))
if( clientsCache.getIfPresent(entry.key))
continue
// infer the target queue from the client key
final target = targetFromClientKey(entry.key)
Expand All @@ -173,7 +176,7 @@ abstract class AbstractMessageQueue<M> implements Runnable {
// offer back the value to be processed again
broker.offer(target, value)
if( e.message?.contains('close') ) {
closedClients.put(entry.key, true)
clientsCache.put(entry.key, true)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ abstract class AbstractMessageStream<M> implements Closeable {
* The {@link Predicate} to be invoked when a stream message is consumed (read from) the stream.
*/
void addConsumer(String streamId, MessageConsumer<M> consumer) {
// the use of synchronized block is meant to prevent a race condition while
// updating the 'listeners' from concurrent invocations.
// however, considering the addConsumer is invoked during the initialization phase
// (and therefore in the same thread) in should not be really needed.
synchronized (listeners) {
if( listeners.containsKey(streamId))
throw new IllegalStateException("Only one consumer can be defined for each stream - offending streamId=$streamId; consumer=$consumer")
Expand Down
Loading