diff --git a/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkLocalIngestApi.java b/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkLocalIngestApi.java new file mode 100644 index 0000000000..e0aa086895 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkLocalIngestApi.java @@ -0,0 +1,141 @@ +package com.slack.astra.bulkIngestApi; + +import static com.linecorp.armeria.common.HttpStatus.CREATED; +import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR; + +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.server.annotation.Post; +import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser; +import com.slack.astra.chunkManager.ChunkManager; +import com.slack.astra.logstore.LogMessage; +import com.slack.astra.proto.schema.Schema; +import com.slack.service.murron.trace.Trace; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for defining the http endpoint behavior for the bulk ingest. It is + * expected to handle appropriate rate limiting, error handling, and submit the parsed messages to + * Kafka for ingestion. + */ +public class BulkLocalIngestApi { + private static final Logger LOG = LoggerFactory.getLogger(BulkLocalIngestApi.class); + + // private final BulkIngestKafkaProducer bulkIngestKafkaProducer; + // private final DatasetRateLimitingService datasetRateLimitingService; + // private final MeterRegistry meterRegistry; + // private final Counter incomingByteTotal; + // private final Counter incomingDocsTotal; + // private final Timer bulkIngestTimer; + // private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "astra_preprocessor_incoming_byte"; + // private final String BULK_INGEST_INCOMING_BYTE_DOCS = "astra_preprocessor_incoming_docs"; + // private final String BULK_INGEST_ERROR = "astra_preprocessor_error"; + // private final String BULK_INGEST_TIMER = "astra_preprocessor_bulk_ingest"; + // private final int rateLimitExceededErrorCode; + private final ChunkManager chunkManager; + private final Schema.IngestSchema schema; + + // private final Counter bulkIngestErrorCounter; + + public BulkLocalIngestApi( + // MeterRegistry meterRegistry, + ChunkManager chunkManager, Schema.IngestSchema schema) { + + // this.bulkIngestKafkaProducer = bulkIngestKafkaProducer; + // this.datasetRateLimitingService = datasetRateLimitingService; + // this.meterRegistry = meterRegistry; + // this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL); + // this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS); + // this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER); + // if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) { + // this.rateLimitExceededErrorCode = 400; + // } else { + // this.rateLimitExceededErrorCode = rateLimitExceededErrorCode; + // } + this.schema = schema; + this.chunkManager = chunkManager; + // this.bulkIngestErrorCounter = meterRegistry.counter(BULK_INGEST_ERROR); + } + + @Post("/_local_bulk") + public HttpResponse addDocument(String bulkRequest) { + // 1. Astra does not support the concept of "updates". It's always an add. + // 2. The "index" is used as the span name + CompletableFuture future = new CompletableFuture<>(); + // Timer.Sample sample = Timer.start(meterRegistry); + // future.thenRun(() -> sample.stop(bulkIngestTimer)); + + int count = 0; + + try { + byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8); + // incomingByteTotal.increment(bulkRequestBytes.length); + Map> docs = Map.of(); + try { + docs = BulkApiRequestParser.parseRequest(bulkRequestBytes, schema); + } catch (Exception e) { + LOG.error("Request failed ", e); + // bulkIngestErrorCounter.increment(); + BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + } + LOG.info("Parsed docs message: {}", docs); + + // todo - our rate limiter doesn't have a way to acquire permits across multiple + // datasets + // so today as a limitation we reject any request that has documents against + // multiple indexes + // We think most indexing requests will be against 1 index + if (docs.keySet().size() > 1) { + BulkIngestResponse response = + new BulkIngestResponse(0, 0, "request must contain only 1 unique index"); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + // bulkIngestErrorCounter.increment(); + return HttpResponse.of(future); + } + + // for (Map.Entry> indexDocs : docs.entrySet()) { + // incomingDocsTotal.increment(indexDocs.getValue().size()); + // final String index = indexDocs.getKey(); + // if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) { + // BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded"); + // future.complete( + // HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode), + // response)); + // return HttpResponse.of(future); + // } + // } + + // todo - explore the possibility of using the blocking task executor backed by virtual + // threads to fulfill this + + for (Map.Entry> indexDocs : docs.entrySet()) { + for (Trace.Span span : indexDocs.getValue()) { + try { + chunkManager.addMessage(span, span.getSerializedSize(), String.valueOf(0), 12345, true); + count += 1; + // return HttpResponse.of(future); + } catch (Exception e) { + LOG.error("Request failed ", e); + // bulkIngestErrorCounter.increment(); + future.complete( + HttpResponse.ofJson( + INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage()))); + } + } + } + } catch (Exception e) { + LOG.error("Request failed ", e); + // bulkIngestErrorCounter.increment(); + BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + } + + future.complete(HttpResponse.ofJson(CREATED, new BulkIngestResponse(count, 0, ""))); + return HttpResponse.of(future); + } +} \ No newline at end of file diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java index d3dd5784a0..ec2bd1ee25 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java @@ -139,7 +139,7 @@ public static SearchMetadata toSearchMetadata(String snapshotName, SearchContext } /** Index the message in the logstore and update the chunk data time range. */ - public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) { + public void addMessage(Trace.Span message, String kafkaPartitionId, long offset, boolean local_update) { if (!this.kafkaPartitionId.equals(kafkaPartitionId)) { throw new IllegalArgumentException( "All messages for this chunk should belong to partition: " @@ -158,7 +158,9 @@ public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) } chunkInfo.updateDataTimeRange(timestamp.toEpochMilli()); - chunkInfo.updateMaxOffset(offset); + if (local_update) { + chunkInfo.updateMaxOffset(offset); + } } else { throw new IllegalStateException(String.format("Chunk %s is read only", chunkInfo)); } diff --git a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java index 030dee047e..9b923a7ce6 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java @@ -175,7 +175,7 @@ public static CachingChunkManager fromConfig( } @Override - public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert) throws IOException { throw new UnsupportedOperationException( "Adding messages is not supported on a caching chunk manager"); diff --git a/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java index 502d12a75a..38a7adb017 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java @@ -9,7 +9,7 @@ import java.util.Map; public interface ChunkManager { - void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_update) throws IOException; SearchResult query(SearchQuery query, Duration queryTimeout); diff --git a/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java index 590f76d609..a0c146701f 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java @@ -165,7 +165,7 @@ public IndexingChunkManager( */ @Override public void addMessage( - final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + final Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert) throws IOException { if (stopIngestion) { // Currently, this flag is set on only a chunkRollOverException. @@ -175,7 +175,7 @@ public void addMessage( // find the active chunk and add a message to it ReadWriteChunk currentChunk = getOrCreateActiveChunk(kafkaPartitionId, indexerConfig); - currentChunk.addMessage(message, kafkaPartitionId, offset); + currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert); long currentIndexedMessages = liveMessagesIndexedGauge.incrementAndGet(); long currentIndexedBytes = liveBytesIndexedGauge.addAndGet(msgSize); diff --git a/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java index 1ca98b5732..d16e20dc1b 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java @@ -80,7 +80,7 @@ public RecoveryChunkManager( @Override public void addMessage( - final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + final Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert) throws IOException { if (readOnly) { LOG.warn("Ingestion is stopped since the chunk is in read only mode."); @@ -89,7 +89,7 @@ public void addMessage( // find the active chunk and add a message to it ReadWriteChunk currentChunk = getOrCreateActiveChunk(kafkaPartitionId); - currentChunk.addMessage(message, kafkaPartitionId, offset); + currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert); liveMessagesIndexedGauge.incrementAndGet(); liveBytesIndexedGauge.addAndGet(msgSize); } diff --git a/astra/src/main/java/com/slack/astra/server/ArmeriaService.java b/astra/src/main/java/com/slack/astra/server/ArmeriaService.java index be2576489e..db442a67af 100644 --- a/astra/src/main/java/com/slack/astra/server/ArmeriaService.java +++ b/astra/src/main/java/com/slack/astra/server/ArmeriaService.java @@ -100,6 +100,11 @@ public Builder withRequestTimeout(Duration requestTimeout) { return this; } + public Builder maxContentLength(long maxRequestLength) { + serverBuilder.maxRequestLength(maxRequestLength); + return this; + } + public Builder withTracing(AstraConfigs.TracingConfig tracingConfig) { // span handlers is an ordered list, so we need to be careful with ordering if (tracingConfig.getCommonTagsCount() > 0) { diff --git a/astra/src/main/java/com/slack/astra/server/Astra.java b/astra/src/main/java/com/slack/astra/server/Astra.java index 90e6f011d1..8bc9a410e8 100644 --- a/astra/src/main/java/com/slack/astra/server/Astra.java +++ b/astra/src/main/java/com/slack/astra/server/Astra.java @@ -7,6 +7,7 @@ import com.slack.astra.blobfs.S3AsyncUtil; import com.slack.astra.bulkIngestApi.BulkIngestApi; import com.slack.astra.bulkIngestApi.BulkIngestKafkaProducer; +import com.slack.astra.bulkIngestApi.BulkLocalIngestApi; import com.slack.astra.bulkIngestApi.DatasetRateLimitingService; import com.slack.astra.chunkManager.CachingChunkManager; import com.slack.astra.chunkManager.IndexingChunkManager; @@ -185,13 +186,22 @@ private static Set getServices( final int serverPort = astraConfig.getIndexerConfig().getServerConfig().getServerPort(); Duration requestTimeout = Duration.ofMillis(astraConfig.getIndexerConfig().getServerConfig().getRequestTimeoutMs()); - ArmeriaService armeriaService = + ArmeriaService.Builder armeriaServiceBuilder = new ArmeriaService.Builder(serverPort, "astraIndex", meterRegistry) .withRequestTimeout(requestTimeout) + .maxContentLength(2000000000) .withTracing(astraConfig.getTracingConfig()) - .withGrpcService(searcher) - .build(); - services.add(armeriaService); + .withGrpcService(searcher); + Schema.IngestSchema schema = SchemaUtil.parseSchema(Path.of("")); + LOG.info( + "Loaded schema with fields count: {}, defaults count: {}", + schema.getFieldsCount(), + schema.getDefaultsCount()); + schema = ReservedFields.addPredefinedFields(schema); + BulkLocalIngestApi localOpenSearchBulkApiService = + new BulkLocalIngestApi(chunkManager, schema); + armeriaServiceBuilder.withAnnotatedService(localOpenSearchBulkApiService); + services.add(armeriaServiceBuilder.build()); } if (roles.contains(AstraConfigs.NodeRole.QUERY)) { diff --git a/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java b/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java index be7c8a244f..d4b906fb64 100644 --- a/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java +++ b/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java @@ -63,7 +63,8 @@ public boolean insertRecord(ConsumerRecord record) throws IOExce Trace.Span.parseFrom(record.value()), record.serializedValueSize(), String.valueOf(record.partition()), - record.offset()); + record.offset(), + false); return true; } } diff --git a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java index 342b0be2f2..f1bdc2ee11 100644 --- a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java @@ -154,7 +154,7 @@ public void testAddAndSearchChunk() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -208,7 +208,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -256,7 +256,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { final long newMessageEndTimeEpochMs = TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS); for (Trace.Span m : newMessages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -325,7 +325,7 @@ public void testSearchInReadOnlyChunk() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -358,7 +358,7 @@ public void testAddMessageToReadOnlyChunk() { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -370,7 +370,7 @@ public void testAddMessageToReadOnlyChunk() { int finalOffset = offset; assertThatExceptionOfType(IllegalStateException.class) .isThrownBy( - () -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset)); + () -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset, false)); } @Test @@ -378,7 +378,7 @@ public void testMessageFromDifferentPartitionFails() { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -391,7 +391,7 @@ public void testMessageFromDifferentPartitionFails() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy( () -> - chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset)); + chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset, false)); } @Test @@ -399,7 +399,7 @@ public void testCommitBeforeSnapshot() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunk.isReadOnly()).isFalse(); @@ -503,7 +503,7 @@ public void testAddInvalidMessagesToChunk() { Trace.Span invalidSpan = Trace.Span.newBuilder().build(); // An Invalid message is dropped but failure counter is incremented. - chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1); + chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1, false); chunk.commit(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1); @@ -595,7 +595,7 @@ public void testSnapshotToNonExistentS3BucketFails() List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -653,7 +653,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } diff --git a/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java index 8877666120..7b3b5bc688 100644 --- a/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java @@ -138,7 +138,7 @@ public void testAddAndSearchChunk() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -195,7 +195,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -245,7 +245,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { final long newMessageEndTimeEpochMs = TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS); for (Trace.Span m : newMessages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -314,7 +314,7 @@ public void testSearchInReadOnlyChunk() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -347,7 +347,7 @@ public void testAddMessageToReadOnlyChunk() { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -359,7 +359,7 @@ public void testAddMessageToReadOnlyChunk() { int finalOffset = offset; assertThatExceptionOfType(IllegalStateException.class) .isThrownBy( - () -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset)); + () -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset, false)); } @Test @@ -367,7 +367,7 @@ public void testMessageFromDifferentPartitionFails() { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -380,7 +380,7 @@ public void testMessageFromDifferentPartitionFails() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy( () -> - chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset)); + chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset, false)); } @Test @@ -388,7 +388,7 @@ public void testCommitBeforeSnapshot() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunk.isReadOnly()).isFalse(); @@ -494,7 +494,7 @@ public void testAddInvalidMessagesToChunk() { // An Invalid message is dropped but failure counter is incremented. Trace.Span invalidSpan = Trace.Span.newBuilder().build(); - chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1); + chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1, false); chunk.commit(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1); @@ -578,7 +578,7 @@ public void testSnapshotToNonExistentS3BucketFails() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -628,7 +628,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } diff --git a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java index 6284e9aaa6..1a1a9591fb 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java @@ -239,7 +239,7 @@ public void shouldHandleLifecycle() throws Exception { @Test public void testAddMessageIsUnsupported() throws TimeoutException { cachingChunkManager = initChunkManager(); - assertThatThrownBy(() -> cachingChunkManager.addMessage(SpanUtil.makeSpan(1), 10, "1", 1)) + assertThatThrownBy(() -> cachingChunkManager.addMessage(SpanUtil.makeSpan(1), 10, "1", 1, false)) .isInstanceOf(UnsupportedOperationException.class); } diff --git a/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java index 63609dde3d..c419ec0c99 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java @@ -224,7 +224,7 @@ public void testDeleteOverMaxThresholdGreaterThanZero() throws IOException, Time int offset = 1; for (Trace.Span m : messages.subList(0, 9)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunkManager.getChunkList().size()).isEqualTo(1); @@ -236,7 +236,7 @@ public void testDeleteOverMaxThresholdGreaterThanZero() throws IOException, Time assertThat(chunk1.info().getChunkSnapshotTimeEpochMs()).isZero(); for (Trace.Span m : messages.subList(9, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -301,7 +301,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0() int offset = 1; for (Trace.Span m : messages.subList(0, 9)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunkManager.getChunkList().size()).isEqualTo(1); @@ -344,7 +344,7 @@ public void closeDuringCleanerTask() int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; chunkManager.getActiveChunk().commit(); @@ -387,7 +387,7 @@ public void testAddMessage() throws Exception { int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); actualChunkSize += msgSize; offset++; } @@ -458,7 +458,8 @@ public void testAddMessage() throws Exception { messageWithHighOffset, messageWithHighOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - veryHighOffset); + veryHighOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -488,7 +489,8 @@ public void testAddMessage() throws Exception { messageWithLowerOffset, messageWithLowerOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - lowerOffset); + lowerOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -517,7 +519,8 @@ public void testAddMessage() throws Exception { messageWithInvalidTopic, messageWithInvalidTopic.toString().length(), "differentKafkaTopic", - lowerOffset + 1)); + lowerOffset + 1, + false)); } private void testChunkManagerSearch( @@ -597,7 +600,7 @@ public void testAddAndSearchMessageInMultipleSlices() throws Exception { List messages = SpanUtil.makeSpansWithTimeDifference(1, 15, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -625,7 +628,7 @@ public void testAddAndSearchMessageInSpecificChunks() throws Exception { List messages = SpanUtil.makeSpansWithTimeDifference(1, 15, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -715,7 +718,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { // Add a message int offset = 1; Trace.Span msg1 = SpanUtil.makeSpan(1); - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; // Add an invalid message @@ -730,7 +733,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { .build()) .build(); chunkManager.addMessage( - invalidSpan, invalidSpan.getSerializedSize(), TEST_KAFKA_PARTITION_ID, offset); + invalidSpan, invalidSpan.getSerializedSize(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; // Commit the new chunk so we can search it. @@ -779,14 +782,14 @@ public void testMessagesAddedToActiveChunks() throws Exception { Trace.Span msg1 = msgs.get(0); Trace.Span msg2 = msgs.get(1); int offset = 1; - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; ReadWriteChunk chunk1 = chunkManager.getActiveChunk(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(1); assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0); assertThat(getValue(LIVE_MESSAGES_INDEXED, metricsRegistry)).isEqualTo(1); - chunkManager.addMessage(msg2, msg2.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg2, msg2.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; assertThat(chunkManager.getChunkList().size()).isEqualTo(1); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(2); @@ -800,7 +803,7 @@ public void testMessagesAddedToActiveChunks() throws Exception { Trace.Span msg3 = msgs.get(2); Trace.Span msg4 = msgs.get(3); - chunkManager.addMessage(msg3, msg3.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg3, msg3.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; assertThat(chunkManager.getChunkList().size()).isEqualTo(2); @@ -815,7 +818,7 @@ public void testMessagesAddedToActiveChunks() throws Exception { checkMetadata(3, 2, 1, 2, 1); // Inserting in an older chunk throws an exception. So, additions go to active chunks only. assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> chunk1.addMessage(msg4, TEST_KAFKA_PARTITION_ID, 1)); + .isThrownBy(() -> chunk1.addMessage(msg4, TEST_KAFKA_PARTITION_ID, 1, false)); } @Test @@ -830,7 +833,7 @@ public void testMultiThreadedChunkRollover() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (Trace.Span m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -863,7 +866,7 @@ public void testAddMessagesToChunkWithRollover() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (Trace.Span m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -884,7 +887,7 @@ public void testAddMessagesToChunkWithRollover() throws Exception { // Add remaining messages to create a second chunk. for (Trace.Span m : messages.subList(11, 25)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -953,7 +956,7 @@ public void testAllChunkFailures() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (Trace.Span m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } // Main chunk is already committed. Commit the new chunk so we can search it. @@ -972,7 +975,7 @@ public void testAllChunkFailures() throws Exception { testChunkManagerSearch(chunkManager, "Message21", 0, 2, 2); for (Trace.Span m : messages.subList(11, 25)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -1029,7 +1032,7 @@ public void testCommitInvalidChunk() throws Exception { int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -1065,7 +1068,7 @@ public void testMultiChunkSearch() throws Exception { int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset,false); offset++; } @@ -1192,7 +1195,7 @@ public void testChunkRollOverInProgressExceptionIsThrown() throws Exception { () -> { int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } }) @@ -1228,7 +1231,7 @@ public void testSuccessfulRollOverFinishesOnClose() throws Exception { // rollover. int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } ListenableFuture rollOverFuture = chunkManager.getRolloverFuture(); @@ -1279,7 +1282,7 @@ public void testFailedRollOverFinishesOnClose() throws Exception { // rollover. int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } await().until(() -> getCount(ROLLOVERS_FAILED, metricsRegistry) == 1); @@ -1323,7 +1326,7 @@ public void testRollOverFailure() int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -1347,7 +1350,7 @@ public void testRollOverFailure() SpanUtil.makeSpansWithTimeDifference(11, 12, 1000, startTime); for (Trace.Span m : newMessage) { chunkManager.addMessage( - m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset); + m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset, false); newOffset++; } }) @@ -1371,7 +1374,7 @@ public void testRollOverFailureWithDirectExecutor() // exception. int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -1392,7 +1395,7 @@ public void testRollOverFailureWithDirectExecutor() SpanUtil.makeSpansWithTimeDifference(11, 12, 1000, Instant.now()); for (Trace.Span m : newMessage) { chunkManager.addMessage( - m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset); + m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset, false); newOffset++; } }) @@ -1413,7 +1416,7 @@ public void testNewFieldAddedToSchema() throws IOException, TimeoutException { int offset = 1; for (Trace.Span m : messages1) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -1431,7 +1434,7 @@ public void testNewFieldAddedToSchema() throws IOException, TimeoutException { SpanUtil.makeSpan(11, "Message11", Instant.now(), List.of(schemaTestTag)); chunkManager.addMessage( - logMessage, logMessage.toString().length(), TEST_KAFKA_PARTITION_ID, offset++); + logMessage, logMessage.toString().length(), TEST_KAFKA_PARTITION_ID, offset++, false); chunkManager.rollOverActiveChunk(); await().until(() -> getCount(ROLLOVERS_COMPLETED, metricsRegistry) == 2); @@ -1561,7 +1564,7 @@ private void insertMessages( int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; actualMessagesGauge++; actualBytesGauge += msgSize; diff --git a/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java index 0f841a1ae8..840123a136 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java @@ -169,7 +169,7 @@ public void testAddMessageAndRollover() throws Exception { int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); actualChunkSize += msgSize; offset++; } @@ -223,7 +223,8 @@ public void testAddMessageAndRollover() throws Exception { messageWithHighOffset, messageWithHighOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - veryHighOffset); + veryHighOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -253,7 +254,8 @@ public void testAddMessageAndRollover() throws Exception { messageWithLowerOffset, messageWithLowerOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - lowerOffset); + lowerOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -282,7 +284,8 @@ public void testAddMessageAndRollover() throws Exception { messageWithInvalidTopic, messageWithInvalidTopic.toString().length(), "differentKafkaTopic", - lowerOffset + 1)); + lowerOffset + 1, + false)); // Get the count of the amount of indices so that we can confirm we've cleaned them up // after the rollover @@ -311,7 +314,7 @@ public void testAddMessageAndRollover() throws Exception { // Can't add messages to current chunk after roll over. assertThatThrownBy( () -> - currentChunk.addMessage(SpanUtil.makeSpan(100000), TEST_KAFKA_PARTITION_ID, 100000)) + currentChunk.addMessage(SpanUtil.makeSpan(100000), TEST_KAFKA_PARTITION_ID, 100000, false)) .isInstanceOf(IllegalStateException.class); // Ensure data is cleaned up in the manager @@ -368,7 +371,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { // Add a valid message int offset = 1; Trace.Span msg1 = SpanUtil.makeSpan(1); - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; // Add an invalid message @@ -379,7 +382,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { .setFieldType(Schema.SchemaFieldType.INTEGER) .build(); Trace.Span msg100 = SpanUtil.makeSpan(100, "Message100", Instant.now(), List.of(conflictTag)); - chunkManager.addMessage(msg100, msg100.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg100, msg100.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); //noinspection UnusedAssignment offset++; @@ -417,7 +420,7 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { List messages = SpanUtil.makeSpansWithTimeDifference(1, 20, 1, Instant.now()); for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -436,7 +439,7 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { .isThrownBy( () -> chunkManager.addMessage( - SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000)); + SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000, false)); // Check metadata. List snapshots = @@ -459,7 +462,7 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { .isThrownBy( () -> chunkManager.addMessage( - SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000)); + SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000, false)); chunkManager.awaitTerminated(DEFAULT_START_STOP_DURATION); chunkManager = null; diff --git a/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java b/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java index 229b3c42d6..2a3d5ba2ec 100644 --- a/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java +++ b/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java @@ -181,7 +181,7 @@ public void testDiskBasedRolloverWithMaxBytes() throws Exception { boolean shouldCheckOnNextMessage = false; for (Trace.Span m : SpanUtil.makeSpansWithTimeDifference(1, totalMessages, 1000, startTime)) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; Thread.sleep(DiskOrMessageCountBasedRolloverStrategy.DIRECTORY_SIZE_EXECUTOR_PERIOD_MS); if (chunkManager.getActiveChunk() != null) { @@ -250,11 +250,11 @@ public void testRolloverBasedOnMaxTime() throws Exception { // wait for 2+ seconds so that the chunk rollover code will get triggered // add 2nd message to trigger chunk rollover // add 3rd message to create new chunk - chunkManager.addMessage(SpanUtil.makeSpan(1), 100, TEST_KAFKA_PARTITION_ID, 1); + chunkManager.addMessage(SpanUtil.makeSpan(1), 100, TEST_KAFKA_PARTITION_ID, 1, false); // so that the chunk rollover code will get triggered Thread.sleep(2_000 + DiskOrMessageCountBasedRolloverStrategy.DIRECTORY_SIZE_EXECUTOR_PERIOD_MS); - chunkManager.addMessage(SpanUtil.makeSpan(2), 100, TEST_KAFKA_PARTITION_ID, 1); - chunkManager.addMessage(SpanUtil.makeSpan(3), 100, TEST_KAFKA_PARTITION_ID, 1); + chunkManager.addMessage(SpanUtil.makeSpan(2), 100, TEST_KAFKA_PARTITION_ID, 1, false); + chunkManager.addMessage(SpanUtil.makeSpan(3), 100, TEST_KAFKA_PARTITION_ID, 1, false); await().until(() -> getCount(RollOverChunkTask.ROLLOVERS_COMPLETED, metricsRegistry) == 1); @@ -277,7 +277,7 @@ public void testDiskBasedRolloverWithMaxMessages() throws Exception { int offset = 1; for (Trace.Span m : SpanUtil.makeSpansWithTimeDifference(1, totalMessages, 1000, startTime)) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; if (chunkManager.getActiveChunk() != null) { chunkManager.getActiveChunk().commit(); diff --git a/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java b/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java index 1c563a2832..4a31bed7f9 100644 --- a/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java +++ b/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java @@ -388,7 +388,7 @@ private void addMessagesToChunkManager(List messages) throws IOExcep IndexingChunkManager chunkManager = chunkManagerUtil.chunkManager; int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); diff --git a/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java b/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java index 761c76cb9f..6dc6926bd6 100644 --- a/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java +++ b/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java @@ -110,7 +110,7 @@ public void testAstraSearch() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -179,7 +179,7 @@ public void testAstraSearchNoData() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -225,7 +225,7 @@ public void testAstraSearchNoHits() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -273,7 +273,7 @@ public void testAstraSearchNoHistogram() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -328,7 +328,7 @@ public void testAstraBadArgSearch() throws Throwable { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -363,7 +363,7 @@ public void testAstraGrpcSearch() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -442,7 +442,7 @@ public void testAstraGrpcSearchThrowsException() throws IOException { List messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed.