Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic stream solution #2

Open
wants to merge 14 commits into
base: reviewed-stream
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.DaemonStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.MonitorStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
Expand Down Expand Up @@ -239,6 +240,9 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
rsp.add("explanation", tupleStream.toExplanation(this.streamFactory));
}

if (tupleStream instanceof MonitorStream) {
tupleStream = ((MonitorStream)tupleStream).getDaemonStream();
}
if (tupleStream instanceof DaemonStream) {
DaemonStream daemonStream = (DaemonStream) tupleStream;
if (daemons.containsKey(daemonStream.getId())) {
Expand All @@ -247,7 +251,8 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
daemonStream.setDaemons(daemons);
daemonStream.open(); // This will start the deamonStream
daemons.put(daemonStream.getId(), daemonStream);
rsp.add("result-set", new DaemonResponseStream("Deamon:" + daemonStream.getId() + " started on " + coreName));
rsp.add("result-set", new DaemonResponseStream("Daemon:" + daemonStream.getId() + " started on " + coreName));

} else {
rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream)));
}
Expand Down
8 changes: 4 additions & 4 deletions solr/server/solr/configsets/_default/conf/solrconfig.xml
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,11 @@
<query>

<!-- Maximum number of clauses allowed when parsing a boolean query string.

This limit only impacts boolean queries specified by a user as part of a query string,
and provides per-collection controls on how complex user specified boolean queries can
be. Query strings that specify more clauses then this will result in an error.

If this per-collection limit is greater then the global `maxBooleanClauses` limit
specified in `solr.xml`, it will have no effect, as that setting also limits the size
of user specified boolean queries.
Expand Down Expand Up @@ -686,10 +686,10 @@
<str name="echoParams">explicit</str>
<int name="rows">10</int>
<!-- Default search field
<str name="df">text</str>
<str name="df">text</str>
-->
<!-- Change from JSON to XML format (the default prior to Solr 7.0)
<str name="wt">xml</str>
<str name="wt">xml</str>
-->
</lst>
<!-- In addition to defaults, "appends" params can be specified
Expand Down
2 changes: 2 additions & 0 deletions solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public static void register(StreamFactory streamFactory) {
.withFunctionName("commit", CommitStream.class)
.withFunctionName("random", RandomFacadeStream.class)
.withFunctionName("knnSearch", KnnStream.class)
.withFunctionName("monitor", MonitorStream.class)
.withFunctionName("alert", AlertStream.class)


// decorator streams
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.solr.client.solrj.io.stream;

import java.io.IOException;
import java.net.URI;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.SuppressForbidden;


public class AlertStream extends TupleStream implements Expressible {
CloseableHttpClient httpClient;
TupleStream tupleStream;
String targetURL;

public AlertStream(StreamExpression expression, StreamFactory factory) throws IOException {
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);

String targetURL = null;

// parameters by position
TupleStream tupleStream = factory.constructStream(streamExpressions.get(0));
StreamExpressionNamedParameter urlExpression = factory.getNamedOperand(expression, "url");

if (urlExpression != null) {
targetURL = ((StreamExpressionValue) urlExpression.getParameter()).getValue();
jingxihuang411 marked this conversation as resolved.
Show resolved Hide resolved
}

init(tupleStream, targetURL);
}

public AlertStream(TupleStream tupleStream, String targetURL) {
init(tupleStream, targetURL);
}

public void init(TupleStream tupleStream, String targetURL) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method necessary? This logic could go into the second constructor, and the first constructor could call the second constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If calling second inside the first constructor, this(tupleStream, targetURL) should be first line.

this.tupleStream = tupleStream;
this.targetURL = targetURL;
}

@Override
public void setStreamContext(StreamContext context) {
tupleStream.setStreamContext(context);
}

@Override
public List<TupleStream> children() {
ArrayList<TupleStream> list = new ArrayList<>();
list.add(tupleStream);
return list;
}

@Override
public void open() throws IOException {
tupleStream.open();
httpClient = HttpClients.createDefault();
}

@Override
public void close() throws IOException {
tupleStream.close();
httpClient.close();
}

@Override
public Tuple read() throws IOException {
Tuple tuple = tupleStream.read();

if (!tuple.EOF) {
alert(tuple);
}
return tuple;
}

@Override
public StreamComparator getStreamSort() {
return tupleStream.getStreamSort();
}

@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter(((Expressible) tupleStream).toExpression(factory));
return expression;
}

@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
tupleStream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(Explanation.ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory).toString());
}

@SuppressForbidden(reason = "Needs currentTimeMillis to send request time")
void alert(Tuple tuple) throws IOException {
try {
Map<String, String> requestBody = new HashMap<String, String>();
requestBody.put("matches", tuple.fields.toString());
requestBody.put("requestTime", new Timestamp(System.currentTimeMillis()).toString());


HttpPost httpPost= new HttpPost((URI.create(targetURL)));
httpPost.setEntity(new StringEntity(requestBody.toString()));

httpClient.execute(httpPost);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice if the user could receive the HTTP statuses of each alert request in the tuple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not sure which tuple do you mean. Right now the alert request is sent to the endpoint for each matching tuple so users could retrieve the HTTP status for each request received at the endpoint.

I added the timestamp for sending each request in the tuple.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when adding the timestamp, you should also add the HTTP status code and maybe other information so that the users know if certain documents were not processed properly

//TODO: Error handling for non-successful response code
} catch (ClientProtocolException cpe) {
// Currently detecting authentication by string-matching the HTTP response
// Perhaps SolrClient should have thrown an exception itself??
if (cpe.getMessage().contains("HTTP ERROR 401") || cpe.getMessage().contentEquals("HTTP ERROR 403")) {
int code = cpe.getMessage().contains("HTTP ERROR 401") ? 401 : 403;
throw new SolrException(SolrException.ErrorCode.getErrorCode(code),
"Solr requires authentication for " + targetURL + ". Please supply valid credentials. HTTP code=" + code);
} else {
throw new SolrException(SolrException.ErrorCode.UNKNOWN, cpe);
}
}
}
}
Loading