Skip to content

Commit

Permalink
add retry for lock holder
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Nov 2, 2023
1 parent 6ebb346 commit f08af2d
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 48 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/query/storages/common/locks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ common-pipeline-core = { path = "../../../pipeline/core" }
common-users = { path = "../../../users" }

async-backtrace = { workspace = true }
async-channel = "1.7.1"
async-trait = "0.1.57"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
futures = "0.3.24"
futures-util = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
parking_lot = "0.12.1"
rand = "0.8.5"
2 changes: 2 additions & 0 deletions src/query/storages/common/locks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ mod lock_holder;
mod lock_manager;
mod lock_metrics;
mod table_lock;
mod utils;

pub use lock_holder::LockHolder;
pub use lock_manager::LockManager;
pub use lock_metrics::*;
pub use table_lock::TableLock;
pub use utils::set_backoff;
120 changes: 114 additions & 6 deletions src/query/storages/common/locks/src/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use backoff::backoff::Backoff;
use common_base::base::tokio::sync::Notify;
use common_base::base::tokio::time::sleep;
use common_base::runtime::GlobalIORuntime;
Expand All @@ -26,11 +28,15 @@ use common_catalog::lock::Lock;
use common_catalog::lock::LockExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::DeleteLockRevReq;
use common_meta_app::schema::ExtendLockRevReq;
use futures::future::select;
use futures::future::Either;
use rand::thread_rng;
use rand::Rng;

use crate::set_backoff;

#[derive(Default)]
pub struct LockHolder {
shutdown_flag: AtomicBool,
Expand Down Expand Up @@ -60,21 +66,31 @@ impl LockHolder {
let mut rng = thread_rng();
rng.gen_range(sleep_range.clone())
};
let sleep = Box::pin(sleep(Duration::from_millis(mills)));
match select(notified, sleep).await {
let sleep_range = Box::pin(sleep(Duration::from_millis(mills)));
match select(notified, sleep_range).await {
Either::Left((_, _)) => {
// shutdown.
break;
}
Either::Right((_, new_notified)) => {
notified = new_notified;
catalog
.extend_lock_revision(extend_table_lock_req.clone())
self_clone
.try_extend_lock(
catalog.clone(),
extend_table_lock_req.clone(),
Some(Duration::from_millis(expire_secs * 1000 - mills)),
)
.await?;
}
}
}
catalog.delete_lock_revision(delete_table_lock_req).await?;
Ok::<_, ErrorCode>(())

Self::try_delete_lock(
catalog,
delete_table_lock_req,
Some(Duration::from_millis(expire_secs * 1000)),
)
.await
}
});

Expand All @@ -86,3 +102,95 @@ impl LockHolder {
self.shutdown_notify.notify_waiters();
}
}

impl LockHolder {
async fn try_extend_lock(
self: &Arc<Self>,
catalog: Arc<dyn Catalog>,
req: ExtendLockRevReq,
max_retry_elapsed: Option<Duration>,
) -> Result<()> {
let mut backoff = set_backoff(Some(Duration::from_millis(2)), None, max_retry_elapsed);
let mut extend_notified = Box::pin(self.shutdown_notify.notified());
while !self.shutdown_flag.load(Ordering::SeqCst) {
match catalog.extend_lock_revision(req.clone()).await {
Ok(_) => {
break;
}
Err(e) if e.code() == ErrorCode::TABLE_LOCK_EXPIRED => {
log::error!("failed to extend the lock. cause {:?}", e);
return Err(e);
}
Err(e) => match backoff.next_backoff() {
Some(duration) => {
log::debug!(
"failed to extend the lock, tx will be retried {} ms later. table id {}, revision {}",
duration.as_millis(),
req.lock_key.get_table_id(),
req.revision,
);
let sleep_gap = Box::pin(sleep(duration));
match select(extend_notified, sleep_gap).await {
Either::Left((_, _)) => {
// shutdown.
break;
}
Either::Right((_, new_notified)) => {
extend_notified = new_notified;
}
}
}
None => {
let error_info = format!(
"failed to extend the lock after retries {} ms, aborted. cause {:?}",
Instant::now()
.duration_since(backoff.start_time)
.as_millis(),
e,
);
log::error!("{}", error_info);
return Err(ErrorCode::OCCRetryFailure(error_info));
}
},
}
}

Ok(())
}

async fn try_delete_lock(
catalog: Arc<dyn Catalog>,
req: DeleteLockRevReq,
max_retry_elapsed: Option<Duration>,
) -> Result<()> {
let mut backoff = set_backoff(Some(Duration::from_millis(2)), None, max_retry_elapsed);
loop {
match catalog.delete_lock_revision(req.clone()).await {
Ok(_) => break,
Err(e) => match backoff.next_backoff() {
Some(duration) => {
log::debug!(
"failed to delete the lock, tx will be retried {} ms later. table id {}, revision {}",
duration.as_millis(),
req.lock_key.get_table_id(),
req.revision,
);
sleep(duration).await;
}
None => {
let error_info = format!(
"failed to delete the lock after retries {} ms, aborted. cause {:?}",
Instant::now()
.duration_since(backoff.start_time)
.as_millis(),
e,
);
log::error!("{}", error_info);
return Err(ErrorCode::OCCRetryFailure(error_info));
}
},
}
}
Ok(())
}
}
14 changes: 7 additions & 7 deletions src/query/storages/common/locks/src/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use async_channel::Sender;
use common_base::base::tokio::sync::mpsc;
use common_base::base::tokio::time::timeout;
use common_base::base::GlobalInstance;
use common_base::runtime::GlobalIORuntime;
Expand Down Expand Up @@ -44,18 +44,18 @@ use crate::LockHolder;

