Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexer local Bulk ingest API #15

Open
wants to merge 1 commit into
base: airbnb-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.slack.astra.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.CREATED;
import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.astra.chunkManager.ChunkManager;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is responsible for defining the http endpoint behavior for the bulk ingest. It is
* expected to handle appropriate rate limiting, error handling, and submit the parsed messages to
* Kafka for ingestion.
*/
public class BulkLocalIngestApi {
private static final Logger LOG = LoggerFactory.getLogger(BulkLocalIngestApi.class);

// private final BulkIngestKafkaProducer bulkIngestKafkaProducer;
// private final DatasetRateLimitingService datasetRateLimitingService;
// private final MeterRegistry meterRegistry;
// private final Counter incomingByteTotal;
// private final Counter incomingDocsTotal;
// private final Timer bulkIngestTimer;
// private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "astra_preprocessor_incoming_byte";
// private final String BULK_INGEST_INCOMING_BYTE_DOCS = "astra_preprocessor_incoming_docs";
// private final String BULK_INGEST_ERROR = "astra_preprocessor_error";
// private final String BULK_INGEST_TIMER = "astra_preprocessor_bulk_ingest";
// private final int rateLimitExceededErrorCode;
private final ChunkManager<LogMessage> chunkManager;
private final Schema.IngestSchema schema;

// private final Counter bulkIngestErrorCounter;

public BulkLocalIngestApi(
// MeterRegistry meterRegistry,
ChunkManager<LogMessage> chunkManager, Schema.IngestSchema schema) {

// this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
// this.datasetRateLimitingService = datasetRateLimitingService;
// this.meterRegistry = meterRegistry;
// this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
// this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
// this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
// if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) {
// this.rateLimitExceededErrorCode = 400;
// } else {
// this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
// }
this.schema = schema;
this.chunkManager = chunkManager;
// this.bulkIngestErrorCounter = meterRegistry.counter(BULK_INGEST_ERROR);
}

@Post("/_local_bulk")
public HttpResponse addDocument(String bulkRequest) {
// 1. Astra does not support the concept of "updates". It's always an add.
// 2. The "index" is used as the span name
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
// Timer.Sample sample = Timer.start(meterRegistry);
// future.thenRun(() -> sample.stop(bulkIngestTimer));

int count = 0;

try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
// incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = Map.of();
try {
docs = BulkApiRequestParser.parseRequest(bulkRequestBytes, schema);
} catch (Exception e) {
LOG.error("Request failed ", e);
// bulkIngestErrorCounter.increment();
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}
LOG.info("Parsed docs message: {}", docs);

// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
// so today as a limitation we reject any request that has documents against
// multiple indexes
// We think most indexing requests will be against 1 index
if (docs.keySet().size() > 1) {
BulkIngestResponse response =
new BulkIngestResponse(0, 0, "request must contain only 1 unique index");
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
// bulkIngestErrorCounter.increment();
return HttpResponse.of(future);
}

// for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
// incomingDocsTotal.increment(indexDocs.getValue().size());
// final String index = indexDocs.getKey();
// if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
// BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
// future.complete(
// HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode),
// response));
// return HttpResponse.of(future);
// }
// }

// todo - explore the possibility of using the blocking task executor backed by virtual
// threads to fulfill this

for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
for (Trace.Span span : indexDocs.getValue()) {
try {
chunkManager.addMessage(span, span.getSerializedSize(), String.valueOf(0), 12345, true);
count += 1;
// return HttpResponse.of(future);
} catch (Exception e) {
LOG.error("Request failed ", e);
// bulkIngestErrorCounter.increment();
future.complete(
HttpResponse.ofJson(
INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage())));
}
}
}
} catch (Exception e) {
LOG.error("Request failed ", e);
// bulkIngestErrorCounter.increment();
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}

future.complete(HttpResponse.ofJson(CREATED, new BulkIngestResponse(count, 0, "")));
return HttpResponse.of(future);
}
}
6 changes: 4 additions & 2 deletions astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static SearchMetadata toSearchMetadata(String snapshotName, SearchContext
}

