diff --git a/.run/Astra_manager.run.xml b/.run/Astra_manager.run.xml deleted file mode 100644 index 0509dd3f53..0000000000 --- a/.run/Astra_manager.run.xml +++ /dev/null @@ -1,13 +0,0 @@ -<component name="ProjectRunConfigurationManager"> - <configuration default="false" name="Astra Manager Node" type="Application" factoryName="Application"> - <envs> - <env name="NODE_ROLES" value="MANAGER" /> - </envs> - <option name="MAIN_CLASS_NAME" value="com.slack.astra.server.Astra" /> - <module name="astra" /> - <option name="PROGRAM_PARAMETERS" value="config/config.yaml" /> - <method v="2"> - <option name="Make" enabled="true" /> - </method> - </configuration> -</component> diff --git a/Dockerfile b/Dockerfile index ea715737fc..bcc6f7444a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,4 +5,4 @@ RUN cd /work; mvn package -DskipTests FROM amazoncorretto:21 COPY --from=build /work/astra/target/astra.jar / COPY --from=build /work/config/config.yaml / -ENTRYPOINT [ "java", "--enable-preview", "-jar", "./astra.jar", "config.yaml" ] +ENTRYPOINT [ "java", "-Xms512m", "-Xmx2g", "--enable-preview", "-jar", "./astra.jar", "config.yaml" ] diff --git a/astra/dependency-reduced-pom.xml b/astra/dependency-reduced-pom.xml new file mode 100644 index 0000000000..017db2cfff --- /dev/null +++ b/astra/dependency-reduced-pom.xml @@ -0,0 +1,409 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.slack.astra</groupId> + <artifactId>astra</artifactId> + <version>0.1-SNAPSHOT</version> + <licenses> + <license> + <name>MIT</name> + <url>https://opensource.org/licenses/MIT</url> + <distribution>repo</distribution> + <comments>A business-friendly OSS license</comments> + </license> + </licenses> + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.7.1</version> + </extension> + </extensions> + <defaultGoal>clean install</defaultGoal> + <pluginManagement> + <plugins> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <version>3.4.0</version> + </plugin> + <plugin> + <artifactId>maven-install-plugin</artifactId> + <version>3.1.3</version> + </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>3.4.2</version> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>3.10.1</version> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.3.1</version> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.20.0</version> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.3.1</version> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <argLine>--enable-preview</argLine> + <trimStackTrace>false</trimStackTrace> + <systemPropertyVariables> + <log4j.configurationFile>src/test/resources/log4j2.xml</log4j.configurationFile> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.13.0</version> + <configuration> + <compilerVersion>${javac.target}</compilerVersion> + <source>${javac.target}</source> + <testSource>${javac.target}</testSource> + <target>${javac.target}</target> + <fork>true</fork> + <compilerArgs> + <arg>-XDcompilePolicy=simple</arg> + <arg>--enable-preview</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.model=ALL-UNNAMED</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.processing=ALL-UNNAMED</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED</arg> + <arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED</arg> + <arg>-J--add-opens=jdk.compiler/com.sun.tools.javac.code=ALL-UNNAMED</arg> + <arg>-J--add-opens=jdk.compiler/com.sun.tools.javac.comp=ALL-UNNAMED</arg> + <arg>-Xplugin:ErrorProne \ + -XepDisableWarningsInGeneratedCode \ + -XepExcludedPaths:.*/protobuf/.* \ + -Xep:WildcardImport:ERROR \ + -Xep:AssertEqualsArgumentOrderChecker:ERROR \ + -Xep:AlmostJavadoc:ERROR \ + -Xep:FallThrough:ERROR \ + -Xep:Finally:ERROR \ + -Xep:InconsistentCapitalization:ERROR \ + -Xep:InconsistentHashCode:ERROR \ + -Xep:InlineFormatString:ERROR \ + -Xep:InvalidThrows:ERROR \ + -Xep:JavaInstantGetSecondsGetNano:ERROR \ + -Xep:JavaUtilDate:ERROR \ + -Xep:LockNotBeforeTry:ERROR \ + -Xep:MissingOverride:ERROR \ + -Xep:NullOptional:ERROR \ + -Xep:UnnecessaryParentheses:ERROR \ + -Xep:UnusedMethod:ERROR \ + -Xep:UnusedVariable:ERROR \ + -Xep:StreamResourceLeak:ERROR \ + -Xep:StaticAssignmentInConstructor:ERROR</arg> + </compilerArgs> + <annotationProcessorPaths> + <path> + <groupId>com.google.errorprone</groupId> + <artifactId>error_prone_core</artifactId> + <version>${error.prone.version}</version> + </path> + </annotationProcessorPaths> + </configuration> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>3.6.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>${uberjar.name}</finalName> + <transformers> + <transformer> + <manifestEntries> + <Main-Class>com.slack.astra.server.Astra</Main-Class> + <Multi-Release>true</Multi-Release> + </manifestEntries> + </transformer> + <transformer /> + <transformer /> + </transformers> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-transform-maven-shade-plugin-extensions</artifactId> + <version>0.1.0</version> + </dependency> + </dependencies> + </plugin> + <plugin> + <groupId>com.spotify.fmt</groupId> + <artifactId>fmt-maven-plugin</artifactId> + <version>2.25</version> + <executions> + <execution> + <goals> + <goal>format</goal> + </goals> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>com.google.googlejavaformat</groupId> + <artifactId>google-java-format</artifactId> + <version>1.24.0</version> + </dependency> + </dependencies> + <configuration> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <verbose>true</verbose> + <filesNamePattern>.*\.java</filesNamePattern> + <skip>false</skip> + <skipSourceDirectory>false</skipSourceDirectory> + <skipTestSourceDirectory>false</skipTestSourceDirectory> + <skipSortingImports>false</skipSortingImports> + <style>google</style> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + <goal>test-compile</goal> + </goals> + </execution> + </executions> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + <writeDescriptorSet>true</writeDescriptorSet> + <includeDependenciesInDescriptorSet>true</includeDependenciesInDescriptorSet> + <includeSourceInfoInDescriptorSet>true</includeSourceInfoInDescriptorSet> + <descriptorSetOutputDirectory>${project.build.outputDirectory}/META-INF/armeria/grpc</descriptorSetOutputDirectory> + <descriptorSetFileName>${project.build.finalName}.dsc</descriptorSetFileName> + </configuration> + </plugin> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>3.5.0</version> + <configuration> + <rules> + <dependencyConvergence /> + </rules> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-xml</artifactId> + <version>2.18.0</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>stax2-api</artifactId> + <groupId>org.codehaus.woodstox</groupId> + </exclusion> + <exclusion> + <artifactId>woodstox-core</artifactId> + <groupId>com.fasterxml.woodstox</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>5.11.2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>junit-jupiter-params</artifactId> + <groupId>org.junit.jupiter</groupId> + </exclusion> + <exclusion> + <artifactId>junit-jupiter-engine</artifactId> + <groupId>org.junit.jupiter</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.11.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>4.2.2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>hamcrest</artifactId> + <groupId>org.hamcrest</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <version>1.68.0</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + </exclusion> + <exclusion> + <artifactId>grpc-inprocess</artifactId> + <groupId>io.grpc</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>3.26.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>5.14.1</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>byte-buddy-agent</artifactId> + <groupId>net.bytebuddy</groupId> + </exclusion> + <exclusion> + <artifactId>objenesis</artifactId> + <groupId>org.objenesis</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>5.2.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.adobe.testing</groupId> + <artifactId>s3mock-junit5</artifactId> + <version>3.11.0</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>logback-core</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + <exclusion> + <artifactId>logback-classic</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-to-slf4j</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>javax.servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>httpclient</artifactId> + <groupId>org.apache.httpcomponents</groupId> + </exclusion> + <exclusion> + <artifactId>s3mock-testsupport-common</artifactId> + <groupId>com.adobe.testing</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.github.charithe</groupId> + <artifactId>kafka-junit</artifactId> + <version>4.2.7</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>log4j-to-slf4j</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-module-scala_2.13</artifactId> + <groupId>com.fasterxml.jackson.module</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>kafka_2.13</artifactId> + <groupId>org.apache.kafka</groupId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <properties> + <opensearch.version>2.11.1</opensearch.version> + <aws.sdk.version>2.28.24</aws.sdk.version> + <jackson.databind.version>2.18.0</jackson.databind.version> + <jackson.version>2.18.0</jackson.version> + <junit.jupiter.version>5.11.2</junit.jupiter.version> + <kafka.version>3.5.0</kafka.version> + <javac.target>23</javac.target> + <uberjar.name>astra</uberjar.name> + <lucene.version>9.7.0</lucene.version> + <protobuf.version>3.25.5</protobuf.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <http.core.version>4.4.16</http.core.version> + <log4j.version>2.24.1</log4j.version> + <grpc.version>1.68.0</grpc.version> + <armeria.version>1.27.3</armeria.version> + <error.prone.version>2.33.0</error.prone.version> + <curator.version>5.7.1</curator.version> + <micrometer.version>1.13.5</micrometer.version> + </properties> +</project> diff --git a/astra/pom.xml b/astra/pom.xml index 20112a3379..3947cceba2 100644 --- a/astra/pom.xml +++ b/astra/pom.xml @@ -46,6 +46,11 @@ </licenses> <dependencies> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20210307</version> + </dependency> <!-- Logging dependencies --> <dependency> <groupId>org.slf4j</groupId> @@ -330,6 +335,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>auth</artifactId> + <version>${aws.sdk.version}</version> + </dependency> <dependency> <groupId>software.amazon.awssdk.crt</groupId> <artifactId>aws-crt</artifactId> diff --git a/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkLocalIngestApi.java b/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkLocalIngestApi.java new file mode 100644 index 0000000000..3f16ea9096 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/bulkIngestApi/BulkLocalIngestApi.java @@ -0,0 +1,141 @@ +package com.slack.astra.bulkIngestApi; + +import static com.linecorp.armeria.common.HttpStatus.CREATED; +import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR; + +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.server.annotation.Post; +import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser; +import com.slack.astra.chunkManager.ChunkManager; +import com.slack.astra.logstore.LogMessage; +import com.slack.astra.proto.schema.Schema; +import com.slack.service.murron.trace.Trace; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for defining the http endpoint behavior for the bulk ingest. It is + * expected to handle appropriate rate limiting, error handling, and submit the parsed messages to + * Kafka for ingestion. + */ +public class BulkLocalIngestApi { + private static final Logger LOG = LoggerFactory.getLogger(BulkLocalIngestApi.class); + + // private final BulkIngestKafkaProducer bulkIngestKafkaProducer; + // private final DatasetRateLimitingService datasetRateLimitingService; + // private final MeterRegistry meterRegistry; + // private final Counter incomingByteTotal; + // private final Counter incomingDocsTotal; + // private final Timer bulkIngestTimer; + // private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "astra_preprocessor_incoming_byte"; + // private final String BULK_INGEST_INCOMING_BYTE_DOCS = "astra_preprocessor_incoming_docs"; + // private final String BULK_INGEST_ERROR = "astra_preprocessor_error"; + // private final String BULK_INGEST_TIMER = "astra_preprocessor_bulk_ingest"; + // private final int rateLimitExceededErrorCode; + private final ChunkManager<LogMessage> chunkManager; + private final Schema.IngestSchema schema; + + // private final Counter bulkIngestErrorCounter; + + public BulkLocalIngestApi( + // MeterRegistry meterRegistry, + ChunkManager<LogMessage> chunkManager, Schema.IngestSchema schema) { + + // this.bulkIngestKafkaProducer = bulkIngestKafkaProducer; + // this.datasetRateLimitingService = datasetRateLimitingService; + // this.meterRegistry = meterRegistry; + // this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL); + // this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS); + // this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER); + // if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) { + // this.rateLimitExceededErrorCode = 400; + // } else { + // this.rateLimitExceededErrorCode = rateLimitExceededErrorCode; + // } + this.schema = schema; + this.chunkManager = chunkManager; + // this.bulkIngestErrorCounter = meterRegistry.counter(BULK_INGEST_ERROR); + } + + @Post("/_local_bulk") + public HttpResponse addDocument(String bulkRequest) { + // 1. Astra does not support the concept of "updates". It's always an add. + // 2. The "index" is used as the span name + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + // Timer.Sample sample = Timer.start(meterRegistry); + // future.thenRun(() -> sample.stop(bulkIngestTimer)); + + int count = 0; + + try { + byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8); + // incomingByteTotal.increment(bulkRequestBytes.length); + Map<String, List<Trace.Span>> docs = Map.of(); + try { + docs = BulkApiRequestParser.parseRequest(bulkRequestBytes, schema); + } catch (Exception e) { + LOG.error("Request failed ", e); + // bulkIngestErrorCounter.increment(); + BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + } + LOG.info("Parsed docs message: {}", docs); + + // todo - our rate limiter doesn't have a way to acquire permits across multiple + // datasets + // so today as a limitation we reject any request that has documents against + // multiple indexes + // We think most indexing requests will be against 1 index + if (docs.keySet().size() > 1) { + BulkIngestResponse response = + new BulkIngestResponse(0, 0, "request must contain only 1 unique index"); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + // bulkIngestErrorCounter.increment(); + return HttpResponse.of(future); + } + + // for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) { + // incomingDocsTotal.increment(indexDocs.getValue().size()); + // final String index = indexDocs.getKey(); + // if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) { + // BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded"); + // future.complete( + // HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode), + // response)); + // return HttpResponse.of(future); + // } + // } + + // todo - explore the possibility of using the blocking task executor backed by virtual + // threads to fulfill this + + for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) { + for (Trace.Span span : indexDocs.getValue()) { + try { + chunkManager.addMessage(span, span.getSerializedSize(), String.valueOf(0), 12345, true); + count += 1; + // return HttpResponse.of(future); + } catch (Exception e) { + LOG.error("Request failed ", e); + // bulkIngestErrorCounter.increment(); + future.complete( + HttpResponse.ofJson( + INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage()))); + } + } + } + } catch (Exception e) { + LOG.error("Request failed ", e); + // bulkIngestErrorCounter.increment(); + BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + } + + future.complete(HttpResponse.ofJson(CREATED, new BulkIngestResponse(count, 0, ""))); + return HttpResponse.of(future); + } +} diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java index d3dd5784a0..5aead038eb 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java @@ -139,7 +139,8 @@ public static SearchMetadata toSearchMetadata(String snapshotName, SearchContext } /** Index the message in the logstore and update the chunk data time range. */ - public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) { + public void addMessage( + Trace.Span message, String kafkaPartitionId, long offset, boolean local_update) { if (!this.kafkaPartitionId.equals(kafkaPartitionId)) { throw new IllegalArgumentException( "All messages for this chunk should belong to partition: " @@ -158,7 +159,9 @@ public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) } chunkInfo.updateDataTimeRange(timestamp.toEpochMilli()); - chunkInfo.updateMaxOffset(offset); + if (local_update) { + chunkInfo.updateMaxOffset(offset); + } } else { throw new IllegalStateException(String.format("Chunk %s is read only", chunkInfo)); } diff --git a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java index 030dee047e..661555cfa0 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java @@ -175,7 +175,8 @@ public static CachingChunkManager<LogMessage> fromConfig( } @Override - public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage( + Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert) throws IOException { throw new UnsupportedOperationException( "Adding messages is not supported on a caching chunk manager"); diff --git a/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java index 502d12a75a..84c66eb84d 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/ChunkManager.java @@ -9,7 +9,8 @@ import java.util.Map; public interface ChunkManager<T> { - void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + void addMessage( + Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_update) throws IOException; SearchResult<T> query(SearchQuery query, Duration queryTimeout); diff --git a/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java index 590f76d609..5ef2be51d4 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java @@ -165,7 +165,11 @@ public IndexingChunkManager( */ @Override public void addMessage( - final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + final Trace.Span message, + long msgSize, + String kafkaPartitionId, + long offset, + boolean local_insert) throws IOException { if (stopIngestion) { // Currently, this flag is set on only a chunkRollOverException. @@ -175,7 +179,7 @@ public void addMessage( // find the active chunk and add a message to it ReadWriteChunk<T> currentChunk = getOrCreateActiveChunk(kafkaPartitionId, indexerConfig); - currentChunk.addMessage(message, kafkaPartitionId, offset); + currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert); long currentIndexedMessages = liveMessagesIndexedGauge.incrementAndGet(); long currentIndexedBytes = liveBytesIndexedGauge.addAndGet(msgSize); diff --git a/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java index 1ca98b5732..c38e427f3d 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/RecoveryChunkManager.java @@ -80,7 +80,11 @@ public RecoveryChunkManager( @Override public void addMessage( - final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + final Trace.Span message, + long msgSize, + String kafkaPartitionId, + long offset, + boolean local_insert) throws IOException { if (readOnly) { LOG.warn("Ingestion is stopped since the chunk is in read only mode."); @@ -89,7 +93,7 @@ public void addMessage( // find the active chunk and add a message to it ReadWriteChunk<T> currentChunk = getOrCreateActiveChunk(kafkaPartitionId); - currentChunk.addMessage(message, kafkaPartitionId, offset); + currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert); liveMessagesIndexedGauge.incrementAndGet(); liveBytesIndexedGauge.addAndGet(msgSize); } diff --git a/astra/src/main/java/com/slack/astra/server/ArmeriaService.java b/astra/src/main/java/com/slack/astra/server/ArmeriaService.java index be2576489e..db442a67af 100644 --- a/astra/src/main/java/com/slack/astra/server/ArmeriaService.java +++ b/astra/src/main/java/com/slack/astra/server/ArmeriaService.java @@ -100,6 +100,11 @@ public Builder withRequestTimeout(Duration requestTimeout) { return this; } + public Builder maxContentLength(long maxRequestLength) { + serverBuilder.maxRequestLength(maxRequestLength); + return this; + } + public Builder withTracing(AstraConfigs.TracingConfig tracingConfig) { // span handlers is an ordered list, so we need to be careful with ordering if (tracingConfig.getCommonTagsCount() > 0) { diff --git a/astra/src/main/java/com/slack/astra/server/Astra.java b/astra/src/main/java/com/slack/astra/server/Astra.java index 90e6f011d1..0e74bd059d 100644 --- a/astra/src/main/java/com/slack/astra/server/Astra.java +++ b/astra/src/main/java/com/slack/astra/server/Astra.java @@ -7,6 +7,7 @@ import com.slack.astra.blobfs.S3AsyncUtil; import com.slack.astra.bulkIngestApi.BulkIngestApi; import com.slack.astra.bulkIngestApi.BulkIngestKafkaProducer; +import com.slack.astra.bulkIngestApi.BulkLocalIngestApi; import com.slack.astra.bulkIngestApi.DatasetRateLimitingService; import com.slack.astra.chunkManager.CachingChunkManager; import com.slack.astra.chunkManager.IndexingChunkManager; @@ -160,6 +161,9 @@ private static Set<Service> getServices( HashSet<AstraConfigs.NodeRole> roles = new HashSet<>(astraConfig.getNodeRolesList()); if (roles.contains(AstraConfigs.NodeRole.INDEX)) { + // final AstraConfigs.PreprocessorConfig preprocessorConfig = + // astraConfig.getPreprocessorConfig(); + IndexingChunkManager<LogMessage> chunkManager = IndexingChunkManager.fromConfig( meterRegistry, @@ -185,13 +189,29 @@ private static Set<Service> getServices( final int serverPort = astraConfig.getIndexerConfig().getServerConfig().getServerPort(); Duration requestTimeout = Duration.ofMillis(astraConfig.getIndexerConfig().getServerConfig().getRequestTimeoutMs()); - ArmeriaService armeriaService = + ArmeriaService.Builder armeriaServiceBuilder = new ArmeriaService.Builder(serverPort, "astraIndex", meterRegistry) .withRequestTimeout(requestTimeout) + .maxContentLength(2000000000) .withTracing(astraConfig.getTracingConfig()) - .withGrpcService(searcher) - .build(); - services.add(armeriaService); + .withGrpcService(searcher); + // .build(); + // Schema.IngestSchema schema = Schema.IngestSchema.getDefaultInstance(); + // if (!preprocessorConfig.getSchemaFile().isEmpty()) { + // LOG.info("Loading schema file: {}", preprocessorConfig.getSchemaFile()); + Schema.IngestSchema schema = SchemaUtil.parseSchema(Path.of("")); + LOG.info( + "Loaded schema with fields count: {}, defaults count: {}", + schema.getFieldsCount(), + schema.getDefaultsCount()); + // } else { + // LOG.info("No schema file provided, using default schema"); + // } + schema = ReservedFields.addPredefinedFields(schema); + BulkLocalIngestApi localOpenSearchBulkApiService = + new BulkLocalIngestApi(chunkManager, schema); + armeriaServiceBuilder.withAnnotatedService(localOpenSearchBulkApiService); + services.add(armeriaServiceBuilder.build()); } if (roles.contains(AstraConfigs.NodeRole.QUERY)) { diff --git a/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java b/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java index be7c8a244f..d4b906fb64 100644 --- a/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java +++ b/astra/src/main/java/com/slack/astra/writer/LogMessageWriterImpl.java @@ -63,7 +63,8 @@ public boolean insertRecord(ConsumerRecord<String, byte[]> record) throws IOExce Trace.Span.parseFrom(record.value()), record.serializedValueSize(), String.valueOf(record.partition()), - record.offset()); + record.offset(), + false); return true; } } diff --git a/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinService.java b/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinService.java index c7de18ae1f..2153f33150 100644 --- a/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinService.java +++ b/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinService.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +118,7 @@ protected static String convertLogWireMessageToZipkinSpan(List<LogWireMessage> m span.setRemoteEndpoint(remoteEndpoint); } span.setTimestamp(convertToMicroSeconds(message.getTimestamp())); - span.setDuration(Math.toIntExact(duration)); + span.setDuration(duration); span.setTags(messageTags); traces.add(span); } @@ -200,7 +201,11 @@ public HttpResponse getTraceByTraceId( @Param("endTimeEpochMs") Optional<Long> endTimeEpochMs, @Param("maxSpans") Optional<Integer> maxSpans) throws IOException { - String queryString = "trace_id:" + traceId; + JSONObject traceObject = new JSONObject(); + traceObject.put("trace_id", traceId); + JSONObject queryJson = new JSONObject(); + queryJson.put("term", traceObject); + String queryString = queryJson.toString(); long startTime = startTimeEpochMs.orElseGet( () -> Instant.now().minus(LOOKBACK_MINS, ChronoUnit.MINUTES).toEpochMilli()); diff --git a/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinSpanResponse.java b/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinSpanResponse.java index b07ee59525..df7b8d9198 100644 --- a/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinSpanResponse.java +++ b/astra/src/main/java/com/slack/astra/zipkinApi/ZipkinSpanResponse.java @@ -23,8 +23,8 @@ public class ZipkinSpanResponse { private ZipkinEndpointResponse remoteEndpoint = null; @JsonProperty("duration") - // Zipkin spec defines this is integer, even though a long seems like it would be more appropriate - private int durationMicros; + // Zipkin spec defines this is integer64, so long seems to be more appropriate + private long durationMicros; private String kind; @@ -48,7 +48,7 @@ public void setTimestamp(long timestampMicros) { this.timestampMicros = timestampMicros; } - public void setDuration(int durationMicros) { + public void setDuration(long durationMicros) { this.durationMicros = durationMicros; } @@ -96,7 +96,7 @@ public long getTimestamp() { return timestampMicros; } - public int getDuration() { + public long getDuration() { return durationMicros; } diff --git a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java index 342b0be2f2..41171b5488 100644 --- a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java @@ -154,7 +154,7 @@ public void testAddAndSearchChunk() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -208,7 +208,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -256,7 +256,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { final long newMessageEndTimeEpochMs = TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS); for (Trace.Span m : newMessages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -325,7 +325,7 @@ public void testSearchInReadOnlyChunk() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -358,7 +358,7 @@ public void testAddMessageToReadOnlyChunk() { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -370,7 +370,9 @@ public void testAddMessageToReadOnlyChunk() { int finalOffset = offset; assertThatExceptionOfType(IllegalStateException.class) .isThrownBy( - () -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset)); + () -> + chunk.addMessage( + SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset, false)); } @Test @@ -378,7 +380,7 @@ public void testMessageFromDifferentPartitionFails() { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -391,7 +393,8 @@ public void testMessageFromDifferentPartitionFails() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy( () -> - chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset)); + chunk.addMessage( + SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset, false)); } @Test @@ -399,7 +402,7 @@ public void testCommitBeforeSnapshot() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunk.isReadOnly()).isFalse(); @@ -503,7 +506,7 @@ public void testAddInvalidMessagesToChunk() { Trace.Span invalidSpan = Trace.Span.newBuilder().build(); // An Invalid message is dropped but failure counter is incremented. - chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1); + chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1, false); chunk.commit(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1); @@ -595,7 +598,7 @@ public void testSnapshotToNonExistentS3BucketFails() List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -653,7 +656,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } diff --git a/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java index 8877666120..659a26135e 100644 --- a/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java @@ -138,7 +138,7 @@ public void testAddAndSearchChunk() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -195,7 +195,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -245,7 +245,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException { final long newMessageEndTimeEpochMs = TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS); for (Trace.Span m : newMessages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -314,7 +314,7 @@ public void testSearchInReadOnlyChunk() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -347,7 +347,7 @@ public void testAddMessageToReadOnlyChunk() { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -359,7 +359,9 @@ public void testAddMessageToReadOnlyChunk() { int finalOffset = offset; assertThatExceptionOfType(IllegalStateException.class) .isThrownBy( - () -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset)); + () -> + chunk.addMessage( + SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset, false)); } @Test @@ -367,7 +369,7 @@ public void testMessageFromDifferentPartitionFails() { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunk.commit(); @@ -380,7 +382,8 @@ public void testMessageFromDifferentPartitionFails() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy( () -> - chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset)); + chunk.addMessage( + SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset, false)); } @Test @@ -388,7 +391,7 @@ public void testCommitBeforeSnapshot() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunk.isReadOnly()).isFalse(); @@ -494,7 +497,7 @@ public void testAddInvalidMessagesToChunk() { // An Invalid message is dropped but failure counter is incremented. Trace.Span invalidSpan = Trace.Span.newBuilder().build(); - chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1); + chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1, false); chunk.commit(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1); @@ -578,7 +581,7 @@ public void testSnapshotToNonExistentS3BucketFails() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -628,7 +631,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false); offset++; } diff --git a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java index 6284e9aaa6..ca69469182 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java @@ -239,7 +239,8 @@ public void shouldHandleLifecycle() throws Exception { @Test public void testAddMessageIsUnsupported() throws TimeoutException { cachingChunkManager = initChunkManager(); - assertThatThrownBy(() -> cachingChunkManager.addMessage(SpanUtil.makeSpan(1), 10, "1", 1)) + assertThatThrownBy( + () -> cachingChunkManager.addMessage(SpanUtil.makeSpan(1), 10, "1", 1, false)) .isInstanceOf(UnsupportedOperationException.class); } diff --git a/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java index 63609dde3d..1768d2be01 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java @@ -224,7 +224,7 @@ public void testDeleteOverMaxThresholdGreaterThanZero() throws IOException, Time int offset = 1; for (Trace.Span m : messages.subList(0, 9)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunkManager.getChunkList().size()).isEqualTo(1); @@ -236,7 +236,7 @@ public void testDeleteOverMaxThresholdGreaterThanZero() throws IOException, Time assertThat(chunk1.info().getChunkSnapshotTimeEpochMs()).isZero(); for (Trace.Span m : messages.subList(9, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -301,7 +301,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0() int offset = 1; for (Trace.Span m : messages.subList(0, 9)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } assertThat(chunkManager.getChunkList().size()).isEqualTo(1); @@ -344,7 +344,7 @@ public void closeDuringCleanerTask() int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; chunkManager.getActiveChunk().commit(); @@ -387,7 +387,7 @@ public void testAddMessage() throws Exception { int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); actualChunkSize += msgSize; offset++; } @@ -458,7 +458,8 @@ public void testAddMessage() throws Exception { messageWithHighOffset, messageWithHighOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - veryHighOffset); + veryHighOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -488,7 +489,8 @@ public void testAddMessage() throws Exception { messageWithLowerOffset, messageWithLowerOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - lowerOffset); + lowerOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -517,7 +519,8 @@ public void testAddMessage() throws Exception { messageWithInvalidTopic, messageWithInvalidTopic.toString().length(), "differentKafkaTopic", - lowerOffset + 1)); + lowerOffset + 1, + false)); } private void testChunkManagerSearch( @@ -597,7 +600,7 @@ public void testAddAndSearchMessageInMultipleSlices() throws Exception { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 15, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -625,7 +628,7 @@ public void testAddAndSearchMessageInSpecificChunks() throws Exception { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 15, 1, Instant.now()); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -715,7 +718,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { // Add a message int offset = 1; Trace.Span msg1 = SpanUtil.makeSpan(1); - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; // Add an invalid message @@ -730,7 +733,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { .build()) .build(); chunkManager.addMessage( - invalidSpan, invalidSpan.getSerializedSize(), TEST_KAFKA_PARTITION_ID, offset); + invalidSpan, invalidSpan.getSerializedSize(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; // Commit the new chunk so we can search it. @@ -779,14 +782,14 @@ public void testMessagesAddedToActiveChunks() throws Exception { Trace.Span msg1 = msgs.get(0); Trace.Span msg2 = msgs.get(1); int offset = 1; - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; ReadWriteChunk<LogMessage> chunk1 = chunkManager.getActiveChunk(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(1); assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0); assertThat(getValue(LIVE_MESSAGES_INDEXED, metricsRegistry)).isEqualTo(1); - chunkManager.addMessage(msg2, msg2.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg2, msg2.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; assertThat(chunkManager.getChunkList().size()).isEqualTo(1); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(2); @@ -800,7 +803,7 @@ public void testMessagesAddedToActiveChunks() throws Exception { Trace.Span msg3 = msgs.get(2); Trace.Span msg4 = msgs.get(3); - chunkManager.addMessage(msg3, msg3.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg3, msg3.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; assertThat(chunkManager.getChunkList().size()).isEqualTo(2); @@ -815,7 +818,7 @@ public void testMessagesAddedToActiveChunks() throws Exception { checkMetadata(3, 2, 1, 2, 1); // Inserting in an older chunk throws an exception. So, additions go to active chunks only. assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> chunk1.addMessage(msg4, TEST_KAFKA_PARTITION_ID, 1)); + .isThrownBy(() -> chunk1.addMessage(msg4, TEST_KAFKA_PARTITION_ID, 1, false)); } @Test @@ -830,7 +833,7 @@ public void testMultiThreadedChunkRollover() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (Trace.Span m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -863,7 +866,7 @@ public void testAddMessagesToChunkWithRollover() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (Trace.Span m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -884,7 +887,7 @@ public void testAddMessagesToChunkWithRollover() throws Exception { // Add remaining messages to create a second chunk. for (Trace.Span m : messages.subList(11, 25)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -953,7 +956,7 @@ public void testAllChunkFailures() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (Trace.Span m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } // Main chunk is already committed. Commit the new chunk so we can search it. @@ -972,7 +975,7 @@ public void testAllChunkFailures() throws Exception { testChunkManagerSearch(chunkManager, "Message21", 0, 2, 2); for (Trace.Span m : messages.subList(11, 25)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -1029,7 +1032,7 @@ public void testCommitInvalidChunk() throws Exception { int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -1065,7 +1068,7 @@ public void testMultiChunkSearch() throws Exception { int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -1192,7 +1195,8 @@ public void testChunkRollOverInProgressExceptionIsThrown() throws Exception { () -> { int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } }) @@ -1228,7 +1232,7 @@ public void testSuccessfulRollOverFinishesOnClose() throws Exception { // rollover. int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } ListenableFuture<?> rollOverFuture = chunkManager.getRolloverFuture(); @@ -1279,7 +1283,7 @@ public void testFailedRollOverFinishesOnClose() throws Exception { // rollover. int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } await().until(() -> getCount(ROLLOVERS_FAILED, metricsRegistry) == 1); @@ -1323,7 +1327,7 @@ public void testRollOverFailure() int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -1347,7 +1351,7 @@ public void testRollOverFailure() SpanUtil.makeSpansWithTimeDifference(11, 12, 1000, startTime); for (Trace.Span m : newMessage) { chunkManager.addMessage( - m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset); + m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset, false); newOffset++; } }) @@ -1371,7 +1375,7 @@ public void testRollOverFailureWithDirectExecutor() // exception. int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -1392,7 +1396,7 @@ public void testRollOverFailureWithDirectExecutor() SpanUtil.makeSpansWithTimeDifference(11, 12, 1000, Instant.now()); for (Trace.Span m : newMessage) { chunkManager.addMessage( - m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset); + m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset, false); newOffset++; } }) @@ -1413,7 +1417,7 @@ public void testNewFieldAddedToSchema() throws IOException, TimeoutException { int offset = 1; for (Trace.Span m : messages1) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); @@ -1431,7 +1435,7 @@ public void testNewFieldAddedToSchema() throws IOException, TimeoutException { SpanUtil.makeSpan(11, "Message11", Instant.now(), List.of(schemaTestTag)); chunkManager.addMessage( - logMessage, logMessage.toString().length(), TEST_KAFKA_PARTITION_ID, offset++); + logMessage, logMessage.toString().length(), TEST_KAFKA_PARTITION_ID, offset++, false); chunkManager.rollOverActiveChunk(); await().until(() -> getCount(ROLLOVERS_COMPLETED, metricsRegistry) == 2); @@ -1561,7 +1565,7 @@ private void insertMessages( int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; actualMessagesGauge++; actualBytesGauge += msgSize; diff --git a/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java index 0f841a1ae8..f8b87b323c 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java @@ -169,7 +169,7 @@ public void testAddMessageAndRollover() throws Exception { int offset = 1; for (Trace.Span m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); actualChunkSize += msgSize; offset++; } @@ -223,7 +223,8 @@ public void testAddMessageAndRollover() throws Exception { messageWithHighOffset, messageWithHighOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - veryHighOffset); + veryHighOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -253,7 +254,8 @@ public void testAddMessageAndRollover() throws Exception { messageWithLowerOffset, messageWithLowerOffset.toString().length(), TEST_KAFKA_PARTITION_ID, - lowerOffset); + lowerOffset, + false); assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); chunkManager.getActiveChunk().commit(); assertThat( @@ -282,7 +284,8 @@ public void testAddMessageAndRollover() throws Exception { messageWithInvalidTopic, messageWithInvalidTopic.toString().length(), "differentKafkaTopic", - lowerOffset + 1)); + lowerOffset + 1, + false)); // Get the count of the amount of indices so that we can confirm we've cleaned them up // after the rollover @@ -311,7 +314,8 @@ public void testAddMessageAndRollover() throws Exception { // Can't add messages to current chunk after roll over. assertThatThrownBy( () -> - currentChunk.addMessage(SpanUtil.makeSpan(100000), TEST_KAFKA_PARTITION_ID, 100000)) + currentChunk.addMessage( + SpanUtil.makeSpan(100000), TEST_KAFKA_PARTITION_ID, 100000, false)) .isInstanceOf(IllegalStateException.class); // Ensure data is cleaned up in the manager @@ -368,7 +372,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { // Add a valid message int offset = 1; Trace.Span msg1 = SpanUtil.makeSpan(1); - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; // Add an invalid message @@ -379,7 +383,8 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { .setFieldType(Schema.SchemaFieldType.INTEGER) .build(); Trace.Span msg100 = SpanUtil.makeSpan(100, "Message100", Instant.now(), List.of(conflictTag)); - chunkManager.addMessage(msg100, msg100.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + msg100, msg100.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); //noinspection UnusedAssignment offset++; @@ -417,7 +422,7 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 20, 1, Instant.now()); for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } @@ -436,7 +441,7 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { .isThrownBy( () -> chunkManager.addMessage( - SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000)); + SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000, false)); // Check metadata. List<SnapshotMetadata> snapshots = @@ -459,7 +464,7 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { .isThrownBy( () -> chunkManager.addMessage( - SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000)); + SpanUtil.makeSpan(1000), 100, TEST_KAFKA_PARTITION_ID, 1000, false)); chunkManager.awaitTerminated(DEFAULT_START_STOP_DURATION); chunkManager = null; diff --git a/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java b/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java index 229b3c42d6..2a3d5ba2ec 100644 --- a/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java +++ b/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java @@ -181,7 +181,7 @@ public void testDiskBasedRolloverWithMaxBytes() throws Exception { boolean shouldCheckOnNextMessage = false; for (Trace.Span m : SpanUtil.makeSpansWithTimeDifference(1, totalMessages, 1000, startTime)) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; Thread.sleep(DiskOrMessageCountBasedRolloverStrategy.DIRECTORY_SIZE_EXECUTOR_PERIOD_MS); if (chunkManager.getActiveChunk() != null) { @@ -250,11 +250,11 @@ public void testRolloverBasedOnMaxTime() throws Exception { // wait for 2+ seconds so that the chunk rollover code will get triggered // add 2nd message to trigger chunk rollover // add 3rd message to create new chunk - chunkManager.addMessage(SpanUtil.makeSpan(1), 100, TEST_KAFKA_PARTITION_ID, 1); + chunkManager.addMessage(SpanUtil.makeSpan(1), 100, TEST_KAFKA_PARTITION_ID, 1, false); // so that the chunk rollover code will get triggered Thread.sleep(2_000 + DiskOrMessageCountBasedRolloverStrategy.DIRECTORY_SIZE_EXECUTOR_PERIOD_MS); - chunkManager.addMessage(SpanUtil.makeSpan(2), 100, TEST_KAFKA_PARTITION_ID, 1); - chunkManager.addMessage(SpanUtil.makeSpan(3), 100, TEST_KAFKA_PARTITION_ID, 1); + chunkManager.addMessage(SpanUtil.makeSpan(2), 100, TEST_KAFKA_PARTITION_ID, 1, false); + chunkManager.addMessage(SpanUtil.makeSpan(3), 100, TEST_KAFKA_PARTITION_ID, 1, false); await().until(() -> getCount(RollOverChunkTask.ROLLOVERS_COMPLETED, metricsRegistry) == 1); @@ -277,7 +277,7 @@ public void testDiskBasedRolloverWithMaxMessages() throws Exception { int offset = 1; for (Trace.Span m : SpanUtil.makeSpansWithTimeDifference(1, totalMessages, 1000, startTime)) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset, false); offset++; if (chunkManager.getActiveChunk() != null) { chunkManager.getActiveChunk().commit(); diff --git a/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java b/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java index 1c563a2832..4a31bed7f9 100644 --- a/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java +++ b/astra/src/test/java/com/slack/astra/elasticsearchApi/ElasticsearchApiServiceTest.java @@ -388,7 +388,7 @@ private void addMessagesToChunkManager(List<Trace.Span> messages) throws IOExcep IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager; int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset, false); offset++; } chunkManager.getActiveChunk().commit(); diff --git a/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java b/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java index 761c76cb9f..6dc6926bd6 100644 --- a/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java +++ b/astra/src/test/java/com/slack/astra/logstore/search/AstraLocalQueryServiceTest.java @@ -110,7 +110,7 @@ public void testAstraSearch() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -179,7 +179,7 @@ public void testAstraSearchNoData() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -225,7 +225,7 @@ public void testAstraSearchNoHits() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -273,7 +273,7 @@ public void testAstraSearchNoHistogram() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -328,7 +328,7 @@ public void testAstraBadArgSearch() throws Throwable { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -363,7 +363,7 @@ public void testAstraGrpcSearch() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -442,7 +442,7 @@ public void testAstraGrpcSearchThrowsException() throws IOException { List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (Trace.Span m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset, false); offset++; } // No need to commit the active chunk since the last chunk is already closed. diff --git a/benchmarks/dependency-reduced-pom.xml b/benchmarks/dependency-reduced-pom.xml new file mode 100644 index 0000000000..00890667b0 --- /dev/null +++ b/benchmarks/dependency-reduced-pom.xml @@ -0,0 +1,95 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.slack.astra</groupId> + <artifactId>astra-benchmarks</artifactId> + <name>Astra benchmarks</name> + <version>0.1-SNAPSHOT</version> + <licenses> + <license> + <name>MIT</name> + <url>https://opensource.org/licenses/MIT</url> + <distribution>repo</distribution> + <comments>A business-friendly OSS license</comments> + </license> + </licenses> + <build> + <plugins> + <plugin> + <groupId>com.spotify.fmt</groupId> + <artifactId>fmt-maven-plugin</artifactId> + <version>2.25</version> + <executions> + <execution> + <goals> + <goal>format</goal> + </goals> + </execution> + </executions> + <configuration> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <verbose>true</verbose> + <filesNamePattern>.*\.java</filesNamePattern> + <skip>false</skip> + <skipSourceDirectory>false</skipSourceDirectory> + <skipTestSourceDirectory>false</skipTestSourceDirectory> + <skipSortingImports>false</skipSortingImports> + <style>google</style> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.13.0</version> + <configuration> + <compilerVersion>${javac.target}</compilerVersion> + <source>${javac.target}</source> + <target>${javac.target}</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + </configuration> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>3.6.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>${uberjar.name}</finalName> + <transformers> + <transformer> + <mainClass>org.openjdk.jmh.Main</mainClass> + </transformer> + <transformer /> + </transformers> + <filters /> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>1.37</version> + <scope>provided</scope> + </dependency> + </dependencies> + <properties> + <jmh.version>1.37</jmh.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <javac.target>23</javac.target> + <uberjar.name>benchmarks</uberjar.name> + </properties> +</project> diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 6175734451..efa99e3b60 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -20,7 +20,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <jmh.version>1.37</jmh.version> - <javac.target>21</javac.target> + <javac.target>23</javac.target> <uberjar.name>benchmarks</uberjar.name> </properties> @@ -85,7 +85,6 @@ <artifactId>maven-surefire-plugin</artifactId> <version>3.5.1</version> <configuration> - <forkMode>always</forkMode> <redirectTestOutputToFile>true</redirectTestOutputToFile> </configuration> </plugin> diff --git a/config/config.yaml b/config/config.yaml index f2c9fb3a0a..d486ef6fff 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,9 +1,9 @@ nodeRoles: [${NODE_ROLES:-QUERY,INDEX,CACHE,MANAGER,RECOVERY,PREPROCESSOR}] indexerConfig: - maxMessagesPerChunk: ${INDEXER_MAX_MESSAGES_PER_CHUNK:-100000} - maxBytesPerChunk: ${INDEXER_MAX_BYTES_PER_CHUNK:-1000000} - maxTimePerChunkSeconds: ${INDEXER_MAX_TIME_PER_CHUNK_SECONDS:-5400} + maxMessagesPerChunk: ${INDEXER_MAX_MESSAGES_PER_CHUNK:-165346700} + maxBytesPerChunk: ${INDEXER_MAX_BYTES_PER_CHUNK:-80000000000} + maxTimePerChunkSeconds: ${INDEXER_MAX_TIME_PER_CHUNK_SECONDS:-1296000} luceneConfig: commitDurationSecs: ${INDEXER_COMMIT_DURATION_SECS:-10} refreshDurationSecs: ${INDEXER_REFRESH_DURATION_SECS:-11} @@ -11,14 +11,14 @@ indexerConfig: staleDurationSecs: ${INDEXER_STALE_DURATION_SECS:-7200} dataDirectory: ${INDEXER_DATA_DIR:-/tmp} maxOffsetDelayMessages: ${INDEXER_MAX_OFFSET_DELAY_MESSAGES:-10000000} - defaultQueryTimeoutMs: ${ASTRA_INDEX_DEFAULT_QUERY_TIMEOUT_MS:-2500} + defaultQueryTimeoutMs: ${ASTRA_INDEX_DEFAULT_QUERY_TIMEOUT_MS:-3600000} readFromLocationOnStart: ${INDEXER_READ_FROM_LOCATION_ON_START:-LATEST} createRecoveryTasksOnStart: ${INDEXER_CREATE_RECOVERY_TASKS_ON_START:-true} maxChunksOnDisk: ${INDEXER_MAX_CHUNKS_ON_DISK:-3} serverConfig: serverPort: ${ASTRA_INDEX_SERVER_PORT:-8080} serverAddress: ${ASTRA_INDEX_SERVER_ADDRESS:-localhost} - requestTimeoutMs: ${ASTRA_INDEX_REQUEST_TIMEOUT_MS:-5000} + requestTimeoutMs: ${ASTRA_INDEX_REQUEST_TIMEOUT_MS:-4200000} kafkaConfig: kafkaTopic: ${KAFKA_TOPIC:-test-topic} kafkaTopicPartition: ${KAFKA_TOPIC_PARTITION:-0} @@ -56,9 +56,9 @@ metadataStoreConfig: zookeeperConfig: zkConnectString: ${ASTRA_ZK_CONNECTION_STRING:-localhost:2181} zkPathPrefix: ${ASTRA_ZK_PATH_PREFIX:-ASTRA} - zkSessionTimeoutMs: ${ASTRA_ZK_SESSION_TIMEOUT_MS:-5000} - zkConnectionTimeoutMs: ${ASTRA_ZK_CONNECT_TIMEOUT_MS:-500} - sleepBetweenRetriesMs: ${ASTRA_ZK_SLEEP_RETRIES_MS:-100} + zkSessionTimeoutMs: ${ASTRA_ZK_SESSION_TIMEOUT_MS:-60000} + zkConnectionTimeoutMs: ${ASTRA_ZK_CONNECT_TIMEOUT_MS:-15000} + sleepBetweenRetriesMs: ${ASTRA_ZK_SLEEP_RETRIES_MS:-5000} cacheConfig: slotsPerInstance: ${ASTRA_CACHE_SLOTS_PER_INSTANCE:-10} @@ -80,7 +80,7 @@ managerConfig: requestTimeoutMs: ${ASTRA_MANAGER_REQUEST_TIMEOUT_MS:-5000} replicaCreationServiceConfig: schedulePeriodMins: ${ASTRA_MANAGER_REPLICAS_PERIOD_MINS:-15} - replicaLifespanMins: ${ASTRA_MANAGER_REPLICA_LIFESPAN_MINS:-1440} + replicaLifespanMins: ${ASTRA_MANAGER_REPLICA_LIFESPAN_MINS:-30} replicaSets: [${ASTRA_MANAGER_REPLICA_SETS:-rep1}] replicaAssignmentServiceConfig: schedulePeriodMins: ${ASTRA_MANAGER_CACHE_SLOT_PERIOD_MINS:-15} @@ -94,7 +94,7 @@ managerConfig: schedulePeriodMins: ${ASTRA_MANAGER_RECOVERY_PERIOD_MINS:-15} snapshotDeletionServiceConfig: schedulePeriodMins: ${ASTRA_MANAGER_SNAPSHOT_DELETE_PERIOD_MINS:-15} - snapshotLifespanMins: ${ASTRA_MANAGER_SNAPSHOT_LIFESPAN_MINS:-10080} + snapshotLifespanMins: ${ASTRA_MANAGER_SNAPSHOT_LIFESPAN_MINS:-120} replicaRestoreServiceConfig: schedulePeriodMins: ${ASTRA_MANAGER_REPLICA_RESTORE_PERIOD_MINS:-15} maxReplicasPerRequest: ${ASTRA_MANAGER_REPLICA_RESTORE_MAX_REPLICAS_PER_REQUEST:-200} @@ -136,7 +136,7 @@ preprocessorConfig: serverConfig: serverPort: ${ASTRA_PREPROCESSOR_SERVER_PORT:-8086} serverAddress: ${ASTRA_PREPROCESSOR_SERVER_ADDRESS:-localhost} - requestTimeoutMs: ${ASTRA_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000} + requestTimeoutMs: ${ASTRA_PREPROCESSOR_REQUEST_TIMEOUT_MS:-120000} preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1} rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1} rateLimitExceededErrorCode: ${ASTRA_PREPROCESSOR_RATE_LIMIT_EXCEEDED_ERROR_CODE:-400} diff --git a/docker-compose.yml b/docker-compose.yml index 4d91c285b7..6e45b5962d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -74,6 +74,10 @@ services: - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 - ZIPKIN_TRACING_ENDPOINT=http://openzipkin:9411/api/v2/spans - S3_ENDPOINT=http://dep_s3:9090 + - ASTRA_ZK_SESSION_TIMEOUT_MS=60000 + - ASTRA_ZK_CONNECT_TIMEOUT_MS=15000 + - ASTRA_ZK_SLEEP_RETRIES_MS=5000 + - ASTRA_PREPROCESSOR_REQUEST_TIMEOUT_MS=120000 depends_on: - zookeeper - kafka @@ -94,6 +98,13 @@ services: - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 - ZIPKIN_TRACING_ENDPOINT=http://openzipkin:9411/api/v2/spans - S3_ENDPOINT=http://dep_s3:9090 + - ASTRA_ZK_CONNECT_TIMEOUT_MS=15000 + - ASTRA_ZK_SLEEP_RETRIES_MS=5000 + - INDEXER_MAX_MESSAGES_PER_CHUNK=165346700 + - INDEXER_MAX_BYTES_PER_CHUNK=80000000000 + - INDEXER_MAX_TIME_PER_CHUNK_SECONDS=1296000 + - ASTRA_INDEX_REQUEST_TIMEOUT_MS=4200000 + - ASTRA_INDEX_DEFAULT_QUERY_TIMEOUT_MS=3600000 depends_on: - astra_preprocessor @@ -111,6 +122,11 @@ services: - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 - ZIPKIN_TRACING_ENDPOINT=http://openzipkin:9411/api/v2/spans - S3_ENDPOINT=http://dep_s3:9090 + - ASTRA_ZK_SESSION_TIMEOUT_MS=60000 + - ASTRA_ZK_CONNECT_TIMEOUT_MS=15000 + - ASTRA_ZK_SLEEP_RETRIES_MS=5000 + - ASTRA_MANAGER_REPLICA_LIFESPAN_MINS=30 + - ASTRA_MANAGER_SNAPSHOT_LIFESPAN_MINS=120 depends_on: - astra_preprocessor @@ -129,6 +145,9 @@ services: - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 - ZIPKIN_TRACING_ENDPOINT=http://openzipkin:9411/api/v2/spans - S3_ENDPOINT=http://dep_s3:9090 + - ASTRA_ZK_SESSION_TIMEOUT_MS=60000 + - ASTRA_ZK_CONNECT_TIMEOUT_MS=15000 + - ASTRA_ZK_SLEEP_RETRIES_MS=5000 depends_on: - astra_preprocessor @@ -146,6 +165,9 @@ services: - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 - ZIPKIN_TRACING_ENDPOINT=http://openzipkin:9411/api/v2/spans - S3_ENDPOINT=http://dep_s3:9090 + - ASTRA_ZK_SESSION_TIMEOUT_MS=60000 + - ASTRA_ZK_CONNECT_TIMEOUT_MS=15000 + - ASTRA_ZK_SLEEP_RETRIES_MS=5000 depends_on: - astra_preprocessor @@ -163,5 +185,8 @@ services: - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 - ZIPKIN_TRACING_ENDPOINT=http://openzipkin:9411/api/v2/spans - S3_ENDPOINT=http://dep_s3:9090 + - ASTRA_ZK_SESSION_TIMEOUT_MS=60000 + - ASTRA_ZK_CONNECT_TIMEOUT_MS=15000 + - ASTRA_ZK_SLEEP_RETRIES_MS=5000 depends_on: - astra_preprocessor diff --git a/pom.xml b/pom.xml index 9c4d9a5fb4..c213d9deba 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ <packaging>pom</packaging> <properties> - <javac.target>21</javac.target> + <javac.target>23</javac.target> </properties> <modules>