Skip to content

Commit

Permalink
chore(query): add drop guard for impl drop trait (#14783)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 authored Feb 28, 2024
1 parent 8c13fee commit 381e731
Show file tree
Hide file tree
Showing 37 changed files with 344 additions and 204 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions src/common/base/src/base/stop_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use log::info;
use tokio::sync::broadcast;

use super::Stoppable;
use crate::runtime::drop_guard;

/// Handle a group of `Stoppable` tasks.
/// When a user press ctrl-c, it calls the `stop()` method on every task to close them.
Expand Down Expand Up @@ -129,14 +130,16 @@ impl<E: Error + Send + 'static> StopHandle<E> {

impl<E: Error + Send + 'static> Drop for StopHandle<E> {
fn drop(&mut self) {
let (tx, _rx) = broadcast::channel::<()>(16);
drop_guard(move || {
let (tx, _rx) = broadcast::channel::<()>(16);

// let every task subscribe the channel, then send a force stop signal `()`
let fut = self.stop_all(Some(tx.clone()));
// let every task subscribe the channel, then send a force stop signal `()`
let fut = self.stop_all(Some(tx.clone()));

if let Ok(fut) = fut {
let _ = tx.send(());
futures::executor::block_on(fut);
}
if let Ok(fut) = fut {
let _ = tx.send(());
futures::executor::block_on(fut);
}
})
}
}
10 changes: 7 additions & 3 deletions src/common/base/src/containers/fixed_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::fmt::Formatter;
use std::fmt::Result;
use std::mem::MaybeUninit;

use crate::runtime::drop_guard;

