Skip to content

Commit

Permalink
feat: add kill query when extend lock failure (#13980)
Browse files Browse the repository at this point in the history
* move lock to service

* force kill query if lock failure

* refresh table after lock table
  • Loading branch information
zhyass authored Dec 12, 2023
1 parent 44ed9fa commit 680a74c
Show file tree
Hide file tree
Showing 34 changed files with 192 additions and 204 deletions.
26 changes: 1 addition & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ members = [
"src/query/storages/common/cache",
"src/query/storages/common/cache_manager",
"src/query/storages/common/index",
"src/query/storages/common/locks",
"src/query/storages/common/pruner",
"src/query/storages/common/table_meta",
"src/query/storages/delta",
Expand Down
1 change: 1 addition & 0 deletions src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub use crate::metrics::cluster;
/// Metrics.
pub use crate::metrics::http;
pub use crate::metrics::interpreter;
pub use crate::metrics::lock;
pub use crate::metrics::mysql;
pub use crate::metrics::openai;
pub use crate::metrics::session;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::sync::LazyLock;

use common_meta_app::schema::LockType;
use common_metrics::register_counter;
use common_metrics::register_counter_family;
use common_metrics::Counter;
use common_metrics::Family;
use common_metrics::VecLabels;
use crate::register_counter;
use crate::register_counter_family;
use crate::Counter;
use crate::Family;
use crate::VecLabels;

const METRIC_CREATED_LOCK_NUMS: &str = "created_lock_nums";
const METRIC_ACQUIRED_LOCK_NUMS: &str = "acquired_lock_nums";
Expand All @@ -38,17 +37,17 @@ static SHUTDOWN_LOCK_HOLDER_NUMS: LazyLock<Counter> =
const LABEL_TYPE: &str = "type";
const LABEL_TABLE_ID: &str = "table_id";

pub fn record_created_lock_nums(lock_type: LockType, table_id: u64, num: u64) {
pub fn record_created_lock_nums(lock_type: String, table_id: u64, num: u64) {
let labels = &vec![
(LABEL_TYPE, lock_type.to_string()),
(LABEL_TYPE, lock_type),
(LABEL_TABLE_ID, table_id.to_string()),
];
CREATED_LOCK_NUMS.get_or_create(labels).inc_by(num);
}

pub fn record_acquired_lock_nums(lock_type: LockType, table_id: u64, num: u64) {
pub fn record_acquired_lock_nums(lock_type: String, table_id: u64, num: u64) {
let labels = &vec![
(LABEL_TYPE, lock_type.to_string()),
(LABEL_TYPE, lock_type),
(LABEL_TABLE_ID, table_id.to_string()),
];
ACQUIRED_LOCK_NUMS.get_or_create(labels).inc_by(num);
Expand Down
1 change: 1 addition & 0 deletions src/common/metrics/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod cache;
pub mod cluster;
pub mod http;
pub mod interpreter;
pub mod lock;
pub mod mysql;
pub mod openai;
pub mod session;
Expand Down
4 changes: 3 additions & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use common_storage::StorageMetrics;
use storages_common_table_meta::meta::SnapshotId;
use storages_common_table_meta::meta::TableSnapshot;

use crate::lock::Lock;
use crate::plan::DataSourceInfo;
use crate::plan::DataSourcePlan;
use crate::plan::PartStatistics;
Expand Down Expand Up @@ -304,9 +305,10 @@ pub trait Table: Sync + Send {
async fn compact_segments(
&self,
ctx: Arc<dyn TableContext>,
lock: Arc<dyn Lock>,
limit: Option<usize>,
) -> Result<()> {
let (_, _) = (ctx, limit);
let (_, _, _) = (ctx, lock, limit);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support compact segments",
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ storages-common-blocks = { path = "../storages/common/blocks" }
storages-common-cache = { path = "../storages/common/cache" }
storages-common-cache-manager = { path = "../storages/common/cache_manager" }
storages-common-index = { path = "../storages/common/index" }
storages-common-locks = { path = "../storages/common/locks" }
storages-common-table-meta = { path = "../storages/common/table_meta" }
stream-handler = { path = "../ee_features/stream_handler" }
vacuum-handler = { path = "../ee_features/vacuum_handler" }
Expand All @@ -107,6 +106,7 @@ async-backtrace = { workspace = true }
async-channel = "1.7.1"
async-stream = "0.3.3"
async-trait = { workspace = true }
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
base64 = "0.21.0"
bumpalo = { workspace = true }
byte-unit = "4.0.19"
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use common_tracing::GlobalLogger;
use common_users::RoleCacheManager;
use common_users::UserApiProvider;
use storages_common_cache_manager::CacheManager;
use storages_common_locks::LockManager;

use crate::api::DataExchangeManager;
use crate::auth::AuthMgr;
use crate::catalogs::DatabaseCatalog;
use crate::clusters::ClusterDiscovery;
use crate::locks::LockManager;
use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::SessionManager;

Expand Down
16 changes: 8 additions & 8 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;

use common_catalog::lock::Lock;
use common_catalog::plan::Filters;
use common_catalog::plan::Partitions;
use common_catalog::table::TableExt;
Expand Down Expand Up @@ -58,12 +57,12 @@ use common_storages_factory::Table;
use common_storages_fuse::FuseTable;
use futures_util::TryStreamExt;
use log::debug;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::common::create_push_down_filters;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -100,26 +99,27 @@ impl Interpreter for DeleteInterpreter {
debug!("ctx.id" = self.ctx.get_id().as_str(); "delete_interpreter_execute");

let is_distributed = !self.ctx.get_cluster().is_empty();
let catalog_name = self.plan.catalog_name.as_str();

let catalog_name = self.plan.catalog_name.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();

let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();

// refresh table.
let tbl = catalog
.get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name)
.await?;

// check mutability
tbl.check_mutable()?;

// Add table lock.
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;

// check mutability
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_row_id_column();
if !support_row_id {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use common_catalog::catalog::Catalog;
use common_catalog::lock::Lock;
use common_catalog::table::Table;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -49,11 +48,11 @@ use common_storages_view::view_table::VIEW_ENGINE;
use common_users::UserApiProvider;
use data_mask_feature::get_datamask_handler;
use storages_common_index::BloomIndex;
use storages_common_locks::LockManager;
use storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS;

use super::common::check_referenced_computed_columns;
use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -190,6 +189,12 @@ impl ModifyTableColumnInterpreter {
table: &Arc<dyn Table>,
field_and_comments: &[(TableField, String)],
) -> Result<PipelineBuildResult> {
// Add table lock.
let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;
// refresh table.
let table = table.refresh(self.ctx.as_ref()).await?;

let schema = table.schema().as_ref().clone();
let table_info = table.get_table_info();
let mut new_schema = schema.clone();
Expand Down Expand Up @@ -271,10 +276,6 @@ impl ModifyTableColumnInterpreter {
return Ok(PipelineBuildResult::create());
}

// Add table lock.
let table_lock = LockManager::create_table_lock(table_info.clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// 1. construct sql for selecting data from old table
let mut sql = "select".to_string();
schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ use common_sql::plans::OptimizeTableAction;
use common_sql::plans::OptimizeTablePlan;
use common_storages_factory::NavigationPoint;
use common_storages_fuse::FuseTable;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -162,7 +162,7 @@ impl OptimizeTableInterpreter {

if matches!(target, CompactTarget::Segments) {
table
.compact_segments(self.ctx.clone(), self.plan.limit)
.compact_segments(self.ctx.clone(), table_lock, self.plan.limit)
.await?;
return Ok(PipelineBuildResult::create());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ use common_storages_fuse::FuseTable;
use log::error;
use log::info;
use log::warn;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::Statistics;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down
18 changes: 10 additions & 8 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;

use common_catalog::lock::Lock;
use common_catalog::plan::Filters;
use common_catalog::plan::Partitions;
use common_catalog::table::TableExt;
Expand All @@ -42,7 +41,6 @@ use common_sql::Visibility;
use common_storages_factory::Table;
use common_storages_fuse::FuseTable;
use log::debug;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::common::check_deduplicate_label;
Expand All @@ -52,6 +50,7 @@ use crate::interpreters::common::RefreshAggIndexDesc;
use crate::interpreters::interpreter_delete::replace_subquery;
use crate::interpreters::interpreter_delete::subquery_filter;
use crate::interpreters::Interpreter;
use crate::locks::LockManager;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -88,22 +87,25 @@ impl Interpreter for UpdateInterpreter {
}

let catalog_name = self.plan.catalog.as_str();
let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();
// refresh table.

let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let tbl = catalog
.get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name)
.await?;

// check mutability
tbl.check_mutable()?;

// Add table lock.
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;

// check mutability
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_row_id_column();
if !support_row_id {
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod clusters;
pub mod databases;
pub mod interpreters;
pub mod local;
pub mod locks;
pub mod metrics;
pub mod pipelines;
pub mod schedulers;
Expand Down
Loading

0 comments on commit 680a74c

Please sign in to comment.