diff --git a/src/main/java/org/commonjava/indy/service/archive/controller/ArchiveController.java b/src/main/java/org/commonjava/indy/service/archive/controller/ArchiveController.java index 09a6838..2560f2e 100644 --- a/src/main/java/org/commonjava/indy/service/archive/controller/ArchiveController.java +++ b/src/main/java/org/commonjava/indy/service/archive/controller/ArchiveController.java @@ -29,7 +29,6 @@ import org.apache.http.impl.client.BasicCookieStore; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.commonjava.indy.service.archive.config.PreSeedConfig; import org.commonjava.indy.service.archive.model.ArchiveStatus; import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO; @@ -47,6 +46,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -58,6 +60,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -94,6 +97,18 @@ public class ArchiveController } } ); + private static final Map buildConfigLocks = new ConcurrentHashMap<>(); + + private static final int threads = 4 * Runtime.getRuntime().availableProcessors(); + + private static final ExecutorService generateExecutor = + Executors.newFixedThreadPool( threads, ( final Runnable r ) -> { + final Thread t = new Thread( r ); + t.setName( "Generate-" + t.getName() ); + t.setDaemon( true ); + return t; + } ); + @Inject HistoricalContentListReader reader; @@ -115,7 +130,7 @@ public class ArchiveController @PostConstruct public void init() - throws IOException + throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException { int threads = 4 * Runtime.getRuntime().availableProcessors(); executorService = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> { @@ -125,11 +140,8 @@ public void init() return t; } ); - final PoolingHttpClientConnectionManager ccm = new PoolingHttpClientConnectionManager(); - ccm.setMaxTotal( 500 ); - RequestConfig rc = RequestConfig.custom().build(); - client = HttpClients.custom().setConnectionManager( ccm ).setDefaultRequestConfig( rc ).build(); + client = HttpClients.custom().setDefaultRequestConfig( rc ).build(); String storeDir = preSeedConfig.storageDir().orElse( "data" ); contentDir = String.format( "%s%s", storeDir, CONTENT_DIR ); @@ -145,21 +157,48 @@ public void destroy() public void generate( HistoricalContentDTO content ) { - int threads = 4 * Runtime.getRuntime().availableProcessors(); - ExecutorService generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> { - final Thread t = new Thread( r ); - t.setName( "Generate-" + t.getName() ); - t.setDaemon( true ); - return t; - } ); - generateExecutor.execute( () -> doGenerate( content ) ); + String buildConfigId = content.getBuildConfigId(); + Object lock = buildConfigLocks.computeIfAbsent( buildConfigId, k -> new Object() ); + synchronized ( lock ) + { + while ( isInProgress( buildConfigId ) ) + { + logger.info( "There is already generation process in progress for buildConfigId {}, try lock wait.", + buildConfigId ); + try + { + lock.wait(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + return; + } + } + recordInProgress( content.getBuildConfigId() ); + generateExecutor.execute( () -> { + try + { + doGenerate( content ); + } + finally + { + synchronized ( lock ) + { + recordCompleted( content.getBuildConfigId() ); + buildConfigLocks.remove( buildConfigId ); + lock.notifyAll(); + logger.info( "lock released, buildConfigId {}", buildConfigId ); + } + } + } ); + } } protected Boolean doGenerate( HistoricalContentDTO content ) { logger.info( "Handle generate event: {}, build config id: {}", EVENT_GENERATE_ARCHIVE, content.getBuildConfigId() ); - recordInProgress( content.getBuildConfigId() ); Map entryDTOs = reader.readEntries( content ); Map downloadPaths = new HashMap<>(); @@ -195,7 +234,6 @@ protected Boolean doGenerate( HistoricalContentDTO content ) created = renderArchive( archive.get(), content.getBuildConfigId() ); } - recordCompleted( content.getBuildConfigId() ); return created; } @@ -239,11 +277,17 @@ public boolean statusExists( final String buildConfigId ) return treated.containsKey( buildConfigId ); } - public String getStatus( String buildConfigId ) + public String getStatus( final String buildConfigId ) { return treated.get( buildConfigId ); } + public boolean isInProgress( final String buildConfigId ) + { + return statusExists( buildConfigId ) && getStatus( buildConfigId ).equals( + ArchiveStatus.inProgress.getArchiveStatus() ); + } + private void downloadArtifacts( final Map entryDTOs, final Map downloadPaths, final HistoricalContentDTO content ) throws InterruptedException, ExecutionException, IOException @@ -533,12 +577,7 @@ private void restoreGenerateStatusFromDisk() throws IOException List contents = walkAllFiles( archiveDir ); for ( File content : contents ) { - if ( content.getName().endsWith( PART_ARCHIVE_SUFFIX ) ) - { - treated.put( content.getName().split( PART_ARCHIVE_SUFFIX )[0], - ArchiveStatus.inProgress.getArchiveStatus() ); - } - else if ( content.getName().endsWith( ARCHIVE_SUFFIX ) ) + if ( content.getName().endsWith( ARCHIVE_SUFFIX ) ) { treated.put( content.getName().split( ARCHIVE_SUFFIX )[0], ArchiveStatus.completed.getArchiveStatus() ); } diff --git a/src/main/java/org/commonjava/indy/service/archive/jaxrs/ArchiveManageResources.java b/src/main/java/org/commonjava/indy/service/archive/jaxrs/ArchiveManageResources.java index 6b0a383..b01d36d 100644 --- a/src/main/java/org/commonjava/indy/service/archive/jaxrs/ArchiveManageResources.java +++ b/src/main/java/org/commonjava/indy/service/archive/jaxrs/ArchiveManageResources.java @@ -18,6 +18,19 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.smallrye.mutiny.Uni; import io.vertx.core.eventbus.EventBus; +import jakarta.inject.Inject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.ResponseBuilder; +import jakarta.ws.rs.core.UriInfo; import org.apache.commons.io.FileUtils; import org.commonjava.indy.service.archive.controller.ArchiveController; import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO; @@ -31,19 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.ws.rs.PathParam; -import jakarta.inject.Inject; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.DELETE; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.core.Context; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; -import jakarta.ws.rs.core.Response.ResponseBuilder; -import jakarta.ws.rs.core.UriInfo; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -72,14 +72,15 @@ public class ArchiveManageResources @Inject EventBus bus; - @Operation( description = "Generate archive based on tracked content" ) @APIResponse( responseCode = "202", description = "The archive created request is accepted" ) - @RequestBody( description = "The tracked content definition JSON", name = "body", required = true, content = @Content( mediaType = APPLICATION_JSON, example = - "{" + "\"buildConfigId\": \"XXX\"," + "\"downloads\":" + "[{" + " \"storeKey\": \"\"," - + " \"path\": \"\"," + " \"md5\": \"\"," + " \"sha256\": \"\"," - + " \"sha1\": \"\"," + " \"size\": 001" + " }," + "..." - + "]}", schema = @Schema( implementation = HistoricalContentDTO.class ) ) ) + @RequestBody( description = "The tracked content definition JSON", name = "body", required = true, + content = @Content( mediaType = APPLICATION_JSON, + example = "{" + "\"buildConfigId\": \"XXX\"," + "\"downloads\":" + "[{" + + " \"storeKey\": \"\"," + " \"path\": \"\"," + " \"md5\": \"\"," + + " \"sha256\": \"\"," + " \"sha1\": \"\"," + " \"size\": 001" + + " }," + "..." + "]}", + schema = @Schema( implementation = HistoricalContentDTO.class ) ) ) @POST @Path( "generate" ) @Consumes( APPLICATION_JSON )