Skip to content

Commit

Permalink
Fix archive generation process with synchronized lock
Browse files Browse the repository at this point in the history
  • Loading branch information
yma96 committed Dec 2, 2024
1 parent 1ffe209 commit fb9068e
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.commonjava.indy.service.archive.config.PreSeedConfig;
import org.commonjava.indy.service.archive.model.ArchiveStatus;
import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO;
Expand All @@ -47,6 +46,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -58,6 +60,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -94,6 +97,18 @@ public class ArchiveController
}
} );

private static final Map<String, Object> buildConfigLocks = new ConcurrentHashMap<>();

private static final int threads = 4 * Runtime.getRuntime().availableProcessors();

private static final ExecutorService generateExecutor =
Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );

@Inject
HistoricalContentListReader reader;

Expand All @@ -115,7 +130,7 @@ public class ArchiveController

@PostConstruct
public void init()
throws IOException
throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException
{
int threads = 4 * Runtime.getRuntime().availableProcessors();
executorService = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
Expand All @@ -125,11 +140,8 @@ public void init()
return t;
} );

final PoolingHttpClientConnectionManager ccm = new PoolingHttpClientConnectionManager();
ccm.setMaxTotal( 500 );

RequestConfig rc = RequestConfig.custom().build();
client = HttpClients.custom().setConnectionManager( ccm ).setDefaultRequestConfig( rc ).build();
client = HttpClients.custom().setDefaultRequestConfig( rc ).build();

String storeDir = preSeedConfig.storageDir().orElse( "data" );
contentDir = String.format( "%s%s", storeDir, CONTENT_DIR );
Expand All @@ -145,21 +157,48 @@ public void destroy()

public void generate( HistoricalContentDTO content )
{
int threads = 4 * Runtime.getRuntime().availableProcessors();
ExecutorService generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );
generateExecutor.execute( () -> doGenerate( content ) );
String buildConfigId = content.getBuildConfigId();
Object lock = buildConfigLocks.computeIfAbsent( buildConfigId, k -> new Object() );
synchronized ( lock )
{
while ( isInProgress( buildConfigId ) )
{
logger.info( "There is already generation process in progress for buildConfigId {}, try lock wait.",
buildConfigId );
try
{
lock.wait();
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return;
}
}
recordInProgress( content.getBuildConfigId() );
generateExecutor.execute( () -> {
try
{
doGenerate( content );
}
finally
{
synchronized ( lock )
{
recordCompleted( content.getBuildConfigId() );
buildConfigLocks.remove( buildConfigId );
lock.notifyAll();
logger.info( "lock released, buildConfigId {}", buildConfigId );
}
}
} );
}
}

protected Boolean doGenerate( HistoricalContentDTO content )
{
logger.info( "Handle generate event: {}, build config id: {}", EVENT_GENERATE_ARCHIVE,
content.getBuildConfigId() );
recordInProgress( content.getBuildConfigId() );

Map<String, HistoricalEntryDTO> entryDTOs = reader.readEntries( content );
Map<String, String> downloadPaths = new HashMap<>();
Expand Down Expand Up @@ -195,7 +234,6 @@ protected Boolean doGenerate( HistoricalContentDTO content )
created = renderArchive( archive.get(), content.getBuildConfigId() );
}

recordCompleted( content.getBuildConfigId() );
return created;
}

Expand Down Expand Up @@ -239,11 +277,17 @@ public boolean statusExists( final String buildConfigId )
return treated.containsKey( buildConfigId );
}

public String getStatus( String buildConfigId )
public String getStatus( final String buildConfigId )
{
return treated.get( buildConfigId );
}

public boolean isInProgress( final String buildConfigId )
{
return statusExists( buildConfigId ) && getStatus( buildConfigId ).equals(
ArchiveStatus.inProgress.getArchiveStatus() );
}

private void downloadArtifacts( final Map<String, HistoricalEntryDTO> entryDTOs,
final Map<String, String> downloadPaths, final HistoricalContentDTO content )
throws InterruptedException, ExecutionException, IOException
Expand Down Expand Up @@ -533,12 +577,7 @@ private void restoreGenerateStatusFromDisk() throws IOException
List<File> contents = walkAllFiles( archiveDir );
for ( File content : contents )
{
if ( content.getName().endsWith( PART_ARCHIVE_SUFFIX ) )
{
treated.put( content.getName().split( PART_ARCHIVE_SUFFIX )[0],
ArchiveStatus.inProgress.getArchiveStatus() );
}
else if ( content.getName().endsWith( ARCHIVE_SUFFIX ) )
if ( content.getName().endsWith( ARCHIVE_SUFFIX ) )
{
treated.put( content.getName().split( ARCHIVE_SUFFIX )[0], ArchiveStatus.completed.getArchiveStatus() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.mutiny.Uni;
import io.vertx.core.eventbus.EventBus;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.ResponseBuilder;
import jakarta.ws.rs.core.UriInfo;
import org.apache.commons.io.FileUtils;
import org.commonjava.indy.service.archive.controller.ArchiveController;
import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO;
Expand All @@ -31,19 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.ws.rs.PathParam;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.ResponseBuilder;
import jakarta.ws.rs.core.UriInfo;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -72,14 +72,15 @@ public class ArchiveManageResources
@Inject
EventBus bus;


@Operation( description = "Generate archive based on tracked content" )
@APIResponse( responseCode = "202", description = "The archive created request is accepted" )
@RequestBody( description = "The tracked content definition JSON", name = "body", required = true, content = @Content( mediaType = APPLICATION_JSON, example =
"{" + "\"buildConfigId\": \"XXX\"," + "\"downloads\":" + "[{" + " \"storeKey\": \"\","
+ " \"path\": \"\"," + " \"md5\": \"\"," + " \"sha256\": \"\","
+ " \"sha1\": \"\"," + " \"size\": 001" + " }," + "..."
+ "]}", schema = @Schema( implementation = HistoricalContentDTO.class ) ) )
@RequestBody( description = "The tracked content definition JSON", name = "body", required = true,
content = @Content( mediaType = APPLICATION_JSON,
example = "{" + "\"buildConfigId\": \"XXX\"," + "\"downloads\":" + "[{"
+ " \"storeKey\": \"\"," + " \"path\": \"\"," + " \"md5\": \"\","
+ " \"sha256\": \"\"," + " \"sha1\": \"\"," + " \"size\": 001"
+ " }," + "..." + "]}",
schema = @Schema( implementation = HistoricalContentDTO.class ) ) )
@POST
@Path( "generate" )
@Consumes( APPLICATION_JSON )
Expand Down

0 comments on commit fb9068e

Please sign in to comment.