/// A fixed-size heap structure inspired by https://github.com/rodolphito/fixed_heap
pub struct FixedHeap<T> {
high: usize,
Expand Down Expand Up @@ -401,9 +403,11 @@ unsafe impl<T: Send> Send for FixedHeap<T> {}
impl<T> Drop for FixedHeap<T> {
#[inline(always)]
fn drop(&mut self) {
for i in 0..self.high {
unsafe { self.data.get_unchecked_mut(i).assume_init_drop() };
}
drop_guard(move || {
for i in 0..self.high {
unsafe { self.data.get_unchecked_mut(i).assume_init_drop() };
}
})
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/common/base/src/runtime/catch_unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@ use databend_common_exception::Result;
use futures::future::BoxFuture;
use futures::FutureExt;

pub fn drop_guard<F: FnOnce() -> R, R>(f: F) -> R {
let panicking = std::thread::panicking();
#[expect(clippy::disallowed_methods)]
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Ok(res) => res,
Err(panic) => {
if panicking {
eprintln!("double panic");

let backtrace = std::backtrace::Backtrace::force_capture();
eprintln!("double panic {:?}", backtrace);
log::error!("double panic {:?}", backtrace);
}

std::panic::resume_unwind(panic)
}
}
}

pub fn catch_unwind<F: FnOnce() -> R, R>(f: F) -> Result<R> {
#[expect(clippy::disallowed_methods)]
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Expand Down
1 change: 1 addition & 0 deletions src/common/base/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use backtrace::dump_backtrace;
pub use backtrace::get_all_tasks;
pub use backtrace::AsyncTaskItem;
pub use catch_unwind::catch_unwind;
pub use catch_unwind::drop_guard;
pub use catch_unwind::CatchUnwindFuture;
pub use global_runtime::GlobalIORuntime;
pub use global_runtime::GlobalQueryRuntime;
Expand Down
37 changes: 20 additions & 17 deletions src/common/base/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::sync::Semaphore;
use tokio::task::JoinHandle;

use crate::runtime::catch_unwind::CatchUnwindFuture;
use crate::runtime::drop_guard;
use crate::runtime::memory::MemStat;
use crate::runtime::Thread;
use crate::runtime::ThreadJoinHandle;
Expand Down Expand Up @@ -323,24 +324,26 @@ pub struct Dropper {

impl Drop for Dropper {
fn drop(&mut self) {
// Send a signal to say i am dropping.
if let Some(close_sender) = self.close.take() {
if close_sender.send(()).is_ok() {
match self.join_handler.take().unwrap().join() {
Err(e) => warn!("Runtime dropper panic, {:?}", e),
Ok(true) => {
// When the runtime shutdown is blocked for more than 3 seconds,
// we will print the backtrace in the warn log, which will help us debug.
warn!(
"Runtime dropper is blocked 3 seconds, runtime name: {:?}, drop backtrace: {:?}",
self.name,
Backtrace::capture()
);
}
_ => {}
};
drop_guard(move || {
// Send a signal to say i am dropping.
if let Some(close_sender) = self.close.take() {
if close_sender.send(()).is_ok() {
match self.join_handler.take().unwrap().join() {
Err(e) => warn!("Runtime dropper panic, {:?}", e),
Ok(true) => {
// When the runtime shutdown is blocked for more than 3 seconds,
// we will print the backtrace in the warn log, which will help us debug.
warn!(
"Runtime dropper is blocked 3 seconds, runtime name: {:?}, drop backtrace: {:?}",
self.name,
Backtrace::capture()
);
}
_ => {}
};
}
}
}
})
}
}

Expand Down
18 changes: 11 additions & 7 deletions src/common/hashtable/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::ops::DerefMut;
use std::ptr::null_mut;
use std::ptr::NonNull;

use databend_common_base::runtime::drop_guard;

/// # Safety
///
/// Any foreign type shouldn't implement this trait.
Expand Down Expand Up @@ -220,13 +222,15 @@ unsafe impl<T, const N: usize, A: Allocator + Clone> Container for StackContaine

impl<T, const N: usize, A: Allocator> Drop for StackContainer<T, N, A> {
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe {
self.allocator.deallocate(
NonNull::new(self.ptr).unwrap().cast(),
Layout::array::<T>(self.len).unwrap(),
);
drop_guard(move || {
if !self.ptr.is_null() {
unsafe {
self.allocator.deallocate(
NonNull::new(self.ptr).unwrap().cast(),
Layout::array::<T>(self.len).unwrap(),
);
}
}
}
})
}
}
16 changes: 10 additions & 6 deletions src/common/hashtable/src/table0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::alloc::Allocator;
use std::intrinsics::assume;
use std::mem::MaybeUninit;

use databend_common_base::runtime::drop_guard;

use super::container::Container;
use super::traits::EntryMutRefLike;
use super::traits::EntryRefLike;
Expand Down Expand Up @@ -343,15 +345,17 @@ where
A: Allocator + Clone,
{
fn drop(&mut self) {
if std::mem::needs_drop::<V>() && !self.dropped {
unsafe {
for entry in self.entries.as_mut() {
if !entry.is_zero() {
std::ptr::drop_in_place(entry.get_mut());
drop_guard(move || {
if std::mem::needs_drop::<V>() && !self.dropped {
unsafe {
for entry in self.entries.as_mut() {
if !entry.is_zero() {
std::ptr::drop_in_place(entry.get_mut());
}
}
}
}
}
})
}
}

Expand Down
14 changes: 9 additions & 5 deletions src/common/hashtable/src/table1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::alloc::Allocator;

use databend_common_base::runtime::drop_guard;

use super::table0::Entry;

type Ent<V> = Entry<[u8; 2], V>;
Expand Down Expand Up @@ -92,11 +94,13 @@ impl<V, A: Allocator + Clone> Table1<V, A> {

impl<V, A: Allocator + Clone> Drop for Table1<V, A> {
fn drop(&mut self) {
if std::mem::needs_drop::<V>() {
self.iter_mut().for_each(|e| unsafe {
e.val.assume_init_drop();
});
}
drop_guard(move || {
if std::mem::needs_drop::<V>() {
self.iter_mut().for_each(|e| unsafe {
e.val.assume_init_drop();
});
}
})
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/common/hashtable/src/table_empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::alloc::Allocator;

use databend_common_base::runtime::drop_guard;

use super::table0::Entry;

type Ent<V> = Entry<[u8; 0], V>;
Expand Down Expand Up @@ -95,11 +97,13 @@ impl<V, A: Allocator + Clone> TableEmpty<V, A> {

impl<V, A: Allocator + Clone> Drop for TableEmpty<V, A> {
fn drop(&mut self) {
if std::mem::needs_drop::<V>() && self.has_zero {
unsafe {
self.slice[0].val.assume_init_drop();
drop_guard(move || {
if std::mem::needs_drop::<V>() && self.has_zero {
unsafe {
self.slice[0].val.assume_init_drop();
}
}
}
})
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/common/hashtable/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::mem::MaybeUninit;
use std::ops::Deref;
use std::ops::DerefMut;

use databend_common_base::runtime::drop_guard;

use super::table0::Entry;
use super::traits::Keyable;

Expand Down Expand Up @@ -78,11 +80,13 @@ impl<K, V> DerefMut for ZeroEntry<K, V> {

impl<K, V> Drop for ZeroEntry<K, V> {
fn drop(&mut self) {
if let Some(e) = self.0.as_mut() {
unsafe {
e.val.assume_init_drop();
drop_guard(move || {
if let Some(e) = self.0.as_mut() {
unsafe {
e.val.assume_init_drop();
}
}
}
})
}
}

Expand Down
1 change: 1 addition & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ test = false
[dependencies] # In alphabetical order
# Workspace dependencies
databend-common-arrow = { path = "../../common/arrow" }
databend-common-base = { path = "../../common/base" }
databend-common-datavalues = { path = "../datavalues" }
databend-common-exception = { path = "../../common/exception" }
databend-common-hashtable = { path = "../../common/hashtable" }
Expand Down
31 changes: 17 additions & 14 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::mem::MaybeUninit;
use std::sync::Arc;

use bumpalo::Bump;
use databend_common_base::runtime::drop_guard;

use super::payload_row::rowformat_size;
use super::payload_row::serialize_column_to_rowformat;
Expand Down Expand Up @@ -336,24 +337,26 @@ impl Payload {

impl Drop for Payload {
fn drop(&mut self) {
// drop states
if !self.state_move_out {
for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
if aggr.need_manual_drop_state() {
for page in self.pages.iter() {
for row in 0..page.rows {
unsafe {
let state_place = StateAddr::new(core::ptr::read::<u64>(
self.data_ptr(page, row).add(self.state_offset) as _,
)
as usize);

aggr.drop_state(state_place.next(*addr_offset));
drop_guard(move || {
// drop states
if !self.state_move_out {
for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) {
if aggr.need_manual_drop_state() {
for page in self.pages.iter() {
for row in 0..page.rows {
unsafe {
let state_place = StateAddr::new(core::ptr::read::<u64>(
self.data_ptr(page, row).add(self.state_offset) as _,
)
as usize);

aggr.drop_state(state_place.next(*addr_offset));
}
}
}
}
}
}
}
})
}
}
19 changes: 11 additions & 8 deletions src/query/expression/src/kernels/topk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::mem;
use std::ptr;

use databend_common_arrow::arrow::bitmap::MutableBitmap;
use databend_common_base::runtime::drop_guard;

use crate::types::*;
use crate::with_number_mapped_type;
Expand Down Expand Up @@ -285,14 +286,16 @@ where F: FnMut(&T, &T) -> bool {

impl<T> Drop for InsertionHole<T> {
fn drop(&mut self) {
// SAFETY:
// we ensure src/dest point to a properly initialized value of type T
// src is valid for reads of `count * size_of::()` bytes.
// dest is valid for reads of `count * size_of::()` bytes.
// Both `src` and `dst` are properly aligned.
unsafe {
ptr::copy_nonoverlapping(self.src, self.dest, 1);
}
drop_guard(move || {
// SAFETY:
// we ensure src/dest point to a properly initialized value of type T
// src is valid for reads of `count * size_of::()` bytes.
// dest is valid for reads of `count * size_of::()` bytes.
// Both `src` and `dst` are properly aligned.
unsafe {
ptr::copy_nonoverlapping(self.src, self.dest, 1);
}
})
}
}
}
Loading

0 comments on commit 381e731

Please sign in to comment.