Skip to content

Commit

Permalink
[fix](arrow-flight-sql) Fix kill timeout FlightSqlConnection and Flig…
Browse files Browse the repository at this point in the history
…htSqlConnectProcessor close (apache#41770)

1. Doris will cancel the connection that has not responded for a long
time, Mysql Conenction will exit directly, but Arrow Flight Conenction
does not exit immediately, may be frequently cancel and print logs.
timeout is `wait_timeout` in session veriable.
2. FlightSqlConnectProcessor Use try-with-resources to close correctly.
  • Loading branch information
xinyiZzz authored Oct 16, 2024
1 parent a74ee9a commit 2973d0f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 68 deletions.
16 changes: 9 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,8 @@ public String getRemoteHostPortString() {

// kill operation with no protect.
public void kill(boolean killConnection) {
LOG.warn("kill query from {}, kill mysql connection: {}", getRemoteHostPortString(), killConnection);
LOG.warn("kill query from {}, kill {} connection: {}", getRemoteHostPortString(), getConnectType(),
killConnection);

if (killConnection) {
isKilled = true;
Expand All @@ -964,10 +965,10 @@ public void kill(boolean killConnection) {

// kill operation with no protect by timeout.
private void killByTimeout(boolean killConnection) {
LOG.warn("kill query from {}, kill mysql connection: {} reason time out", getRemoteHostPortString(),
killConnection);

if (killConnection) {
LOG.warn("kill wait timeout connection, connection type: {}, connectionId: {}, remote: {}, "
+ "wait timeout: {}",
getConnectType(), connectionId, getRemoteHostPortString(), sessionVariable.getWaitTimeoutS());
isKilled = true;
// Close channel to break connection with client
closeChannel();
Expand All @@ -976,6 +977,10 @@ private void killByTimeout(boolean killConnection) {
// cancelQuery by time out
StmtExecutor executorRef = executor;
if (executorRef != null) {
LOG.warn("kill time out query, remote: {}, at the same time kill connection is {},"
+ " connection type: {}, connectionId: {}",
getRemoteHostPortString(), killConnection,
getConnectType(), connectionId);
executorRef.cancel(new Status(TStatusCode.TIMEOUT,
"query is timeout, killed by timeout checker"));
}
Expand All @@ -999,9 +1004,6 @@ public void checkTimeout(long now) {
if (command == MysqlCommand.COM_SLEEP) {
if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
// Need kill this connection.
LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}",
getRemoteHostPortString(), sessionVariable.getWaitTimeoutS());

killFlag = true;
killConnection = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,64 +186,66 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
final FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext);

flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
throw new RuntimeException("after executeQueryStatement handleQuery");
}

if (connectContext.isReturnResultFromLocal()) {
// set/use etc. stmt returns an OK result by default.
if (connectContext.getFlightSqlChannel().resultNum() == 0) {
// a random query id and add empty results
String queryId = UUID.randomUUID().toString();
connectContext.getFlightSqlChannel().addOKResult(queryId, query);
try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) {
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
throw new RuntimeException("after executeQueryStatement handleQuery");
}

final ByteString handle = ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
if (connectContext.isReturnResultFromLocal()) {
// set/use etc. stmt returns an OK result by default.
if (connectContext.getFlightSqlChannel().resultNum() == 0) {
// a random query id and add empty results
String queryId = UUID.randomUUID().toString();
connectContext.getFlightSqlChannel().addOKResult(queryId, query);

final ByteString handle = ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot()
.getSchema());
} else {
// A Flight Sql request can only contain one statement that returns result,
// otherwise expected thrown exception during execution.
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);

// The tokens used for authentication between getStreamStatement and getFlightInfoStatement
// are different. So put the peerIdentity into the ticket and then getStreamStatement is used to
// find the correct ConnectContext.
// queryId is used to find query results.
final ByteString handle = ByteString.copyFromUtf8(
peerIdentity + ":" + DebugUtil.printId(connectContext.queryId()));
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
return getFlightInfoForSchema(ticketStatement, descriptor, connectContext.getFlightSqlChannel()
.getResult(DebugUtil.printId(connectContext.queryId())).getVectorSchemaRoot()
.getSchema());
}
} else {
// A Flight Sql request can only contain one statement that returns result,
// otherwise expected thrown exception during execution.
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);

// The tokens used for authentication between getStreamStatement and getFlightInfoStatement
// are different. So put the peerIdentity into the ticket and then getStreamStatement is used to
// find the correct ConnectContext.
// queryId is used to find query results.
final ByteString handle = ByteString.copyFromUtf8(
peerIdentity + ":" + DebugUtil.printId(connectContext.queryId()));
// Now only query stmt will pull results from BE.
final ByteString handle;
if (connectContext.getSessionVariable().enableParallelResultSink()) {
handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + query);
} else {
// only one instance
handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + query);
}
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null")
.toRuntimeException();
}
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
.getVectorSchemaRoot().getSchema());
}
} else {
// Now only query stmt will pull results from BE.
final ByteString handle;
if (connectContext.getSessionVariable().enableParallelResultSink()) {
handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + query);
} else {
// only one instance
handle = ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + query);
}
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
} catch (Exception e) {
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
Expand Down Expand Up @@ -296,8 +298,7 @@ private ActionCreatePreparedStatementResult buildCreatePreparedStatementResult(B
final ByteString bytes = Objects.isNull(parameterSchema) ? ByteString.EMPTY
: ByteString.copyFrom(serializeMetadata(parameterSchema));
return ActionCreatePreparedStatementResult.newBuilder()
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData)))
.setParameterSchema(bytes)
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData))).setParameterSchema(bytes)
.setPreparedStatementHandle(handle).build();
}

Expand Down Expand Up @@ -326,12 +327,11 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
Schema metaData = connectContext.getFlightSqlChannel()
.createOneOneSchemaRoot("ResultMeta", "UNIMPLEMENTED").getSchema();
listener.onNext(new Result(
Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData))
.toByteArray()));
Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData)).toByteArray()));
} catch (Exception e) {
String errMsg = "create prepared statement failed, " + e.getMessage() + ", "
+ Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode()
+ ", error msg: " + connectContext.getState().getErrorMessage();
String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(
e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@

/**
* Process one flgiht sql connection.
*
* Must use try-with-resources.
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class);
Expand Down Expand Up @@ -177,6 +179,7 @@ public Schema fetchArrowFlightSchema(int timeoutMs) {
@Override
public void close() throws Exception {
ctx.setCommand(MysqlCommand.COM_SLEEP);
ctx.clear();
// TODO support query profile
for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
asynExecutor.finalizeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ protected void closeChannel() {
if (flightSqlChannel != null) {
flightSqlChannel.close();
}
connectScheduler.unregisterConnection(this);
}

// kill operation with no protect.
Expand All @@ -72,8 +73,8 @@ public void kill(boolean killConnection) {

if (killConnection) {
isKilled = true;
// Close channel and break connection with client.
closeChannel();
connectScheduler.unregisterConnection(this);
}
// Now, cancel running query.
cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query killed by user"));
Expand Down

0 comments on commit 2973d0f

Please sign in to comment.