Skip to content

Commit

Permalink
Merge pull request #916 from komamitsu/unit-test-for-backup
Browse files Browse the repository at this point in the history
Add unit tests for FileBackup
  • Loading branch information
komamitsu authored Nov 4, 2024
2 parents e4631f0 + 35fbc63 commit a088925
Show file tree
Hide file tree
Showing 17 changed files with 486 additions and 308 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ subprojects {
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.10.2")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.10.2")
testImplementation("ch.qos.logback:logback-classic:1.3.14")
testImplementation("org.hamcrest:hamcrest-all:1.3")
testImplementation("org.assertj:assertj-core:3.26.3")
testImplementation("org.mockito:mockito-core:4.11.0")
testImplementation("com.google.guava:guava:33.1.0-jre")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
Expand Down Expand Up @@ -54,7 +55,7 @@ public FileBackup(File backupDir, Buffer userBuffer, String prefix)
this.backupDir = backupDir;
this.userBuffer = userBuffer;
this.prefix = prefix;
this.pattern = Pattern.compile(userBuffer.bufferFormatType() + prefix() + PARAM_DELIM_IN_FILENAME + "([\\w\\.\\-" + PARAM_DELIM_IN_FILENAME + "]+)" + EXT_FILENAME);
this.pattern = Pattern.compile(String.format("^%s%s%s([\\w.\\-%s]+)%s$", userBuffer.bufferFormatType(), prefix(), PARAM_DELIM_IN_FILENAME, PARAM_DELIM_IN_FILENAME, EXT_FILENAME));
LOG.debug(this.toString());
}

Expand Down Expand Up @@ -83,9 +84,9 @@ public List<SavedBuffer> getSavedFiles()
}

LOG.debug("Checking backup files. files.length={}", files.length);
ArrayList<SavedBuffer> savedBuffers = new ArrayList<>();
List<SavedBuffer> savedBuffers = new ArrayList<>();
for (File f : files) {
Matcher matcher = pattern.matcher(f.getName());
Matcher matcher = pattern.matcher(f.toPath().getFileName().toString());
if (matcher.find()) {
if (matcher.groupCount() != 1) {
LOG.warn("Invalid backup filename: file={}", f.getName());
Expand Down Expand Up @@ -133,15 +134,15 @@ public void saveBuffer(List<String> params, ByteBuffer buffer)
channel.write(buffer);
}
catch (Exception e) {
LOG.error("Failed to save buffer to file: params=" + copiedParams + ", path=" + file.getAbsolutePath() + ", buffer=" + buffer, e);
LOG.error("Failed to save buffer to file: params={}, path={}, buffer={}", copiedParams, file.getAbsolutePath(), buffer, e);
}
finally {
if (channel != null) {
try {
channel.close();
}
catch (IOException e) {
LOG.warn("Failed to close Channel: channel=" + channel);
LOG.warn("Failed to close Channel: channel={}", channel);
}
}
}
Expand All @@ -168,22 +169,22 @@ public void open(Callback callback)
success();
}
catch (Exception e) {
LOG.error("Failed to process file. Skipping the file: file=" + savedFile, e);
LOG.error("Failed to process file. Skipping the file: file={}", savedFile, e);
}
finally {
try {
close();
}
catch (IOException e) {
LOG.warn("Failed to close file: file=" + savedFile, e);
LOG.warn("Failed to close file: file={}", savedFile, e);
}
}
}

public void remove()
{
if (!savedFile.delete()) {
LOG.warn("Failed to delete file: file=" + savedFile);
LOG.warn("Failed to delete file: file={}", savedFile);
}
}

Expand All @@ -193,7 +194,7 @@ private void success()
close();
}
catch (IOException e) {
LOG.warn("Failed to close file: file=" + savedFile, e);
LOG.warn("Failed to close file: file={}", savedFile, e);
}
finally {
remove();
Expand All @@ -210,6 +211,18 @@ public void close()
}
}

@Override
public String toString() {
return "SavedBuffer{" +
"params=" + params +
", savedFile=" + savedFile +
'}';
}

public Path getPath() {
return savedFile.toPath();
}

public interface Callback
{
void process(List<String> params, FileChannel channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -83,9 +82,9 @@ void testGetAllocatedBufferSize()
Buffer buffer = new Buffer(bufferConfig, new JsonRecordFormatter());
Flusher flusher = new Flusher(flusherConfig, buffer, ingester);
try (Fluency fluency = new Fluency(buffer, flusher)) {
assertThat(fluency.getAllocatedBufferSize(), is(0L));
assertThat(fluency.getAllocatedBufferSize()).isEqualTo(0L);
fluency.emit("foodb.bartbl", ImmutableMap.of("comment", "hello, world"));
assertThat(fluency.getAllocatedBufferSize(), is(1024L));
assertThat(fluency.getAllocatedBufferSize()).isEqualTo(1024L);
}
}

Expand Down Expand Up @@ -122,7 +121,7 @@ void testWaitUntilFlusherTerminated(int waitUntilFlusherTerm, boolean expected)

fluency.emit("foo.bar", new HashMap<>());
fluency.close();
assertThat(fluency.waitUntilFlusherTerminated(waitUntilFlusherTerm), is(expected));
assertThat(fluency.waitUntilFlusherTerminated(waitUntilFlusherTerm)).isEqualTo(expected);
}