/** Index the message in the logstore and update the chunk data time range. */
public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) {
public void addMessage(Trace.Span message, String kafkaPartitionId, long offset, boolean local_update) {
if (!this.kafkaPartitionId.equals(kafkaPartitionId)) {
throw new IllegalArgumentException(
"All messages for this chunk should belong to partition: "
Expand All @@ -158,7 +158,9 @@ public void addMessage(Trace.Span message, String kafkaPartitionId, long offset)
}
chunkInfo.updateDataTimeRange(timestamp.toEpochMilli());

chunkInfo.updateMaxOffset(offset);
if (local_update) {
chunkInfo.updateMaxOffset(offset);
}
} else {
throw new IllegalStateException(String.format("Chunk %s is read only", chunkInfo));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static CachingChunkManager<LogMessage> fromConfig(
}

@Override
public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert)
throws IOException {
throw new UnsupportedOperationException(
"Adding messages is not supported on a caching chunk manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.Map;

public interface ChunkManager<T> {
void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_update)
throws IOException;

SearchResult<T> query(SearchQuery query, Duration queryTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public IndexingChunkManager(
*/
@Override
public void addMessage(
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert)
throws IOException {
if (stopIngestion) {
// Currently, this flag is set on only a chunkRollOverException.
Expand All @@ -175,7 +175,7 @@ public void addMessage(

// find the active chunk and add a message to it
ReadWriteChunk<T> currentChunk = getOrCreateActiveChunk(kafkaPartitionId, indexerConfig);
currentChunk.addMessage(message, kafkaPartitionId, offset);
currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert);
long currentIndexedMessages = liveMessagesIndexedGauge.incrementAndGet();
long currentIndexedBytes = liveBytesIndexedGauge.addAndGet(msgSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public RecoveryChunkManager(

@Override
public void addMessage(
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert)
throws IOException {
if (readOnly) {
LOG.warn("Ingestion is stopped since the chunk is in read only mode.");
Expand All @@ -89,7 +89,7 @@ public void addMessage(

// find the active chunk and add a message to it
ReadWriteChunk<T> currentChunk = getOrCreateActiveChunk(kafkaPartitionId);
currentChunk.addMessage(message, kafkaPartitionId, offset);
currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert);
liveMessagesIndexedGauge.incrementAndGet();
liveBytesIndexedGauge.addAndGet(msgSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public Builder withRequestTimeout(Duration requestTimeout) {
return this;
}

public Builder maxContentLength(long maxRequestLength) {
serverBuilder.maxRequestLength(maxRequestLength);
return this;
}

public Builder withTracing(AstraConfigs.TracingConfig tracingConfig) {
// span handlers is an ordered list, so we need to be careful with ordering
if (tracingConfig.getCommonTagsCount() > 0) {
Expand Down
18 changes: 14 additions & 4 deletions astra/src/main/java/com/slack/astra/server/Astra.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.slack.astra.blobfs.S3AsyncUtil;
import com.slack.astra.bulkIngestApi.BulkIngestApi;
import com.slack.astra.bulkIngestApi.BulkIngestKafkaProducer;
import com.slack.astra.bulkIngestApi.BulkLocalIngestApi;
import com.slack.astra.bulkIngestApi.DatasetRateLimitingService;
import com.slack.astra.chunkManager.CachingChunkManager;
import com.slack.astra.chunkManager.IndexingChunkManager;
Expand Down Expand Up @@ -185,13 +186,22 @@ private static Set<Service> getServices(
final int serverPort = astraConfig.getIndexerConfig().getServerConfig().getServerPort();
Duration requestTimeout =
Duration.ofMillis(astraConfig.getIndexerConfig().getServerConfig().getRequestTimeoutMs());
ArmeriaService armeriaService =
ArmeriaService.Builder armeriaServiceBuilder =
new ArmeriaService.Builder(serverPort, "astraIndex", meterRegistry)
.withRequestTimeout(requestTimeout)
.maxContentLength(2000000000)
.withTracing(astraConfig.getTracingConfig())
.withGrpcService(searcher)
.build();
services.add(armeriaService);
.withGrpcService(searcher);
Schema.IngestSchema schema = SchemaUtil.parseSchema(Path.of(""));
LOG.info(
"Loaded schema with fields count: {}, defaults count: {}",
schema.getFieldsCount(),
schema.getDefaultsCount());
schema = ReservedFields.addPredefinedFields(schema);
BulkLocalIngestApi localOpenSearchBulkApiService =
new BulkLocalIngestApi(chunkManager, schema);
armeriaServiceBuilder.withAnnotatedService(localOpenSearchBulkApiService);
services.add(armeriaServiceBuilder.build());
}

if (roles.contains(AstraConfigs.NodeRole.QUERY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public boolean insertRecord(ConsumerRecord<String, byte[]> record) throws IOExce
Trace.Span.parseFrom(record.value()),
record.serializedValueSize(),
String.valueOf(record.partition()),
record.offset());
record.offset(),
false);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testAddAndSearchChunk() throws IOException {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException {
TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS);
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -256,7 +256,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException {
final long newMessageEndTimeEpochMs =
TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS);
for (Trace.Span m : newMessages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -325,7 +325,7 @@ public void testSearchInReadOnlyChunk() throws IOException {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -358,7 +358,7 @@ public void testAddMessageToReadOnlyChunk() {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand All @@ -370,15 +370,15 @@ public void testAddMessageToReadOnlyChunk() {
int finalOffset = offset;
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(
() -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset));
() -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset, false));
}

@Test
public void testMessageFromDifferentPartitionFails() {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand All @@ -391,15 +391,15 @@ public void testMessageFromDifferentPartitionFails() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
() ->
chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset));
chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset, false));
}

@Test
public void testCommitBeforeSnapshot() throws IOException {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
assertThat(chunk.isReadOnly()).isFalse();
Expand Down Expand Up @@ -503,7 +503,7 @@ public void testAddInvalidMessagesToChunk() {
Trace.Span invalidSpan = Trace.Span.newBuilder().build();

// An Invalid message is dropped but failure counter is incremented.
chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1);
chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1, false);
chunk.commit();

assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1);
Expand Down Expand Up @@ -595,7 +595,7 @@ public void testSnapshotToNonExistentS3BucketFails()
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}

Expand Down Expand Up @@ -653,7 +653,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}

Expand Down
Loading
Loading