From 6472ea85b1593fa1295480f45bd7ec6752fac62a Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 18 Dec 2023 22:45:10 +0800 Subject: [PATCH] fix(query): fix panic when drop uncompression buffer --- .../fuse/src/io/read/block/decompressor.rs | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/block/decompressor.rs b/src/query/storages/fuse/src/io/read/block/decompressor.rs index 1c12e5ec9bc1..968daaedfc30 100644 --- a/src/query/storages/fuse/src/io/read/block/decompressor.rs +++ b/src/query/storages/fuse/src/io/read/block/decompressor.rs @@ -44,22 +44,26 @@ impl UncompressedBuffer { }) } - pub fn clear(&self) { - if self.used.fetch_add(1, Ordering::SeqCst) != 0 { - self.used.fetch_sub(1, Ordering::SeqCst); + pub fn clear(self: &Arc) { + let guard = self.borrow_mut(); + + if !guard.is_unique_borrow_mut() { panic!( "UncompressedBuffer cannot be accessed between multiple threads at the same time." ); } drop(std::mem::take(self.buffer_mut())); - self.used.fetch_sub(1, Ordering::SeqCst); } #[allow(clippy::mut_from_ref)] pub(in crate::io::read::block::decompressor) fn buffer_mut(&self) -> &mut Vec { unsafe { &mut *self.buffer.get() } } + + pub(in crate::io::read::block::decompressor) fn borrow_mut(self: &Arc) -> UsedGuard { + UsedGuard::create(self) + } } pub struct BuffedBasicDecompressor>> { @@ -90,8 +94,9 @@ where I: Iterator> fn advance(&mut self) -> Result<(), Error> { if let Some(page) = self.current.as_mut() { if self.was_decompressed { - if self.uncompressed_buffer.used.fetch_add(1, Ordering::SeqCst) != 0 { - self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst); + let guard = self.uncompressed_buffer.borrow_mut(); + + if !guard.is_unique_borrow_mut() { return Err(Error::FeatureNotSupported(String::from( "UncompressedBuffer cannot be accessed between multiple threads at the same time.", ))); @@ -104,16 +109,15 @@ where I: Iterator> *borrow_buffer = std::mem::take(page.buffer_mut()); } } - - self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst); } } self.current = match self.iter.next() { None => None, Some(page) => { - if self.uncompressed_buffer.used.fetch_add(1, Ordering::SeqCst) != 0 { - self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst); + let guard = self.uncompressed_buffer.borrow_mut(); + + if !guard.is_unique_borrow_mut() { return Err(Error::FeatureNotSupported(String::from( "UncompressedBuffer cannot be accessed between multiple threads at the same time.", ))); @@ -126,8 +130,6 @@ where I: Iterator> decompress(page, self.uncompressed_buffer.buffer_mut())? }; - self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst); - Some(decompress_page) } }; @@ -149,10 +151,9 @@ where I: Iterator> impl>> Drop for BuffedBasicDecompressor { fn drop(&mut self) { if let Some(page) = self.current.as_mut() { - if !std::thread::panicking() - && self.uncompressed_buffer.used.fetch_add(1, Ordering::SeqCst) != 0 - { - self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst); + let guard = self.uncompressed_buffer.borrow_mut(); + + if !std::thread::panicking() && !guard.is_unique_borrow_mut() { panic!( "UncompressedBuffer cannot be accessed between multiple threads at the same time." ); @@ -165,8 +166,30 @@ impl>> Drop for BuffedBasicDeco *borrow_buffer = std::mem::take(page.buffer_mut()); } } + } + } +} - self.uncompressed_buffer.used.fetch_sub(1, Ordering::SeqCst); +struct UsedGuard { + unique_mut: bool, + inner: Arc, +} + +impl UsedGuard { + pub fn create(inner: &Arc) -> UsedGuard { + let used = inner.used.fetch_add(1, Ordering::SeqCst); + UsedGuard { + unique_mut: used == 0, + inner: inner.clone(), } } + pub fn is_unique_borrow_mut(&self) -> bool { + self.unique_mut + } +} + +impl Drop for UsedGuard { + fn drop(&mut self) { + self.inner.used.fetch_sub(1, Ordering::SeqCst); + } }