diff --git a/src/main/java/net/ripe/rpki/rsyncit/rrdp/RpkiObject.java b/src/main/java/net/ripe/rpki/rsyncit/rrdp/RpkiObject.java index 3b53362..db10d08 100644 --- a/src/main/java/net/ripe/rpki/rsyncit/rrdp/RpkiObject.java +++ b/src/main/java/net/ripe/rpki/rsyncit/rrdp/RpkiObject.java @@ -3,6 +3,6 @@ import java.net.URI; import java.time.Instant; -public record RpkiObject(URI url, byte[] bytes, Instant createdAt) { +public record RpkiObject(URI url, byte[] bytes, Instant modificationTime) { } diff --git a/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java b/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java index 1c210af..b35c1da 100644 --- a/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java +++ b/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java @@ -5,14 +5,18 @@ import lombok.Data; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; +@Slf4j @Getter public class State { @@ -25,14 +29,15 @@ public State() { } // Remove entries that were not mentioned in RRDP repository for a while - public synchronized void removeOldObject(Instant cutOffTime) { - var hashesToDelete = new ArrayList<>(); - times.forEach((hash, t) -> { - if (t.getLastMentioned().isBefore(cutOffTime)) { - hashesToDelete.add(hash); - } - }); - hashesToDelete.forEach(times::remove); + public void removeOldObject(Instant cutOffTime) { + var expired = times.entrySet().stream().filter(entry -> + entry.getValue().getLastMentioned().isBefore(cutOffTime) + ); + + var removed = expired.mapToInt(entry -> Boolean.compare(times.remove(entry.getKey(), entry.getValue()), false)); + if (log.isInfoEnabled()) { + log.debug("Cleaned {} items from timestamp cache", removed.sum()); + } } public Instant cacheTimestamps(String hash, Instant now, Supplier createdAt) { diff --git a/src/main/java/net/ripe/rpki/rsyncit/rsync/RsyncWriter.java b/src/main/java/net/ripe/rpki/rsyncit/rsync/RsyncWriter.java index 0c18e5f..c38aeaf 100644 --- a/src/main/java/net/ripe/rpki/rsyncit/rsync/RsyncWriter.java +++ b/src/main/java/net/ripe/rpki/rsyncit/rsync/RsyncWriter.java @@ -100,7 +100,7 @@ private Path writeObjectToNewDirectory(List objects, Instant now) th try { Files.write(fullPath, o.bytes()); // rsync relies on the correct timestamp for fast synchronization - Files.setLastModifiedTime(fullPath, FileTime.from(o.createdAt())); + Files.setLastModifiedTime(fullPath, FileTime.from(o.modificationTime())); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/src/test/java/net/ripe/rpki/rsyncit/rrdp/StateTest.java b/src/test/java/net/ripe/rpki/rsyncit/rrdp/StateTest.java index dd1a860..d29b95a 100644 --- a/src/test/java/net/ripe/rpki/rsyncit/rrdp/StateTest.java +++ b/src/test/java/net/ripe/rpki/rsyncit/rrdp/StateTest.java @@ -1,45 +1,61 @@ package net.ripe.rpki.rsyncit.rrdp; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Instant; -import static org.junit.jupiter.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; class StateTest { + private State subject; + + @BeforeEach + public void setUp() { + subject = new State(); + } + + @Test + public void testBaseCase() { + assertThat(subject.getTimes()).isEmpty(); + } @Test public void testCachedTimestamps() { - var state = new State(); + var now = Instant.now(); + + var subject = new State(); var createdAt1 = Instant.ofEpochMilli(1000_000_000); var createdAt2 = Instant.ofEpochMilli(1000_100_000); - var now = Instant.now(); - assertEquals(createdAt1, state.cacheTimestamps("hash1", now, () -> createdAt1)); - assertEquals(createdAt1, state.cacheTimestamps("hash1", now, () -> createdAt1)); - assertEquals(createdAt2, state.cacheTimestamps("hash2", now, () -> createdAt2)); - assertEquals(createdAt2, state.cacheTimestamps("hash2", now, () -> createdAt2)); + + assertThat(subject.cacheTimestamps("hash1", now, () -> createdAt1)).isEqualTo(createdAt1); + assertThat(subject.cacheTimestamps("hash1", now, () -> createdAt1)).isEqualTo(createdAt1); + + assertThat(subject.cacheTimestamps("hash2", now, () -> createdAt2)).isEqualTo(createdAt2); + assertThat(subject.cacheTimestamps("hash2", now, () -> createdAt2)).isEqualTo(createdAt2); + + assertThat(subject.getTimes()).hasSize(2); } @Test public void testRemoveOldEntries() { - var state = new State(); - for (int i = 0; i < 10 ; i++) { var createdAt = Instant.ofEpochMilli(1000_000_000 + 10_000_000 * i); var now = Instant.ofEpochMilli(1000_000_000 + 10_000 * i); - state.cacheTimestamps("hash" + i, now, () -> createdAt); + subject.cacheTimestamps("hash" + i, now, () -> createdAt); } - assertEquals(10, state.getTimes().size()); - state.removeOldObject(Instant.ofEpochMilli(1000_000_000 + 10_000 * 5)); - assertEquals(5, state.getTimes().size()); + assertThat(subject.getTimes()).hasSize(10); + + subject.removeOldObject(Instant.ofEpochMilli(1000_000_000 + 10_000 * 5)); + assertThat(subject.getTimes()).hasSize(5); for (int i = 5; i < 10; i++) { var createdAt = Instant.ofEpochMilli(1000_000_000 + 10_000_000 * i); var mentioned = Instant.ofEpochMilli(1000_000_000 + 10_000 * i); - final State.Times times = state.getTimes().get("hash" + i); - assertEquals(createdAt, times.getCreatedAt()); - assertEquals(mentioned, times.getLastMentioned()); + final State.Times times = subject.getTimes().get("hash" + i); + assertThat(times.getCreatedAt()).isEqualTo(createdAt); + assertThat(times.getLastMentioned()).isEqualTo(mentioned); } } diff --git a/src/test/java/net/ripe/rpki/rsyncit/rsync/RsyncWriterTest.java b/src/test/java/net/ripe/rpki/rsyncit/rsync/RsyncWriterTest.java index 56e06ca..79ce266 100644 --- a/src/test/java/net/ripe/rpki/rsyncit/rsync/RsyncWriterTest.java +++ b/src/test/java/net/ripe/rpki/rsyncit/rsync/RsyncWriterTest.java @@ -3,6 +3,7 @@ import net.ripe.rpki.rsyncit.config.Config; import net.ripe.rpki.rsyncit.rrdp.RpkiObject; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; @@ -44,13 +45,13 @@ public void testRegexPublished() { } @Test - public void testWriteNoObjects() { - withRsyncWriter(rsyncWriter -> rsyncWriter.writeObjects(Collections.emptyList())); + public void testWriteNoObjects(@TempDir Path tmpPath) throws Exception { + withRsyncWriter(tmpPath, rsyncWriter -> rsyncWriter.writeObjects(Collections.emptyList())); } @Test - public void testWriteMultipleObjects() { - withRsyncWriter(rsyncWriter -> { + public void testWriteMultipleObjects(@TempDir Path tmpPath) throws Exception { + withRsyncWriter(tmpPath, rsyncWriter -> { var o1 = new RpkiObject(URI.create("rsync://bla.net/path1/a.cer"), someBytes(), Instant.now()); var o2 = new RpkiObject(URI.create("rsync://bla.net/path1/b.cer"), someBytes(), Instant.now()); var o3 = new RpkiObject(URI.create("rsync://bla.net/path1/nested/c.cer"), someBytes(), Instant.now()); @@ -69,13 +70,14 @@ public void testWriteMultipleObjects() { } @Test - public void testRemoveOldDirectoriesWhenTheyAreOld() { + public void testRemoveOldDirectoriesWhenTheyAreOld(@TempDir Path tmpPath) throws Exception { final Function writeSomeObjects = rsyncWriter -> rsyncWriter.writeObjects(IntStream.range(0, 10).mapToObj(i -> new RpkiObject(URI.create("rsync://bla.net/path1/" + i + ".cer"), someBytes(), Instant.now()) ).collect(Collectors.toList())); withRsyncWriter( + tmpPath, // make it ridiculous so that we clean up everything except for the last directory config -> config.withTargetDirectoryRetentionPeriodMs(100).withTargetDirectoryRetentionCopiesCount(0), rsyncWriter -> { @@ -94,13 +96,14 @@ public void testRemoveOldDirectoriesWhenTheyAreOld() { } @Test - public void testRemoveOldDirectoriesButKeepSomeNumberOfThem() { + public void testRemoveOldDirectoriesButKeepSomeNumberOfThem(@TempDir Path tmpDir) throws Exception { final Function writeSomeObjects = rsyncWriter -> rsyncWriter.writeObjects(IntStream.range(0, 10).mapToObj(i -> new RpkiObject(URI.create("rsync://bla.net/path1/" + i + ".cer"), someBytes(), Instant.now()) ).collect(Collectors.toList())); withRsyncWriter( + tmpDir, // make it ridiculous so that we clean up everything except for the last directory config -> config.withTargetDirectoryRetentionPeriodMs(100).withTargetDirectoryRetentionCopiesCount(2), rsyncWriter -> { @@ -121,37 +124,19 @@ private static void sleep(long millis) { } } - private static void checkFile(Path path, byte[] bytes) { - try { - final File file = path.toFile(); - final byte[] readBackBytes = Files.readAllBytes(path); - assertThat(file.exists()).isTrue(); - assertThat(Arrays.equals(bytes, readBackBytes)).isTrue(); - } catch (IOException e) { - throw new RuntimeException(e); - } + private static void checkFile(Path path, byte[] bytes) throws IOException { + final byte[] readBackBytes = Files.readAllBytes(path); + assertThat(path.toFile().exists()).isTrue(); + assertThat(readBackBytes).isEqualTo(bytes); } - static void withRsyncWriter(Function transformConfig, Consumer actualTest) { - Path tmp = null; - try { - final Config config = defaultConfig(); - final Path rsyncitDir = Files.createTempDirectory("rsyncit"); - final Config config1 = config.withRsyncPath(rsyncitDir.toAbsolutePath().toString()); - tmp = Files.createTempDirectory(Path.of(config1.rsyncPath()), "rsync-writer-test-"); - final Config config2 = config.withRsyncPath(tmp.toAbsolutePath().toString()); - actualTest.accept(new RsyncWriter(transformConfig.apply(config2))); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if (tmp != null) { - tmp.toFile().delete(); - } - } + static void withRsyncWriter(Path tmpPath, Function transformConfig, ThrowingConsumer actualTest) throws Exception { + final Config config = defaultConfig().withRsyncPath(tmpPath.toString()); + actualTest.accept(new RsyncWriter(transformConfig.apply(config))); } - static void withRsyncWriter(Consumer actualTest) { - withRsyncWriter(Function.identity(), actualTest); + static void withRsyncWriter(Path tmpPath, ThrowingConsumer actualTest) throws Exception { + withRsyncWriter(tmpPath, Function.identity(), actualTest); } private static final Random random = new Random(); @@ -163,4 +148,7 @@ private static byte[] someBytes() { return bytes; } + interface ThrowingConsumer { + void accept(T t) throws Exception; + } } \ No newline at end of file