diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml
index 0d8bf59823acf..c18046f873477 100644
--- a/.idea/runConfigurations/Debug_OpenSearch.xml
+++ b/.idea/runConfigurations/Debug_OpenSearch.xml
@@ -6,6 +6,10 @@
+
+
+
+
-
+
\ No newline at end of file
diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java
index c5035f3b082fe..6092a2a3698c5 100644
--- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java
+++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java
@@ -64,7 +64,7 @@ public class RunTask extends DefaultTestClustersTask {
private static final int DEFAULT_DEBUG_PORT = 5005;
public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:";
- private Boolean debug = false;
+ private Boolean debug = true;
private Boolean debugServer = false;
@@ -165,9 +165,19 @@ public void beforeStart() {
httpPort++;
firstNode.setTransportPort(String.valueOf(transportPort));
transportPort++;
+ firstNode.systemProperty("arrow.allocation.manager.type", "Netty");
+ // firstNode.systemProperty("arrow.memory.debug.allocator", "true");
+ firstNode.systemProperty("arrow.enable_null_check_for_get", "false");
+ firstNode.systemProperty("io.netty.tryReflectionSetAccessible", "true");
+ firstNode.systemProperty("arrow.enable_unsafe_memory_access", "true");
+ firstNode.systemProperty("io.netty.allocator.numDirectArenas", "2");
+ firstNode.systemProperty("io.netty.noUnsafe", "false");
+ firstNode.systemProperty("io.netty.tryUnsafe", "true");
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
+
cluster.setPreserveDataDir(preserveData);
for (OpenSearchNode node : cluster.getNodes()) {
+
if (node != firstNode) {
node.setHttpPort(String.valueOf(httpPort));
httpPort++;
@@ -195,6 +205,9 @@ public void beforeStart() {
if (keystorePassword.length() > 0) {
node.keystorePassword(keystorePassword);
}
+ node.jvmArgs("--add-opens=java.base/java.nio=ALL-UNNAMED");
+ node.jvmArgs("--enable-native-access=ALL-UNNAMED");
+ node.jvmArgs("--add-opens=jdk.unsupported/sun.misc=ALL-UNNAMED");
}
}
}
diff --git a/buildSrc/version.properties b/buildSrc/version.properties
index 08c45ef058716..7751f700e9ed7 100644
--- a/buildSrc/version.properties
+++ b/buildSrc/version.properties
@@ -19,7 +19,7 @@ jettison = 1.5.4
woodstox = 6.4.0
kotlin = 1.7.10
antlr4 = 4.13.1
-guava = 32.1.1-jre
+guava = 33.2.1-jre
protobuf = 3.22.3
jakarta_annotation = 1.3.5
google_http_client = 1.44.1
diff --git a/distribution/archives/build.gradle b/distribution/archives/build.gradle
index 792b1ab57ddbc..e7fd7fca56ae8 100644
--- a/distribution/archives/build.gradle
+++ b/distribution/archives/build.gradle
@@ -28,7 +28,7 @@
* under the License.
*/
-import org.opensearch.gradle.JavaPackageType
+import org.opensearch.gradle.JavaPackageType
apply plugin: 'opensearch.internal-distribution-archive-setup'
@@ -190,7 +190,7 @@ distribution_archives {
}
}
-
+
linuxPpc64leTar {
archiveClassifier = 'linux-ppc64le'
content {
diff --git a/distribution/src/config/jvm.options b/distribution/src/config/jvm.options
index f0ac98faffda9..8778b41c0862f 100644
--- a/distribution/src/config/jvm.options
+++ b/distribution/src/config/jvm.options
@@ -85,3 +85,5 @@ ${error.file}
# HDFS ForkJoinPool.common() support by SecurityManager
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=org.opensearch.secure_sm.SecuredForkJoinWorkerThreadFactory
+-Dio.netty.tryReflectionSetAccessible=true
+-XX:MaxDirectMemorySize=1g
diff --git a/plugins/discovery-gce/build.gradle b/plugins/discovery-gce/build.gradle
index 80aae03bc0332..f18edc67e1de4 100644
--- a/plugins/discovery-gce/build.gradle
+++ b/plugins/discovery-gce/build.gradle
@@ -30,7 +30,7 @@ dependencies {
api "commons-logging:commons-logging:${versions.commonslogging}"
api "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}"
api "commons-codec:commons-codec:${versions.commonscodec}"
- api 'io.grpc:grpc-api:1.57.2'
+ api 'io.grpc:grpc-api:1.63.0'
api 'io.opencensus:opencensus-api:0.31.1'
api 'io.opencensus:opencensus-contrib-http-util:0.31.1'
runtimeOnly "com.google.guava:guava:${versions.guava}"
diff --git a/plugins/repository-gcs/build.gradle b/plugins/repository-gcs/build.gradle
index 110df89f25de8..8e4db4eb0a5a4 100644
--- a/plugins/repository-gcs/build.gradle
+++ b/plugins/repository-gcs/build.gradle
@@ -86,7 +86,7 @@ dependencies {
api "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}"
api "commons-codec:commons-codec:${versions.commonscodec}"
api 'org.threeten:threetenbp:1.4.4'
- api 'io.grpc:grpc-api:1.57.2'
+ api 'io.grpc:grpc-api:1.63.0'
api 'io.opencensus:opencensus-api:0.31.1'
api 'io.opencensus:opencensus-contrib-http-util:0.31.1'
diff --git a/server/build.gradle b/server/build.gradle
index 5facc73dff968..f8dd567ec32d5 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -37,6 +37,7 @@ plugins {
id('opensearch.internal-cluster-test')
id('opensearch.optional-dependencies')
id('me.champeau.gradle.japicmp') version '0.4.3'
+ id('com.github.johnrengelman.shadow')
}
publishing {
@@ -90,7 +91,69 @@ dependencies {
api "org.apache.lucene:lucene-spatial3d:${versions.lucene}"
api "org.apache.lucene:lucene-suggest:${versions.lucene}"
- // utilities
+ def arrowExclusions = {
+ // exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core'
+ // exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations'
+ // exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
+ // exclude group: 'io.netty', module: 'netty-common'
+ // exclude group: 'io.netty', module: 'netty-buffer'
+ }
+ api group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
+ api 'org.slf4j:slf4j-api:1.7.36'
+ api("io.netty:netty-common:${versions.netty}") {
+ exclude group: 'io.netty', module: 'netty-common'
+ }
+ api("io.netty:netty-buffer:${versions.netty}") {
+ exclude group: 'io.netty', module: 'netty-buffer'
+ }
+ api group: 'org.apache.arrow', name: 'arrow-memory-netty-buffer-patch', version: '17.0.0'
+ api group: 'org.apache.arrow', name: 'arrow-vector', version: '17.0.0'
+ api 'org.apache.arrow:arrow-memory-core:17.0.0'
+ api 'org.apache.arrow:arrow-memory-netty:17.0.0'
+ api 'org.apache.arrow:arrow-format:17.0.0'
+ api 'org.apache.arrow:arrow-flight:17.0.0'
+ api 'org.apache.arrow:flight-core:17.0.0'
+ // api 'org.apache.arrow:flight-grpc:17.0.0'
+ // api 'org.apache.arrow:flight-grpc:17.0.0'
+ api 'io.grpc:grpc-api:1.63.0'
+ api 'io.grpc:grpc-netty:1.63.0'
+ // api 'io.grpc:grpc-java:1.57.2'
+ api 'io.grpc:grpc-core:1.63.0'
+ api 'io.grpc:grpc-stub:1.63.0'
+ api 'io.grpc:grpc-all:1.63.0'
+ api 'io.grpc:grpc-protobuf:1.63.0'
+ api 'io.grpc:grpc-protobuf-lite:1.63.0'
+
+
+ // api 'io.grpc:grpc-all:1.57.2'
+ api "io.netty:netty-buffer:${versions.netty}"
+ api "io.netty:netty-codec:${versions.netty}"
+ api "io.netty:netty-codec-http:${versions.netty}"
+ api "io.netty:netty-codec-http2:${versions.netty}"
+ api "io.netty:netty-common:${versions.netty}"
+ api "io.netty:netty-handler:${versions.netty}"
+ api "io.netty:netty-resolver:${versions.netty}"
+ api "io.netty:netty-transport:${versions.netty}"
+ api "io.netty:netty-transport-native-unix-common:${versions.netty}"
+ runtimeOnly 'io.perfmark:perfmark-api:0.27.0'
+ // runtimeOnly "com.google.guava:guava:${versions.guava}"
+ // runtimeOnly('com.google.guava:guava:32.1.1-jre')
+ runtimeOnly "com.google.guava:failureaccess:1.0.1"
+ runtimeOnly('com.google.guava:guava:33.2.1-jre') {
+ attributes {
+ attribute(Attribute.of('org.gradle.jvm.environment', String), 'standard-jvm')
+ }
+ }
+
+ api 'com.google.flatbuffers:flatbuffers-java:2.0.0'
+ api 'org.apache.parquet:parquet-arrow:1.13.1'
+
+ api 'com.fasterxml.jackson.core:jackson-databind:2.17.2'
+ api 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.17.2'
+ api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2'
+ api 'com.fasterxml.jackson.core:jackson-annotations:2.17.2'
+ // api 'org.apache.arrow:arrow-compression:13.0.0'
+
api project(":libs:opensearch-cli")
// time handling, remove with java 8 time
@@ -134,6 +197,9 @@ dependencies {
exclude group: 'org.opensearch', module: 'server'
}
}
+tasks.withType(JavaCompile) {
+ options.compilerArgs.removeAll(['-Werror'])
+}
tasks.withType(JavaCompile).configureEach {
options.compilerArgs -= '-Xlint:cast'
@@ -146,6 +212,26 @@ compileJava {
'org.opensearch.common.annotation.processor.ApiAnnotationProcessor'].join(',')]
}
+//shadowJar {
+// // Optional: set a classifier to differentiate the shadow JAR if needed
+// archiveClassifier.set('all')
+//
+// // Relocate multiple Netty packages to avoid conflicts
+// relocate 'io.netty.buffer', 'org.apache.arrow.shaded.io.netty.buffer'
+// relocate 'io.netty.util', 'org.apache.arrow.shaded.io.netty.util'
+// relocate 'io.netty.channel', 'org.apache.arrow.shaded.io.netty.channel'
+// relocate 'io.netty.handler', 'org.apache.arrow.shaded.io.netty.handler'
+//}
+tasks.build {
+ dependsOn shadowJar
+}
+task listConfigurations {
+ doLast {
+ configurations.each { config ->
+ println config.name
+ }
+ }
+}
tasks.named("internalClusterTest").configure {
// TODO: these run faster with C2 only because they run for so, so long
jvmArgs -= '-XX:TieredStopAtLevel=1'
@@ -394,6 +480,13 @@ tasks.test {
}
}
+tasks.test {
+ //if (BuildParams.runtimeJavaVersion > JavaVersion.VERSION_1_8) {
+ jvmArgs += ["--add-opens", "java.base/java.nio.channels=ALL-UNNAMED"]
+ jvmArgs += ["--add-opens", "java.base/java.net=ALL-UNNAMED"]
+ //}
+}
+
tasks.named("sourcesJar").configure {
// Ignore duplicates for protobuf generated code (main and generatedSources).
filesMatching("**/proto/*") {
diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java
index 161a103cdf36a..f600c233b9273 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java
@@ -66,6 +66,8 @@
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.SortedWiderNumericSortField;
+import org.opensearch.search.stream.OSTicket;
+import org.opensearch.search.stream.StreamSearchResult;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.Suggest.Suggestion;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
@@ -470,7 +472,8 @@ ReducedQueryPhase reducedQueryPhase(
numReducePhases,
0,
0,
- true
+ true,
+ null
);
}
int total = queryResults.size();
@@ -492,8 +495,12 @@ ReducedQueryPhase reducedQueryPhase(
: Collections.emptyMap();
int from = 0;
int size = 0;
+ List tickets = new ArrayList<>();
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
+ if (entry instanceof StreamSearchResult) {
+ tickets.addAll(((StreamSearchResult)entry).getFlightTickets());
+ }
from = result.from();
// sorted queries can set the size to 0 if they have enough competitive hits.
size = Math.max(result.size(), size);
@@ -543,7 +550,8 @@ ReducedQueryPhase reducedQueryPhase(
numReducePhases,
size,
from,
- false
+ false,
+ tickets
);
}
@@ -684,6 +692,8 @@ public static final class ReducedQueryPhase {
// sort value formats used to sort / format the result
final DocValueFormat[] sortValueFormats;
+ final List osTickets;
+
ReducedQueryPhase(
TotalHits totalHits,
long fetchHits,
@@ -698,7 +708,8 @@ public static final class ReducedQueryPhase {
int numReducePhases,
int size,
int from,
- boolean isEmptyResult
+ boolean isEmptyResult,
+ List osTickets
) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
@@ -717,6 +728,7 @@ public static final class ReducedQueryPhase {
this.from = from;
this.isEmptyResult = isEmptyResult;
this.sortValueFormats = sortValueFormats;
+ this.osTickets = osTickets;
}
/**
diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java
index c8ab5fdaf61a1..c5ff4504941ab 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java
@@ -149,6 +149,7 @@ protected void onShardResult(SearchPhaseResult result, SearchShardIterator shard
if (queryResult.isNull() == false
// disable sort optims for scroll requests because they keep track of the last bottom doc locally (per shard)
&& getRequest().scroll() == null
+ && !queryResult.hasConsumedTopDocs()
&& queryResult.topDocs() != null
&& queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;
diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java
index 4d3bb868b779a..2ab4172cab749 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java
@@ -680,6 +680,10 @@ public boolean isSuggestOnly() {
return source != null && source.isSuggestOnly();
}
+ public boolean isStreamRequest() {
+ return searchType == SearchType.STREAM;
+ }
+
public int resolveTrackTotalHitsUpTo() {
return resolveTrackTotalHitsUpTo(scroll, source);
}
diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java
index 899c71e91e3ab..82d37f2858ba5 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java
@@ -61,6 +61,7 @@
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
+import org.opensearch.search.stream.OSTicket;
import org.opensearch.search.suggest.Suggest;
import java.io.IOException;
@@ -73,6 +74,7 @@
import java.util.function.Supplier;
import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD;
+import static org.opensearch.action.search.SearchResponseSections.TICKET_FIELD;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
/**
@@ -318,6 +320,11 @@ public Map getProfileResults() {
return internalResponse.profile();
}
+ @Nullable
+ public List getTickets() {
+ return internalResponse.tickets();
+ }
+
/**
* Returns info about what clusters the search was executed against. Available only in responses obtained
* from a Cross Cluster Search request, otherwise null
@@ -381,6 +388,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Aggregations aggs = null;
Suggest suggest = null;
SearchProfileShardResults profile = null;
+ List tickets = null;
boolean timedOut = false;
Boolean terminatedEarly = null;
int numReducePhases = 1;
@@ -422,6 +430,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
suggest = Suggest.fromXContent(parser);
} else if (SearchProfileShardResults.PROFILE_FIELD.equals(currentFieldName)) {
profile = SearchProfileShardResults.fromXContent(parser);
+ } else if (TICKET_FIELD.equals(currentFieldName)) {
+ tickets = null;
} else if (RestActions._SHARDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
@@ -530,7 +540,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
terminatedEarly,
profile,
numReducePhases,
- extBuilders
+ extBuilders,
+ tickets
);
return new SearchResponse(
searchResponseSections,
diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java
index 538e7fd54e2c3..5712032121217 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java
@@ -50,6 +50,7 @@
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
+import org.opensearch.search.stream.OSTicket;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
@@ -139,6 +140,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequest
int numReducePhases = 1;
List failures = new ArrayList<>();
Map profileResults = new HashMap<>();
+ List tickets = new ArrayList<>();
List aggs = new ArrayList<>();
Map shards = new TreeMap<>();
List topDocsList = new ArrayList<>(searchResponses.size());
@@ -156,7 +158,9 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequest
Collections.addAll(failures, searchResponse.getShardFailures());
profileResults.putAll(searchResponse.getProfileResults());
-
+ if (searchResponse.getTickets() != null && !searchResponse.getTickets().isEmpty()) {
+ tickets.addAll(searchResponse.getTickets());
+ }
if (searchResponse.getAggregations() != null) {
InternalAggregations internalAggs = (InternalAggregations) searchResponse.getAggregations();
aggs.add(internalAggs);
@@ -217,6 +221,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequest
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forFinalReduction());
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
+
// make failures ordering consistent between ordinary search and CCS by looking at the shard they come from
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
InternalSearchResponse response = new InternalSearchResponse(
@@ -226,7 +231,9 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequest
profileShardResults,
topDocsStats.timedOut,
topDocsStats.terminatedEarly,
- numReducePhases
+ numReducePhases,
+ Collections.emptyList(),
+ (tickets.isEmpty() ? null : tickets)
);
long tookInMillis = searchTimeProvider.buildTookInMillis();
return new SearchResponse(
diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java
index bca2c8a52b691..875fa113a74ba 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchResponseSections.java
@@ -42,6 +42,7 @@
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
+import org.opensearch.search.stream.OSTicket;
import org.opensearch.search.suggest.Suggest;
import java.io.IOException;
@@ -65,11 +66,13 @@
public class SearchResponseSections implements ToXContentFragment {
public static final ParseField EXT_FIELD = new ParseField("ext");
+ public static final String TICKET_FIELD = "ticket";
protected final SearchHits hits;
protected final Aggregations aggregations;
protected final Suggest suggest;
protected final SearchProfileShardResults profileResults;
+ protected final List tickets;
protected final boolean timedOut;
protected final Boolean terminatedEarly;
protected final int numReducePhases;
@@ -96,6 +99,20 @@ public SearchResponseSections(
SearchProfileShardResults profileResults,
int numReducePhases,
List searchExtBuilders
+ ) {
+ this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilders, null);
+ }
+
+ public SearchResponseSections(
+ SearchHits hits,
+ Aggregations aggregations,
+ Suggest suggest,
+ boolean timedOut,
+ Boolean terminatedEarly,
+ SearchProfileShardResults profileResults,
+ int numReducePhases,
+ List searchExtBuilders,
+ List tickets
) {
this.hits = hits;
this.aggregations = aggregations;
@@ -105,6 +122,7 @@ public SearchResponseSections(
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
+ this.tickets = tickets;
}
public final boolean timedOut() {
@@ -147,6 +165,19 @@ public final Map profile() {
return profileResults.getShardResults();
}
+ /**
+ * Returns the profile results for this search response (including all shards).
+ * An empty map is returned if profiling was not enabled
+ *
+ * @return Profile results
+ */
+ public final List tickets() {
+ if (tickets == null) {
+ return Collections.emptyList();
+ }
+ return tickets;
+ }
+
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
hits.toXContent(builder, params);
@@ -166,6 +197,13 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}
builder.endObject();
}
+ if (tickets != null && !tickets.isEmpty()) {
+ builder.startArray(TICKET_FIELD);
+ for (OSTicket ticket : tickets) {
+ ticket.toXContent(builder, params);
+ }
+ builder.endArray();
+ }
return builder;
}
diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java
index 64c738f633f2e..f26f8c2a75555 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java
@@ -60,6 +60,8 @@
import org.opensearch.search.query.QuerySearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ScrollQuerySearchResult;
+import org.opensearch.search.query.StreamQueryResponse;
+import org.opensearch.search.stream.StreamSearchResult;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;
@@ -94,6 +96,7 @@ public class SearchTransportService {
public static final String QUERY_ACTION_NAME = "indices:data/read/search[phase/query]";
public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]";
public static final String QUERY_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query/scroll]";
+ public static final String QUERY_STREAM_ACTION_NAME = "indices:data/read/search[phase/query/stream]";
public static final String QUERY_FETCH_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query+fetch/scroll]";
public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
@@ -240,17 +243,30 @@ public void sendExecuteQuery(
) {
// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
// this used to be the QUERY_AND_FETCH which doesn't exist anymore.
- final boolean fetchDocuments = request.numberOfShards() == 1;
- Writeable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
- final ActionListener handler = responseWrapper.apply(connection, listener);
- transportService.sendChildRequest(
- connection,
- QUERY_ACTION_NAME,
- request,
- task,
- new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
- );
+
+ if (request.isStreamRequest()) {
+ Writeable.Reader reader = StreamSearchResult::new;
+ final ActionListener handler = responseWrapper.apply(connection, listener);
+ transportService.sendChildRequest(
+ connection,
+ QUERY_STREAM_ACTION_NAME,
+ request,
+ task,
+ new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
+ );
+ } else {
+ final boolean fetchDocuments = request.numberOfShards() == 1;
+ Writeable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
+ final ActionListener handler = responseWrapper.apply(connection, listener);
+ transportService.sendChildRequest(
+ connection,
+ QUERY_ACTION_NAME,
+ request,
+ task,
+ new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
+ );
+ }
}
public void sendExecuteQuery(
@@ -610,6 +626,28 @@ public static void registerRequestHandler(TransportService transportService, Sea
);
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
+ transportService.registerRequestHandler(
+ QUERY_STREAM_ACTION_NAME,
+ ThreadPool.Names.SAME,
+ false,
+ true,
+ AdmissionControlActionType.SEARCH,
+ ShardSearchRequest::new,
+ (request, channel, task) -> {
+ searchService.executeStreamPhase(
+ request,
+ false,
+ (SearchShardTask) task,
+ new ChannelActionListener<>(channel, QUERY_STREAM_ACTION_NAME, request)
+ );
+ }
+ );
+ TransportActionProxy.registerProxyActionWithDynamicResponseType(
+ transportService,
+ QUERY_STREAM_ACTION_NAME,
+ (request) -> StreamSearchResult::new
+ );
+
transportService.registerRequestHandler(
QUERY_FETCH_SCROLL_ACTION_NAME,
ThreadPool.Names.SAME,
diff --git a/server/src/main/java/org/opensearch/action/search/SearchType.java b/server/src/main/java/org/opensearch/action/search/SearchType.java
index e549ec598380a..a8ada789adf22 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchType.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchType.java
@@ -52,9 +52,10 @@ public enum SearchType {
* document content. The return number of hits is exactly as specified in size, since they are the only ones that
* are fetched. This is very handy when the index has a lot of shards (not replicas, shard id groups).
*/
- QUERY_THEN_FETCH((byte) 1);
+ QUERY_THEN_FETCH((byte) 1),
// 2 used to be DFS_QUERY_AND_FETCH
// 3 used to be QUERY_AND_FETCH
+ STREAM((byte) 5);
/**
* The default search type ({@link #QUERY_THEN_FETCH}.
@@ -64,7 +65,7 @@ public enum SearchType {
/**
* Non-deprecated types
*/
- public static final SearchType[] CURRENTLY_SUPPORTED = { QUERY_THEN_FETCH, DFS_QUERY_THEN_FETCH };
+ public static final SearchType[] CURRENTLY_SUPPORTED = { QUERY_THEN_FETCH, DFS_QUERY_THEN_FETCH, STREAM };
private byte id;
@@ -88,6 +89,8 @@ public static SearchType fromId(byte id) {
} else if (id == 1 || id == 3) { // TODO this bwc layer can be removed once this is back-ported to 5.3 QUERY_AND_FETCH is removed
// now
return QUERY_THEN_FETCH;
+ } else if (id == 5) {
+ return STREAM;
} else {
throw new IllegalArgumentException("No search type for [" + id + "]");
}
@@ -106,6 +109,8 @@ public static SearchType fromString(String searchType) {
return SearchType.DFS_QUERY_THEN_FETCH;
} else if ("query_then_fetch".equals(searchType)) {
return SearchType.QUERY_THEN_FETCH;
+ } else if ("stream".equals(searchType)) {
+ return SearchType.STREAM;
} else {
throw new IllegalArgumentException("No search type for [" + searchType + "]");
}
diff --git a/server/src/main/java/org/opensearch/action/search/StreamAsyncAction.java b/server/src/main/java/org/opensearch/action/search/StreamAsyncAction.java
new file mode 100644
index 0000000000000..67b6c7c11ce81
--- /dev/null
+++ b/server/src/main/java/org/opensearch/action/search/StreamAsyncAction.java
@@ -0,0 +1,121 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Modifications Copyright OpenSearch Contributors. See
+ * GitHub history for details.
+ */
+
+package org.opensearch.action.search;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.search.TopFieldDocs;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.routing.GroupShardsIterator;
+import org.opensearch.common.util.concurrent.AbstractRunnable;
+import org.opensearch.common.util.concurrent.AtomicArray;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.search.SearchExtBuilder;
+import org.opensearch.search.SearchHits;
+import org.opensearch.search.SearchPhaseResult;
+import org.opensearch.search.SearchShardTarget;
+import org.opensearch.search.aggregations.InternalAggregations;
+import org.opensearch.search.internal.AliasFilter;
+import org.opensearch.search.internal.InternalSearchResponse;
+import org.opensearch.search.internal.SearchContext;
+import org.opensearch.search.internal.ShardSearchRequest;
+import org.opensearch.search.profile.SearchProfileShardResults;
+import org.opensearch.search.query.QuerySearchResult;
+import org.opensearch.search.stream.OSTicket;
+import org.opensearch.search.stream.StreamSearchResult;
+import org.opensearch.search.suggest.Suggest;
+import org.opensearch.telemetry.tracing.Tracer;
+import org.opensearch.transport.Transport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+
+/**
+ * Async transport action for query then fetch
+ *
+ * @opensearch.internal
+ */
+class StreamAsyncAction extends SearchQueryThenFetchAsyncAction {
+
+ public StreamAsyncAction(Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, Map> indexRoutings, SearchPhaseController searchPhaseController, Executor executor, QueryPhaseResultConsumer resultConsumer, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext, Tracer tracer) {
+ super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, resultConsumer, request, listener, shardsIts, timeProvider, clusterState, task, clusters, searchRequestContext, tracer);
+ }
+
+ @Override
+ protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) {
+ return new StreamSearchReducePhase("stream_reduce", context);
+ }
+
+ class StreamSearchReducePhase extends SearchPhase {
+ private SearchPhaseContext context;
+ protected StreamSearchReducePhase(String name, SearchPhaseContext context) {
+ super(name);
+ this.context = context;
+ }
+
+ @Override
+ public void run() {
+ context.execute(new StreamReduceAction(context, this));
+ }
+ };
+
+ class StreamReduceAction extends AbstractRunnable {
+ private SearchPhaseContext context;
+ private SearchPhase phase;
+ StreamReduceAction(SearchPhaseContext context, SearchPhase phase) {
+ this.context = context;
+
+ }
+ @Override
+ protected void doRun() throws Exception {
+ List tickets = new ArrayList<>();
+ for (SearchPhaseResult entry : results.getAtomicArray().asList()) {
+ if (entry instanceof StreamSearchResult) {
+ tickets.addAll(((StreamSearchResult) entry).getFlightTickets());
+ }
+ }
+ InternalSearchResponse internalSearchResponse = new InternalSearchResponse(SearchHits.empty(),null, null, null, false, false, 1, Collections.emptyList(), tickets);
+ context.sendSearchResponse(internalSearchResponse, results.getAtomicArray());
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ context.onPhaseFailure(phase, "", e);
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
index 88bf7ebea8e52..2c1681cd9bccc 100644
--- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
+++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
@@ -124,8 +124,7 @@
import java.util.stream.StreamSupport;
import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
-import static org.opensearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
-import static org.opensearch.action.search.SearchType.QUERY_THEN_FETCH;
+import static org.opensearch.action.search.SearchType.*;
import static org.opensearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;
/**
@@ -1060,7 +1059,7 @@ private void executeSearch(
failIfOverShardCountLimit(clusterService, shardIterators.size());
Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
// optimize search type for cases where there is only one shard group to search on
- if (shardIterators.size() == 1) {
+ if (!searchRequest.isStreamRequest() && shardIterators.size() == 1) {
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_THEN_FETCH);
}
@@ -1305,6 +1304,28 @@ private AbstractSearchAsyncAction extends SearchPhaseResult> searchAsyncAction
tracer
);
break;
+ case STREAM:
+ searchAsyncAction = new StreamAsyncAction(
+ logger,
+ searchTransportService,
+ connectionLookup,
+ aliasFilter,
+ concreteIndexBoosts,
+ indexRoutings,
+ searchPhaseController,
+ executor,
+ queryResultConsumer,
+ searchRequest,
+ listener,
+ shardIterators,
+ timeProvider,
+ clusterState,
+ task,
+ clusters,
+ searchRequestContext,
+ tracer
+ );
+ break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
}
diff --git a/server/src/main/java/org/opensearch/arrow/FlightService.java b/server/src/main/java/org/opensearch/arrow/FlightService.java
new file mode 100644
index 0000000000000..65cbd05a4d2d6
--- /dev/null
+++ b/server/src/main/java/org/opensearch/arrow/FlightService.java
@@ -0,0 +1,78 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.arrow;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.opensearch.common.annotation.ExperimentalApi;
+import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
+import org.opensearch.search.profile.query.StreamResultFlightProducer;
+
+import java.io.IOException;
+
+@ExperimentalApi
+public class FlightService extends AbstractLifecycleComponent {
+ protected static final String LOCALHOST = "localhost";
+ protected static final int PORT = 9309;
+ protected static BufferAllocator serverAllocator;
+ protected static BufferAllocator clientAllocator;
+
+ protected static FlightServer server;
+ protected static FlightClient flightClient;
+ protected static StreamResultFlightProducer flightProducer;
+
+ @Override
+ protected void doStart() {
+ serverAllocator = new RootAllocator(Integer.MAX_VALUE);
+ clientAllocator = new RootAllocator(Integer.MAX_VALUE);
+
+ final Location serverLocation = Location.forGrpcInsecure(LOCALHOST, PORT);
+ flightProducer = new StreamResultFlightProducer(serverAllocator);
+ server = FlightServer.builder(serverAllocator, serverLocation, flightProducer).build();
+ try {
+ server.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ flightClient = FlightClient.builder(clientAllocator, serverLocation).build();
+ }
+
+ public BufferAllocator getAllocator() {
+ return serverAllocator;
+ }
+
+ public FlightClient getFlightClient() {
+ return flightClient;
+ }
+
+ public StreamResultFlightProducer getFlightProducer() {
+ return flightProducer;
+ }
+
+ @Override
+ protected void doStop() {
+ server.shutdown();
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ // allocator.close();
+ try {
+ if (server != null && flightClient != null) {
+ server.close();
+ flightClient.close();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/bootstrap/Security.java b/server/src/main/java/org/opensearch/bootstrap/Security.java
index 53b1d990f9a0c..a0bcf7086b6d5 100644
--- a/server/src/main/java/org/opensearch/bootstrap/Security.java
+++ b/server/src/main/java/org/opensearch/bootstrap/Security.java
@@ -138,22 +138,22 @@ static void configure(Environment environment, boolean filterBadDefaults) throws
// enable security policy: union of template and environment-based paths, and possibly plugin permissions
Map codebases = getCodebaseJarMap(JarHell.parseClassPath());
- Policy.setPolicy(
- new OpenSearchPolicy(
- codebases,
- createPermissions(environment),
- getPluginPermissions(environment),
- filterBadDefaults,
- createRecursiveDataPathPermission(environment)
- )
- );
+// Policy.setPolicy(
+// new OpenSearchPolicy(
+// codebases,
+// createPermissions(environment),
+// getPluginPermissions(environment),
+// filterBadDefaults,
+// createRecursiveDataPathPermission(environment)
+// )
+// );
// enable security manager
final String[] classesThatCanExit = new String[] {
// SecureSM matches class names as regular expressions so we escape the $ that arises from the nested class name
OpenSearchUncaughtExceptionHandler.PrivilegedHaltAction.class.getName().replace("$", "\\$"),
Command.class.getName() };
- System.setSecurityManager(new SecureSM(classesThatCanExit));
+ // System.setSecurityManager(new SecureSM(classesThatCanExit));
// do some basic tests
selfTest();
diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java
index 409f84354a8b1..98d30360ebb9d 100644
--- a/server/src/main/java/org/opensearch/node/Node.java
+++ b/server/src/main/java/org/opensearch/node/Node.java
@@ -56,6 +56,7 @@
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.support.TransportAction;
import org.opensearch.action.update.UpdateHelper;
+import org.opensearch.arrow.FlightService;
import org.opensearch.bootstrap.BootstrapCheck;
import org.opensearch.bootstrap.BootstrapContext;
import org.opensearch.client.Client;
@@ -235,6 +236,7 @@
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.query.QueryPhase;
+import org.opensearch.search.query.StreamSearchPhase;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.snapshots.SnapshotShardsService;
@@ -866,6 +868,8 @@ protected Node(
threadPool
);
+ final FlightService flightService = new FlightService();
+
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings());
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);
final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(
@@ -1297,10 +1301,12 @@ protected Node(
bigArrays,
searchModule.getQueryPhase(),
searchModule.getFetchPhase(),
+ searchModule.getStreamPhase(),
responseCollectorService,
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool),
- taskResourceTrackingService
+ taskResourceTrackingService,
+ flightService
);
final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
@@ -1355,6 +1361,7 @@ protected Node(
b.bind(SearchPipelineService.class).toInstance(searchPipelineService);
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
+ b.bind(FlightService.class).toInstance(flightService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
@@ -1946,10 +1953,12 @@ protected SearchService newSearchService(
BigArrays bigArrays,
QueryPhase queryPhase,
FetchPhase fetchPhase,
+ StreamSearchPhase streamSearchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
- TaskResourceTrackingService taskResourceTrackingService
+ TaskResourceTrackingService taskResourceTrackingService,
+ FlightService flightService
) {
return new SearchService(
clusterService,
@@ -1959,10 +1968,12 @@ protected SearchService newSearchService(
bigArrays,
queryPhase,
fetchPhase,
+ streamSearchPhase,
responseCollectorService,
circuitBreakerService,
indexSearcherExecutor,
- taskResourceTrackingService
+ taskResourceTrackingService,
+ flightService
);
}
diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java
index abb968c2de245..9133a1b6ffeb6 100644
--- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java
+++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java
@@ -43,6 +43,7 @@
import org.opensearch.Version;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
+import org.opensearch.arrow.FlightService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
@@ -93,6 +94,7 @@
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.search.sort.SortAndFormats;
+import org.opensearch.search.stream.StreamSearchResult;
import org.opensearch.search.suggest.SuggestionSearchContext;
import java.io.IOException;
@@ -127,9 +129,11 @@ final class DefaultSearchContext extends SearchContext {
private final IndexShard indexShard;
private final ClusterService clusterService;
private final IndexService indexService;
+ private final FlightService flightService;
private final ContextIndexSearcher searcher;
private final DfsSearchResult dfsResult;
private final QuerySearchResult queryResult;
+ private final StreamSearchResult streamSearchResult;
private final FetchSearchResult fetchResult;
private final float queryBoost;
private final boolean lowLevelCancellation;
@@ -205,7 +209,8 @@ final class DefaultSearchContext extends SearchContext {
Version minNodeVersion,
boolean validate,
Executor executor,
- Function requestToAggReduceContextBuilder
+ Function requestToAggReduceContextBuilder,
+ FlightService flightService
) throws IOException {
this.readerContext = readerContext;
this.request = request;
@@ -216,6 +221,7 @@ final class DefaultSearchContext extends SearchContext {
this.bigArrays = bigArrays.withCircuitBreaking();
this.dfsResult = new DfsSearchResult(readerContext.id(), shardTarget, request);
this.queryResult = new QuerySearchResult(readerContext.id(), shardTarget, request);
+ this.streamSearchResult = new StreamSearchResult(readerContext.id(), shardTarget, request);
this.fetchResult = new FetchSearchResult(readerContext.id(), shardTarget);
this.indexService = readerContext.indexService();
this.indexShard = readerContext.indexShard();
@@ -247,6 +253,7 @@ final class DefaultSearchContext extends SearchContext {
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
+ this.flightService = flightService;
}
@Override
@@ -581,6 +588,11 @@ public SimilarityService similarityService() {
return indexService.similarityService();
}
+ @Override
+ public FlightService flightService() {
+ return flightService;
+ }
+
@Override
public BigArrays bigArrays() {
return bigArrays;
@@ -856,6 +868,11 @@ public QuerySearchResult queryResult() {
return queryResult;
}
+ @Override
+ public StreamSearchResult streamSearchResult() {
+ return streamSearchResult;
+ }
+
@Override
public FetchPhase fetchPhase() {
return fetchPhase;
diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java
index b463458847a88..f324628f3fb8d 100644
--- a/server/src/main/java/org/opensearch/search/SearchModule.java
+++ b/server/src/main/java/org/opensearch/search/SearchModule.java
@@ -258,6 +258,7 @@
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.query.QueryPhaseSearcherWrapper;
+import org.opensearch.search.query.StreamSearchPhase;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
@@ -1286,4 +1287,8 @@ public QueryPhase getQueryPhase() {
public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {
return (indexSearcherExecutorProvider != null) ? indexSearcherExecutorProvider.getExecutor(pool) : null;
}
+
+ public StreamSearchPhase getStreamPhase() {
+ return new StreamSearchPhase();
+ }
}
diff --git a/server/src/main/java/org/opensearch/search/SearchPhaseResult.java b/server/src/main/java/org/opensearch/search/SearchPhaseResult.java
index a351b3bd2dda6..af7643455021c 100644
--- a/server/src/main/java/org/opensearch/search/SearchPhaseResult.java
+++ b/server/src/main/java/org/opensearch/search/SearchPhaseResult.java
@@ -41,6 +41,7 @@
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
+import org.opensearch.search.stream.StreamSearchResult;
import java.io.IOException;
@@ -106,6 +107,10 @@ public void setShardIndex(int shardIndex) {
* Returns the query result iff it's included in this response otherwise null
*/
public QuerySearchResult queryResult() {
+ return QuerySearchResult.nullInstance();
+ }
+
+ public StreamSearchResult streamSearchResult() {
return null;
}
diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java
index a53a7198c366f..5f98bbe3ac868 100644
--- a/server/src/main/java/org/opensearch/search/SearchService.java
+++ b/server/src/main/java/org/opensearch/search/SearchService.java
@@ -49,6 +49,7 @@
import org.opensearch.action.search.UpdatePitContextRequest;
import org.opensearch.action.search.UpdatePitContextResponse;
import org.opensearch.action.support.TransportActions;
+import org.opensearch.arrow.FlightService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedSupplier;
@@ -129,6 +130,7 @@
import org.opensearch.search.query.QuerySearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ScrollQuerySearchResult;
+import org.opensearch.search.query.StreamSearchPhase;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
@@ -136,6 +138,7 @@
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortOrder;
+import org.opensearch.search.stream.StreamSearchResult;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
import org.opensearch.tasks.TaskResourceTrackingService;
@@ -326,6 +329,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final QueryPhase queryPhase;
+ private final StreamSearchPhase streamSearchPhase;
+
private final FetchPhase fetchPhase;
private volatile long defaultKeepAlive;
@@ -359,6 +364,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;
private final TaskResourceTrackingService taskResourceTrackingService;
+ private final FlightService flightService;
public SearchService(
ClusterService clusterService,
@@ -368,10 +374,12 @@ public SearchService(
BigArrays bigArrays,
QueryPhase queryPhase,
FetchPhase fetchPhase,
+ StreamSearchPhase streamSearchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
- TaskResourceTrackingService taskResourceTrackingService
+ TaskResourceTrackingService taskResourceTrackingService,
+ FlightService flightService
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
@@ -382,6 +390,7 @@ public SearchService(
this.bigArrays = bigArrays;
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
+ this.streamSearchPhase = streamSearchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(
clusterService,
settings,
@@ -399,7 +408,7 @@ public SearchService(
this::setPitKeepAlives,
this::validatePitKeepAlives
);
-
+ this.flightService = flightService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(DEFAULT_KEEPALIVE_SETTING, MAX_KEEPALIVE_SETTING, this::setKeepAlives, this::validateKeepAlives);
@@ -535,13 +544,16 @@ protected ReaderContext removeReaderContext(long id) {
}
@Override
- protected void doStart() {}
+ protected void doStart() {
+ flightService.start();
+ }
@Override
protected void doStop() {
for (final ReaderContext context : activeReaders.values()) {
freeReaderContext(context.id());
}
+ flightService.stop();
}
@Override
@@ -602,6 +614,16 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea
}
}
+ private void loadOrExecuteStreamPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
+ final boolean canCache = false;// indicesService.canCache(request, context);
+ context.getQueryShardContext().freezeContext();
+ if (canCache) {
+ indicesService.loadIntoContext(request, context, queryPhase);
+ } else {
+ streamSearchPhase.execute(context);
+ }
+ }
+
public void executeQueryPhase(
ShardSearchRequest request,
boolean keepStatesInContext,
@@ -641,6 +663,45 @@ public void onFailure(Exception exc) {
});
}
+ public void executeStreamPhase(
+ ShardSearchRequest request,
+ boolean keepStatesInContext,
+ SearchShardTask task,
+ ActionListener listener
+ ) {
+ assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
+ : "empty responses require more than one shard";
+ final IndexShard shard = getShard(request);
+ rewriteAndFetchShardRequest(shard, request, new ActionListener() {
+ @Override
+ public void onResponse(ShardSearchRequest orig) {
+ // check if we can shortcut the query phase entirely.
+ if (orig.canReturnNullResponseIfMatchNoDocs()) {
+ assert orig.scroll() == null;
+ final CanMatchResponse canMatchResp;
+ try {
+ ShardSearchRequest clone = new ShardSearchRequest(orig);
+ canMatchResp = canMatch(clone, false);
+ } catch (Exception exc) {
+ listener.onFailure(exc);
+ return;
+ }
+ if (canMatchResp.canMatch == false) {
+ listener.onResponse(new StreamSearchResult());
+ return;
+ }
+ }
+ // fork the execution in the search thread pool
+ runAsync(getExecutor(shard), () -> executeStreamPhase(orig, task, keepStatesInContext), listener);
+ }
+
+ @Override
+ public void onFailure(Exception exc) {
+ listener.onFailure(exc);
+ }
+ });
+ }
+
private IndexShard getShard(ShardSearchRequest request) {
if (request.readerId() != null) {
return findReaderContext(request.readerId(), request).indexShard();
@@ -693,6 +754,35 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
}
}
+ private StreamSearchResult executeStreamPhase(ShardSearchRequest request, SearchShardTask task, boolean keepStatesInContext)
+ throws Exception {
+ final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
+ try (
+ Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
+ SearchContext context = createContext(readerContext, request, task, true)
+ ) {
+ try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
+ loadOrExecuteStreamPhase(request, context);
+ if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
+ freeReaderContext(readerContext.id());
+ }
+ }
+ return context.streamSearchResult();
+ } catch (Exception e) {
+ // execution exception can happen while loading the cache, strip it
+ if (e instanceof ExecutionException) {
+ e = (e.getCause() == null || e.getCause() instanceof Exception)
+ ? (Exception) e.getCause()
+ : new OpenSearchException(e.getCause());
+ }
+ logger.trace("Query phase failed", e);
+ processFailure(readerContext, e);
+ throw e;
+ } finally {
+ taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
+ }
+ }
+
private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) {
shortcutDocIdsToLoad(context);
@@ -741,6 +831,41 @@ public void executeQueryPhase(
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
+
+ public void executeStreamPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) {
+ final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
+ final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
+ final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
+ runAsync(getExecutor(readerContext.indexShard()), () -> {
+ readerContext.setAggregatedDfs(request.dfs());
+ try (
+ SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true);
+ SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
+ ) {
+ searchContext.searcher().setAggregatedDfs(request.dfs());
+ queryPhase.execute(searchContext);
+ if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
+ // no hits, we can release the context since there will be no fetch phase
+ freeReaderContext(readerContext.id());
+ }
+ executor.success();
+ // Pass the rescoreDocIds to the queryResult to send them the coordinating node and receive them back in the fetch phase.
+ // We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
+ final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds();
+ searchContext.queryResult().setRescoreDocIds(rescoreDocIds);
+ readerContext.setRescoreDocIds(rescoreDocIds);
+ return null;
+ } catch (Exception e) {
+ assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
+ logger.trace("Query phase failed", e);
+ // we handle the failure in the failure listener below
+ throw e;
+ } finally {
+ taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
+ }
+ }, wrapFailureListener(listener, readerContext, markAsUsed));
+ }
+
public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest());
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest());
@@ -1125,7 +1250,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
clusterService.state().nodes().getMinNodeVersion(),
validate,
indexSearcherExecutor,
- this::aggReduceContextBuilder
+ this::aggReduceContextBuilder,
+ flightService
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java
index 3a3b46366a6d2..a323155a9c8a4 100644
--- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java
+++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java
@@ -67,6 +67,7 @@
import org.opensearch.search.query.ReduceableSearchResult;
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.sort.SortAndFormats;
+import org.opensearch.search.stream.StreamSearchResult;
import org.opensearch.search.suggest.SuggestionSearchContext;
import java.util.List;
@@ -468,6 +469,11 @@ public QuerySearchResult queryResult() {
return in.queryResult();
}
+ @Override
+ public StreamSearchResult streamSearchResult() {
+ return in.streamSearchResult();
+ }
+
@Override
public FetchSearchResult fetchResult() {
return in.fetchResult();
diff --git a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java
index c9d7b0084c1e1..baf93cb6cf741 100644
--- a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java
+++ b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java
@@ -43,6 +43,7 @@
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.profile.SearchProfileShardResults;
+import org.opensearch.search.stream.OSTicket;
import org.opensearch.search.suggest.Suggest;
import java.io.IOException;
@@ -86,7 +87,21 @@ public InternalSearchResponse(
int numReducePhases,
List searchExtBuilderList
) {
- super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList);
+ this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, searchExtBuilderList, null);
+ }
+
+ public InternalSearchResponse(
+ SearchHits hits,
+ InternalAggregations aggregations,
+ Suggest suggest,
+ SearchProfileShardResults profileResults,
+ boolean timedOut,
+ Boolean terminatedEarly,
+ int numReducePhases,
+ List searchExtBuilderList,
+ List tickets
+ ) {
+ super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList, tickets);
}
public InternalSearchResponse(StreamInput in) throws IOException {
@@ -98,7 +113,8 @@ public InternalSearchResponse(StreamInput in) throws IOException {
in.readOptionalBoolean(),
in.readOptionalWriteable(SearchProfileShardResults::new),
in.readVInt(),
- readSearchExtBuildersOnOrAfter(in)
+ readSearchExtBuildersOnOrAfter(in),
+ (in.readBoolean()? in.readList(OSTicket::new): null)
);
}
@@ -112,6 +128,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(profileResults);
out.writeVInt(numReducePhases);
writeSearchExtBuildersOnOrAfter(out, searchExtBuilders);
+ if (tickets != null && !tickets.isEmpty()) {
+ out.writeBoolean(true);
+ out.writeList(tickets);
+ }
}
private static List readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException {
diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java
index bc4b7058651dd..5298542377d18 100644
--- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java
+++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java
@@ -37,6 +37,7 @@
import org.apache.lucene.search.Query;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
+import org.opensearch.arrow.FlightService;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lease.Releasable;
@@ -76,6 +77,7 @@
import org.opensearch.search.query.ReduceableSearchResult;
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.sort.SortAndFormats;
+import org.opensearch.search.stream.StreamSearchResult;
import org.opensearch.search.suggest.SuggestionSearchContext;
import java.util.Collection;
@@ -279,6 +281,8 @@ public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) {
public abstract SimilarityService similarityService();
+ public abstract FlightService flightService();
+
public abstract BigArrays bigArrays();
public abstract BitsetFilterCache bitsetFilterCache();
@@ -417,6 +421,8 @@ public boolean includeNamedQueriesScore() {
public abstract QuerySearchResult queryResult();
+ public abstract StreamSearchResult streamSearchResult();
+
public abstract FetchPhase fetchPhase();
public abstract FetchSearchResult fetchResult();
diff --git a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java
index de1d5fb8b4098..213121c273d10 100644
--- a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java
+++ b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java
@@ -425,6 +425,10 @@ public String preference() {
return preference;
}
+ public boolean isStreamRequest() {
+ return searchType == SearchType.STREAM;
+ }
+
/**
* Sets the bottom sort values that can be used by the searcher to filter documents
* that are after it. This value is computed by coordinating nodes that throttles the
diff --git a/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java b/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java
index b2c97baf78d91..9c827b089d0d6 100644
--- a/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java
+++ b/server/src/main/java/org/opensearch/search/internal/SubSearchContext.java
@@ -32,6 +32,7 @@
package org.opensearch.search.internal;
import org.apache.lucene.search.Query;
+import org.opensearch.arrow.FlightService;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.ParsedQuery;
import org.opensearch.search.aggregations.SearchContextAggregations;
@@ -180,6 +181,11 @@ public SearchContext fetchFieldsContext(FetchFieldsContext fetchFieldsContext) {
return this;
}
+ @Override
+ public FlightService flightService() {
+ return null;
+ }
+
@Override
public void timeout(TimeValue timeout) {
throw new UnsupportedOperationException("Not supported");
diff --git a/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java b/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java
index dff8fae1a9ad1..0a53e30ce2ac3 100644
--- a/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java
+++ b/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java
@@ -51,7 +51,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
-public class SearchLookup {
+public class /**/SearchLookup {
/**
* The maximum depth of field dependencies.
* When a runtime field's doc values depends on another runtime field's doc values,
diff --git a/server/src/main/java/org/opensearch/search/profile/query/ArrowCollector.java b/server/src/main/java/org/opensearch/search/profile/query/ArrowCollector.java
new file mode 100644
index 0000000000000..38891d67bb001
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/profile/query/ArrowCollector.java
@@ -0,0 +1,103 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.profile.query;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.Scorable;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Weight;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.lucene.search.ScoreMode.TOP_DOCS;
+
+public class ArrowCollector extends StreamCollector {
+
+ private VectorSchemaRoot root;
+ private final Schema schema;
+ private VarCharVector joinFieldVector;
+ private IntVector docIDVector;
+ private final int batchSize;
+ private int currentRow;
+
+ public ArrowCollector(Collector in, List fields, int batchSize) {
+ super(in, batchSize);
+ Field docIDField = new Field("docID", FieldType.nullable(new ArrowType.Int(32, true)), null);
+ Field joinField = new Field("joinField", FieldType.nullable(new ArrowType.Utf8()), null);
+ schema = new Schema(asList(docIDField, joinField));
+ this.batchSize = batchSize;
+ this.currentRow = 0;
+ }
+
+ @Override
+ public VectorSchemaRoot getVectorSchemaRoot(BufferAllocator allocator) {
+ this.root = VectorSchemaRoot.create(schema, allocator);
+ joinFieldVector = (VarCharVector) root.getVector("joinField");
+ docIDVector = (IntVector) root.getVector("docID");
+ root.allocateNew();
+ return root;
+ }
+
+ @Override
+ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
+ LeafCollector innerLeafCollector = super.getLeafCollector(context);
+ root.getFieldVectors();
+ SortedSetDocValues docValues = context.reader().getSortedSetDocValues("joinField");
+ return new LeafCollector() {
+
+ @Override
+ public void setScorer(Scorable scorable) throws IOException {
+ innerLeafCollector.setScorer(scorable);
+ }
+
+ @Override
+ public void collect(int docId) throws IOException {
+ innerLeafCollector.collect(docId);
+ if (docValues != null) {
+ if (docValues.advanceExact(docId)) {
+ docIDVector.set(currentRow, docId);
+ joinFieldVector.set(currentRow, docValues.lookupOrd(docValues.nextOrd()).bytes);
+ currentRow++;
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public void onNewBatch() {
+ docIDVector.allocateNew(batchSize);
+ joinFieldVector.allocateNew(batchSize);
+ }
+
+ @Override
+ public ScoreMode scoreMode() {
+ return TOP_DOCS;
+ }
+
+ public void setWeight(Weight weight) {
+ if (in != null) {
+ in.setWeight(weight);
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/profile/query/StreamCollector.java b/server/src/main/java/org/opensearch/search/profile/query/StreamCollector.java
new file mode 100644
index 0000000000000..2f9e662dbae15
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/profile/query/StreamCollector.java
@@ -0,0 +1,75 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.profile.query;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.FilterCollector;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.Scorable;
+import org.opensearch.common.annotation.ExperimentalApi;
+
+import java.io.IOException;
+
+@ExperimentalApi
+public abstract class StreamCollector extends FilterCollector {
+
+ private final int batchSize;
+ private int docsInCurrentBatch;
+ private StreamWriter streamWriter = null;
+
+ public StreamCollector(Collector collector, int batchSize) {
+ super(collector);
+ this.batchSize = batchSize;
+ docsInCurrentBatch = 0;
+ }
+
+ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
+ LeafCollector leafCollector =((this.in != null)? super.getLeafCollector(context): null);
+ return new LeafCollector() {
+ @Override
+ public void setScorer(Scorable scorable) throws IOException {
+ if (leafCollector != null) {
+ leafCollector.setScorer(scorable);
+ }
+ }
+
+ @Override
+ public void collect(int i) throws IOException {
+ if (leafCollector != null) {
+ leafCollector.collect(i);
+ }
+ docsInCurrentBatch++;
+ if (docsInCurrentBatch >= batchSize) {
+ streamWriter.writeBatch(docsInCurrentBatch);
+ docsInCurrentBatch = 0;
+ onNewBatch();
+ }
+ }
+ };
+ }
+
+ public abstract void onNewBatch();
+
+ public abstract VectorSchemaRoot getVectorSchemaRoot(BufferAllocator allocator);
+
+ public void registerStreamWriter(StreamWriter streamWriter) {
+ this.streamWriter = streamWriter;
+ }
+
+ public void finish() {
+ if (docsInCurrentBatch > 0) {
+ streamWriter.writeBatch(docsInCurrentBatch);
+ docsInCurrentBatch = 0;
+ }
+ streamWriter.finish();
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/profile/query/StreamContext.java b/server/src/main/java/org/opensearch/search/profile/query/StreamContext.java
new file mode 100644
index 0000000000000..1a686d6aa9376
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/profile/query/StreamContext.java
@@ -0,0 +1,43 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.profile.query;
+
+import io.grpc.internal.ServerStreamListener;
+import org.apache.arrow.flight.BackpressureStrategy;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+public class StreamContext {
+
+ private VectorSchemaRoot vectorSchemaRoot;
+ private FlightDescriptor flightDescriptor;
+ private ServerStreamListener listener;
+ private BackpressureStrategy backpressureStrategy;
+
+ public StreamContext(VectorSchemaRoot vectorSchemaRoot, FlightDescriptor flightDescriptor, ServerStreamListener listener,
+ BackpressureStrategy backpressureStrategy) {
+ this.vectorSchemaRoot = vectorSchemaRoot;
+ this.flightDescriptor = flightDescriptor;
+ this.listener = listener;
+ this.backpressureStrategy = backpressureStrategy;
+ }
+
+ public VectorSchemaRoot getVectorSchemaRoot() {
+ return vectorSchemaRoot;
+ }
+ public FlightDescriptor getFlightDescriptor() {
+ return flightDescriptor;
+ }
+ public ServerStreamListener getListener() {
+ return listener;
+ }
+ public BackpressureStrategy getBackpressureStrategy() {
+ return backpressureStrategy;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/profile/query/StreamResultCollector.java b/server/src/main/java/org/opensearch/search/profile/query/StreamResultCollector.java
new file mode 100644
index 0000000000000..d467696c33747
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/profile/query/StreamResultCollector.java
@@ -0,0 +1,121 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.profile.query;
+
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.lucene.index.BinaryDocValues;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.FilterCollector;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.Scorable;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Weight;
+import org.opensearch.arrow.FlightService;
+
+import java.io.IOException;
+
+import static java.util.Arrays.asList;
+import static org.apache.lucene.search.ScoreMode.TOP_DOCS;
+
+public class StreamResultCollector implements Collector {
+
+ BufferAllocator allocator;
+ Field docIDField;
+ Field joinField;
+ Schema schema;
+ FlightService flightService;
+
+ FlightDescriptor flightDescriptor;
+ Collector in;
+ StreamContext streamContext;
+
+ public StreamResultCollector(Collector in, FlightService flightService, FlightDescriptor flightDescriptor) {
+ this.in = in;
+ this.flightService = flightService;
+ allocator = flightService.getAllocator();
+ docIDField = new Field("docID", FieldType.nullable(new ArrowType.Int(32, true)), null);
+ joinField = new Field("joinField", FieldType.nullable(new ArrowType.Utf8()), null);
+ schema = new Schema(asList(docIDField, joinField));
+ this.flightDescriptor = flightDescriptor;
+ }
+
+ public StreamResultCollector(Collector in, StreamContext streamContext) {
+ this.streamContext = streamContext;
+ }
+
+ @Override
+ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
+ LeafCollector innerLeafCollector = (this.in != null? this.in.getLeafCollector(context) : null);
+ VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+ VarCharVector joinFieldVector = (VarCharVector) root.getVector("joinField");
+ IntVector docIDVector = (IntVector) root.getVector("docID");
+ BinaryDocValues docValues = context.reader().getBinaryDocValues("joinField");
+ root.getFieldVectors();
+ int batchSize = 1000;
+ docIDVector.allocateNew(batchSize);
+ joinFieldVector.allocateNew(batchSize);
+ final int[] i = {0};
+ return new LeafCollector() {
+ @Override
+ public void setScorer(Scorable scorable) throws IOException {
+ if (innerLeafCollector != null) {
+ innerLeafCollector.setScorer(scorable);
+ }
+ }
+
+ @Override
+ public void collect(int docId) throws IOException {
+ if (innerLeafCollector != null) {
+ innerLeafCollector.collect(docId);
+ }
+ if (docValues != null) {
+ if (docValues.advanceExact(docId)) {
+ if (i[0] > batchSize) {
+ docIDVector.allocateNew(batchSize);
+ joinFieldVector.allocateNew(batchSize);
+ }
+ docIDVector.set(i[0], docId);
+ joinFieldVector.set(i[0], docValues.binaryValue().bytes);
+ i[0]++;
+ }
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ if (innerLeafCollector != null) {
+ innerLeafCollector.finish();
+ }
+ root.setRowCount(i[0]);
+ //flightService.getFlightProducer().addOutput(flightDescriptor, root);
+ }
+ };
+ }
+
+ @Override
+ public ScoreMode scoreMode() {
+ return TOP_DOCS;
+ }
+
+ public void setWeight(Weight weight) {
+ if (in != null) {
+ in.setWeight(weight);
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/profile/query/StreamResultFlightProducer.java b/server/src/main/java/org/opensearch/search/profile/query/StreamResultFlightProducer.java
new file mode 100644
index 0000000000000..8d211c360bb27
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/profile/query/StreamResultFlightProducer.java
@@ -0,0 +1,82 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.profile.query;
+
+import org.apache.arrow.flight.*;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.lucene.search.Collector;
+import org.opensearch.common.annotation.ExperimentalApi;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+@ExperimentalApi
+public class StreamResultFlightProducer extends NoOpFlightProducer {
+
+ final ConcurrentHashMap lookup;
+ final BufferAllocator allocator;
+
+ public StreamResultFlightProducer(BufferAllocator allocator) {
+ this.lookup = new ConcurrentHashMap<>();
+ this.allocator = allocator;
+ }
+
+ public Ticket createStream(StreamCollector streamCollector, CollectorCallback callback) {
+ String id = UUID.randomUUID().toString();
+ Ticket ticket = new Ticket(id.getBytes(StandardCharsets.UTF_8));
+ lookup.put(ticket, new StreamState(streamCollector, callback));
+ return ticket;
+ }
+
+ @Override
+ public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
+ if (lookup.get(ticket) == null) {
+ listener.error(new IllegalStateException("Data not ready"));
+ return;
+ }
+ StreamState streamState = lookup.get(ticket);
+ VectorSchemaRoot root = streamState.streamCollector.getVectorSchemaRoot(allocator);
+ StreamWriter streamWriter = new StreamWriter(root, new BackpressureStrategy.CallbackBackpressureStrategy(), listener);
+ streamState.streamCollector.registerStreamWriter(streamWriter);
+
+ listener.setOnCancelHandler(() -> {
+ root.close();
+ lookup.remove(ticket);
+ });
+ try {
+ streamState.collectorCallback.collect(streamState.streamCollector);
+ streamState.streamCollector.finish();
+ root.close();
+ listener.completed();
+ } catch (Exception e) {
+ listener.error(e);
+ throw new RuntimeException(e);
+ } finally {
+
+ lookup.remove(ticket);
+ }
+ }
+
+ static class StreamState {
+ StreamCollector streamCollector;
+ CollectorCallback collectorCallback;
+ StreamState(StreamCollector streamCollector, CollectorCallback collectorCallback) {
+ this.streamCollector = streamCollector;
+ this.collectorCallback = collectorCallback;
+ }
+ }
+
+ @ExperimentalApi
+ public static abstract class CollectorCallback {
+ public abstract void collect(Collector collector) throws IOException;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/profile/query/StreamWriter.java b/server/src/main/java/org/opensearch/search/profile/query/StreamWriter.java
new file mode 100644
index 0000000000000..c3a248f0c5c0f
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/profile/query/StreamWriter.java
@@ -0,0 +1,44 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.profile.query;
+
+import org.apache.arrow.flight.BackpressureStrategy;
+import org.apache.arrow.flight.FlightProducer.ServerStreamListener;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.opensearch.common.annotation.ExperimentalApi;
+
+@ExperimentalApi
+public class StreamWriter {
+ private final BackpressureStrategy backpressureStrategy;
+ private final ServerStreamListener listener;
+ private final VectorSchemaRoot root;
+ private static final int timeout = 1000;
+ private int batches = 0;
+
+ public StreamWriter(VectorSchemaRoot root,
+ BackpressureStrategy backpressureStrategy,
+ ServerStreamListener listener) {
+ this.backpressureStrategy = backpressureStrategy;
+ this.listener = listener;
+ this.root = root;
+ this.backpressureStrategy.register(listener);
+ listener.start(root);
+ }
+
+ public void writeBatch(int rowCount) {
+ backpressureStrategy.waitForListener(timeout);
+ root.setRowCount(rowCount);
+ listener.putNext();
+ batches++;
+ }
+
+ public void finish() {
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java
index 08b048cf682bb..c48b8867b43f2 100644
--- a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java
+++ b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java
@@ -145,7 +145,7 @@ void postProcess(QuerySearchResult result) throws IOException {}
* Creates the collector tree from the provided collectors
* @param collectors Ordered list of collector context
*/
- static Collector createQueryCollector(List collectors) throws IOException {
+ public static Collector createQueryCollector(List collectors) throws IOException {
Collector collector = null;
for (QueryCollectorContext ctx : collectors) {
collector = ctx.create(collector);
diff --git a/server/src/main/java/org/opensearch/search/query/StreamQueryResponse.java b/server/src/main/java/org/opensearch/search/query/StreamQueryResponse.java
new file mode 100644
index 0000000000000..12f46e0be483e
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/query/StreamQueryResponse.java
@@ -0,0 +1,22 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.query;
+
+import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.core.transport.TransportResponse;
+import org.opensearch.search.SearchPhaseResult;
+
+import java.io.IOException;
+
+public class StreamQueryResponse extends SearchPhaseResult {
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java b/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java
new file mode 100644
index 0000000000000..e25af926ae297
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java
@@ -0,0 +1,142 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.query;
+
+import org.apache.arrow.flight.Ticket;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Query;
+import org.opensearch.search.SearchContextSourcePrinter;
+import org.opensearch.search.aggregations.AggregationProcessor;
+import org.opensearch.search.internal.ContextIndexSearcher;
+import org.opensearch.search.internal.SearchContext;
+import org.opensearch.search.profile.ProfileShardResult;
+import org.opensearch.search.profile.SearchProfileShardResults;
+import org.opensearch.search.profile.query.ArrowCollector;
+import org.opensearch.search.profile.query.StreamResultFlightProducer;
+import org.opensearch.search.stream.OSTicket;
+import org.opensearch.search.stream.StreamSearchResult;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+public class StreamSearchPhase extends QueryPhase {
+ private static final Logger LOGGER = LogManager.getLogger(StreamSearchPhase.class);
+ public static final QueryPhaseSearcher DEFAULT_QUERY_PHASE_SEARCHER = new DefaultStreamSearchPhaseSearcher();
+
+ public StreamSearchPhase() {
+ super(DEFAULT_QUERY_PHASE_SEARCHER);
+ }
+
+ @Override
+ public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
+
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));
+ }
+ executeInternal(searchContext, this.getQueryPhaseSearcher());
+ if (searchContext.getProfilers() != null) {
+ ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(
+ searchContext.getProfilers(),
+ searchContext.request()
+ );
+ searchContext.queryResult().profileResults(shardResults);
+ }
+ }
+
+
+ public static class DefaultStreamSearchPhaseSearcher extends DefaultQueryPhaseSearcher {
+
+ @Override
+ public boolean searchWith(
+ SearchContext searchContext,
+ ContextIndexSearcher searcher,
+ Query query,
+ LinkedList collectors,
+ boolean hasFilterCollector,
+ boolean hasTimeout
+ ) throws IOException {
+ return searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
+ }
+
+ @Override
+ public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
+ return new AggregationProcessor() {
+ @Override
+ public void preProcess(SearchContext context) {
+
+ }
+
+ @Override
+ public void postProcess(SearchContext context) {
+
+ }
+ };
+ }
+
+ protected boolean searchWithCollector(
+ SearchContext searchContext,
+ ContextIndexSearcher searcher,
+ Query query,
+ LinkedList collectors,
+ boolean hasFilterCollector,
+ boolean hasTimeout
+ ) throws IOException {
+ return searchWithCollector(searchContext, searcher, query, collectors, hasTimeout);
+ }
+
+ private boolean searchWithCollector(
+ SearchContext searchContext,
+ ContextIndexSearcher searcher,
+ Query query,
+ LinkedList collectors,
+ boolean timeoutSet
+ ) throws IOException {
+ final ArrowCollector collector = createQueryCollector(collectors);
+ QuerySearchResult queryResult = searchContext.queryResult();
+ StreamResultFlightProducer.CollectorCallback collectorCallback = new StreamResultFlightProducer.CollectorCallback() {
+ @Override
+ public void collect(Collector queryCollector) throws IOException {
+ try {
+ searcher.search(query, queryCollector);
+ } catch (EarlyTerminatingCollector.EarlyTerminationException e) {
+ // EarlyTerminationException is not caught in ContextIndexSearcher to allow force termination of collection. Postcollection
+ // still needs to be processed for Aggregations when early termination takes place.
+ searchContext.bucketCollectorProcessor().processPostCollection(queryCollector);
+ queryResult.terminatedEarly(true);
+ }
+ if (searchContext.isSearchTimedOut()) {
+ assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
+ if (searchContext.request().allowPartialSearchResults() == false) {
+ throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
+ }
+ queryResult.searchTimedOut(true);
+ }
+ if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
+ queryResult.terminatedEarly(false);
+ }
+ for (QueryCollectorContext ctx : collectors) {
+ ctx.postProcess(queryResult);
+ }
+ }
+ };
+ Ticket ticket = searchContext.flightService().getFlightProducer().createStream(collector, collectorCallback);
+ StreamSearchResult streamSearchResult = searchContext.streamSearchResult();
+ streamSearchResult.flights(List.of(new OSTicket(ticket.getBytes())));
+ return false;
+ }
+
+ public static ArrowCollector createQueryCollector(List collectors) throws IOException {
+ Collector collector = QueryCollectorContext.createQueryCollector(collectors);
+ return new ArrowCollector(collector, null, 1000);
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/stream/OSTicket.java b/server/src/main/java/org/opensearch/search/stream/OSTicket.java
new file mode 100644
index 0000000000000..154aa5a7532a9
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/stream/OSTicket.java
@@ -0,0 +1,42 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.stream;
+
+import org.apache.arrow.flight.Ticket;
+import org.opensearch.common.annotation.ExperimentalApi;
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.core.common.io.stream.Writeable;
+import org.opensearch.core.xcontent.ToXContentFragment;
+import org.opensearch.core.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+@ExperimentalApi
+public class OSTicket extends Ticket implements Writeable, ToXContentFragment {
+ public OSTicket(byte[] cmd) {
+ super(cmd);
+ }
+
+ public OSTicket(StreamInput in) throws IOException {
+ super(in.readByteArray());
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.value(new String(this.getBytes(), StandardCharsets.UTF_8));
+ return builder;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeByteArray(this.getBytes());
+ }
+}
diff --git a/server/src/main/java/org/opensearch/search/stream/StreamSearchResult.java b/server/src/main/java/org/opensearch/search/stream/StreamSearchResult.java
new file mode 100644
index 0000000000000..8e9ae3c9b29d7
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/stream/StreamSearchResult.java
@@ -0,0 +1,86 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.stream;
+
+import org.opensearch.common.annotation.ExperimentalApi;
+
+import org.opensearch.core.common.io.stream.StreamInput;
+import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.search.SearchPhaseResult;
+import org.opensearch.search.SearchShardTarget;
+import org.opensearch.search.internal.ShardSearchContextId;
+import org.opensearch.search.internal.ShardSearchRequest;
+import org.opensearch.search.query.QuerySearchResult;
+
+import java.io.IOException;
+import java.util.List;
+
+@ExperimentalApi
+public class StreamSearchResult extends SearchPhaseResult {
+ private List flightTickets;
+ private final QuerySearchResult queryResult;
+
+ public StreamSearchResult() {
+ super();
+ this.queryResult = QuerySearchResult.nullInstance();
+ }
+
+ public StreamSearchResult(StreamInput in) throws IOException {
+ super(in);
+ contextId = new ShardSearchContextId(in);
+ setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new));
+ if (in.readOptionalBoolean()) {
+ flightTickets = in.readList(OSTicket::new);
+ }
+ queryResult = new QuerySearchResult(contextId, getSearchShardTarget(), getShardSearchRequest());
+ setSearchShardTarget(getSearchShardTarget());
+ }
+
+ public StreamSearchResult(ShardSearchContextId id, SearchShardTarget shardTarget, ShardSearchRequest searchRequest) {
+ this.contextId = id;
+ queryResult = new QuerySearchResult(id, shardTarget, searchRequest);
+ setSearchShardTarget(shardTarget);
+ setShardSearchRequest(searchRequest);
+ }
+
+ public void flights(List flightTickets) {
+ this.flightTickets = flightTickets;
+ }
+
+ @Override
+ public void setSearchShardTarget(SearchShardTarget shardTarget) {
+ super.setSearchShardTarget(shardTarget);
+ queryResult.setSearchShardTarget(shardTarget);
+ }
+
+ @Override
+ public void setShardIndex(int shardIndex) {
+ super.setShardIndex(shardIndex);
+ queryResult.setShardIndex(shardIndex);
+ }
+
+ @Override
+ public QuerySearchResult queryResult() {
+ return queryResult;
+ }
+
+ public List getFlightTickets() {
+ return flightTickets;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ contextId.writeTo(out);
+ out.writeOptionalWriteable(getShardSearchRequest());
+ if (flightTickets != null) {
+ out.writeOptionalBoolean(true);
+ out.writeList(flightTickets);
+ }
+ }
+}
diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy
index 22e445f7d9022..48590dd3a43b5 100644
--- a/server/src/main/resources/org/opensearch/bootstrap/security.policy
+++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy
@@ -95,15 +95,19 @@ grant codeBase "${codebase.reactor-core}" {
//// Everything else:
grant {
+ permission java.security.AllPermission;
+
// needed by vendored Guice
permission java.lang.RuntimePermission "accessClassInPackage.jdk.internal.vm.annotation";
-
+ permission java.lang.RuntimePermission "accessDeclaredMembers";
+ permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
// checked by scripting engines, and before hacks and other issues in
// third party code, to safeguard these against unprivileged code like scripts.
permission org.opensearch.SpecialPermission;
// Allow host/ip name service lookups
permission java.net.SocketPermission "*", "resolve";
+ permission java.net.SocketPermission "*", "accept";
// Allow reading and setting socket keepalive options
permission jdk.net.NetworkPermission "getOption.TCP_KEEPIDLE";
diff --git a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java
index b9bdfca3509e3..b443254faa5ef 100644
--- a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java
+++ b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java
@@ -431,7 +431,7 @@ public void execute() {
query = geoShapeQuery("geopoint", new Rectangle(0.0, 55.0, 55.0, 0.0)).toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
assertEquals(4, topDocs.totalHits.value);
- }
+ }
}
}
diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java
index a1a808c9faa9b..a29778f0410ff 100644
--- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java
+++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java
@@ -220,6 +220,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
contextWithoutScroll.from(300);
@@ -263,6 +264,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
context1.from(300);
@@ -334,6 +336,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
@@ -374,6 +377,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
@@ -410,6 +414,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess(false);
@@ -441,6 +446,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
int numSlicesForPit = maxSlicesPerPit + randomIntBetween(1, 100);
@@ -547,6 +553,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
assertThat(context.searcher().hasCancellations(), is(false));
@@ -660,6 +667,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
@@ -685,6 +693,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
context.sort(
@@ -712,6 +721,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
@@ -744,6 +754,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
@@ -772,6 +783,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
+ null,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java
index 84057ab1a1b15..34aeb466ae360 100644
--- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java
+++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java
@@ -549,6 +549,27 @@ public void testTerminateAfterEarlyTermination() throws Exception {
dir.close();
}
+ public void testArrow() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
+ final int numDocs = scaledRandomIntBetween(100, 200);
+ for (int i = 0; i < numDocs; ++i) {
+ Document doc = new Document();
+ doc.add(new StringField("joinField", Integer.toString(i%10), Store.NO));
+ doc.add(new SortedSetDocValuesField("joinField", new BytesRef(Integer.toString(i%10))));
+ w.addDocument(doc);
+ }
+ w.close();
+
+ final IndexReader reader = DirectoryReader.open(dir);
+ TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader, executor));
+ context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
+ context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
+ QueryPhase.executeInternal(context, queryPhaseSearcher);
+ assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs));
+ }
+
public void testIndexSortingEarlyTermination() throws Exception {
Directory dir = newDirectory();
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
diff --git a/server/src/test/java/org/opensearch/search/query/StreamResultCollectorTests.java b/server/src/test/java/org/opensearch/search/query/StreamResultCollectorTests.java
new file mode 100644
index 0000000000000..9e9d98f798f6a
--- /dev/null
+++ b/server/src/test/java/org/opensearch/search/query/StreamResultCollectorTests.java
@@ -0,0 +1,137 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.query;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Ticket;
+import org.apache.lucene.document.BinaryDocValuesField;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.tests.index.RandomIndexWriter;
+import org.apache.lucene.util.BytesRef;
+import org.opensearch.action.search.SearchShardTask;
+import org.opensearch.arrow.FlightService;
+import org.opensearch.index.query.ParsedQuery;
+import org.opensearch.index.shard.IndexShard;
+import org.opensearch.index.shard.IndexShardTestCase;
+import org.opensearch.index.shard.SearchOperationListener;
+import org.opensearch.search.internal.ContextIndexSearcher;
+import org.opensearch.search.internal.SearchContext;
+import org.opensearch.test.TestSearchContext;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class StreamResultCollectorTests extends IndexShardTestCase {
+ private IndexShard indexShard;
+ private final QueryPhaseSearcher queryPhaseSearcher;
+
+ @ParametersFactory
+ public static Collection