@ParameterizedTest
Expand All @@ -136,7 +135,7 @@ void testWaitUntilFlushingAllBuffer(int waitUntilFlusherTerm, boolean expected)
Flusher flusher = new Flusher(flusherConfig, buffer, ingester);
try (Fluency fluency = new Fluency(buffer, flusher)) {
fluency.emit("foo.bar", new HashMap<>());
assertThat(fluency.waitUntilAllBufferFlushed(waitUntilFlusherTerm), is(expected));
assertThat(fluency.waitUntilAllBufferFlushed(waitUntilFlusherTerm)).isEqualTo(expected);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.hamcrest.number.OrderingComparison.lessThan;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -337,13 +334,13 @@ void testGetAllocatedSize()
{
bufferConfig.setChunkInitialSize(256 * 1024);
try (Buffer buffer = new Buffer(bufferConfig, recordFormatter)) {
assertThat(buffer.getAllocatedSize(), is(0L));
assertThat(buffer.getAllocatedSize()).isEqualTo(0L);
Map<String, Object> map = new HashMap<>();
map.put("name", "komamitsu");
for (int i = 0; i < 10; i++) {
buffer.append("foo.bar", new Date().getTime(), map);
}
assertThat(buffer.getAllocatedSize(), is(256 * 1024L));
assertThat(buffer.getAllocatedSize()).isEqualTo(256 * 1024L);
}
}

Expand All @@ -353,18 +350,18 @@ void testGetBufferedDataSize()
{
bufferConfig.setChunkInitialSize(256 * 1024);
try (Buffer buffer = new Buffer(bufferConfig, recordFormatter)) {
assertThat(buffer.getBufferedDataSize(), is(0L));
assertThat(buffer.getBufferedDataSize()).isEqualTo(0L);

Map<String, Object> map = new HashMap<>();
map.put("name", "komamitsu");
for (int i = 0; i < 10; i++) {
buffer.append("foo.bar", new Date().getTime(), map);
}
assertThat(buffer.getBufferedDataSize(), is(greaterThan(0L)));
assertThat(buffer.getBufferedDataSize(), is(lessThan(512L)));
assertThat(buffer.getBufferedDataSize()).isGreaterThan(0L);
assertThat(buffer.getBufferedDataSize()).isLessThan(512L);

buffer.flush(ingester, true);
assertThat(buffer.getBufferedDataSize(), is(0L));
assertThat(buffer.getBufferedDataSize()).isEqualTo(0L);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package org.komamitsu.fluency.buffer;

import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

class FileBackupTest {

private void createTempFile(File dir, String filename, String content) throws IOException {
Path tempfilePath = Files.createFile(dir.toPath().resolve(filename));
Files.write(tempfilePath, content.getBytes(StandardCharsets.UTF_8));
tempfilePath.toFile().deleteOnExit();
}

private void assertSavedBuffer(FileBackup.SavedBuffer savedBuffer, Path expectedPath, byte[] expectedContent, String... expectedParams) {
assertThat(savedBuffer.getPath()).isEqualTo(expectedPath);
savedBuffer.open((params, channel) -> {
assertThat(params.toArray()).isEqualTo(expectedParams);
try {
long size = channel.size();
ByteBuffer buf = ByteBuffer.allocate((int) size);
channel.read(buf);
assertThat(buf.array()).isEqualTo(expectedContent);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

private void assertSavedFile(File savedFile,
String bufferFormatType,
String prefix,
long startNanos,
long endNanos,
String param1,
String param2,
byte[] expectedContent) throws IOException {
String fileName = savedFile.toPath().getFileName().toString();
assertThat(fileName).endsWith(".buf");

String[] partsOfPath = fileName.substring(0, fileName.length() - ".buf".length()).split("#");
assertThat(partsOfPath).hasSize(4);
assertThat(partsOfPath[0]).isEqualTo(bufferFormatType + "_" + prefix);
assertThat(partsOfPath[1]).isEqualTo(param1);
assertThat(partsOfPath[2]).isEqualTo(param2);
assertThat(Long.valueOf(partsOfPath[3])).isBetween(startNanos, endNanos);
assertThat(Files.readAllBytes(savedFile.toPath())).isEqualTo(expectedContent);
}

@Test
void getSavedFiles_GivenEmptyFiles_ShouldReturnEmpty() throws IOException {
File backupDir = Files.createTempDirectory("test").toFile();
backupDir.deleteOnExit();
Buffer buffer = mock(Buffer.class);
String prefix = "my_prefix";
FileBackup fileBackup = new FileBackup(backupDir, buffer, prefix);
assertThat(fileBackup.getSavedFiles()).isEmpty();
}

@Test
void getSavedFiles_GivenSomeFiles_ShouldReturnThem() throws IOException {
long nanoSeconds1 = System.nanoTime();
long nanoSeconds2 = System.nanoTime();
long nanoSeconds3 = System.nanoTime();
File backupDir = Files.createTempDirectory("test").toFile();
backupDir.deleteOnExit();
createTempFile(backupDir,
String.format("xmy_buf_type_my_prefix#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("xmy_buf_type_my_prefix#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("y_buf_type_my_prefix#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#1paramA#1paramB#%d.buf", nanoSeconds1),
"content1");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#2param-a#2param-b#%d.buf", nanoSeconds2),
"content2");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#3param_a#3param_b#%d.buf", nanoSeconds3),
"content3");
createTempFile(backupDir,
String.format("my_buf_type_my_prefixz#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefi#param_a#param_b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#param:a#param:b#%d.buf", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#param_a#param_b#%d", System.nanoTime()),
"ignored");
createTempFile(backupDir,
String.format("my_buf_type_my_prefix#param_a#param_b#%d.buff", System.nanoTime()),
"ignored");
Buffer buffer = mock(Buffer.class);
doReturn("my_buf_type").when(buffer).bufferFormatType();
String prefix = "my_prefix";
FileBackup fileBackup = new FileBackup(backupDir, buffer, prefix);

List<FileBackup.SavedBuffer> savedFiles = fileBackup.getSavedFiles().stream().sorted(
Comparator.comparing(FileBackup.SavedBuffer::getPath)).collect(Collectors.toList());
System.out.println(savedFiles);
assertThat(savedFiles).size().isEqualTo(3);
assertSavedBuffer(savedFiles.get(0),
backupDir.toPath().resolve(String.format("my_buf_type_my_prefix#1paramA#1paramB#%d.buf", nanoSeconds1)),
"content1".getBytes(StandardCharsets.UTF_8),
"1paramA",
"1paramB");
assertSavedBuffer(savedFiles.get(1),
backupDir.toPath().resolve(String.format("my_buf_type_my_prefix#2param-a#2param-b#%d.buf", nanoSeconds2)),
"content2".getBytes(StandardCharsets.UTF_8),
"2param-a",
"2param-b");
assertSavedBuffer(savedFiles.get(2),
backupDir.toPath().resolve(String.format("my_buf_type_my_prefix#3param_a#3param_b#%d.buf", nanoSeconds3)),
"content3".getBytes(StandardCharsets.UTF_8),
"3param_a",
"3param_b");
}

@Test
void saveBuffer() throws IOException {
File backupDir = Files.createTempDirectory("test").toFile();
backupDir.deleteOnExit();
Buffer buffer = mock(Buffer.class);
doReturn("my_buf_type").when(buffer).bufferFormatType();
String prefix = "my_prefix";
FileBackup fileBackup = new FileBackup(backupDir, buffer, prefix);
long startNanos = System.nanoTime();
fileBackup.saveBuffer(
Arrays.asList("1paramA", "1paramB"),
ByteBuffer.wrap("content1".getBytes(StandardCharsets.UTF_8)));
fileBackup.saveBuffer(
Arrays.asList("2param-a", "2param-b"),
ByteBuffer.wrap("content2".getBytes(StandardCharsets.UTF_8)));
fileBackup.saveBuffer(
Arrays.asList("3param_a", "3param_b"),
ByteBuffer.wrap("content3".getBytes(StandardCharsets.UTF_8)));
long endNanos = System.nanoTime();

List<File> savedFiles = Arrays.stream(Objects.requireNonNull(backupDir.listFiles()))
.sorted(Comparator.comparing(File::toString))
.collect(Collectors.toList());
assertThat(savedFiles).size().isEqualTo(3);
assertSavedFile(savedFiles.get(0),
"my_buf_type",
"my_prefix",
startNanos,
endNanos,
"1paramA",
"1paramB",
"content1".getBytes(StandardCharsets.UTF_8));
assertSavedFile(savedFiles.get(1),
"my_buf_type",
"my_prefix",
startNanos,
endNanos,
"2param-a",
"2param-b",
"content2".getBytes(StandardCharsets.UTF_8));
assertSavedFile(savedFiles.get(2),
"my_buf_type",
"my_prefix",
startNanos,
endNanos,
"3param_a",
"3param_b",
"content3".getBytes(StandardCharsets.UTF_8));
}
}
Loading

0 comments on commit a088925

Please sign in to comment.