pub struct LockManager {
active_locks: Arc<RwLock<HashMap<u64, Arc<LockHolder>>>>,
tx: Sender<u64>,
tx: mpsc::UnboundedSender<u64>,
}

impl LockManager {
pub fn init() -> Result<()> {
let (tx, rx) = async_channel::unbounded();
let (tx, mut rx) = mpsc::unbounded_channel();
let active_locks = Arc::new(RwLock::new(HashMap::new()));
let lock_manager = Self { active_locks, tx };
GlobalIORuntime::instance().spawn({
let active_locks = lock_manager.active_locks.clone();
async move {
while let Ok(revision) = rx.recv().await {
while let Some(revision) = rx.recv().await {
metrics_inc_shutdown_lock_holder_nums();
if let Some(lock) = active_locks.write().remove(&revision) {
lock.shutdown();
Expand All @@ -81,8 +81,8 @@ impl LockManager {
/// Otherwise, listen to the deletion event of the previous revision in a loop until get lock success.
///
/// NOTICE: the lock holder is not 100% reliable.
/// E.g., there is a very small probability of a lock renewal failure.
/// There is also a possibility of failure in deleting a lease.
/// E.g., there is a very small probability of a lock extend failure.
/// There is also a possibility of failure in deleting a lock.
#[async_backtrace::framed]
pub async fn try_lock<T: Lock + ?Sized>(
self: &Arc<Self>,
Expand Down Expand Up @@ -182,6 +182,6 @@ impl LockManager {

impl UnlockApi for LockManager {
fn unlock(&self, revision: u64) {
let _ = self.tx.send_blocking(revision);
let _ = self.tx.send(revision);
}
}
51 changes: 51 additions & 0 deletions src/query/storages/common/locks/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use backoff::ExponentialBackoff;
use backoff::ExponentialBackoffBuilder;

const OCC_DEFAULT_BACKOFF_INIT_DELAY_MS: Duration = Duration::from_millis(5);
const OCC_DEFAULT_BACKOFF_MAX_DELAY_MS: Duration = Duration::from_millis(20 * 1000);
const OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS: Duration = Duration::from_millis(120 * 1000);

#[inline]
pub fn set_backoff(
init_retry_delay: Option<Duration>,
max_retry_delay: Option<Duration>,
max_retry_elapsed: Option<Duration>,
) -> ExponentialBackoff {
// The initial retry delay in millisecond. By default, it is 5 ms.
let init_delay = init_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_INIT_DELAY_MS);

// The maximum back off delay in millisecond, once the retry interval reaches this value, it stops increasing.
// By default, it is 20 seconds.
let max_delay = max_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_DELAY_MS);

// The maximum elapsed time after the occ starts, beyond which there will be no more retries.
// By default, it is 2 minutes
let max_elapsed = max_retry_elapsed.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS);

// TODO(xuanwo): move to backon instead.
//
// To simplify the settings, using fixed common values for randomization_factor and multiplier
ExponentialBackoffBuilder::new()
.with_initial_interval(init_delay)
.with_max_interval(max_delay)
.with_randomization_factor(0.5)
.with_multiplier(2.0)
.with_max_elapsed_time(Some(max_elapsed))
.build()
}
35 changes: 3 additions & 32 deletions src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::sync::Arc;
use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use backoff::ExponentialBackoffBuilder;
use chrono::Utc;
use common_catalog::table::Table;
use common_catalog::table::TableExt;
Expand All @@ -41,6 +39,7 @@ use log::warn;
use opendal::Operator;
use storages_common_cache::CacheAccessor;
use storages_common_cache_manager::CachedObject;
use storages_common_locks::set_backoff;
use storages_common_table_meta::meta::Location;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::SnapshotId;
Expand All @@ -67,10 +66,6 @@ use crate::operations::common::TransformSerializeSegment;
use crate::statistics::merge_statistics;
use crate::FuseTable;

const OCC_DEFAULT_BACKOFF_INIT_DELAY_MS: Duration = Duration::from_millis(5);
const OCC_DEFAULT_BACKOFF_MAX_DELAY_MS: Duration = Duration::from_millis(20 * 1000);
const OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS: Duration = Duration::from_millis(120 * 1000);

impl FuseTable {
#[async_backtrace::framed]
pub fn do_commit(
Expand Down Expand Up @@ -289,7 +284,7 @@ impl FuseTable {
max_retry_elapsed: Option<Duration>,
) -> Result<()> {
let mut retries = 0;
let mut backoff = Self::set_backoff(max_retry_elapsed);
let mut backoff = set_backoff(None, None, max_retry_elapsed);

let mut latest_snapshot = base_snapshot.clone();
let mut latest_table_info = &self.table_info;
Expand Down Expand Up @@ -344,6 +339,7 @@ impl FuseTable {
self.table_info.ident
);

common_base::base::tokio::time::sleep(d).await;
latest_table_ref = self.refresh(ctx.as_ref()).await?;
let latest_fuse_table =
FuseTable::try_from_table(latest_table_ref.as_ref())?;
Expand Down Expand Up @@ -465,31 +461,6 @@ impl FuseTable {
e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
}

#[inline]
pub fn set_backoff(max_retry_elapsed: Option<Duration>) -> ExponentialBackoff {
// The initial retry delay in millisecond. By default, it is 5 ms.
let init_delay = OCC_DEFAULT_BACKOFF_INIT_DELAY_MS;

// The maximum back off delay in millisecond, once the retry interval reaches this value, it stops increasing.
// By default, it is 20 seconds.
let max_delay = OCC_DEFAULT_BACKOFF_MAX_DELAY_MS;

// The maximum elapsed time after the occ starts, beyond which there will be no more retries.
// By default, it is 2 minutes
let max_elapsed = max_retry_elapsed.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS);

// TODO(xuanwo): move to backon instead.
//
// To simplify the settings, using fixed common values for randomization_factor and multiplier
ExponentialBackoffBuilder::new()
.with_initial_interval(init_delay)
.with_max_interval(max_delay)
.with_randomization_factor(0.5)
.with_multiplier(2.0)
.with_max_elapsed_time(Some(max_elapsed))
.build()
}

// check if there are any fuse table legacy options
pub fn remove_legacy_options(table_options: &mut BTreeMap<String, String>) {
table_options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use log::error;
use log::info;
use log::warn;
use opendal::Operator;
use storages_common_locks::set_backoff;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::ClusterKey;
use storages_common_table_meta::meta::SegmentInfo;
Expand Down Expand Up @@ -164,7 +165,7 @@ where F: SnapshotGenerator + Send + 'static

self.abort_operation = meta.abort_operation;

self.backoff = FuseTable::set_backoff(self.max_retry_elapsed);
self.backoff = set_backoff(None, None, self.max_retry_elapsed);

self.snapshot_gen
.set_conflict_resolve_context(meta.conflict_resolve_context);
Expand Down

0 comments on commit f08af2d

Please sign in to comment.