Skip to content

Commit

Permalink
fix review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Nov 1, 2023
1 parent 1b2d7ea commit 6ebb346
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 134 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 18 additions & 15 deletions src/query/catalog/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,8 @@ pub trait Lock: Sync + Send {

fn get_catalog(&self) -> &str;

fn get_expire_secs(&self) -> u64;

fn get_table_id(&self) -> u64;

fn get_user(&self) -> String;

fn get_node(&self) -> String;

fn get_session_id(&self) -> String;

fn watch_delete_key(&self, revision: u64) -> String;

async fn try_lock(&self, ctx: Arc<dyn TableContext>) -> Result<Option<LockGuard>>;
Expand All @@ -61,13 +53,19 @@ pub trait LockExt: Lock {
Ok(!reply.is_empty())
}

fn gen_create_lock_req(&self) -> CreateLockRevReq {
fn gen_create_lock_req(
&self,
user: String,
node: String,
session_id: String,
expire_secs: u64,
) -> CreateLockRevReq {
CreateLockRevReq {
lock_key: self.gen_lock_key(),
user: self.get_user(),
node: self.get_node(),
session_id: self.get_session_id(),
expire_at: Utc::now().timestamp() as u64 + self.get_expire_secs(),
user,
node,
session_id,
expire_at: Utc::now().timestamp() as u64 + expire_secs,
}
}

Expand All @@ -84,12 +82,17 @@ pub trait LockExt: Lock {
}
}

fn gen_extend_lock_req(&self, revision: u64, acquire_lock: bool) -> ExtendLockRevReq {
fn gen_extend_lock_req(
&self,
revision: u64,
expire_secs: u64,
acquire_lock: bool,
) -> ExtendLockRevReq {
ExtendLockRevReq {
lock_key: self.gen_lock_key(),
revision,
acquire_lock,
expire_at: Utc::now().timestamp() as u64 + self.get_expire_secs(),
expire_at: Utc::now().timestamp() as u64 + expire_secs,
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ impl Interpreter for DeleteInterpreter {
tbl.check_mutable()?;

// Add table lock.
let table_lock =
LockManager::create_table_lock(self.ctx.clone(), tbl.get_table_info().clone())?;
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

let selection = if !self.plan.subquery_desc.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Interpreter for MergeIntoInterpreter {
.await?;

// Add table lock before execution.
let table_lock = LockManager::create_table_lock(self.ctx.clone(), table_info)?;
let table_lock = LockManager::create_table_lock(table_info)?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;
build_res.main_pipeline.add_lock_guard(lock_guard);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl ModifyTableColumnInterpreter {
}

// Add table lock.
let table_lock = LockManager::create_table_lock(self.ctx.clone(), table_info.clone())?;
let table_lock = LockManager::create_table_lock(table_info.clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// 1. construct sql for selecting data from old table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl OptimizeTableInterpreter {
let table_info = table.get_table_info().clone();

// check if the table is locked.
let table_lock = LockManager::create_table_lock(self.ctx.clone(), table_info.clone())?;
let table_lock = LockManager::create_table_lock(table_info.clone())?;
if table_lock.check_lock(catalog.clone()).await? {
return Err(ErrorCode::TableAlreadyLocked(format!(
"table '{}' is locked, please retry compaction later",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Interpreter for ReclusterTableInterpreter {
let table_info = table.get_table_info().clone();

// check if the table is locked.
let table_lock = LockManager::create_table_lock(self.ctx.clone(), table_info.clone())?;
let table_lock = LockManager::create_table_lock(table_info.clone())?;
if table_lock.check_lock(catalog.clone()).await? {
return Err(ErrorCode::TableAlreadyLocked(format!(
"table '{}' is locked, please retry recluster later",
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ impl Interpreter for UpdateInterpreter {
tbl.check_mutable()?;

// Add table lock.
let table_lock =
LockManager::create_table_lock(self.ctx.clone(), tbl.get_table_info().clone())?;
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

let selection = if !self.plan.subquery_desc.is_empty() {
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/common/locks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ async-trait = "0.1.57"
futures = "0.3.24"
futures-util = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
parking_lot = "0.12.1"
rand = "0.8.5"
3 changes: 1 addition & 2 deletions src/query/storages/common/locks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ mod table_lock;

pub use lock_holder::LockHolder;
pub use lock_manager::LockManager;
pub use lock_metrics::record_acquired_lock_nums;
pub use lock_metrics::record_created_lock_nums;
pub use lock_metrics::*;
pub use table_lock::TableLock;
57 changes: 19 additions & 38 deletions src/query/storages/common/locks/src/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use std::time::Duration;

use common_base::base::tokio::sync::Notify;
use common_base::base::tokio::task::JoinHandle;
use common_base::base::tokio::time::sleep;
use common_base::runtime::GlobalIORuntime;
use common_base::runtime::TrySpawn;
Expand All @@ -32,48 +31,38 @@ use futures::future::Either;
use rand::thread_rng;
use rand::Rng;

#[derive(Default)]
pub struct LockHolder {
shutdown_flag: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
shutdown_handler: Option<JoinHandle<Result<()>>>,
shutdown_flag: AtomicBool,
shutdown_notify: Notify,
}

impl LockHolder {
pub fn create() -> LockHolder {
LockHolder {
shutdown_flag: Arc::new(AtomicBool::new(false)),
shutdown_notify: Arc::new(Notify::new()),
shutdown_handler: None,
}
}

#[async_backtrace::framed]
pub async fn start<T: Lock + ?Sized>(
&mut self,
self: &Arc<Self>,
catalog: Arc<dyn Catalog>,
lock: &T,
revision: u64,
expire_secs: u64,
) -> Result<()> {
let expire_secs = lock.get_expire_secs();
let sleep_range = (expire_secs * 1000 / 3)..=((expire_secs * 1000 / 3) * 2);
let sleep_range: std::ops::RangeInclusive<u64> =
(expire_secs * 1000 / 3)..=((expire_secs * 1000 / 3) * 2);
let delete_table_lock_req = lock.gen_delete_lock_req(revision);
let extend_table_lock_req = lock.gen_extend_lock_req(revision, false);

self.shutdown_handler = Some(GlobalIORuntime::instance().spawn({
let shutdown_flag = self.shutdown_flag.clone();
let shutdown_notify = self.shutdown_notify.clone();
let extend_table_lock_req = lock.gen_extend_lock_req(revision, expire_secs, false);

GlobalIORuntime::instance().spawn({
let self_clone = self.clone();
async move {
let mut notified = Box::pin(shutdown_notify.notified());
while !shutdown_flag.load(Ordering::Relaxed) {
let mut notified = Box::pin(self_clone.shutdown_notify.notified());
while !self_clone.shutdown_flag.load(Ordering::SeqCst) {
let mills = {
let mut rng = thread_rng();
rng.gen_range(sleep_range.clone())
};
let sleep = Box::pin(sleep(Duration::from_millis(mills)));
match select(notified, sleep).await {
Either::Left((_, _)) => {
catalog.delete_lock_revision(delete_table_lock_req).await?;
break;
}
Either::Right((_, new_notified)) => {
Expand All @@ -84,24 +73,16 @@ impl LockHolder {
}
}
}
Ok(())
catalog.delete_lock_revision(delete_table_lock_req).await?;
Ok::<_, ErrorCode>(())
}
}));
});

Ok(())
}

#[async_backtrace::framed]
pub async fn shutdown(&mut self) -> Result<()> {
if let Some(shutdown_handler) = self.shutdown_handler.take() {
self.shutdown_flag.store(true, Ordering::Relaxed);
self.shutdown_notify.notify_waiters();
if let Err(shutdown_failure) = shutdown_handler.await {
return Err(ErrorCode::TokioError(format!(
"Cannot shutdown table lock heartbeat, cause {:?}",
shutdown_failure
)));
}
}
Ok(())
pub fn shutdown(&self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
self.shutdown_notify.notify_waiters();
}
}
61 changes: 31 additions & 30 deletions src/query/storages/common/locks/src/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;
use std::time::Duration;

use async_channel::Sender;
use common_base::base::tokio::sync::Mutex;
use common_base::base::tokio::time::timeout;
use common_base::base::GlobalInstance;
use common_base::runtime::GlobalIORuntime;
Expand All @@ -36,13 +35,15 @@ use common_users::UserApiProvider;
use futures_util::StreamExt;
use parking_lot::RwLock;

use crate::metrics_inc_shutdown_lock_holder_nums;
use crate::metrics_inc_start_lock_holder_nums;
use crate::record_acquired_lock_nums;
use crate::record_created_lock_nums;
use crate::table_lock::TableLock;
use crate::LockHolder;

pub struct LockManager {
active_locks: Arc<RwLock<HashMap<u64, Arc<Mutex<LockHolder>>>>>,
active_locks: Arc<RwLock<HashMap<u64, Arc<LockHolder>>>>,
tx: Sender<u64>,
}

Expand All @@ -55,12 +56,9 @@ impl LockManager {
let active_locks = lock_manager.active_locks.clone();
async move {
while let Ok(revision) = rx.recv().await {
let lock = active_locks.write().remove(&revision);
if let Some(lock) = lock {
let mut guard = lock.lock().await;
if let Err(cause) = guard.shutdown().await {
log::warn!("Cannot release table lock, cause {:?}", cause);
}
metrics_inc_shutdown_lock_holder_nums();
if let Some(lock) = active_locks.write().remove(&revision) {
lock.shutdown();
}
}
}
Expand All @@ -73,43 +71,43 @@ impl LockManager {
GlobalInstance::get()
}

pub fn create_table_lock(
ctx: Arc<dyn TableContext>,
table_info: TableInfo,
) -> Result<TableLock> {
pub fn create_table_lock(table_info: TableInfo) -> Result<TableLock> {
let lock_mgr = LockManager::instance();
let user = ctx.get_current_user()?.name;
let node = ctx.get_cluster().local_id.clone();
let session_id = ctx.get_current_session_id();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
Ok(TableLock::create(
lock_mgr,
table_info,
user,
node,
session_id,
expire_secs,
))
Ok(TableLock::create(lock_mgr, table_info))
}

/// The requested lock returns a global incremental revision, listing all existing revisions,
/// and if the current revision is the smallest, the lock is acquired successfully.
/// Otherwise, listen to the deletion event of the previous revision in a loop until get lock success.
///
/// NOTICE: the lock holder is not 100% reliable.
/// E.g., there is a very small probability of a lock renewal failure.
/// There is also a possibility of failure in deleting a lease.
#[async_backtrace::framed]
pub async fn try_lock<T: Lock + ?Sized>(
self: &Arc<Self>,
ctx: Arc<dyn TableContext>,
lock: &T,
) -> Result<Option<LockGuard>> {
let user = ctx.get_current_user()?.name;
let node = ctx.get_cluster().local_id.clone();
let session_id = ctx.get_current_session_id();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;

let catalog = ctx.get_catalog(lock.get_catalog()).await?;

// get a new table lock revision.
let res = catalog
.create_lock_revision(lock.gen_create_lock_req())
.create_lock_revision(lock.gen_create_lock_req(user, node, session_id, expire_secs))
.await?;
let revision = res.revision;
// metrics.
record_created_lock_nums(lock.lock_type(), lock.get_table_id(), 1);

let mut lock_holder = LockHolder::create();
lock_holder.start(catalog.clone(), lock, revision).await?;
let lock_holder = Arc::new(LockHolder::default());
lock_holder
.start(catalog.clone(), lock, revision, expire_secs)
.await?;

self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);
Expand All @@ -131,7 +129,7 @@ impl LockManager {

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req = lock.gen_extend_lock_req(revision, true);
let extend_table_lock_req = lock.gen_extend_lock_req(revision, expire_secs, true);
catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock.lock_type(), lock.get_table_id(), 1);
Expand Down Expand Up @@ -172,10 +170,13 @@ impl LockManager {
Ok(Some(guard))
}

fn insert_lock(&self, revision: u64, lock_holder: LockHolder) {
fn insert_lock(&self, revision: u64, lock_holder: Arc<LockHolder>) {
let mut active_locks = self.active_locks.write();
let prev = active_locks.insert(revision, Arc::new(Mutex::new(lock_holder)));
let prev = active_locks.insert(revision, lock_holder);
assert!(prev.is_none());

// metrics.
metrics_inc_start_lock_holder_nums();
}
}

Expand Down
Loading

0 comments on commit 6ebb346

Please sign in to comment.