Skip to content

Commit

Permalink
feat: distinguish early close from kill/cancel with new LogType::Clos…
Browse files Browse the repository at this point in the history
…ed. (#15206)

* feat: distinguish early close from kill/cancel with new LogType::Closed.

* extract Executor::get_response_state

* feat: http handler allow redundant final/kill.

* revert #15022

#15094 (comment)

* fix test.

* ignore flaky test.
  • Loading branch information
youngsofun authored Apr 10, 2024
1 parent 515f99e commit a883bc0
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 220 deletions.
1 change: 1 addition & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ build_exceptions! {
TooManyUserConnections(1041),
AbortedSession(1042),
AbortedQuery(1043),
ClosedQuery(1044),
CannotListenerPort(1045),
BadBytes(1046),
InitPrometheusFailure(1047),
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/src/interpreters/common/query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ fn error_fields(log_type: LogType, err: Option<ErrorCode>) -> (LogType, i32, Str
e.to_string(),
e.backtrace_str(),
)
} else if e.code() == ErrorCode::ABORTED_QUERY {
(
LogType::Closed,
e.code().into(),
e.to_string(),
e.backtrace_str(),
)
} else {
(
LogType::Error,
Expand Down
123 changes: 50 additions & 73 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,28 +232,19 @@ async fn query_final_handler(
query_id, query_id
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager
.remove_query(&query_id, RemoveReason::Finished)
.await
{
Ok(query) => {
match http_query_manager.remove_query(&query_id, RemoveReason::Finished) {
Some(query) => {
let mut response = query.get_response_state_only().await;
if query.check_removed().is_none() && !response.state.state.is_stopped() {
query.kill(ErrorCode::ClosedQuery("closed by client")).await;
response = query.get_response_state_only().await;
}
// it is safe to set these 2 fields to None, because client now check for null/None first.
response.session = None;
response.state.affect = None;
if response.state.state == ExecuteStateKind::Running {
return Err(PoemError::from_string(
format!("query {} is still running, can not final it", query_id),
StatusCode::BAD_REQUEST,
));
}
Ok(QueryResponse::from_internal(query_id, response, true))
}
Err(reason) => Err(query_id_not_found_or_removed(
&query_id,
&ctx.node_id,
reason,
)),
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
}
}
.in_span(root)
Expand All @@ -275,20 +266,16 @@ async fn query_cancel_handler(
query_id, query_id
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager.try_get_query(&query_id).await {
Ok(query) => {
query.kill("http query cancel by handler").await;
http_query_manager
.remove_query(&query_id, RemoveReason::Canceled)
.await
.ok();
match http_query_manager.remove_query(&query_id, RemoveReason::Canceled) {
Some(query) => {
if query.check_removed().is_none() {
query
.kill(ErrorCode::AbortedQuery("canceled by client"))
.await;
}
Ok(StatusCode::OK)
}
Err(reason) => Err(query_id_not_found_or_removed(
&query_id,
&ctx.node_id,
reason,
)),
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
}
}
.in_span(root)
Expand All @@ -304,16 +291,16 @@ async fn query_state_handler(

async {
let http_query_manager = HttpQueryManager::instance();
match http_query_manager.try_get_query(&query_id).await {
Ok(query) => {
let response = query.get_response_state_only().await;
Ok(QueryResponse::from_internal(query_id, response, false))
match http_query_manager.get_query(&query_id) {
Some(query) => {
if let Some(reason) = query.check_removed() {
Err(query_id_removed(&query_id, reason))
} else {
let response = query.get_response_state_only().await;
Ok(QueryResponse::from_internal(query_id, response, false))
}
}
Err(reason) => Err(query_id_not_found_or_removed(
&query_id,
&ctx.node_id,
reason,
)),
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
}
}
.in_span(root)
Expand All @@ -330,20 +317,20 @@ async fn query_page_handler(

async {
let http_query_manager = HttpQueryManager::instance();
match http_query_manager.try_get_query(&query_id).await {
Ok(query) => {
query.update_expire_time(true).await;
let resp = query.get_response_page(page_no).await.map_err(|err| {
poem::Error::from_string(err.message(), StatusCode::NOT_FOUND)
})?;
query.update_expire_time(false).await;
Ok(QueryResponse::from_internal(query_id, resp, false))
match http_query_manager.get_query(&query_id) {
Some(query) => {
if let Some(reason) = query.check_removed() {
Err(query_id_removed(&query_id, reason))
} else {
query.update_expire_time(true).await;
let resp = query.get_response_page(page_no).await.map_err(|err| {
poem::Error::from_string(err.message(), StatusCode::NOT_FOUND)
})?;
query.update_expire_time(false).await;
Ok(QueryResponse::from_internal(query_id, resp, false))
}
}
Err(reason) => Err(query_id_not_found_or_removed(
&query_id,
&ctx.node_id,
reason,
)),
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
}
}
.in_span(root)
Expand Down Expand Up @@ -372,19 +359,11 @@ pub(crate) async fn query_handler(
Ok(query) => {
query.update_expire_time(true).await;
// tmp workaround to tolerant old clients
let max_wait_time = std::cmp::max(1, req.pagination.wait_time_secs);
let start = std::time::Instant::now();
let resp = loop {
let resp = query
.get_response_page(0)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?;
if matches!(resp.state.state, ExecuteStateKind::Starting) && start.elapsed().as_secs() < max_wait_time as u64 {
continue;
}
break resp
};
let resp = query
.get_response_page(0)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?;
if matches!(resp.state.state, ExecuteStateKind::Failed) {
ctx.set_fail();
}
Expand Down Expand Up @@ -432,17 +411,15 @@ pub fn query_route() -> Route {
route
}

fn query_id_not_found_or_removed(
query_id: &str,
node_id: &str,
reason: Option<RemoveReason>,
) -> PoemError {
let error = match reason {
Some(reason) => reason.to_string(),
None => "not found".to_string(),
};
fn query_id_removed(query_id: &str, remove_reason: RemoveReason) -> PoemError {
PoemError::from_string(
format!("query id {query_id} {}", remove_reason),
StatusCode::BAD_REQUEST,
)
}
fn query_id_not_found(query_id: &str, node_id: &str) -> PoemError {
PoemError::from_string(
format!("query id {query_id} {error} on {node_id}"),
format!("query id {query_id} not found on {node_id}"),
StatusCode::NOT_FOUND,
)
}
Expand Down
52 changes: 33 additions & 19 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterQueryLog;
use crate::servers::http::v1::http_query_handlers::QueryResponseField;
use crate::servers::http::v1::query::http_query::ResponseState;
use crate::servers::http::v1::query::sized_spsc::SizedChannelSender;
use crate::sessions::AcquireQueueGuard;
use crate::sessions::QueriesQueueManager;
Expand All @@ -58,6 +59,12 @@ pub enum ExecuteStateKind {
Succeeded,
}

impl ExecuteStateKind {
pub fn is_stopped(self) -> bool {
matches!(self, Self::Succeeded | Self::Failed)
}
}

impl std::fmt::Display for ExecuteStateKind {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
Expand Down Expand Up @@ -156,6 +163,18 @@ impl ExecutorSessionState {
}

impl Executor {
pub fn get_response_state(&self) -> ResponseState {
let (exe_state, err) = self.state.extract();
ResponseState {
running_time_ms: self.get_query_duration_ms(),
progresses: self.get_progress(),
state: exe_state,
error: err,
warnings: self.get_warnings(),
affect: self.get_affect(),
schema: self.get_schema(),
}
}
pub fn get_schema(&self) -> Vec<QueryResponseField> {
match &self.state {
Starting(_) => Default::default(),
Expand Down Expand Up @@ -220,7 +239,7 @@ impl Executor {
}
}
#[async_backtrace::framed]
pub async fn stop(this: &Arc<RwLock<Executor>>, reason: Result<()>, kill: bool) {
pub async fn stop(this: &Arc<RwLock<Executor>>, reason: Result<()>) {
{
let guard = this.read().await;
if let Stopped(s) = &guard.state {
Expand Down Expand Up @@ -249,8 +268,10 @@ impl Executor {
)
.unwrap_or_else(|e| error!("fail to write query_log {:?}", e));
}
if reason.is_err() {
s.ctx.get_current_session().txn_mgr().lock().set_fail();
if let Err(e) = &reason {
if e.code() != ErrorCode::CLOSED_QUERY {
s.ctx.get_current_session().txn_mgr().lock().set_fail();
}
}
guard.state = Stopped(Box::new(ExecuteStopped {
stats: Default::default(),
Expand All @@ -263,18 +284,11 @@ impl Executor {
}))
}
Running(r) => {
// release session
if kill {
if let Err(error) = &reason {
r.session.force_kill_query(error.clone());
} else {
r.session.force_kill_query(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed",
));
if let Err(e) = &reason {
if e.code() != ErrorCode::CLOSED_QUERY {
r.session.txn_mgr().lock().set_fail();
}
}
if reason.is_err() {
r.session.txn_mgr().lock().set_fail();
r.session.force_kill_query(e.clone());
}

guard.state = Stopped(Box::new(ExecuteStopped {
Expand Down Expand Up @@ -353,11 +367,11 @@ impl ExecuteState {
);
match CatchUnwindFuture::create(res).await {
Ok(Err(err)) => {
Executor::stop(&executor_clone, Err(err.clone()), false).await;
Executor::stop(&executor_clone, Err(err.clone())).await;
block_sender_closer.close();
}
Err(e) => {
Executor::stop(&executor_clone, Err(e), false).await;
Executor::stop(&executor_clone, Err(e)).await;
block_sender_closer.close();
}
_ => {}
Expand Down Expand Up @@ -389,7 +403,7 @@ async fn execute(
None => {
let block = DataBlock::empty_with_schema(schema);
block_sender.send(block, 0).await;
Executor::stop(&executor, Ok(()), false).await;
Executor::stop(&executor, Ok(())).await;
block_sender.close();
}
Some(Err(err)) => {
Expand All @@ -399,7 +413,7 @@ async fn execute(
databend_common_expression::Value::Scalar(Scalar::String(err.to_string())),
);
block_sender.send(DataBlock::new(vec![data], 1), 1).await;
Executor::stop(&executor, Err(err), false).await;
Executor::stop(&executor, Err(err)).await;
block_sender.close();
}
Some(Ok(block)) => {
Expand All @@ -424,7 +438,7 @@ async fn execute(
}
};
}
Executor::stop(&executor, Ok(()), false).await;
Executor::stop(&executor, Ok(())).await;
block_sender.close();
}
}
Expand Down
Loading

0 comments on commit a883bc0

Please sign in to comment.