diff --git a/config/config.yaml b/config/config.yaml
index 9835654318..6c95431fd1 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -125,6 +125,10 @@ preprocessorConfig:
numStreamThreads: ${KAFKA_STREAM_THREADS:-2}
processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
+ kafkaConfig:
+ kafkaTopic: ${KAFKA_TOPIC:-test-topic}
+ kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
+ additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
@@ -136,5 +140,4 @@ preprocessorConfig:
dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json}
rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1}
kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0}
- bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false}
diff --git a/kaldb/pom.xml b/kaldb/pom.xml
index 0421035554..2a666aff8d 100644
--- a/kaldb/pom.xml
+++ b/kaldb/pom.xml
@@ -687,6 +687,14 @@
false
+
+
+
+ com.google.googlejavaformat
+ google-java-format
+ 1.19.1
+
+
diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java
new file mode 100644
index 0000000000..baf3686fbd
--- /dev/null
+++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java
@@ -0,0 +1,105 @@
+package com.slack.kaldb.bulkIngestApi;
+
+import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
+import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;
+
+import com.linecorp.armeria.common.HttpResponse;
+import com.linecorp.armeria.server.annotation.Post;
+import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
+import com.slack.service.murron.trace.Trace;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for defining the http endpoint behavior for the bulk ingest. It is
+ * expected to handle appropriate rate limiting, error handling, and submit the parsed messages to
+ * Kafka for ingestion.
+ */
+public class BulkIngestApi {
+ private static final Logger LOG = LoggerFactory.getLogger(BulkIngestApi.class);
+ private final BulkIngestKafkaProducer bulkIngestKafkaProducer;
+ private final DatasetRateLimitingService datasetRateLimitingService;
+ private final MeterRegistry meterRegistry;
+ private final Counter incomingByteTotal;
+ private final Timer bulkIngestTimer;
+ private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
+ private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
+
+ public BulkIngestApi(
+ BulkIngestKafkaProducer bulkIngestKafkaProducer,
+ DatasetRateLimitingService datasetRateLimitingService,
+ MeterRegistry meterRegistry) {
+
+ this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
+ this.datasetRateLimitingService = datasetRateLimitingService;
+ this.meterRegistry = meterRegistry;
+ this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
+ this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
+ }
+
+ @Post("/_bulk")
+ public HttpResponse addDocument(String bulkRequest) {
+ // 1. Kaldb does not support the concept of "updates". It's always an add.
+ // 2. The "index" is used as the span name
+ CompletableFuture future = new CompletableFuture<>();
+ Timer.Sample sample = Timer.start(meterRegistry);
+ future.thenRun(() -> sample.stop(bulkIngestTimer));
+
+ try {
+ byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
+ incomingByteTotal.increment(bulkRequestBytes.length);
+ Map> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);
+
+ // todo - our rate limiter doesn't have a way to acquire permits across multiple
+ // datasets
+ // so today as a limitation we reject any request that has documents against
+ // multiple indexes
+ // We think most indexing requests will be against 1 index
+ if (docs.keySet().size() > 1) {
+ BulkIngestResponse response =
+ new BulkIngestResponse(0, 0, "request must contain only 1 unique index");
+ future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
+ return HttpResponse.of(future);
+ }
+
+ for (Map.Entry> indexDocs : docs.entrySet()) {
+ final String index = indexDocs.getKey();
+ if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
+ BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
+ future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
+ return HttpResponse.of(future);
+ }
+ }
+
+ // todo - explore the possibility of using the blocking task executor backed by virtual
+ // threads to fulfill this
+ Thread.ofVirtual()
+ .start(
+ () -> {
+ try {
+ BulkIngestResponse response =
+ bulkIngestKafkaProducer.submitRequest(docs).getResponse();
+ future.complete(HttpResponse.ofJson(response));
+ } catch (InterruptedException e) {
+ LOG.error("Request failed ", e);
+ future.complete(
+ HttpResponse.ofJson(
+ INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage())));
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Request failed ", e);
+ BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
+ future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
+ }
+
+ return HttpResponse.of(future);
+ }
+}
diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java
new file mode 100644
index 0000000000..651c49b6c4
--- /dev/null
+++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java
@@ -0,0 +1,298 @@
+package com.slack.kaldb.bulkIngestApi;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE;
+import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
+import com.slack.kaldb.metadata.dataset.DatasetMetadata;
+import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
+import com.slack.kaldb.preprocessor.PreprocessorService;
+import com.slack.kaldb.proto.config.KaldbConfigs;
+import com.slack.kaldb.util.RuntimeHalterImpl;
+import com.slack.kaldb.writer.KafkaUtils;
+import com.slack.service.murron.trace.Trace;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
+ private static final Logger LOG = LoggerFactory.getLogger(BulkIngestKafkaProducer.class);
+
+ private final KafkaProducer kafkaProducer;
+
+ private final KafkaClientMetrics kafkaMetrics;
+
+ private final KaldbConfigs.KafkaConfig kafkaConfig;
+
+ private final DatasetMetadataStore datasetMetadataStore;
+ private final KaldbMetadataStoreChangeListener datasetListener =
+ (_) -> cacheSortedDataset();
+
+ protected List throughputSortedDatasets;
+
+ private final BlockingQueue pendingRequests;
+
+ private final Integer producerSleepMs;
+
+ public static final String FAILED_SET_RESPONSE_COUNTER =
+ "bulk_ingest_producer_failed_set_response";
+ private final Counter failedSetResponseCounter;
+ public static final String STALL_COUNTER = "bulk_ingest_producer_stall_counter";
+ private final Counter stallCounter;
+
+ public static final String BATCH_SIZE_GAUGE = "bulk_ingest_producer_batch_size";
+ private final AtomicInteger batchSizeGauge;
+
+ private static final Set OVERRIDABLE_CONFIGS =
+ Set.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+
+ public BulkIngestKafkaProducer(
+ final DatasetMetadataStore datasetMetadataStore,
+ final KaldbConfigs.PreprocessorConfig preprocessorConfig,
+ final PrometheusMeterRegistry meterRegistry) {
+
+ this.kafkaConfig = preprocessorConfig.getKafkaConfig();
+
+ checkArgument(
+ !kafkaConfig.getKafkaBootStrapServers().isEmpty(),
+ "Kafka bootstrapServers must be provided");
+
+ checkArgument(!kafkaConfig.getKafkaTopic().isEmpty(), "Kafka topic must be provided");
+
+ this.datasetMetadataStore = datasetMetadataStore;
+ this.pendingRequests = new LinkedBlockingQueue<>();
+
+ // todo - consider making this a configurable value or removing the config
+ this.producerSleepMs =
+ Integer.parseInt(System.getProperty("kalDb.bulkIngest.producerSleepMs", "50"));
+
+ // since we use a new transaction ID every time we start a preprocessor there can be some zombie
+ // transactions?
+ // I think they will remain in kafka till they expire. They should never be readable if the
+ // consumer sets isolation.level as "read_committed"
+ // see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/
+ this.kafkaProducer = createKafkaTransactionProducer(UUID.randomUUID().toString());
+ this.kafkaMetrics = new KafkaClientMetrics(kafkaProducer);
+ this.kafkaMetrics.bindTo(meterRegistry);
+
+ this.failedSetResponseCounter = meterRegistry.counter(FAILED_SET_RESPONSE_COUNTER);
+ this.stallCounter = meterRegistry.counter(STALL_COUNTER);
+ this.batchSizeGauge = meterRegistry.gauge(BATCH_SIZE_GAUGE, new AtomicInteger(0));
+
+ this.kafkaProducer.initTransactions();
+ }
+
+ private void cacheSortedDataset() {
+ // we sort the datasets to rank from which dataset do we start matching candidate service names
+ // in the future we can change the ordering from sort to something else
+ this.throughputSortedDatasets =
+ datasetMetadataStore.listSync().stream()
+ .sorted(Comparator.comparingLong(DatasetMetadata::getThroughputBytes).reversed())
+ .toList();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ cacheSortedDataset();
+ datasetMetadataStore.addListener(datasetListener);
+ }
+
+ @Override
+ protected void run() throws Exception {
+ while (isRunning()) {
+ List requests = new ArrayList<>();
+ pendingRequests.drainTo(requests);
+ batchSizeGauge.set(requests.size());
+ if (requests.isEmpty()) {
+ try {
+ stallCounter.increment();
+ Thread.sleep(producerSleepMs);
+ } catch (InterruptedException e) {
+ return;
+ }
+ } else {
+ produceDocumentsAndCommit(requests);
+ }
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ datasetMetadataStore.removeListener(datasetListener);
+
+ kafkaProducer.close();
+ if (kafkaMetrics != null) {
+ kafkaMetrics.close();
+ }
+ }
+
+ public BulkIngestRequest submitRequest(Map> inputDocs) {
+ BulkIngestRequest request = new BulkIngestRequest(inputDocs);
+ pendingRequests.add(request);
+ return request;
+ }
+
+ protected Map produceDocumentsAndCommit(
+ List requests) {
+ Map responseMap = new HashMap<>();
+
+ // KafkaProducer kafkaProducer = null;
+ try {
+ // kafkaProducer = kafkaProducerPool.borrowObject();
+ kafkaProducer.beginTransaction();
+ for (BulkIngestRequest request : requests) {
+ responseMap.put(request, produceDocuments(request.getInputDocs(), kafkaProducer));
+ }
+ kafkaProducer.commitTransaction();
+ // kafkaProducerPool.returnObject(kafkaProducer);
+ } catch (TimeoutException te) {
+ LOG.error("Commit transaction timeout", te);
+ // the commitTransaction waits till "max.block.ms" after which it will time out
+ // in that case we cannot call abort exception because that throws the following error
+ // "Cannot attempt operation `abortTransaction` because the previous
+ // call to `commitTransaction` timed out and must be retried"
+ // so for now we just restart the preprocessor
+ new RuntimeHalterImpl()
+ .handleFatal(
+ new Throwable(
+ "KafkaProducer needs to shutdown as we don't have retry yet and we cannot call abortTxn on timeout",
+ te));
+ } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+ // We can't recover from these exceptions, so our only option is to close the producer and
+ // exit.
+ new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e));
+ } catch (Exception e) {
+ LOG.warn("failed transaction with error", e);
+ if (kafkaProducer != null) {
+ try {
+ kafkaProducer.abortTransaction();
+ // kafkaProducerPool.returnObject(kafkaProducer);
+ } catch (ProducerFencedException err) {
+ LOG.error("Could not abort transaction", err);
+ }
+ }
+
+ for (BulkIngestRequest request : requests) {
+ responseMap.put(
+ request,
+ new BulkIngestResponse(
+ 0,
+ request.getInputDocs().values().stream().mapToInt(List::size).sum(),
+ e.getMessage()));
+ }
+ }
+
+ for (Map.Entry entry : responseMap.entrySet()) {
+ BulkIngestRequest key = entry.getKey();
+ BulkIngestResponse value = entry.getValue();
+ if (!key.setResponse(value)) {
+ LOG.warn("Failed to add result to the bulk ingest request, consumer thread went away?");
+ failedSetResponseCounter.increment();
+ }
+ }
+ return responseMap;
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private BulkIngestResponse produceDocuments(
+ Map> indexDocs, KafkaProducer kafkaProducer) {
+ int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum();
+
+ // we cannot create a generic pool of producers because the kafka API expects the transaction ID
+ // to be a property while creating the producer object.
+ for (Map.Entry> indexDoc : indexDocs.entrySet()) {
+ String index = indexDoc.getKey();
+
+ // call once per batch and use the same partition for better batching
+ // todo - this probably shouldn't be tied to the transaction batching logic?
+ int partition = getPartition(index);
+
+ // since there isn't a dataset provisioned for this service/index we will not index this set
+ // of docs
+ if (partition < 0) {
+ LOG.warn("index=" + index + " does not have a provisioned dataset associated with it");
+ continue;
+ }
+
+ // KafkaProducer does not allow creating multiple transactions from a single object -
+ // rightfully so.
+ // Till we fix the producer design to allow for multiple /_bulk requests to be able to
+ // write to the same txn
+ // we will limit producing documents 1 thread at a time
+ for (Trace.Span doc : indexDoc.getValue()) {
+ ProducerRecord producerRecord =
+ new ProducerRecord<>(kafkaConfig.getKafkaTopic(), partition, index, doc.toByteArray());
+
+ // we intentionally suppress FutureReturnValueIgnored here in errorprone - this is because
+ // we wrap this in a transaction, which is responsible for flushing all of the pending
+ // messages
+ kafkaProducer.send(producerRecord);
+ }
+ }
+
+ return new BulkIngestResponse(totalDocs, 0, "");
+ }
+
+ private KafkaProducer createKafkaTransactionProducer(String transactionId) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaBootStrapServers());
+ props.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
+
+ // don't override the properties that we have already set explicitly using named properties
+ for (Map.Entry additionalProp :
+ kafkaConfig.getAdditionalPropsMap().entrySet()) {
+ props =
+ KafkaUtils.maybeOverrideProps(
+ props,
+ additionalProp.getKey(),
+ additionalProp.getValue(),
+ OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
+ }
+ return new KafkaProducer<>(props);
+ }
+
+ private int getPartition(String index) {
+ for (DatasetMetadata datasetMetadata : throughputSortedDatasets) {
+ String serviceNamePattern = datasetMetadata.getServiceNamePattern();
+
+ if (serviceNamePattern.equals(MATCH_ALL_SERVICE)
+ || serviceNamePattern.equals(MATCH_STAR_SERVICE)
+ || index.equals(serviceNamePattern)) {
+ List partitions = PreprocessorService.getActivePartitionList(datasetMetadata);
+ return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
+ }
+ }
+ // We don't have a provisioned service for this index
+ return -1;
+ }
+}
diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestRequest.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestRequest.java
new file mode 100644
index 0000000000..8b47a0310b
--- /dev/null
+++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestRequest.java
@@ -0,0 +1,32 @@
+package com.slack.kaldb.bulkIngestApi;
+
+import com.slack.service.murron.trace.Trace;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.SynchronousQueue;
+
+/**
+ * Wrapper object to enable building a bulk request and awaiting on an asynchronous response to be
+ * populated. As this uses a synchronous queue internally, it expects a thread to already be waiting
+ * on getResponse when setResponse is invoked with the result data.
+ */
+public class BulkIngestRequest {
+ private final Map> inputDocs;
+ private final SynchronousQueue internalResponse = new SynchronousQueue<>();
+
+ protected BulkIngestRequest(Map> inputDocs) {
+ this.inputDocs = inputDocs;
+ }
+
+ Map> getInputDocs() {
+ return inputDocs;
+ }
+
+ boolean setResponse(BulkIngestResponse response) {
+ return internalResponse.offer(response);
+ }
+
+ public BulkIngestResponse getResponse() throws InterruptedException {
+ return internalResponse.take();
+ }
+}
diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestResponse.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestResponse.java
new file mode 100644
index 0000000000..fa1947ae56
--- /dev/null
+++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestResponse.java
@@ -0,0 +1,4 @@
+package com.slack.kaldb.bulkIngestApi;
+
+/** Metadata object for the result of a bulk ingest request */
+public record BulkIngestResponse(int totalDocs, long failedDocs, String errorMsg) {}
diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/DatasetRateLimitingService.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/DatasetRateLimitingService.java
new file mode 100644
index 0000000000..c8cd900aba
--- /dev/null
+++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/DatasetRateLimitingService.java
@@ -0,0 +1,74 @@
+package com.slack.kaldb.bulkIngestApi;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
+import com.slack.kaldb.metadata.dataset.DatasetMetadata;
+import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
+import com.slack.kaldb.preprocessor.PreprocessorRateLimiter;
+import com.slack.kaldb.proto.config.KaldbConfigs;
+import com.slack.service.murron.trace.Trace;
+import io.micrometer.core.instrument.Timer;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import java.util.List;
+import java.util.function.BiPredicate;
+
+/**
+ * Guava service that maintains an rate limiting object consistent with the value stored in the
+ * dataset metadata store.
+ */
+public class DatasetRateLimitingService extends AbstractIdleService {
+ private final DatasetMetadataStore datasetMetadataStore;
+ private final KaldbMetadataStoreChangeListener datasetListener =
+ (_) -> updateRateLimiter();
+
+ private final PreprocessorRateLimiter rateLimiter;
+ private BiPredicate> rateLimiterPredicate;
+
+ private final PrometheusMeterRegistry meterRegistry;
+ public static final String RATE_LIMIT_RELOAD_TIMER =
+ "preprocessor_dataset_rate_limit_reload_timer";
+ private final Timer rateLimitReloadtimer;
+
+ public DatasetRateLimitingService(
+ DatasetMetadataStore datasetMetadataStore,
+ KaldbConfigs.PreprocessorConfig preprocessorConfig,
+ PrometheusMeterRegistry meterRegistry) {
+ this.datasetMetadataStore = datasetMetadataStore;
+ this.meterRegistry = meterRegistry;
+
+ this.rateLimiter =
+ new PreprocessorRateLimiter(
+ meterRegistry,
+ preprocessorConfig.getPreprocessorInstanceCount(),
+ preprocessorConfig.getRateLimiterMaxBurstSeconds(),
+ true);
+
+ this.rateLimitReloadtimer = meterRegistry.timer(RATE_LIMIT_RELOAD_TIMER);
+ }
+
+ private void updateRateLimiter() {
+ Timer.Sample sample = Timer.start(meterRegistry);
+ try {
+ List datasetMetadataList = datasetMetadataStore.listSync();
+ this.rateLimiterPredicate = rateLimiter.createBulkIngestRateLimiter(datasetMetadataList);
+ } finally {
+ // TODO: re-work this so that we can add success/failure tags and capture them
+ sample.stop(rateLimitReloadtimer);
+ }
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ updateRateLimiter();
+ datasetMetadataStore.addListener(datasetListener);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ datasetMetadataStore.removeListener(datasetListener);
+ }
+
+ public boolean tryAcquire(String index, List value) {
+ return rateLimiterPredicate.test(index, value);
+ }
+}
diff --git a/kaldb/src/main/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkApiRequestParser.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java
similarity index 79%
rename from kaldb/src/main/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkApiRequestParser.java
rename to kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java
index 891e056ab1..527ecb77c5 100644
--- a/kaldb/src/main/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkApiRequestParser.java
+++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java
@@ -1,10 +1,9 @@
-package com.slack.kaldb.preprocessor.ingest;
+package com.slack.kaldb.bulkIngestApi.opensearch;
import com.google.protobuf.ByteString;
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@@ -22,13 +21,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class OpenSearchBulkApiRequestParser {
+/**
+ * This class uses the Opensearch libraries to parse the bulk ingest request into documents which
+ * can be inserted into Kafka. The goal of this is to leverage Opensearch where possible, while
+ * preventing opensearch abstractions from leaking further into KalDB.
+ */
+public class BulkApiRequestParser {
- public static final Logger LOG = LoggerFactory.getLogger(OpenSearchBulkApiRequestParser.class);
+ private static final Logger LOG = LoggerFactory.getLogger(BulkApiRequestParser.class);
- private static String SERVICE_NAME_KEY = "service_name";
+ private static final String SERVICE_NAME_KEY = "service_name";
- public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
+ public static Map> parseRequest(byte[] postBody) throws IOException {
+ return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
+ }
+
+ protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
ZonedDateTime timestamp =
(ZonedDateTime)
ingestDocument
@@ -69,9 +77,9 @@ public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
return spanBuilder.build();
}
- // key - index. value - list of docs to be indexed
- public static Map> convertIndexRequestToTraceFormat(
+ protected static Map> convertIndexRequestToTraceFormat(
List indexRequests) {
+ // key - index. value - list of docs to be indexed
Map> indexDocs = new HashMap<>();
for (IndexRequest indexRequest : indexRequests) {
@@ -81,7 +89,7 @@ public static Map> convertIndexRequestToTraceFormat(
}
IngestDocument ingestDocument = convertRequestToDocument(indexRequest);
List docs = indexDocs.computeIfAbsent(index, key -> new ArrayList<>());
- docs.add(OpenSearchBulkApiRequestParser.fromIngestDocument(ingestDocument));
+ docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument));
}
return indexDocs;
}
@@ -101,12 +109,11 @@ protected static IngestDocument convertRequestToDocument(IndexRequest indexReque
// and transform it
}
- public static List parseBulkRequest(String postBody) throws IOException {
+ protected static List parseBulkRequest(byte[] postBody) throws IOException {
List indexRequests = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();
// calls parse under the hood
- byte[] bytes = postBody.getBytes(StandardCharsets.UTF_8);
- bulkRequest.add(bytes, 0, bytes.length, null, MediaTypeRegistry.JSON);
+ bulkRequest.add(postBody, 0, postBody.length, null, MediaTypeRegistry.JSON);
List> requests = bulkRequest.requests();
for (DocWriteRequest> request : requests) {
if (request.opType() == DocWriteRequest.OpType.INDEX) {
diff --git a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/BulkIngestResponse.java b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/BulkIngestResponse.java
deleted file mode 100644
index 8d40b9d867..0000000000
--- a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/BulkIngestResponse.java
+++ /dev/null
@@ -1,3 +0,0 @@
-package com.slack.kaldb.elasticsearchApi;
-
-public record BulkIngestResponse(int totalDocs, long failedDocs, String errorMsg) {}
diff --git a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java
index 1e87100fbd..5351ec9b01 100644
--- a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java
+++ b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java
@@ -2,7 +2,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION;
-import static com.slack.kaldb.writer.kafka.KaldbKafkaConsumer.maybeOverride;
import com.google.common.util.concurrent.AbstractService;
import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
@@ -10,6 +9,7 @@
import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
import com.slack.kaldb.proto.config.KaldbConfigs;
+import com.slack.kaldb.writer.KafkaUtils;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
@@ -313,7 +313,9 @@ protected static Properties makeKafkaStreamsProps(
// don't override any property we already set
for (Map.Entry additionalProp :
kafkaStreamConfig.getAdditionalPropsMap().entrySet()) {
- maybeOverride(props, additionalProp.getKey(), additionalProp.getValue(), false);
+ props =
+ KafkaUtils.maybeOverrideProps(
+ props, additionalProp.getKey(), additionalProp.getValue(), false);
}
return props;
diff --git a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
index 4e2a94ab15..8c93374235 100644
--- a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
+++ b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
@@ -5,6 +5,9 @@
import com.google.common.util.concurrent.ServiceManager;
import com.slack.kaldb.blobfs.BlobFs;
import com.slack.kaldb.blobfs.s3.S3CrtBlobFs;
+import com.slack.kaldb.bulkIngestApi.BulkIngestApi;
+import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer;
+import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService;
import com.slack.kaldb.chunkManager.CachingChunkManager;
import com.slack.kaldb.chunkManager.IndexingChunkManager;
import com.slack.kaldb.clusterManager.ClusterHpaMetricService;
@@ -388,12 +391,16 @@ private static Set getServices(
KaldbConfigs.NodeRole.PREPROCESSOR, List.of(datasetMetadataStore)));
if (preprocessorConfig.getUseBulkApi()) {
- // TODO: using an armeria service that is also a guava service does not look elegant
- // explore ways where we can control the lifecycle without the need for a guava service here
- OpenSearchBulkIngestApi openSearchBulkApiService =
- new OpenSearchBulkIngestApi(datasetMetadataStore, preprocessorConfig, meterRegistry);
+ BulkIngestKafkaProducer bulkIngestKafkaProducer =
+ new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry);
+ services.add(bulkIngestKafkaProducer);
+ DatasetRateLimitingService datasetRateLimitingService =
+ new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry);
+ services.add(datasetRateLimitingService);
+
+ BulkIngestApi openSearchBulkApiService =
+ new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
- services.add(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
new PreprocessorService(datasetMetadataStore, preprocessorConfig, meterRegistry);
diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
deleted file mode 100644
index 32aaf4b760..0000000000
--- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
+++ /dev/null
@@ -1,300 +0,0 @@
-package com.slack.kaldb.server;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
-import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;
-import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE;
-import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE;
-import static com.slack.kaldb.preprocessor.PreprocessorService.CONFIG_RELOAD_TIMER;
-import static com.slack.kaldb.preprocessor.PreprocessorService.INITIALIZE_RATE_LIMIT_WARM;
-import static com.slack.kaldb.preprocessor.PreprocessorService.filterValidDatasetMetadata;
-import static com.slack.kaldb.preprocessor.PreprocessorService.sortDatasetsOnThroughput;
-
-import com.google.common.util.concurrent.AbstractService;
-import com.linecorp.armeria.common.HttpResponse;
-import com.linecorp.armeria.server.annotation.Blocking;
-import com.linecorp.armeria.server.annotation.Post;
-import com.slack.kaldb.elasticsearchApi.BulkIngestResponse;
-import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
-import com.slack.kaldb.metadata.dataset.DatasetMetadata;
-import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
-import com.slack.kaldb.preprocessor.PreprocessorRateLimiter;
-import com.slack.kaldb.preprocessor.PreprocessorService;
-import com.slack.kaldb.preprocessor.ingest.OpenSearchBulkApiRequestParser;
-import com.slack.kaldb.proto.config.KaldbConfigs;
-import com.slack.kaldb.util.RuntimeHalterImpl;
-import com.slack.service.murron.trace.Trace;
-import io.micrometer.core.instrument.Timer;
-import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
-import io.micrometer.prometheus.PrometheusMeterRegistry;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiPredicate;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.errors.AuthorizationException;
-import org.apache.kafka.common.errors.OutOfOrderSequenceException;
-import org.apache.kafka.common.errors.ProducerFencedException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.opensearch.action.index.IndexRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * batching is important - if we send one doc a time we will create a transaction per request which
- * is expensive
- */
-public class OpenSearchBulkIngestApi extends AbstractService {
-
- private static final Logger LOG = LoggerFactory.getLogger(OpenSearchBulkIngestApi.class);
- private final PrometheusMeterRegistry meterRegistry;
-
- private final KaldbConfigs.PreprocessorConfig preprocessorConfig;
- private final DatasetMetadataStore datasetMetadataStore;
-
- private final KaldbMetadataStoreChangeListener datasetListener =
- (datasetMetadata) -> load();
-
- private final PreprocessorRateLimiter rateLimiter;
- private BiPredicate> rateLimiterPredicate;
- protected List throughputSortedDatasets;
-
- private final Timer configReloadTimer;
-
- private final KafkaProducer kafkaProducer;
- private final KafkaClientMetrics kafkaMetrics;
-
- private final ReentrantLock lockTransactionalProducer = new ReentrantLock();
-
- @Override
- protected void doStart() {
- try {
- LOG.info("Starting OpenSearchBulkIngestApi service");
- load();
- datasetMetadataStore.addListener(datasetListener);
- LOG.info("OpenSearchBulkIngestAPI service started");
- notifyStarted();
- } catch (Throwable t) {
- notifyFailed(t);
- }
- }
-
- @Override
- protected void doStop() {
- try {
- LOG.info("Stopping OpenSearchBulkIngestApi service");
- datasetMetadataStore.removeListener(datasetListener);
- kafkaProducer.close();
- if (kafkaMetrics != null) {
- kafkaMetrics.close();
- }
- LOG.info("OpenSearchBulkIngestApi service closed");
- notifyStopped();
- } catch (Throwable t) {
- notifyFailed(t);
- }
- }
-
- public void load() {
- Timer.Sample sample = Timer.start(meterRegistry);
- try {
- List datasetMetadataList = datasetMetadataStore.listSync();
- // only attempt to register stream processing on valid dataset configurations
- List datasetMetadataToProcesses =
- filterValidDatasetMetadata(datasetMetadataList);
-
- checkState(!datasetMetadataToProcesses.isEmpty(), "dataset metadata list must not be empty");
-
- this.throughputSortedDatasets = sortDatasetsOnThroughput(datasetMetadataToProcesses);
- this.rateLimiterPredicate =
- rateLimiter.createBulkIngestRateLimiter(datasetMetadataToProcesses);
- } catch (Exception e) {
- notifyFailed(e);
- } finally {
- // TODO: re-work this so that we can add success/failure tags and capture them
- sample.stop(configReloadTimer);
- }
- }
-
- public OpenSearchBulkIngestApi(
- DatasetMetadataStore datasetMetadataStore,
- KaldbConfigs.PreprocessorConfig preprocessorConfig,
- PrometheusMeterRegistry meterRegistry) {
- this(datasetMetadataStore, preprocessorConfig, meterRegistry, INITIALIZE_RATE_LIMIT_WARM);
- }
-
- public OpenSearchBulkIngestApi(
- DatasetMetadataStore datasetMetadataStore,
- KaldbConfigs.PreprocessorConfig preprocessorConfig,
- PrometheusMeterRegistry meterRegistry,
- boolean initializeRateLimitWarm) {
-
- checkArgument(
- !preprocessorConfig.getBootstrapServers().isEmpty(),
- "Kafka bootstrapServers must be provided");
-
- checkArgument(
- !preprocessorConfig.getDownstreamTopic().isEmpty(),
- "Kafka downstreamTopic must be provided");
-
- this.datasetMetadataStore = datasetMetadataStore;
- this.preprocessorConfig = preprocessorConfig;
- this.meterRegistry = meterRegistry;
- this.rateLimiter =
- new PreprocessorRateLimiter(
- meterRegistry,
- preprocessorConfig.getPreprocessorInstanceCount(),
- preprocessorConfig.getRateLimiterMaxBurstSeconds(),
- initializeRateLimitWarm);
-
- this.configReloadTimer = meterRegistry.timer(CONFIG_RELOAD_TIMER);
-
- // since we use a new transaction ID every time we start a preprocessor there can be some zombie
- // transactions?
- // I think they will remain in kafka till they expire. They should never be readable if the
- // consumer sets isolation.level as "read_committed"
- // see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/
- this.kafkaProducer = createKafkaTransactionProducer(UUID.randomUUID().toString());
- kafkaMetrics = new KafkaClientMetrics(kafkaProducer);
- kafkaMetrics.bindTo(meterRegistry);
- this.kafkaProducer.initTransactions();
- }
-
- /**
- * 1. Kaldb does not support the concept of "updates". It's always an add 2. The "index" is used
- * as the span name
- */
- @Blocking
- @Post("/_bulk")
- public HttpResponse addDocument(String bulkRequest) {
- try {
- List indexRequests =
- OpenSearchBulkApiRequestParser.parseBulkRequest(bulkRequest);
- Map> docs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
- // our rate limiter doesn't have a way to acquire permits across multiple datasets
- // so today as a limitation we reject any request that has documents against multiple indexes
- // We think most indexing requests will be against 1 index
- if (docs.keySet().size() > 1) {
- BulkIngestResponse response =
- new BulkIngestResponse(0, 0, "request must contain only 1 unique index");
- return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response);
- }
-
- for (Map.Entry> indexDocs : docs.entrySet()) {
- final String index = indexDocs.getKey();
- if (!rateLimiterPredicate.test(index, indexDocs.getValue())) {
- BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
- return HttpResponse.ofJson(TOO_MANY_REQUESTS, response);
- }
- }
- BulkIngestResponse response = produceDocuments(docs);
- return HttpResponse.ofJson(response);
- } catch (Exception e) {
- LOG.error("Request failed ", e);
- BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
- return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response);
- }
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- public BulkIngestResponse produceDocuments(Map> indexDocs) {
- int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum();
-
- // we cannot create a generic pool of producers because the kafka API expects the transaction ID
- // to be a property while creating the producer object.
- for (Map.Entry> indexDoc : indexDocs.entrySet()) {
- String index = indexDoc.getKey();
- // call once per batch and use the same partition for better batching
- int partition = getPartition(index);
-
- // since there isn't a dataset provisioned for this service/index we will not index this set
- // of docs
- if (partition < 0) {
- LOG.warn("index=" + index + " does not have a provisioned dataset associated with it");
- continue;
- }
-
- // KafkaProducer does not allow creating multiple transactions from a single object -
- // rightfully so.
- // Till we fix the producer design to allow for multiple /_bulk requests to be able to
- // write to the same txn
- // we will limit producing documents 1 thread at a time
- lockTransactionalProducer.lock();
- try {
- kafkaProducer.beginTransaction();
- for (Trace.Span doc : indexDoc.getValue()) {
- ProducerRecord producerRecord =
- new ProducerRecord<>(
- preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray());
-
- // we intentionally supress FutureReturnValueIgnored here in errorprone - this is because
- // we wrap this in a transaction, which is responsible for flushing all of the pending
- // messages
- kafkaProducer.send(producerRecord);
- }
- kafkaProducer.commitTransaction();
- } catch (TimeoutException te) {
- LOG.error("Commit transaction timeout", te);
- // the commitTransaction waits till "max.block.ms" after which it will time out
- // in that case we cannot call abort exception because that throws the following error
- // "Cannot attempt operation `abortTransaction` because the previous
- // call to `commitTransaction` timed out and must be retried"
- // so for now we just restart the preprocessor
- new RuntimeHalterImpl()
- .handleFatal(
- new Throwable(
- "KafkaProducer needs to shutdown as we don't have retry yet and we cannot call abortTxn on timeout",
- te));
- } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
- // We can't recover from these exceptions, so our only option is to close the producer and
- // exit.
- new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e));
- } catch (Exception e) {
- LOG.warn("failed transaction with error", e);
- try {
- kafkaProducer.abortTransaction();
- } catch (ProducerFencedException err) {
- LOG.error("Could not abort transaction", err);
- }
- return new BulkIngestResponse(0, totalDocs, e.getMessage());
- } finally {
- lockTransactionalProducer.unlock();
- }
- }
-
- return new BulkIngestResponse(totalDocs, 0, "");
- }
-
- private int getPartition(String index) {
- for (DatasetMetadata datasetMetadata : throughputSortedDatasets) {
- String serviceNamePattern = datasetMetadata.getServiceNamePattern();
-
- if (serviceNamePattern.equals(MATCH_ALL_SERVICE)
- || serviceNamePattern.equals(MATCH_STAR_SERVICE)
- || index.equals(serviceNamePattern)) {
- List partitions = PreprocessorService.getActivePartitionList(datasetMetadata);
- return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
- }
- }
- // We don't have a provisioned service for this index
- return -1;
- }
-
- private KafkaProducer createKafkaTransactionProducer(String transactionId) {
- Properties props = new Properties();
- props.put("bootstrap.servers", preprocessorConfig.getBootstrapServers());
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- props.put("transactional.id", transactionId);
- props.put("linger.ms", 250);
- props.put("max.block.ms", "10000");
- props.put("compression.type", "snappy");
- return new KafkaProducer<>(props);
- }
-}
diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java b/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java
new file mode 100644
index 0000000000..7c351a6f00
--- /dev/null
+++ b/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java
@@ -0,0 +1,35 @@
+package com.slack.kaldb.writer;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Shared kafka functions for producers, consumers, and stream applications */
+public class KafkaUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+
+ @VisibleForTesting
+ public static Properties maybeOverrideProps(
+ Properties inputProps, String key, String value, boolean override) {
+ Properties changedProps = (Properties) inputProps.clone();
+ String userValue = changedProps.getProperty(key);
+ if (userValue != null) {
+ if (override) {
+ LOG.warn(
+ String.format(
+ "Property %s is provided but will be overridden from %s to %s",
+ key, userValue, value));
+ changedProps.setProperty(key, value);
+ } else {
+ LOG.warn(
+ String.format(
+ "Property %s is provided but won't be overridden from %s to %s",
+ key, userValue, value));
+ }
+ } else {
+ changedProps.setProperty(key, value);
+ }
+ return changedProps;
+ }
+}
diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java b/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java
index 368cd158c7..7e151df9d5 100644
--- a/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java
+++ b/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java
@@ -9,6 +9,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.server.KaldbConfig;
+import com.slack.kaldb.writer.KafkaUtils;
import com.slack.kaldb.writer.LogMessageWriterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
@@ -82,40 +83,16 @@ public static Properties makeKafkaConsumerProps(KaldbConfigs.KafkaConfig kafkaCo
// don't override the properties that we have already set explicitly using named properties
for (Map.Entry additionalProp :
kafkaConfig.getAdditionalPropsMap().entrySet()) {
- maybeOverride(
- props,
- additionalProp.getKey(),
- additionalProp.getValue(),
- OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
+ props =
+ KafkaUtils.maybeOverrideProps(
+ props,
+ additionalProp.getKey(),
+ additionalProp.getValue(),
+ OVERRIDABLE_CONFIGS.contains(additionalProp.getKey()));
}
return props;
}
- @VisibleForTesting
- public static boolean maybeOverride(
- Properties props, String key, String value, boolean override) {
- boolean overridden = false;
- String userValue = props.getProperty(key);
- if (userValue != null) {
- if (override) {
- LOG.warn(
- String.format(
- "Property %s is provided but will be overridden from %s to %s",
- key, userValue, value));
- props.setProperty(key, value);
- overridden = true;
- } else {
- LOG.warn(
- String.format(
- "Property %s is provided but won't be overridden from %s to %s",
- key, userValue, value));
- }
- } else {
- props.setProperty(key, value);
- }
- return overridden;
- }
-
private KafkaConsumer kafkaConsumer;
private final TopicPartition topicPartition;
diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto
index 17b1656939..21ac233489 100644
--- a/kaldb/src/main/proto/kaldb_configs.proto
+++ b/kaldb/src/main/proto/kaldb_configs.proto
@@ -266,8 +266,8 @@ message PreprocessorConfig {
// more docs on PreprocessorPartitioner#getDatasetPartitionSuppliers
int32 kafka_partition_sticky_timeout_ms = 8;
- // this value needs to be set if the bulk API is used to bootstrap the producer kafka
+ // Kafka config needs to be set if the bulk API is used to bootstrap the producer kafka
// we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future
- string bootstrap_servers = 9;
+ KafkaConfig kafka_config = 9;
bool use_bulk_api = 10;
}
diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java
new file mode 100644
index 0000000000..3035a09180
--- /dev/null
+++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java
@@ -0,0 +1,264 @@
+package com.slack.kaldb.bulkIngestApi;
+
+import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import brave.Tracing;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.slack.kaldb.metadata.core.CuratorBuilder;
+import com.slack.kaldb.metadata.dataset.DatasetMetadata;
+import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
+import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
+import com.slack.kaldb.proto.config.KaldbConfigs;
+import com.slack.kaldb.testlib.TestKafkaServer;
+import com.slack.service.murron.trace.Trace;
+import io.micrometer.prometheus.PrometheusConfig;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BulkIngestKafkaProducerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(BulkIngestKafkaProducerTest.class);
+ private static PrometheusMeterRegistry meterRegistry;
+ private static AsyncCuratorFramework curatorFramework;
+ private static KaldbConfigs.PreprocessorConfig preprocessorConfig;
+ private static DatasetMetadataStore datasetMetadataStore;
+ private static TestingServer zkServer;
+ private static TestKafkaServer kafkaServer;
+
+ private BulkIngestKafkaProducer bulkIngestKafkaProducer;
+
+ static String INDEX_NAME = "testtransactionindex";
+
+ private static String DOWNSTREAM_TOPIC = "test-transaction-topic-out";
+
+ @BeforeEach
+ public void bootstrapCluster() throws Exception {
+ Tracing.newBuilder().build();
+ meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
+
+ zkServer = new TestingServer();
+ KaldbConfigs.ZookeeperConfig zkConfig =
+ KaldbConfigs.ZookeeperConfig.newBuilder()
+ .setZkConnectString(zkServer.getConnectString())
+ .setZkPathPrefix("testZK")
+ .setZkSessionTimeoutMs(1000)
+ .setZkConnectionTimeoutMs(1000)
+ .setSleepBetweenRetriesMs(1000)
+ .build();
+ curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig);
+
+ kafkaServer = new TestKafkaServer();
+ kafkaServer.createTopicWithPartitions(DOWNSTREAM_TOPIC, 5);
+
+ KaldbConfigs.ServerConfig serverConfig =
+ KaldbConfigs.ServerConfig.newBuilder()
+ .setServerPort(8080)
+ .setServerAddress("localhost")
+ .build();
+ KaldbConfigs.KafkaConfig kafkaConfig =
+ KaldbConfigs.KafkaConfig.newBuilder()
+ .setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get())
+ .setKafkaTopic(DOWNSTREAM_TOPIC)
+ .build();
+ preprocessorConfig =
+ KaldbConfigs.PreprocessorConfig.newBuilder()
+ .setKafkaConfig(kafkaConfig)
+ .setUseBulkApi(true)
+ .setServerConfig(serverConfig)
+ .setPreprocessorInstanceCount(1)
+ .setRateLimiterMaxBurstSeconds(1)
+ .build();
+
+ datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true);
+ DatasetMetadata datasetMetadata =
+ new DatasetMetadata(
+ INDEX_NAME,
+ "owner",
+ 1,
+ List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("0"))),
+ INDEX_NAME);
+ // Create an entry while init. Update the entry on every test run
+ datasetMetadataStore.createSync(datasetMetadata);
+
+ bulkIngestKafkaProducer =
+ new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry);
+ bulkIngestKafkaProducer.startAsync();
+ bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION);
+ }
+
+ @Test
+ public void testDocumentInKafkaTransactionError() throws Exception {
+ KafkaConsumer kafkaConsumer = getTestKafkaConsumer();
+
+ // we want to inject a failure in the second doc and test if the abort transaction works and we
+ // don't index the first document
+ Trace.Span doc1 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error1")).build();
+ Trace.Span doc2 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error2")).build();
+ Trace.Span doc3 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error3")).build();
+ Trace.Span doc4 = spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error4")).build());
+ when(doc4.toByteArray()).thenThrow(new RuntimeException("exception"));
+ Trace.Span doc5 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error5")).build();
+
+ Map> indexDocs =
+ Map.of(INDEX_NAME, List.of(doc1, doc2, doc3, doc4, doc5));
+
+ BulkIngestRequest request1 = new BulkIngestRequest(indexDocs);
+ Thread.ofVirtual()
+ .start(
+ () -> {
+ try {
+ // because of the synchronous queue, we need someone consuming the response before
+ // we attempt to set it
+ request1.getResponse();
+ } catch (InterruptedException ignored) {
+ }
+ });
+ BulkIngestResponse responseObj =
+ (BulkIngestResponse)
+ bulkIngestKafkaProducer
+ .produceDocumentsAndCommit(List.of(request1))
+ .values()
+ .toArray()[0];
+ assertThat(responseObj.totalDocs()).isEqualTo(0);
+ assertThat(responseObj.failedDocs()).isEqualTo(5);
+ assertThat(responseObj.errorMsg()).isNotNull();
+
+ await()
+ .until(
+ () -> {
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ long partitionOffset =
+ (Long)
+ kafkaConsumer
+ .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0)))
+ .values()
+ .stream()
+ .findFirst()
+ .get();
+ LOG.debug(
+ "Current partitionOffset - {}. expecting offset to be less than 5",
+ partitionOffset);
+ return partitionOffset > 0 && partitionOffset < 5;
+ });
+
+ ConsumerRecords records =
+ kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS));
+
+ assertThat(records.count()).isEqualTo(0);
+
+ long currentPartitionOffset =
+ (Long)
+ kafkaConsumer
+ .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0)))
+ .values()
+ .stream()
+ .findFirst()
+ .get();
+
+ Trace.Span doc6 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error6")).build();
+ Trace.Span doc7 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error7")).build();
+ Trace.Span doc8 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error8")).build();
+ Trace.Span doc9 =
+ spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error9")).build());
+ Trace.Span doc10 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error10")).build();
+
+ indexDocs = Map.of(INDEX_NAME, List.of(doc6, doc7, doc8, doc9, doc10));
+
+ BulkIngestRequest request2 = new BulkIngestRequest(indexDocs);
+ Thread.ofVirtual()
+ .start(
+ () -> {
+ try {
+ // because of the synchronous queue, we need someone consuming the response before
+ // we attempt to set it
+ request2.getResponse();
+ } catch (InterruptedException ignored) {
+ }
+ });
+ responseObj =
+ (BulkIngestResponse)
+ bulkIngestKafkaProducer
+ .produceDocumentsAndCommit(List.of(request2))
+ .values()
+ .toArray()[0];
+ assertThat(responseObj.totalDocs()).isEqualTo(5);
+ assertThat(responseObj.failedDocs()).isEqualTo(0);
+ assertThat(responseObj.errorMsg()).isNotNull();
+
+ // 5 docs. 1 control batch. initial offset was 1 after the first failed batch
+ validateOffset(kafkaConsumer, currentPartitionOffset + 5 + 1);
+ records = kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS));
+
+ assertThat(records.count()).isEqualTo(5);
+ records.forEach(
+ record ->
+ LOG.info(
+ "Trace= + " + TraceSpanParserSilenceError(record.value()).getId().toStringUtf8()));
+
+ // close the kafka consumer used in the test
+ kafkaConsumer.close();
+ }
+
+ public KafkaConsumer getTestKafkaConsumer() {
+ // used to verify the message exist on the downstream topic
+ Properties properties = kafkaServer.getBroker().consumerConfig();
+ properties.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
+ properties.put("isolation.level", "read_committed");
+ KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
+ kafkaConsumer.subscribe(List.of(DOWNSTREAM_TOPIC));
+ return kafkaConsumer;
+ }
+
+ public void validateOffset(KafkaConsumer kafkaConsumer, long expectedOffset) {
+ await()
+ .until(
+ () -> {
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ long partitionOffset =
+ (Long)
+ kafkaConsumer
+ .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0)))
+ .values()
+ .stream()
+ .findFirst()
+ .get();
+ LOG.debug(
+ "Current partitionOffset - {}. expecting offset to be - {}",
+ partitionOffset,
+ expectedOffset);
+ return partitionOffset == expectedOffset;
+ });
+ }
+
+ private static Trace.Span TraceSpanParserSilenceError(byte[] data) {
+ try {
+ return Trace.Span.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ return Trace.Span.newBuilder().build();
+ }
+ }
+}
diff --git a/kaldb/src/test/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkRequestTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java
similarity index 69%
rename from kaldb/src/test/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkRequestTest.java
rename to kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java
index 62b3141f38..c25d76a548 100644
--- a/kaldb/src/test/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkRequestTest.java
+++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java
@@ -1,11 +1,13 @@
-package com.slack.kaldb.preprocessor.ingest;
+package com.slack.kaldb.bulkIngestApi.opensearch;
+import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.io.Resources;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
@@ -15,26 +17,27 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ingest.IngestDocument;
-public class OpenSearchBulkRequestTest {
+public class BulkApiRequestParserTest {
- private String getRawQueryString(String filename) throws IOException {
+ private byte[] getRawQueryBytes(String filename) throws IOException {
return Resources.toString(
- Resources.getResource(String.format("opensearchRequest/bulk/%s.ndjson", filename)),
- Charset.defaultCharset());
+ Resources.getResource(String.format("opensearchRequest/bulk/%s.ndjson", filename)),
+ Charset.defaultCharset())
+ .getBytes(StandardCharsets.UTF_8);
}
@Test
public void testSimpleIndexRequest() throws Exception {
- String rawRequest = getRawQueryString("index_simple");
+ byte[] rawRequest = getRawQueryBytes("index_simple");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
assertThat(indexRequests.get(0).index()).isEqualTo("test");
assertThat(indexRequests.get(0).id()).isEqualTo("1");
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(2);
Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
+ BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);
@@ -52,12 +55,12 @@ public void testSimpleIndexRequest() throws Exception {
@Test
public void testIndexNoFields() throws Exception {
- String rawRequest = getRawQueryString("index_no_fields");
+ byte[] rawRequest = getRawQueryBytes("index_no_fields");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
+ BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);
@@ -75,12 +78,12 @@ public void testIndexNoFields() throws Exception {
@Test
public void testIndexNoFieldsNoId() throws Exception {
- String rawRequest = getRawQueryString("index_no_fields_no_id");
+ byte[] rawRequest = getRawQueryBytes("index_no_fields_no_id");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
+ BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);
@@ -98,29 +101,29 @@ public void testIndexNoFieldsNoId() throws Exception {
@Test
public void testIndexEmptyRequest() throws Exception {
- String rawRequest = getRawQueryString("index_empty_request");
+ byte[] rawRequest = getRawQueryBytes("index_empty_request");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
+ BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(0);
}
@Test
public void testOtherBulkRequests() throws Exception {
- String rawRequest = getRawQueryString("non_index");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ byte[] rawRequest = getRawQueryBytes("non_index");
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(0);
}
@Test
public void testIndexRequestWithSpecialChars() throws Exception {
- String rawRequest = getRawQueryString("index_request_with_special_chars");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ byte[] rawRequest = getRawQueryBytes("index_request_with_special_chars");
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
+ BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("index_name").size()).isEqualTo(1);
@@ -138,12 +141,12 @@ public void testIndexRequestWithSpecialChars() throws Exception {
@Test
public void testBulkRequests() throws Exception {
- String rawRequest = getRawQueryString("bulk_requests");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ byte[] rawRequest = getRawQueryBytes("bulk_requests");
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
+ BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);
@@ -161,12 +164,12 @@ public void testBulkRequests() throws Exception {
@Test
public void testUpdatesAgainstTwoIndexes() throws Exception {
- String rawRequest = getRawQueryString("two_indexes");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ byte[] rawRequest = getRawQueryBytes("two_indexes");
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(2);
Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
+ BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(2);
assertThat(indexDocs.get("test1").size()).isEqualTo(1);
assertThat(indexDocs.get("test2").size()).isEqualTo(1);
@@ -177,14 +180,13 @@ public void testUpdatesAgainstTwoIndexes() throws Exception {
@Test
public void testTraceSpanGeneratedTimestamp() throws IOException {
- String rawRequest = getRawQueryString("index_simple");
+ byte[] rawRequest = getRawQueryBytes("index_simple");
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest);
+ List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
- IngestDocument ingestDocument =
- OpenSearchBulkApiRequestParser.convertRequestToDocument(indexRequests.get(0));
- Trace.Span span = OpenSearchBulkApiRequestParser.fromIngestDocument(ingestDocument);
+ IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0));
+ Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument);
// timestamp is in microseconds based on the trace.proto definition
Instant ingestDocumentTime =
diff --git a/kaldb/src/test/java/com/slack/kaldb/server/OpenSearchBulkIngestApiTest.java b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java
similarity index 55%
rename from kaldb/src/test/java/com/slack/kaldb/server/OpenSearchBulkIngestApiTest.java
rename to kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java
index 3581237c96..c112d35bc0 100644
--- a/kaldb/src/test/java/com/slack/kaldb/server/OpenSearchBulkIngestApiTest.java
+++ b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java
@@ -5,32 +5,34 @@
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
import brave.Tracing;
-import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.common.AggregatedHttpResponse;
-import com.slack.kaldb.elasticsearchApi.BulkIngestResponse;
+import com.slack.kaldb.bulkIngestApi.BulkIngestApi;
+import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer;
+import com.slack.kaldb.bulkIngestApi.BulkIngestResponse;
+import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService;
import com.slack.kaldb.metadata.core.CuratorBuilder;
import com.slack.kaldb.metadata.dataset.DatasetMetadata;
import com.slack.kaldb.metadata.dataset.DatasetMetadataStore;
import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata;
-import com.slack.kaldb.preprocessor.PreprocessorRateLimiter;
-import com.slack.kaldb.preprocessor.ingest.OpenSearchBulkApiRequestParser;
import com.slack.kaldb.proto.config.KaldbConfigs;
+import com.slack.kaldb.testlib.MetricsUtil;
import com.slack.kaldb.testlib.TestKafkaServer;
import com.slack.kaldb.util.JsonUtil;
import com.slack.service.murron.trace.Trace;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -40,21 +42,21 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.opensearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class OpenSearchBulkIngestApiTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(OpenSearchBulkIngestApi.class);
-
+public class BulkIngestApiTest {
+ private static final Logger LOG = LoggerFactory.getLogger(BulkIngestApi.class);
private static PrometheusMeterRegistry meterRegistry;
private static AsyncCuratorFramework curatorFramework;
private static KaldbConfigs.PreprocessorConfig preprocessorConfig;
private static DatasetMetadataStore datasetMetadataStore;
private static TestingServer zkServer;
private static TestKafkaServer kafkaServer;
- private OpenSearchBulkIngestApi openSearchBulkAPI;
+ private BulkIngestApi bulkApi;
+
+ private BulkIngestKafkaProducer bulkIngestKafkaProducer;
+ private DatasetRateLimitingService datasetRateLimitingService;
static String INDEX_NAME = "testindex";
@@ -84,14 +86,18 @@ public void bootstrapCluster() throws Exception {
.setServerPort(8080)
.setServerAddress("localhost")
.build();
+ KaldbConfigs.KafkaConfig kafkaConfig =
+ KaldbConfigs.KafkaConfig.newBuilder()
+ .setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get())
+ .setKafkaTopic(DOWNSTREAM_TOPIC)
+ .build();
preprocessorConfig =
KaldbConfigs.PreprocessorConfig.newBuilder()
- .setBootstrapServers(kafkaServer.getBroker().getBrokerList().get())
+ .setKafkaConfig(kafkaConfig)
.setUseBulkApi(true)
.setServerConfig(serverConfig)
.setPreprocessorInstanceCount(1)
.setRateLimiterMaxBurstSeconds(1)
- .setDownstreamTopic(DOWNSTREAM_TOPIC)
.build();
datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true);
@@ -105,18 +111,29 @@ public void bootstrapCluster() throws Exception {
// Create an entry while init. Update the entry on every test run
datasetMetadataStore.createSync(datasetMetadata);
- openSearchBulkAPI =
- new OpenSearchBulkIngestApi(datasetMetadataStore, preprocessorConfig, meterRegistry, false);
+ datasetRateLimitingService =
+ new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry);
+ bulkIngestKafkaProducer =
+ new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry);
- openSearchBulkAPI.startAsync();
- openSearchBulkAPI.awaitRunning(DEFAULT_START_STOP_DURATION);
+ datasetRateLimitingService.startAsync();
+ bulkIngestKafkaProducer.startAsync();
+
+ datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION);
+ bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION);
+
+ bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry);
}
// I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and
// pass throughputBytes.
// However, decided not to go with that because it involved hardcoding the throughput bytes
// when defining the test. We need it to be dynamic based on the size of the docs
- public void updateDatasetThroughput(int throughputBytes) throws Exception {
+ public void updateDatasetThroughput(int throughputBytes) {
+ double timerCount =
+ MetricsUtil.getTimerCount(
+ DatasetRateLimitingService.RATE_LIMIT_RELOAD_TIMER, meterRegistry);
+
// dataset metadata already exists. Update with the throughput value
DatasetMetadata datasetMetadata =
new DatasetMetadata(
@@ -127,17 +144,13 @@ public void updateDatasetThroughput(int throughputBytes) throws Exception {
INDEX_NAME);
datasetMetadataStore.updateSync(datasetMetadata);
- // Need to wait until we've verified the rate limit has been correctly loaded
+ // Need to wait until the rate limit has been loaded
await()
.until(
() ->
- openSearchBulkAPI.throughputSortedDatasets.stream()
- .filter(datasetMetadata1 -> datasetMetadata1.name.equals(INDEX_NAME))
- .findFirst(),
- (storedDatasetMetadata) -> {
- //noinspection OptionalGetWithoutIsPresent
- return storedDatasetMetadata.get().getThroughputBytes() == throughputBytes;
- });
+ MetricsUtil.getTimerCount(
+ DatasetRateLimitingService.RATE_LIMIT_RELOAD_TIMER, meterRegistry)
+ > timerCount);
}
@AfterEach
@@ -145,9 +158,13 @@ public void updateDatasetThroughput(int throughputBytes) throws Exception {
// Instead of calling stop from every test and ensuring it's part of a finally block we just call
// the shutdown code with the @AfterEach annotation
public void shutdownOpenSearchAPI() throws Exception {
- if (openSearchBulkAPI != null) {
- openSearchBulkAPI.stopAsync();
- openSearchBulkAPI.awaitTerminated(DEFAULT_START_STOP_DURATION);
+ if (datasetRateLimitingService != null) {
+ datasetRateLimitingService.stopAsync();
+ datasetRateLimitingService.awaitTerminated(DEFAULT_START_STOP_DURATION);
+ }
+ if (bulkIngestKafkaProducer != null) {
+ bulkIngestKafkaProducer.stopAsync();
+ bulkIngestKafkaProducer.awaitTerminated(DEFAULT_START_STOP_DURATION);
}
kafkaServer.close();
curatorFramework.unwrap().close();
@@ -178,19 +195,10 @@ public void testBulkApiBasic() throws Exception {
{ "index": {"_index": "testindex", "_id": "1"} }
{ "field1" : "value1" }
""";
- // get num bytes that can be used to create the dataset. When we make 2 successive calls the
- // second one should fail
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(request1);
- Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
- assertThat(indexDocs.keySet().size()).isEqualTo(1);
- assertThat(indexDocs.get("testindex").size()).isEqualTo(1);
- assertThat(indexDocs.get("testindex").get(0).getId().toStringUtf8()).isEqualTo("1");
- int throughputBytes = PreprocessorRateLimiter.getSpanBytes(indexDocs.get("testindex"));
- updateDatasetThroughput(throughputBytes);
+ updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length);
// test with empty causes a parse exception
- AggregatedHttpResponse response = openSearchBulkAPI.addDocument("{}\n").aggregate().join();
+ AggregatedHttpResponse response = bulkApi.addDocument("{}\n").aggregate().join();
assertThat(response.status().isSuccess()).isEqualTo(false);
assertThat(response.status().code()).isEqualTo(INTERNAL_SERVER_ERROR.code());
BulkIngestResponse responseObj =
@@ -200,20 +208,49 @@ public void testBulkApiBasic() throws Exception {
// test with request1 twice. first one should succeed, second one will fail because of rate
// limiter
- response = openSearchBulkAPI.addDocument(request1).aggregate().join();
- assertThat(response.status().isSuccess()).isEqualTo(true);
- assertThat(response.status().code()).isEqualTo(OK.code());
- responseObj = JsonUtil.read(response.contentUtf8(), BulkIngestResponse.class);
- assertThat(responseObj.totalDocs()).isEqualTo(1);
- assertThat(responseObj.failedDocs()).isEqualTo(0);
-
- response = openSearchBulkAPI.addDocument(request1).aggregate().join();
- assertThat(response.status().isSuccess()).isEqualTo(false);
- assertThat(response.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
- responseObj = JsonUtil.read(response.contentUtf8(), BulkIngestResponse.class);
- assertThat(responseObj.totalDocs()).isEqualTo(0);
- assertThat(responseObj.failedDocs()).isEqualTo(0);
- assertThat(responseObj.errorMsg()).isEqualTo("rate limit exceeded");
+ CompletableFuture response1 =
+ bulkApi
+ .addDocument(request1)
+ .aggregate()
+ .thenApply(
+ httpResponse -> {
+ assertThat(httpResponse.status().isSuccess()).isEqualTo(true);
+ assertThat(httpResponse.status().code()).isEqualTo(OK.code());
+ BulkIngestResponse httpResponseObj = null;
+ try {
+ httpResponseObj =
+ JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
+ } catch (IOException e) {
+ fail("", e);
+ }
+ assertThat(httpResponseObj.totalDocs()).isEqualTo(1);
+ assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
+ return httpResponse;
+ });
+
+ CompletableFuture response2 =
+ bulkApi
+ .addDocument(request1)
+ .aggregate()
+ .thenApply(
+ httpResponse -> {
+ assertThat(httpResponse.status().isSuccess()).isEqualTo(false);
+ assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code());
+ BulkIngestResponse httpResponseObj = null;
+ try {
+ httpResponseObj =
+ JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class);
+ } catch (IOException e) {
+ fail("", e);
+ }
+ assertThat(httpResponseObj.totalDocs()).isEqualTo(0);
+ assertThat(httpResponseObj.failedDocs()).isEqualTo(0);
+ assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded");
+ return httpResponse;
+ });
+
+ await().until(response1::isDone);
+ await().until(response2::isDone);
// test with multiple indexes
String request2 =
@@ -223,7 +260,7 @@ public void testBulkApiBasic() throws Exception {
{ "index": {"_index": "testindex2", "_id": "1"} }
{ "field1" : "value1" }
""";
- response = openSearchBulkAPI.addDocument(request2).aggregate().join();
+ response = bulkApi.addDocument(request2).aggregate().join();
assertThat(response.status().isSuccess()).isEqualTo(false);
assertThat(response.status().code()).isEqualTo(INTERNAL_SERVER_ERROR.code());
responseObj = JsonUtil.read(response.contentUtf8(), BulkIngestResponse.class);
@@ -241,17 +278,11 @@ public void testDocumentInKafkaSimple() throws Exception {
{ "index": {"_index": "testindex", "_id": "2"} }
{ "field1" : "value2" }
""";
- List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(request1);
- Map> indexDocs =
- OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
- assertThat(indexDocs.keySet().size()).isEqualTo(1);
- assertThat(indexDocs.get("testindex").size()).isEqualTo(2);
- int throughputBytes = PreprocessorRateLimiter.getSpanBytes(indexDocs.get("testindex"));
- updateDatasetThroughput(throughputBytes);
+ updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length);
KafkaConsumer kafkaConsumer = getTestKafkaConsumer();
- AggregatedHttpResponse response = openSearchBulkAPI.addDocument(request1).aggregate().join();
+ AggregatedHttpResponse response = bulkApi.addDocument(request1).aggregate().join();
assertThat(response.status().isSuccess()).isEqualTo(true);
assertThat(response.status().code()).isEqualTo(OK.code());
BulkIngestResponse responseObj =
@@ -279,89 +310,6 @@ record ->
kafkaConsumer.close();
}
- @Test
- public void testDocumentInKafkaTransactionError() throws Exception {
- updateDatasetThroughput(100_1000);
-
- KafkaConsumer kafkaConsumer = getTestKafkaConsumer();
-
- // we want to inject a failure in the second doc and test if the abort transaction works and we
- // don't index the first document
- Trace.Span doc1 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error1")).build();
- Trace.Span doc2 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error2")).build();
- Trace.Span doc3 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error3")).build();
- Trace.Span doc4 = spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error4")).build());
- when(doc4.toByteArray()).thenThrow(new RuntimeException("exception"));
- Trace.Span doc5 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error5")).build();
-
- Map> indexDocs =
- Map.of("testindex", List.of(doc1, doc2, doc3, doc4, doc5));
-
- BulkIngestResponse responseObj = openSearchBulkAPI.produceDocuments(indexDocs);
- assertThat(responseObj.totalDocs()).isEqualTo(0);
- assertThat(responseObj.failedDocs()).isEqualTo(5);
- assertThat(responseObj.errorMsg()).isNotNull();
-
- await()
- .until(
- () -> {
- @SuppressWarnings("OptionalGetWithoutIsPresent")
- long partitionOffset =
- (Long)
- kafkaConsumer
- .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0)))
- .values()
- .stream()
- .findFirst()
- .get();
- LOG.debug(
- "Current partitionOffset - {}. expecting offset to be less than 5",
- partitionOffset);
- return partitionOffset > 0 && partitionOffset < 5;
- });
-
- ConsumerRecords records =
- kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS));
-
- assertThat(records.count()).isEqualTo(0);
-
- long currentPartitionOffset =
- (Long)
- kafkaConsumer
- .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0)))
- .values()
- .stream()
- .findFirst()
- .get();
-
- Trace.Span doc6 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error6")).build();
- Trace.Span doc7 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error7")).build();
- Trace.Span doc8 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error8")).build();
- Trace.Span doc9 =
- spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error9")).build());
- Trace.Span doc10 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error10")).build();
-
- indexDocs = Map.of("testindex", List.of(doc6, doc7, doc8, doc9, doc10));
-
- responseObj = openSearchBulkAPI.produceDocuments(indexDocs);
- assertThat(responseObj.totalDocs()).isEqualTo(5);
- assertThat(responseObj.failedDocs()).isEqualTo(0);
- assertThat(responseObj.errorMsg()).isNotNull();
-
- // 5 docs. 1 control batch. initial offset was 1 after the first failed batch
- validateOffset(kafkaConsumer, currentPartitionOffset + 5 + 1);
- records = kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS));
-
- assertThat(records.count()).isEqualTo(5);
- records.forEach(
- record ->
- LOG.info(
- "Trace= + " + TraceSpanParserSilenceError(record.value()).getId().toStringUtf8()));
-
- // close the kafka consumer used in the test
- kafkaConsumer.close();
- }
-
public void validateOffset(KafkaConsumer kafkaConsumer, long expectedOffset) {
await()
.until(
diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java
index 9f72902287..adb80f0fea 100644
--- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java
+++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java
@@ -295,7 +295,11 @@ public void testParseKaldbJsonConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);
assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false);
- assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092");
+
+ final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
+ config.getPreprocessorConfig().getKafkaConfig();
+ assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092");
+ assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");
final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
@@ -466,8 +470,12 @@ public void testParseKaldbYamlConfigFile() throws IOException {
assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log");
assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2);
+ final KaldbConfigs.KafkaConfig preprocessorKafkaConfig =
+ config.getPreprocessorConfig().getKafkaConfig();
+ assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092");
+ assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic");
+
assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true);
- assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092");
final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig();
assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085);
diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/KafkaUtilsTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/KafkaUtilsTest.java
new file mode 100644
index 0000000000..e3e425044b
--- /dev/null
+++ b/kaldb/src/test/java/com/slack/kaldb/writer/KafkaUtilsTest.java
@@ -0,0 +1,48 @@
+package com.slack.kaldb.writer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.slack.kaldb.proto.config.KaldbConfigs;
+import com.slack.kaldb.testlib.TestKafkaServer;
+import com.slack.kaldb.writer.kafka.KaldbKafkaConsumer;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.jupiter.api.Test;
+
+class KafkaUtilsTest {
+ public static final String TEST_KAFKA_CLIENT_GROUP = "test_kaldb_consumer";
+
+ @Test
+ public void testOverridingProperties() {
+ KaldbConfigs.KafkaConfig kafkaConfig =
+ KaldbConfigs.KafkaConfig.newBuilder()
+ .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC)
+ .setKafkaTopicPartition("0")
+ .setKafkaBootStrapServers("bootstrap_server")
+ .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP)
+ .setEnableKafkaAutoCommit("true")
+ .setKafkaAutoCommitInterval("5000")
+ .setKafkaSessionTimeout("5000")
+ .build();
+
+ Properties properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig);
+ assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+ .isEqualTo("org.apache.kafka.common.serialization.StringDeserializer");
+
+ kafkaConfig =
+ KaldbConfigs.KafkaConfig.newBuilder()
+ .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC)
+ .setKafkaTopicPartition("0")
+ .setKafkaBootStrapServers("bootstrap_server")
+ .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP)
+ .setEnableKafkaAutoCommit("true")
+ .setKafkaAutoCommitInterval("5000")
+ .setKafkaSessionTimeout("5000")
+ .putAdditionalProps(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "test_serializer")
+ .build();
+
+ properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig);
+ assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+ .isEqualTo("test_serializer");
+ }
+}
diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java
index fae04c0700..2185beb1bf 100644
--- a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java
+++ b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java
@@ -117,40 +117,6 @@ public void tearDown() throws Exception {
metricsRegistry.close();
}
- @Test
- public void testOverridingProperties() {
- KaldbConfigs.KafkaConfig kafkaConfig =
- KaldbConfigs.KafkaConfig.newBuilder()
- .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC)
- .setKafkaTopicPartition("0")
- .setKafkaBootStrapServers("bootstrap_server")
- .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP)
- .setEnableKafkaAutoCommit("true")
- .setKafkaAutoCommitInterval("5000")
- .setKafkaSessionTimeout("5000")
- .build();
-
- Properties properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig);
- assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
- .isEqualTo("org.apache.kafka.common.serialization.StringDeserializer");
-
- kafkaConfig =
- KaldbConfigs.KafkaConfig.newBuilder()
- .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC)
- .setKafkaTopicPartition("0")
- .setKafkaBootStrapServers("bootstrap_server")
- .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP)
- .setEnableKafkaAutoCommit("true")
- .setKafkaAutoCommitInterval("5000")
- .setKafkaSessionTimeout("5000")
- .putAdditionalProps(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "test_serializer")
- .build();
-
- properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig);
- assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
- .isEqualTo("test_serializer");
- }
-
@Test
public void testGetEndOffsetForPartition() throws Exception {
EphemeralKafkaBroker broker = kafkaServer.getBroker();
diff --git a/kaldb/src/test/resources/test_config.json b/kaldb/src/test/resources/test_config.json
index fa675fa2a8..1d1f4e6833 100644
--- a/kaldb/src/test/resources/test_config.json
+++ b/kaldb/src/test/resources/test_config.json
@@ -142,6 +142,10 @@
"numStreamThreads": 2,
"processingGuarantee": "at_least_once"
},
+ "kafkaConfig": {
+ "kafkaTopic": "test-topic",
+ "kafkaBootStrapServers": "localhost:9092"
+ },
"serverConfig": {
"serverPort": 8085,
"serverAddress": "localhost",
diff --git a/kaldb/src/test/resources/test_config.yaml b/kaldb/src/test/resources/test_config.yaml
index 00c96a30da..e5c62cd740 100644
--- a/kaldb/src/test/resources/test_config.yaml
+++ b/kaldb/src/test/resources/test_config.yaml
@@ -114,6 +114,10 @@ preprocessorConfig:
applicationId: kaldb_preprocessor
numStreamThreads: 2
processingGuarantee: at_least_once
+ kafkaConfig:
+ kafkaTopic: test-topic
+ kafkaBootStrapServers: "localhost:9092"
+
serverConfig:
serverPort: 8085
serverAddress: localhost