Skip to content

Commit

Permalink
Fix hanging files at instance destroy
Browse files Browse the repository at this point in the history
  • Loading branch information
Yann Diorcet committed Dec 9, 2015
1 parent f9ceff2 commit f823115
Showing 1 changed file with 179 additions and 30 deletions.
209 changes: 179 additions & 30 deletions src/main/java/com/jcabi/mysql/maven/plugin/Instances.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -97,8 +98,8 @@ public final class Instances {
/**
* Running processes.
*/
private final transient ConcurrentMap<Integer, Process> processes =
new ConcurrentHashMap<Integer, Process>(0);
private final transient ConcurrentMap<Integer, DatabaseInstance> processes =
new ConcurrentHashMap<Integer, DatabaseInstance>(0);

/**
* If true, always create a new database. If false, check if there is an
Expand Down Expand Up @@ -127,18 +128,14 @@ public void start(@NotNull final Config config, @NotNull final File dist,
String.format("port %d is already busy", config.port())
);
}
final Process proc = this.process(config, dist, target, socket);
this.processes.put(config.port(), proc);
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable() {
@Override
public void run() {
Instances.this.stop(config.port());
}
}
)
final DatabaseInstance instance = this.process(
config,
dist,
target,
socket
);
this.processes.put(config.port(), instance);
instance.setShutdownHook(config.port());
}
}

Expand All @@ -148,9 +145,10 @@ public void run() {
*/
public void stop(final int port) {
synchronized (this.processes) {
final Process proc = this.processes.remove(port);
if (proc != null) {
proc.destroy();
final DatabaseInstance instance = this.processes.remove(port);
if (instance != null) {
instance.removeShutdownHook();
instance.halt();
}
}
}
Expand All @@ -170,11 +168,11 @@ public boolean reusedExistingDatabase() {
* @param dist Path to MySQL distribution
* @param target Where to keep temp data
* @param socketfile Alternative socket location for mysql (may be null)
* @return Process started
* @return DatabaseInstance containing the process and the thread
* @throws IOException If fails to start
* @checkstyle ParameterNumberCheck (10 lines)
*/
private Process process(@NotNull final Config config,
private DatabaseInstance process(@NotNull final Config config,
final File dist, final File target, final File socketfile)
throws IOException {
final File temp = this.prepareFolders(target);
Expand Down Expand Up @@ -211,24 +209,14 @@ private Process process(@NotNull final Config config,
}
}
final Process proc = builder.start();
final Thread thread = new Thread(
new VerboseRunnable(
new Callable<Void>() {
@Override
public Void call() throws Exception {
new VerboseProcess(proc).stdoutQuietly();
return null;
}
}
)
);
final VerboseProcessThread thread = new VerboseProcessThread(proc);
thread.setDaemon(true);
thread.start();
this.waitFor(socket, config.port());
if (this.clean) {
this.configure(config, dist, socket);
}
return proc;
return new DatabaseInstance(proc, thread);
}

/**
Expand Down Expand Up @@ -441,4 +429,165 @@ private void setClean(final File target, final boolean deldir) {
Logger.info(this, "reuse existing database %s", !this.clean);
}

private class DatabaseInstance {

/**
* Database process.
*/
private final Process process;

/**
* Database verbose thread.
*/
private final VerboseProcessThread thread;

/**
* Database shutdown thread.
*/
private Thread shutdownthread;

/**
* Create a Database instance.
* @param gprocess The process of the database.
* @param gthread The monitor thread of the database.
*/
public DatabaseInstance(
final Process gprocess,
final VerboseProcessThread gthread
) {
this.process = gprocess;
this.thread = gthread;
}

/**
* Return the process associated to the database.
* @return The process
*/
public Process getProcess() {
return this.process;
}

/**
* Return the verbose thread associated to the database.
* @return The thread
*/
public VerboseProcessThread getThread() {
return this.thread;
}

/**
* Set the thread to call on the shutdown.
* @param port The port of the database to kill on shutdown.
*/
public void setShutdownHook(final int port) {
if (this.shutdownthread == null) {
this.shutdownthread = new Thread(
new Runnable() {
@Override
public void run() {
Instances.this.stop(port);
}
}
);
Runtime.getRuntime().addShutdownHook(this.shutdownthread);
}
}

/**
* Remove the thread to call on the shutdown.
*/
public void removeShutdownHook() {
if (this.shutdownthread != null) {
Runtime.getRuntime().removeShutdownHook(this.shutdownthread);
this.shutdownthread = null;
}
}

/**
* Halt the database.
*/
public void halt() {
this.thread.halt();
if (this.process != null) {
IOUtils.closeQuietly(this.process.getInputStream());
IOUtils.closeQuietly(this.process.getOutputStream());
IOUtils.closeQuietly(this.process.getErrorStream());
this.process.destroy();
}
}
}

/**
* Stoppable process proxy.
*/
private class VerboseProcessThread extends Thread {

/**
* Waiting time for the process thread after a halt.
*/
private static final long HALT_TIMEOUT = 3L;

/**
* The proxy to the process in order to correctly handle the halt.
*/
private final VerboseRunnable runnable;

/**
* Stop status.
*/
private transient boolean stop;

/**
* Create a proxy of a process in order to be able to halt it.
* @param process The process to watch.
*/
public VerboseProcessThread(final Process process) {
this.runnable = new VerboseRunnable(
new Callable<Void>() {
@Override
public Void call() throws Exception {
final VerboseProcess verboseProcess =
new VerboseProcess(process);
try {
verboseProcess.stdoutQuietly();
} catch (final RuntimeException exception) {
if (!VerboseProcessThread.this.stop) {
throw exception;
}
} finally {
verboseProcess.close();
}
return null;
}
}
);
}

/**
* Run the process thread.
*/
@Override
public void run() {
try {
this.runnable.run();
} catch (final RuntimeException exception) {
if (!this.stop) {
throw exception;
}
}
}

/**
* Halt the process thread.
*/
public void halt() {
this.stop = true;
this.interrupt();
try {
TimeUnit.SECONDS.timedJoin(this, HALT_TIMEOUT);
} catch (final InterruptedException exception) {
Logger.warn(this, "Can't stop the thread", exception);
}
}
}
}

0 comments on commit f823115

Please sign in to comment.