Skip to content

Commit

Permalink
fix(query): fix panic when drop uncompression buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Dec 18, 2023
1 parent b32b39c commit 6472ea8
Showing 1 changed file with 40 additions and 17 deletions.
57 changes: 40 additions & 17 deletions src/query/storages/fuse/src/io/read/block/decompressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,26 @@ impl UncompressedBuffer {
})
}

pub fn clear(&self) {
if self.used.fetch_add(1, Ordering::SeqCst) != 0 {
self.used.fetch_sub(1, Ordering::SeqCst);
pub fn clear(self: &Arc<Self>) {
let guard = self.borrow_mut();

if !guard.is_unique_borrow_mut() {
panic!(
"UncompressedBuffer cannot be accessed between multiple threads at the same time."
);
}

drop(std::mem::take(self.buffer_mut()));
self.used.fetch_sub(1, Ordering::SeqCst);
}

#[allow(clippy::mut_from_ref)]
pub(in crate::io::read::block::decompressor) fn buffer_mut(&self) -> &mut Vec<u8> {
unsafe { &mut *self.buffer.get() }
}

pub(in crate::io::read::block::decompressor) fn borrow_mut(self: &Arc<Self>) -> UsedGuard {
UsedGuard::create(self)
}
}

pub struct BuffedBasicDecompressor<I: Iterator<Item = Result<CompressedPage, Error>>> {
Expand Down Expand Up @@ -90,8 +94,9 @@ where I: Iterator<Item = Result<CompressedPage, Error>>
fn advance(&mut self) -> Result<(), Error> {
if let Some(page) = self.current.as_mut() {
if self.was_decompressed {
if self.uncompressed_buffer.used.fetch_add(1, Ordering::SeqCst) != 0 {
self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst);
let guard = self.uncompressed_buffer.borrow_mut();

if !guard.is_unique_borrow_mut() {
return Err(Error::FeatureNotSupported(String::from(
"UncompressedBuffer cannot be accessed between multiple threads at the same time.",
)));
Expand All @@ -104,16 +109,15 @@ where I: Iterator<Item = Result<CompressedPage, Error>>
*borrow_buffer = std::mem::take(page.buffer_mut());
}
}

self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst);
}
}

self.current = match self.iter.next() {
None => None,
Some(page) => {
if self.uncompressed_buffer.used.fetch_add(1, Ordering::SeqCst) != 0 {
self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst);
let guard = self.uncompressed_buffer.borrow_mut();

if !guard.is_unique_borrow_mut() {
return Err(Error::FeatureNotSupported(String::from(
"UncompressedBuffer cannot be accessed between multiple threads at the same time.",
)));
Expand All @@ -126,8 +130,6 @@ where I: Iterator<Item = Result<CompressedPage, Error>>
decompress(page, self.uncompressed_buffer.buffer_mut())?
};

self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst);

Some(decompress_page)
}
};
Expand All @@ -149,10 +151,9 @@ where I: Iterator<Item = Result<CompressedPage, Error>>
impl<I: Iterator<Item = Result<CompressedPage, Error>>> Drop for BuffedBasicDecompressor<I> {
fn drop(&mut self) {
if let Some(page) = self.current.as_mut() {
if !std::thread::panicking()
&& self.uncompressed_buffer.used.fetch_add(1, Ordering::SeqCst) != 0
{
self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst);
let guard = self.uncompressed_buffer.borrow_mut();

if !std::thread::panicking() && !guard.is_unique_borrow_mut() {
panic!(
"UncompressedBuffer cannot be accessed between multiple threads at the same time."
);
Expand All @@ -165,8 +166,30 @@ impl<I: Iterator<Item = Result<CompressedPage, Error>>> Drop for BuffedBasicDeco
*borrow_buffer = std::mem::take(page.buffer_mut());
}
}
}
}
}

self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst);
struct UsedGuard {
unique_mut: bool,
inner: Arc<UncompressedBuffer>,
}

impl UsedGuard {
pub fn create(inner: &Arc<UncompressedBuffer>) -> UsedGuard {
let used = inner.used.fetch_add(1, Ordering::SeqCst);
UsedGuard {
unique_mut: used == 0,
inner: inner.clone(),
}
}
pub fn is_unique_borrow_mut(&self) -> bool {
self.unique_mut
}
}

impl Drop for UsedGuard {
fn drop(&mut self) {
self.inner.used.fetch_sub(1, Ordering::SeqCst);
}
}

0 comments on commit 6472ea8

Please sign in to comment.