diff --git a/fluency-core/src/main/java/org/komamitsu/fluency/buffer/Buffer.java b/fluency-core/src/main/java/org/komamitsu/fluency/buffer/Buffer.java index cd0ad716..b0335f67 100644 --- a/fluency-core/src/main/java/org/komamitsu/fluency/buffer/Buffer.java +++ b/fluency-core/src/main/java/org/komamitsu/fluency/buffer/Buffer.java @@ -189,8 +189,9 @@ private RetentionBuffer prepareBuffer(String tag, int writeSize) RetentionBuffer newBuffer = new RetentionBuffer(acquiredBuffer, System.currentTimeMillis()); if (retentionBuffer != null) { - retentionBuffer.getByteBuffer().flip(); - newBuffer.getByteBuffer().put(retentionBuffer.getByteBuffer()); + ByteBuffer buf = retentionBuffer.getByteBuffer().duplicate(); + buf.flip(); + newBuffer.getByteBuffer().put(buf); bufferPool.returnBuffer(retentionBuffer.getByteBuffer()); } LOG.trace("prepareBuffer(): allocate a new buffer. tag={}, buffer={}", tag, newBuffer); @@ -334,8 +335,9 @@ private void moveRetentionBufferToFlushable(String tag, RetentionBuffer buffer) { try { LOG.trace("moveRetentionBufferToFlushable(): tag={}, buffer={}", tag, buffer); - buffer.getByteBuffer().flip(); - flushableBuffers.put(new TaggableBuffer(tag, buffer.getByteBuffer())); + ByteBuffer buf = buffer.getByteBuffer().duplicate(); + buf.flip(); + flushableBuffers.put(new TaggableBuffer(tag, buf)); retentionBuffers.put(tag, null); } catch (InterruptedException e) { diff --git a/fluency-core/src/test/java/org/komamitsu/fluency/buffer/BufferTest.java b/fluency-core/src/test/java/org/komamitsu/fluency/buffer/BufferTest.java index 4fa54b30..35c83248 100644 --- a/fluency-core/src/test/java/org/komamitsu/fluency/buffer/BufferTest.java +++ b/fluency-core/src/test/java/org/komamitsu/fluency/buffer/BufferTest.java @@ -27,7 +27,6 @@ import org.komamitsu.fluency.recordformat.RecordFormatter; import org.komamitsu.fluency.util.Tuple; import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +38,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -192,6 +192,43 @@ void flush() verify(ingester, times(1)).ingest(eq("foodb.bartbl"), any(ByteBuffer.class)); } + @Test + void flushWithInterrupt() + throws IOException + { + Buffer buffer = new Buffer(bufferConfig, recordFormatter); + + Ingester ingester = mock(Ingester.class); + AtomicReference receivedBuffer = new AtomicReference<>(); + doAnswer((Answer) invocation -> { + receivedBuffer.set(((ByteBuffer)invocation.getArgument(1)).duplicate()); + return null; + }).when(ingester).ingest(anyString(), any(ByteBuffer.class)); + + Map data = new HashMap<>(); + data.put("name", "komamitsu"); + buffer.append("foodb.bartbl", 1420070400, data); + + try { + Thread.currentThread().interrupt(); + buffer.flush(ingester, true); + fail(); + } + catch (IOException e) { + // Caused by interruption. This is expected. + } + + // Retry + buffer.flush(ingester, true); + + verify(ingester, times(1)).ingest(eq("foodb.bartbl"), any(ByteBuffer.class)); + ObjectMapper objectMapper = new ObjectMapper(); + byte[] receivedData = new byte[receivedBuffer.get().remaining()]; + receivedBuffer.get().get(receivedData); + Map deserialized = objectMapper.readValue(receivedData, new TypeReference>() {}); + System.out.println(deserialized); + } + @Test void testFileBackup() throws IOException