Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): Add warnings in the query response #13918

Merged
merged 7 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub trait TableContext: Send + Sync {
fn get_current_catalog(&self) -> String;
fn check_aborting(&self) -> Result<()>;
fn get_error(&self) -> Option<ErrorCode>;
fn push_warning(&self, warning: String);
fn get_current_database(&self) -> String;
fn get_current_user(&self) -> Result<UserInfo>;
fn get_current_role(&self) -> Option<RoleInfo>;
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub struct QueryResponse {
pub error: Option<QueryError>,
pub stats: QueryStats,
pub affect: Option<QueryAffect>,
pub warnings: Vec<String>,
pub stats_uri: Option<String>,
// just call it after client not use it anymore, not care about the server-side behavior
pub final_uri: Option<String>,
Expand Down Expand Up @@ -186,6 +187,7 @@ impl QueryResponse {
session: r.session,
stats,
affect: state.affect,
warnings: r.state.warnings,
id: id.clone(),
next_uri,
stats_uri: Some(make_state_uri(&id)),
Expand All @@ -208,6 +210,7 @@ impl QueryResponse {
data: vec![],
schema: vec![],
session_id: None,
warnings: vec![],
node_id: "".to_string(),
session: None,
next_uri: None,
Expand Down
11 changes: 11 additions & 0 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub struct ExecuteStopped {
pub reason: Result<()>,
pub session_state: ExecutorSessionState,
pub query_duration_ms: i64,
pub warnings: Vec<String>,
}

pub struct Executor {
Expand Down Expand Up @@ -158,6 +159,14 @@ impl Executor {
}
}

pub fn get_warnings(&self) -> Vec<String> {
match &self.state {
Starting(_) => vec![],
Running(r) => r.ctx.pop_warnings(),
Stopped(r) => r.warnings.clone(),
}
}

pub fn get_session_state(&self) -> ExecutorSessionState {
match &self.state {
Starting(r) => ExecutorSessionState::new(r.ctx.get_current_session()),
Expand Down Expand Up @@ -212,6 +221,7 @@ impl Executor {
reason,
session_state: ExecutorSessionState::new(s.ctx.get_current_session()),
query_duration_ms: s.ctx.get_query_duration_ms(),
warnings: s.ctx.pop_warnings(),
affect: Default::default(),
}))
}
Expand All @@ -232,6 +242,7 @@ impl Executor {
reason,
session_state: ExecutorSessionState::new(r.ctx.get_current_session()),
query_duration_ms: r.ctx.get_query_duration_ms(),
warnings: r.ctx.pop_warnings(),
affect: r.ctx.get_affect(),
}))
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub struct ResponseState {
pub state: ExecuteStateKind,
pub affect: Option<QueryAffect>,
pub error: Option<ErrorCode>,
pub warnings: Vec<String>,
}

pub struct HttpQueryResponseInternal {
Expand Down Expand Up @@ -363,6 +364,7 @@ impl HttpQuery {
session_state: ExecutorSessionState::new(ctx_clone.get_current_session()),
query_duration_ms: ctx_clone.get_query_duration_ms(),
affect: ctx_clone.get_affect(),
warnings: ctx_clone.pop_warnings(),
};
info!(
"{}: http query change state to Stopped, fail to start {:?}",
Expand Down Expand Up @@ -438,6 +440,7 @@ impl HttpQuery {
progresses: state.get_progress(),
state: exe_state,
error: err,
warnings: state.get_warnings(),
affect: state.get_affect(),
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ impl QueryContext {
self.shared.get_affect()
}

pub fn pop_warnings(&self) -> Vec<String> {
self.shared.pop_warnings()
}

pub fn get_data_metrics(&self) -> StorageMetrics {
self.shared.get_data_metrics()
}
Expand Down Expand Up @@ -498,6 +502,10 @@ impl TableContext for QueryContext {
self.shared.get_error()
}

fn push_warning(&self, warn: String) {
self.shared.push_warning(warn)
}

fn get_current_database(&self) -> String {
self.shared.get_current_database()
}
Expand Down
15 changes: 15 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct QueryContextShared {
/// result_progress for metrics of result datablocks (uncompressed)
pub(in crate::sessions) result_progress: Arc<Progress>,
pub(in crate::sessions) error: Arc<Mutex<Option<ErrorCode>>>,
pub(in crate::sessions) warnings: Arc<Mutex<Vec<String>>>,
pub(in crate::sessions) session: Arc<Session>,
pub(in crate::sessions) runtime: Arc<RwLock<Option<Arc<Runtime>>>>,
pub(in crate::sessions) init_query_id: Arc<RwLock<String>>,
Expand Down Expand Up @@ -122,6 +123,7 @@ impl QueryContextShared {
result_progress: Arc::new(Progress::create()),
write_progress: Arc::new(Progress::create()),
error: Arc::new(Mutex::new(None)),
warnings: Arc::new(Mutex::new(vec![])),
runtime: Arc::new(RwLock::new(None)),
running_query: Arc::new(RwLock::new(None)),
running_query_kind: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -158,6 +160,18 @@ impl QueryContextShared {
(*guard).clone()
}

pub fn push_warning(&self, warn: String) {
let mut guard = self.warnings.lock();
(*guard).push(warn);
}

pub fn pop_warnings(&self) -> Vec<String> {
let mut guard = self.warnings.lock();
let warnings = (*guard).clone();
(*guard).clear();
warnings
}

pub fn set_on_error_map(&self, map: Arc<DashMap<String, HashMap<u16, InputError>>>) {
let mut guard = self.on_error_map.write();
*guard = Some(map);
Expand All @@ -170,6 +184,7 @@ impl QueryContextShared {
pub fn get_on_error_mode(&self) -> Option<OnErrorMode> {
self.on_error_mode.read().clone()
}

pub fn set_on_error_mode(&self, mode: OnErrorMode) {
let mut guard = self.on_error_mode.write();
*guard = Some(mode);
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ impl TableContext for CtxDelegation {
todo!()
}

fn push_warning(&self, _warn: String) {
todo!()
}

fn get_current_database(&self) -> String {
self.ctx.get_current_database()
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,10 @@ impl TableContext for CtxDelegation {
todo!()
}

fn push_warning(&self, _warn: String) {
todo!()
}

fn get_current_database(&self) -> String {
self.ctx.get_current_database()
}
Expand Down
9 changes: 6 additions & 3 deletions src/query/storages/system/src/tables_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableIdent;
use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableMeta;
use log::warn;

use crate::table::AsyncOneBlockSystemTable;
use crate::table::AsyncSystemTable;
Expand Down Expand Up @@ -137,7 +136,7 @@ where TablesTable<T>: HistoryAware
if let Ok(database) = ctl.get_database(tenant.as_str(), db.as_str()).await {
dbs.push(database);
}
// TODO(liyz): return the warnings if get_database() failed.
ctx.push_warning(format!("get database failed: {}", db))
}
}
}
Expand All @@ -164,7 +163,11 @@ where TablesTable<T>: HistoryAware
// - iceberg database
// - others
// TODO(liyz): return the warnings in the HTTP query protocol.
warn!("list tables failed on db: {}: {}", db.name(), err);
ctx.push_warning(format!(
"list tables failed on db {}: {}",
db.name(),
err
));
continue;
}
};
Expand Down
Loading