Skip to content

Commit

Permalink
stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Sep 18, 2024
1 parent 68b74ff commit 230d1b6
Show file tree
Hide file tree
Showing 51 changed files with 1,687 additions and 59 deletions.
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class RunTask extends DefaultTestClustersTask {
private static final int DEFAULT_DEBUG_PORT = 5005;
public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:";

private Boolean debug = false;
private Boolean debug = true;

private Boolean debugServer = false;

Expand Down Expand Up @@ -165,9 +165,19 @@ public void beforeStart() {
httpPort++;
firstNode.setTransportPort(String.valueOf(transportPort));
transportPort++;
firstNode.systemProperty("arrow.allocation.manager.type", "Netty");
// firstNode.systemProperty("arrow.memory.debug.allocator", "true");
firstNode.systemProperty("arrow.enable_null_check_for_get", "false");
firstNode.systemProperty("io.netty.tryReflectionSetAccessible", "true");
firstNode.systemProperty("arrow.enable_unsafe_memory_access", "true");
firstNode.systemProperty("io.netty.allocator.numDirectArenas", "2");
firstNode.systemProperty("io.netty.noUnsafe", "false");
firstNode.systemProperty("io.netty.tryUnsafe", "true");
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);

cluster.setPreserveDataDir(preserveData);
for (OpenSearchNode node : cluster.getNodes()) {

if (node != firstNode) {
node.setHttpPort(String.valueOf(httpPort));
httpPort++;
Expand Down Expand Up @@ -195,6 +205,9 @@ public void beforeStart() {
if (keystorePassword.length() > 0) {
node.keystorePassword(keystorePassword);
}
node.jvmArgs("--add-opens=java.base/java.nio=ALL-UNNAMED");
node.jvmArgs("--enable-native-access=ALL-UNNAMED");
node.jvmArgs("--add-opens=jdk.unsupported/sun.misc=ALL-UNNAMED");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jettison = 1.5.4
woodstox = 6.4.0
kotlin = 1.7.10
antlr4 = 4.13.1
guava = 32.1.1-jre
guava = 33.2.1-jre
protobuf = 3.22.3
jakarta_annotation = 1.3.5
google_http_client = 1.44.1
Expand Down
4 changes: 2 additions & 2 deletions distribution/archives/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import org.opensearch.gradle.JavaPackageType
import org.opensearch.gradle.JavaPackageType

apply plugin: 'opensearch.internal-distribution-archive-setup'

Expand Down Expand Up @@ -190,7 +190,7 @@ distribution_archives {
}
}


linuxPpc64leTar {
archiveClassifier = 'linux-ppc64le'
content {
Expand Down
2 changes: 2 additions & 0 deletions distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,5 @@ ${error.file}

# HDFS ForkJoinPool.common() support by SecurityManager
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=org.opensearch.secure_sm.SecuredForkJoinWorkerThreadFactory
-Dio.netty.tryReflectionSetAccessible=true
-XX:MaxDirectMemorySize=1g
2 changes: 1 addition & 1 deletion plugins/discovery-gce/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies {
api "commons-logging:commons-logging:${versions.commonslogging}"
api "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}"
api "commons-codec:commons-codec:${versions.commonscodec}"
api 'io.grpc:grpc-api:1.57.2'
api 'io.grpc:grpc-api:1.63.0'
api 'io.opencensus:opencensus-api:0.31.1'
api 'io.opencensus:opencensus-contrib-http-util:0.31.1'
runtimeOnly "com.google.guava:guava:${versions.guava}"
Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-gcs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ dependencies {
api "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}"
api "commons-codec:commons-codec:${versions.commonscodec}"
api 'org.threeten:threetenbp:1.4.4'
api 'io.grpc:grpc-api:1.57.2'
api 'io.grpc:grpc-api:1.63.0'
api 'io.opencensus:opencensus-api:0.31.1'
api 'io.opencensus:opencensus-contrib-http-util:0.31.1'

Expand Down
95 changes: 94 additions & 1 deletion server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ plugins {
id('opensearch.internal-cluster-test')
id('opensearch.optional-dependencies')
id('me.champeau.gradle.japicmp') version '0.4.3'
id('com.github.johnrengelman.shadow')
}

publishing {
Expand Down Expand Up @@ -90,7 +91,69 @@ dependencies {
api "org.apache.lucene:lucene-spatial3d:${versions.lucene}"
api "org.apache.lucene:lucene-suggest:${versions.lucene}"

// utilities
def arrowExclusions = {
// exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core'
// exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations'
// exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
// exclude group: 'io.netty', module: 'netty-common'
// exclude group: 'io.netty', module: 'netty-buffer'
}
api group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
api 'org.slf4j:slf4j-api:1.7.36'
api("io.netty:netty-common:${versions.netty}") {
exclude group: 'io.netty', module: 'netty-common'
}
api("io.netty:netty-buffer:${versions.netty}") {
exclude group: 'io.netty', module: 'netty-buffer'
}
api group: 'org.apache.arrow', name: 'arrow-memory-netty-buffer-patch', version: '17.0.0'
api group: 'org.apache.arrow', name: 'arrow-vector', version: '17.0.0'
api 'org.apache.arrow:arrow-memory-core:17.0.0'
api 'org.apache.arrow:arrow-memory-netty:17.0.0'
api 'org.apache.arrow:arrow-format:17.0.0'
api 'org.apache.arrow:arrow-flight:17.0.0'
api 'org.apache.arrow:flight-core:17.0.0'
// api 'org.apache.arrow:flight-grpc:17.0.0'
// api 'org.apache.arrow:flight-grpc:17.0.0'
api 'io.grpc:grpc-api:1.63.0'
api 'io.grpc:grpc-netty:1.63.0'
// api 'io.grpc:grpc-java:1.57.2'
api 'io.grpc:grpc-core:1.63.0'
api 'io.grpc:grpc-stub:1.63.0'
api 'io.grpc:grpc-all:1.63.0'
api 'io.grpc:grpc-protobuf:1.63.0'
api 'io.grpc:grpc-protobuf-lite:1.63.0'


// api 'io.grpc:grpc-all:1.57.2'
api "io.netty:netty-buffer:${versions.netty}"
api "io.netty:netty-codec:${versions.netty}"
api "io.netty:netty-codec-http:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-common:${versions.netty}"
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
runtimeOnly 'io.perfmark:perfmark-api:0.27.0'
// runtimeOnly "com.google.guava:guava:${versions.guava}"
// runtimeOnly('com.google.guava:guava:32.1.1-jre')
runtimeOnly "com.google.guava:failureaccess:1.0.1"
runtimeOnly('com.google.guava:guava:33.2.1-jre') {
attributes {
attribute(Attribute.of('org.gradle.jvm.environment', String), 'standard-jvm')
}
}

api 'com.google.flatbuffers:flatbuffers-java:2.0.0'
api 'org.apache.parquet:parquet-arrow:1.13.1'

api 'com.fasterxml.jackson.core:jackson-databind:2.17.2'
api 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.17.2'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2'
api 'com.fasterxml.jackson.core:jackson-annotations:2.17.2'
// api 'org.apache.arrow:arrow-compression:13.0.0'

api project(":libs:opensearch-cli")

// time handling, remove with java 8 time
Expand Down Expand Up @@ -134,6 +197,9 @@ dependencies {
exclude group: 'org.opensearch', module: 'server'
}
}
tasks.withType(JavaCompile) {
options.compilerArgs.removeAll(['-Werror'])
}

tasks.withType(JavaCompile).configureEach {
options.compilerArgs -= '-Xlint:cast'
Expand All @@ -146,6 +212,26 @@ compileJava {
'org.opensearch.common.annotation.processor.ApiAnnotationProcessor'].join(',')]
}

//shadowJar {
// // Optional: set a classifier to differentiate the shadow JAR if needed
// archiveClassifier.set('all')
//
// // Relocate multiple Netty packages to avoid conflicts
// relocate 'io.netty.buffer', 'org.apache.arrow.shaded.io.netty.buffer'
// relocate 'io.netty.util', 'org.apache.arrow.shaded.io.netty.util'
// relocate 'io.netty.channel', 'org.apache.arrow.shaded.io.netty.channel'
// relocate 'io.netty.handler', 'org.apache.arrow.shaded.io.netty.handler'
//}
tasks.build {
dependsOn shadowJar
}
task listConfigurations {
doLast {
configurations.each { config ->
println config.name
}
}
}
tasks.named("internalClusterTest").configure {
// TODO: these run faster with C2 only because they run for so, so long
jvmArgs -= '-XX:TieredStopAtLevel=1'
Expand Down Expand Up @@ -394,6 +480,13 @@ tasks.test {
}
}

tasks.test {
//if (BuildParams.runtimeJavaVersion > JavaVersion.VERSION_1_8) {
jvmArgs += ["--add-opens", "java.base/java.nio.channels=ALL-UNNAMED"]
jvmArgs += ["--add-opens", "java.base/java.net=ALL-UNNAMED"]
//}
}

tasks.named("sourcesJar").configure {
// Ignore duplicates for protobuf generated code (main and generatedSources).
filesMatching("**/proto/*") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.SortedWiderNumericSortField;
import org.opensearch.search.stream.OSTicket;
import org.opensearch.search.stream.StreamSearchResult;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.Suggest.Suggestion;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
Expand Down Expand Up @@ -470,7 +472,8 @@ ReducedQueryPhase reducedQueryPhase(
numReducePhases,
0,
0,
true
true,
null
);
}
int total = queryResults.size();
Expand All @@ -492,8 +495,12 @@ ReducedQueryPhase reducedQueryPhase(
: Collections.emptyMap();
int from = 0;
int size = 0;
List<OSTicket> tickets = new ArrayList<>();
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
if (entry instanceof StreamSearchResult) {
tickets.addAll(((StreamSearchResult)entry).getFlightTickets());
}
from = result.from();
// sorted queries can set the size to 0 if they have enough competitive hits.
size = Math.max(result.size(), size);
Expand Down Expand Up @@ -543,7 +550,8 @@ ReducedQueryPhase reducedQueryPhase(
numReducePhases,
size,
from,
false
false,
tickets
);
}

Expand Down Expand Up @@ -684,6 +692,8 @@ public static final class ReducedQueryPhase {
// sort value formats used to sort / format the result
final DocValueFormat[] sortValueFormats;

final List<OSTicket> osTickets;

ReducedQueryPhase(
TotalHits totalHits,
long fetchHits,
Expand All @@ -698,7 +708,8 @@ public static final class ReducedQueryPhase {
int numReducePhases,
int size,
int from,
boolean isEmptyResult
boolean isEmptyResult,
List<OSTicket> osTickets
) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
Expand All @@ -717,6 +728,7 @@ public static final class ReducedQueryPhase {
this.from = from;
this.isEmptyResult = isEmptyResult;
this.sortValueFormats = sortValueFormats;
this.osTickets = osTickets;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ protected void onShardResult(SearchPhaseResult result, SearchShardIterator shard
if (queryResult.isNull() == false
// disable sort optims for scroll requests because they keep track of the last bottom doc locally (per shard)
&& getRequest().scroll() == null
&& !queryResult.hasConsumedTopDocs()
&& queryResult.topDocs() != null
&& queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ public boolean isSuggestOnly() {
return source != null && source.isSuggestOnly();
}

public boolean isStreamRequest() {
return searchType == SearchType.STREAM;
}

public int resolveTrackTotalHitsUpTo() {
return resolveTrackTotalHitsUpTo(scroll, source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.stream.OSTicket;
import org.opensearch.search.suggest.Suggest;

import java.io.IOException;
Expand All @@ -73,6 +74,7 @@
import java.util.function.Supplier;

import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD;
import static org.opensearch.action.search.SearchResponseSections.TICKET_FIELD;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand Down Expand Up @@ -318,6 +320,11 @@ public Map<String, ProfileShardResult> getProfileResults() {
return internalResponse.profile();
}

@Nullable
public List<OSTicket> getTickets() {
return internalResponse.tickets();
}

/**
* Returns info about what clusters the search was executed against. Available only in responses obtained
* from a Cross Cluster Search request, otherwise <code>null</code>
Expand Down Expand Up @@ -381,6 +388,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Aggregations aggs = null;
Suggest suggest = null;
SearchProfileShardResults profile = null;
List<OSTicket> tickets = null;
boolean timedOut = false;
Boolean terminatedEarly = null;
int numReducePhases = 1;
Expand Down Expand Up @@ -422,6 +430,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
suggest = Suggest.fromXContent(parser);
} else if (SearchProfileShardResults.PROFILE_FIELD.equals(currentFieldName)) {
profile = SearchProfileShardResults.fromXContent(parser);
} else if (TICKET_FIELD.equals(currentFieldName)) {
tickets = null;
} else if (RestActions._SHARDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
Expand Down Expand Up @@ -530,7 +540,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
terminatedEarly,
profile,
numReducePhases,
extBuilders
extBuilders,
tickets
);
return new SearchResponse(
searchResponseSections,
Expand Down
Loading

0 comments on commit 230d1b6

Please sign in to comment.