Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental API and disk format for task caching #210

Merged
merged 13 commits into from
Aug 15, 2023
7 changes: 7 additions & 0 deletions build-caching/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
<version>0.15.0</version>
</dependency>

<!-- JSON serialization -->
<dependency>
niloc132 marked this conversation as resolved.
Show resolved Hide resolved
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.vertispan.j2cl.build;

import com.vertispan.j2cl.build.task.CachedPath;
import com.vertispan.j2cl.build.task.Input;

import java.nio.file.Path;
import java.util.Optional;

public class ChangedCachedPath implements Input.ChangedCachedPath {
private final ChangeType type;
private final Path sourcePath;
private final Optional<CachedPath> newIfAny;

public ChangedCachedPath(ChangeType type, Path sourcePath, CachedPath newPath) {
this.type = type;
this.sourcePath = sourcePath;
this.newIfAny = Optional.ofNullable(newPath);
}

@Override
public ChangeType changeType() {
return type;
}

@Override
public Path getSourcePath() {
return sourcePath;
}

@Override
public Optional<Path> getNewAbsolutePath() {
return newIfAny.map(CachedPath::getAbsolutePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,8 @@ public DefaultDiskCache(File cacheDir, Executor executor) throws IOException {
}

@Override
protected Path taskDir(CollectedTaskInputs inputs) {
String projectName = inputs.getProject().getKey();
Murmur3F hash = new Murmur3F();

hash.update(inputs.getTaskFactory().getClass().toString().getBytes(StandardCharsets.UTF_8));
hash.update(inputs.getTaskFactory().getTaskName().getBytes(StandardCharsets.UTF_8));
hash.update(inputs.getTaskFactory().getVersion().getBytes(StandardCharsets.UTF_8));

for (Input input : inputs.getInputs()) {
input.updateHash(hash);
}

for (Map.Entry<String, String> entry : inputs.getUsedConfigs().entrySet()) {
hash.update(entry.getKey().getBytes(StandardCharsets.UTF_8));
if (entry.getValue() == null) {
hash.update(0);
} else {
hash.update(entry.getValue().getBytes(StandardCharsets.UTF_8));
}
}

return cacheDir.toPath().resolve(projectName.replaceAll("[^\\-_a-zA-Z0-9.]", "-")).resolve(hash.getValueHexString() + "-" + inputs.getTaskFactory().getOutputType());
protected Path taskDir(String projectName, String hashString, String outputType) {
return cacheDir.toPath().resolve(projectName.replaceAll("[^\\-_a-zA-Z0-9.]", "-")).resolve(hashString + "-" + outputType);
}

@Override
Expand All @@ -72,4 +52,9 @@ protected Path logFile(Path taskDir) {
protected Path outputDir(Path taskDir) {
return taskDir.resolve("results");
}

@Override
protected Path cacheSummary(Path taskDir) {
return taskDir.resolve("cacheSummary.json");
niloc132 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package com.vertispan.j2cl.build;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.vertispan.j2cl.build.impl.CollectedTaskInputs;
import com.vertispan.j2cl.build.task.CachedPath;
import io.methvin.watcher.PathUtils;
import io.methvin.watcher.hashing.FileHash;
import io.methvin.watcher.hashing.FileHasher;
import io.methvin.watcher.hashing.Murmur3F;
import io.methvin.watchservice.MacOSXListeningWatchService;
import io.methvin.watchservice.WatchablePath;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.WatchService;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
* Manages the cached task inputs and outputs.
* Manages the cached task inputs and outputs, without direct knowledge of the project or task apis.
*/
public abstract class DiskCache {
private static final boolean IS_MAC = System.getProperty("os.name").toLowerCase().contains("mac");
Expand All @@ -38,6 +41,10 @@ public CacheResult(Path taskDir) {
this.taskDir = taskDir;
}

public Path taskDir() {
return taskDir;
}

public Path logFile() {
//TODO finish building a logger that will write to this
return DiskCache.this.logFile(taskDir);
Expand All @@ -54,6 +61,10 @@ public TaskOutput output() {
return taskOutput;
}

public Path cachedSummary() {
return DiskCache.this.cacheSummary(taskDir);
}

public void markSuccess() {
markFinished(this);
runningTasks.remove(taskDir);
Expand Down Expand Up @@ -340,16 +351,57 @@ private void deleteRecursively(Path path) throws IOException {
}
}

protected abstract Path taskDir(CollectedTaskInputs inputs);
private String taskSummaryContents(CollectedTaskInputs inputs) {
TaskSummaryDiskFormat src = new TaskSummaryDiskFormat();
src.setProjectKey(inputs.getProject().getKey());
src.setOutputType(inputs.getTaskFactory().getOutputType());
src.setTaskImpl(inputs.getTaskFactory().getClass().getName());
src.setTaskImplVersion(inputs.getTaskFactory().getVersion());

src.setInputs(inputs.getInputs().stream()
.map(Input::makeDiskFormat)
.collect(Collectors.groupingBy(i -> i.getProjectKey() + "-" + i.getOutputType()))
.values().stream()
.map(list -> {
TaskSummaryDiskFormat.InputDiskFormat result = new TaskSummaryDiskFormat.InputDiskFormat();
result.setProjectKey(list.get(0).getProjectKey());
result.setOutputType(list.get(0).getOutputType());

result.setFileHashes(
list.stream().flatMap(i -> i.getFileHashes().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (left, right) -> {
if (left.equals(right)) {
return left;
}
throw new IllegalStateException("Two hashes for one file! " + left + " vs " + right);
}))
);

return result;
})
.collect(Collectors.toList()));

src.setConfigs(inputs.getUsedConfigs());

return new GsonBuilder().serializeNulls().setPrettyPrinting().create().toJson(src);
}

protected abstract Path taskDir(String projectName, String hashString, String outputType);

protected abstract Path successMarker(Path taskDir);
protected abstract Path failureMarker(Path taskDir);
protected abstract Path logFile(Path taskDir);
protected abstract Path outputDir(Path taskDir);
protected abstract Path cacheSummary(Path taskDir);

interface Listener {
/** Ready for the current listener to do the work */
void onReady(CacheResult result);
/** Someone else did it, but failed for some reason, not re-runnable */
void onFailure(CacheResult result);
/** Someone else tried to do it, but ran into an error, possibly recoverable if we try again */
void onError(Throwable throwable);
/** Someone else finished it, successfully, notify listeners */
void onSuccess(CacheResult result);
}
public class PendingCacheResult implements Cancelable {
Expand Down Expand Up @@ -431,7 +483,14 @@ private synchronized void failure() {
*/
public void waitForTask(CollectedTaskInputs taskDetails, Listener listener) {
assert taskDetails.getInputs().stream().allMatch(Input::hasContents);
final Path taskDir = taskDir(taskDetails);

Murmur3F murmur3F = new Murmur3F();
byte[] taskSummaryContents = taskSummaryContents(taskDetails).getBytes(StandardCharsets.UTF_8);
murmur3F.update(taskSummaryContents);
String hashString = murmur3F.getValueHexString();

final Path taskDir = taskDir(taskDetails.getProject().getKey(), hashString, taskDetails.getTaskFactory().getOutputType());

PendingCacheResult cancelable = new PendingCacheResult(taskDir, listener);
taskFutures.computeIfAbsent(taskDir, ignore -> Collections.newSetFromMap(new ConcurrentHashMap<>())).add(cancelable);
try {
Expand All @@ -449,6 +508,7 @@ public void waitForTask(CollectedTaskInputs taskDetails, Listener listener) {
// caller can begin work right away
Files.createDirectory(outputDir);
Files.createFile(logFile(taskDir));
Files.write(cacheSummary(taskDir), taskSummaryContents);
cancelable.ready();
return;
}
Expand Down Expand Up @@ -546,4 +606,13 @@ public void markFailed(CacheResult failedResult) {
}
}

public Optional<CacheResult> getCacheResult(Path taskDir) {
if (Files.exists(taskDir) || Files.exists(successMarker(taskDir))) {
CacheResult result = new CacheResult(taskDir);
knownOutputs.computeIfAbsent(taskDir, this::makeOutput);
return Optional.of(result);
}
return Optional.empty();
}

}
67 changes: 58 additions & 9 deletions build-caching/src/main/java/com/vertispan/j2cl/build/Input.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@
* interested in, and take the hash of the hashes to represent
*/
public class Input implements com.vertispan.j2cl.build.task.Input {
public interface BuildSpecificChanges {
List<? extends ChangedCachedPath> compute();
static BuildSpecificChanges memoize(BuildSpecificChanges changes) {
return new BuildSpecificChanges() {
private List<ChangedCachedPath> results;
@Override
public List<ChangedCachedPath> compute() {
if (results == null) {
List<? extends ChangedCachedPath> computed = changes.compute();
assert computed != null;
results = new ArrayList<>(computed);
}

return results;
}
};
}
}
private final Project project;
private final String outputType;

private TaskOutput contents;
private BuildSpecificChanges buildSpecificChanges;

public Input(Project project, String outputType) {
this.project = project;
Expand Down Expand Up @@ -66,6 +85,13 @@ public Collection<DiskCache.CacheEntry> getFilesAndHashes() {
.collect(Collectors.toList());
}

@Override
public Collection<? extends ChangedCachedPath> getChanges() {
return wrapped.getChanges().stream()
.filter(entry -> Arrays.stream(filters).anyMatch(f -> f.matches(entry.getSourcePath())))
.collect(Collectors.toList());
}

@Override
public com.vertispan.j2cl.build.task.Project getProject() {
return wrapped.getProject();
Expand Down Expand Up @@ -101,19 +127,34 @@ public void setCurrentContents(TaskOutput contents) {
}

/**
* Internal API.
* Internal API
*
* Updates the given hash object with the filtered file inputs - their paths and their
* hashes, so that if files are moved or changed we change the hash value, but we don't
* re-hash each file every time we ask.
* Once a task is finished, we can let it be used by other tasks as an input. In
* order for those tasks to execute incrementally, each particular Input must
* only have changed relative to the last time that its owner task ran successfully.
* Instead of being set all at once, this is re-assigned before each task actually
* executes.
*/
public void updateHash(Murmur3F hash) {
for (DiskCache.CacheEntry fileAndHash : getFilesAndHashes()) {
hash.update(fileAndHash.getSourcePath().toString().getBytes(StandardCharsets.UTF_8));
hash.update(fileAndHash.getHash().asBytes());
}
public void setBuildSpecificChanges(BuildSpecificChanges buildSpecificChanges) {
this.buildSpecificChanges = buildSpecificChanges;
}

public TaskSummaryDiskFormat.InputDiskFormat makeDiskFormat() {
TaskSummaryDiskFormat.InputDiskFormat out = new TaskSummaryDiskFormat.InputDiskFormat();

out.setProjectKey(getProject().getKey());
out.setOutputType(getOutputType());

out.setFileHashes(getFilesAndHashes().stream().collect(Collectors.toMap(
e -> e.getSourcePath().toString(),
e -> e.getHash().asString()
)));

return out;
}



@Override
public Project getProject() {
return project;
Expand All @@ -139,6 +180,14 @@ public Collection<DiskCache.CacheEntry> getFilesAndHashes() {
return contents.filesAndHashes();
}

@Override
public Collection<? extends ChangedCachedPath> getChanges() {
if (buildSpecificChanges == null) {
throw new NullPointerException("Changes not yet provided " + this);
}
return buildSpecificChanges.compute();
}

@Override
public String toString() {
return "Input{" +
Expand Down
Loading