diff --git a/.typos.toml b/.typos.toml index bea89668ba41..210cdbeee146 100644 --- a/.typos.toml +++ b/.typos.toml @@ -11,6 +11,7 @@ "INOUT" = "INOUT" "ser" = "ser" "Ser" = "Ser" +"flate" = "flate" [files] extend-exclude = [ diff --git a/Cargo.lock b/Cargo.lock index 3fa2f49f21eb..de1a66dce545 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,21 +672,21 @@ dependencies = [ ] [[package]] -name = "async-compression-issue-150-workaround" -version = "0.3.15-issue-150" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a1c3bfa97a4c664de1effcffe5e3345eb6df10dd4f8ff7f3c12167cc3031690" +name = "async-compression" +version = "0.4.3" +source = "git+https://github.com/youngsofun/async-compression?rev=1568ceafd#1568ceafdcb590a0c28226ee9e4cebf39e26e46c" dependencies = [ "brotli", "bzip2", + "deflate64", "flate2", "futures-core", "futures-io", "memchr", "pin-project-lite", "xz2", - "zstd 0.11.2+zstd.1.5.2", - "zstd-safe 5.0.2+zstd.1.5.2", + "zstd 0.12.3+zstd.1.5.2", + "zstd-safe 6.0.4+zstd.1.5.4", ] [[package]] @@ -1859,15 +1859,16 @@ dependencies = [ name = "common-compress" version = "0.1.0" dependencies = [ - "async-compression-issue-150-workaround", + "async-compression 0.4.3", + "brotli", "bytes", + "common-exception", "env_logger", "futures", "log", "pin-project", "rand 0.8.5", "serde", - "sha2", "tokio", ] @@ -4036,6 +4037,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "647605a6345d5e89c3950a36a638c56478af9b414c55c6f2477c73b115f9acde" +[[package]] +name = "deflate64" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30dc5bb425a582de72bb57130320aac133893ea85f6151f79bd9aa9067114f60" + [[package]] name = "der" version = "0.6.1" @@ -8937,7 +8944,7 @@ version = "1.3.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0d92c532a37a9e98c0e9a0411e6852b8acccf9ec07d5e6e450b01cbf947d90b" dependencies = [ - "async-compression", + "async-compression 0.4.1", "async-trait", "bytes", "futures-util", diff --git a/src/common/compress/Cargo.toml b/src/common/compress/Cargo.toml index 2295a6817d04..ee2249fff5d7 100644 --- a/src/common/compress/Cargo.toml +++ b/src/common/compress/Cargo.toml @@ -8,11 +8,13 @@ edition = { workspace = true } [dependencies] # Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved. -async-compression = { package = "async-compression-issue-150-workaround", version = "0.3.15-issue-150", features = [ +async-compression = { git = "https://github.com/youngsofun/async-compression", rev = "1568ceafd", features = [ "futures-io", "all-algorithms", ] } +brotli = { version = "3.3.0", features = ["std"] } bytes = { workspace = true } +common-exception = { path = "../exception" } futures = "0.3" log = { workspace = true } pin-project = "1" @@ -21,5 +23,4 @@ serde = { workspace = true } [dev-dependencies] env_logger = "0.10" rand = "0.8" -sha2 = "0.10" tokio = { version = "1", features = ["rt", "macros"] } diff --git a/src/common/compress/src/compress_algorithms.rs b/src/common/compress/src/compress_algorithms.rs new file mode 100644 index 000000000000..8175e7de4c00 --- /dev/null +++ b/src/common/compress/src/compress_algorithms.rs @@ -0,0 +1,90 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::PathBuf; + +use serde::Deserialize; +use serde::Serialize; + +/// CompressAlgorithm represents all compress algorithm that OpenDAL supports. +#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)] +pub enum CompressAlgorithm { + /// [Brotli](https://github.com/google/brotli) compression format. + Brotli, + /// [bzip2](http://sourceware.org/bzip2/) compression format. + Bz2, + /// [Deflate](https://datatracker.ietf.org/doc/html/rfc1951) Compressed Data Format. + /// + /// Similar to [`CompressAlgorithm::Gzip`] and [`CompressAlgorithm::Zlib`] + Deflate, + /// [Gzip](https://datatracker.ietf.org/doc/html/rfc1952) compress format. + /// + /// Similar to [`CompressAlgorithm::Deflate`] and [`CompressAlgorithm::Zlib`] + Gzip, + /// [LZMA](https://www.7-zip.org/sdk.html) compress format. + Lzma, + /// [Xz](https://tukaani.org/xz/) compress format, the successor of [`CompressAlgorithm::Lzma`]. + Xz, + /// [Zlib](https://datatracker.ietf.org/doc/html/rfc1950) compress format. + /// + /// Similar to [`CompressAlgorithm::Deflate`] and [`CompressAlgorithm::Gzip`] + Zlib, + /// [Zstd](https://github.com/facebook/zstd) compression algorithm + Zstd, +} + +impl CompressAlgorithm { + /// Get the file extension of this compress algorithm. + pub fn extension(&self) -> &str { + match self { + CompressAlgorithm::Brotli => "br", + CompressAlgorithm::Bz2 => "bz2", + CompressAlgorithm::Deflate => "deflate", + CompressAlgorithm::Gzip => "gz", + CompressAlgorithm::Lzma => "lzma", + CompressAlgorithm::Xz => "xz", + CompressAlgorithm::Zlib => "zl", + CompressAlgorithm::Zstd => "zstd", + } + } + + /// Create CompressAlgorithm from file extension. + /// + /// If the file extension is not supported, `None` will be return instead. + pub fn from_extension(ext: &str) -> Option { + match ext { + "br" => Some(CompressAlgorithm::Brotli), + "bz2" => Some(CompressAlgorithm::Bz2), + "deflate" => Some(CompressAlgorithm::Deflate), + "gz" => Some(CompressAlgorithm::Gzip), + "lzma" => Some(CompressAlgorithm::Lzma), + "xz" => Some(CompressAlgorithm::Xz), + "zl" => Some(CompressAlgorithm::Zlib), + "zstd" | "zst" => Some(CompressAlgorithm::Zstd), + _ => None, + } + } + + /// Create CompressAlgorithm from file path. + /// + /// If the extension in file path is not supported, `None` will be return instead. + pub fn from_path(path: &str) -> Option { + let ext = PathBuf::from(path) + .extension() + .map(|s| s.to_string_lossy())? + .to_string(); + + CompressAlgorithm::from_extension(&ext) + } +} diff --git a/src/common/compress/src/compress.rs b/src/common/compress/src/decode.rs similarity index 79% rename from src/common/compress/src/compress.rs rename to src/common/compress/src/decode.rs index 41b3281c6aea..e099a9d5da1a 100644 --- a/src/common/compress/src/compress.rs +++ b/src/common/compress/src/decode.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::io::Result; -use std::path::PathBuf; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -30,101 +29,15 @@ use async_compression::codec::ZstdDecoder; use async_compression::util::PartialBuffer; use bytes::Buf; use bytes::BytesMut; +use common_exception::ErrorCode; use futures::io::BufReader; use futures::ready; use futures::AsyncBufRead; use futures::AsyncRead; use log::trace; use pin_project::pin_project; -use serde::Deserialize; -use serde::Serialize; - -/// CompressAlgorithm represents all compress algorithm that OpenDAL supports. -#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)] -pub enum CompressAlgorithm { - /// [Brotli](https://github.com/google/brotli) compression format. - Brotli, - /// [bzip2](http://sourceware.org/bzip2/) compression format. - Bz2, - /// [Deflate](https://datatracker.ietf.org/doc/html/rfc1951) Compressed Data Format. - /// - /// Similar to [`CompressAlgorithm::Gzip`] and [`CompressAlgorithm::Zlib`] - Deflate, - /// [Gzip](https://datatracker.ietf.org/doc/html/rfc1952) compress format. - /// - /// Similar to [`CompressAlgorithm::Deflate`] and [`CompressAlgorithm::Zlib`] - Gzip, - /// [LZMA](https://www.7-zip.org/sdk.html) compress format. - Lzma, - /// [Xz](https://tukaani.org/xz/) compress format, the successor of [`CompressAlgorithm::Lzma`]. - Xz, - /// [Zlib](https://datatracker.ietf.org/doc/html/rfc1950) compress format. - /// - /// Similar to [`CompressAlgorithm::Deflate`] and [`CompressAlgorithm::Gzip`] - Zlib, - /// [Zstd](https://github.com/facebook/zstd) compression algorithm - Zstd, -} - -impl CompressAlgorithm { - /// Get the file extension of this compress algorithm. - pub fn extension(&self) -> &str { - match self { - CompressAlgorithm::Brotli => "br", - CompressAlgorithm::Bz2 => "bz2", - CompressAlgorithm::Deflate => "deflate", - CompressAlgorithm::Gzip => "gz", - CompressAlgorithm::Lzma => "lzma", - CompressAlgorithm::Xz => "xz", - CompressAlgorithm::Zlib => "zl", - CompressAlgorithm::Zstd => "zstd", - } - } - - /// Create CompressAlgorithm from file extension. - /// - /// If the file extension is not supported, `None` will be return instead. - pub fn from_extension(ext: &str) -> Option { - match ext { - "br" => Some(CompressAlgorithm::Brotli), - "bz2" => Some(CompressAlgorithm::Bz2), - "deflate" => Some(CompressAlgorithm::Deflate), - "gz" => Some(CompressAlgorithm::Gzip), - "lzma" => Some(CompressAlgorithm::Lzma), - "xz" => Some(CompressAlgorithm::Xz), - "zl" => Some(CompressAlgorithm::Zlib), - "zstd" | "zst" => Some(CompressAlgorithm::Zstd), - _ => None, - } - } - /// Create CompressAlgorithm from file path. - /// - /// If the extension in file path is not supported, `None` will be return instead. - pub fn from_path(path: &str) -> Option { - let ext = PathBuf::from(path) - .extension() - .map(|s| s.to_string_lossy())? - .to_string(); - - CompressAlgorithm::from_extension(&ext) - } -} - -impl From for DecompressCodec { - fn from(v: CompressAlgorithm) -> Self { - match v { - CompressAlgorithm::Brotli => DecompressCodec::Brotli(Box::new(BrotliDecoder::new())), - CompressAlgorithm::Bz2 => DecompressCodec::Bz2(BzDecoder::new()), - CompressAlgorithm::Deflate => DecompressCodec::Deflate(DeflateDecoder::new()), - CompressAlgorithm::Gzip => DecompressCodec::Gzip(GzipDecoder::new()), - CompressAlgorithm::Lzma => DecompressCodec::Lzma(LzmaDecoder::new()), - CompressAlgorithm::Xz => DecompressCodec::Xz(XzDecoder::new()), - CompressAlgorithm::Zlib => DecompressCodec::Zlib(ZlibDecoder::new()), - CompressAlgorithm::Zstd => DecompressCodec::Zstd(ZstdDecoder::new()), - } - } -} +use crate::CompressAlgorithm; #[derive(Debug)] pub enum DecompressCodec { @@ -149,6 +62,21 @@ pub enum DecompressCodec { Zstd(ZstdDecoder), } +impl From for DecompressCodec { + fn from(v: CompressAlgorithm) -> Self { + match v { + CompressAlgorithm::Brotli => DecompressCodec::Brotli(Box::new(BrotliDecoder::new())), + CompressAlgorithm::Bz2 => DecompressCodec::Bz2(BzDecoder::new()), + CompressAlgorithm::Deflate => DecompressCodec::Deflate(DeflateDecoder::new()), + CompressAlgorithm::Gzip => DecompressCodec::Gzip(GzipDecoder::new()), + CompressAlgorithm::Lzma => DecompressCodec::Lzma(LzmaDecoder::new()), + CompressAlgorithm::Xz => DecompressCodec::Xz(XzDecoder::new()), + CompressAlgorithm::Zlib => DecompressCodec::Zlib(ZlibDecoder::new()), + CompressAlgorithm::Zstd => DecompressCodec::Zstd(ZstdDecoder::new()), + } + } +} + impl Decode for DecompressCodec { fn reinit(&mut self) -> Result<()> { match self { @@ -324,7 +252,10 @@ impl DecompressDecoder { /// Finish a decompress press, flushing remaining data into output. /// Return the data that has been written. pub fn finish(&mut self, output: &mut [u8]) -> Result { - debug_assert_eq!(self.state, DecompressState::Flushing); + debug_assert!(matches!( + self.state, + DecompressState::Flushing | DecompressState::Reading + )); let mut output = PartialBuffer::new(output); let done = self.decoder.finish(&mut output)?; @@ -409,9 +340,56 @@ impl AsyncRead for DecompressReader { } } +impl DecompressDecoder { + pub fn decompress_all(&mut self, compressed: &[u8]) -> common_exception::Result> { + let main = self.decompress_batch(compressed)?; + let tail = self.decompress_batch(&[])?; + if tail.is_empty() { + Ok(main) + } else { + let mut all = main; + all.extend_from_slice(&tail); + Ok(all) + } + } + // need to finish the decoding by adding a empty input + pub fn decompress_batch(&mut self, compressed: &[u8]) -> common_exception::Result> { + let mut decompress_bufs = vec![]; + let mut filled = false; + loop { + match self.state() { + DecompressState::Reading => { + if filled { + break; + } + self.fill(compressed); + filled = true; + } + DecompressState::Decoding => { + let mut decompress_buf = vec![0u8; 4096]; + let written = self.decode(&mut decompress_buf[..]).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Flushing => { + let mut decompress_buf = vec![0u8; 4096]; + let written = self.finish(&mut decompress_buf).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Done => break, + } + } + Ok(decompress_bufs.concat()) + } +} + #[cfg(test)] mod tests { - use std::cmp::min; use std::env; use std::fs; use std::io::Result; @@ -421,10 +399,43 @@ mod tests { use futures::io::Cursor; use futures::AsyncReadExt; use rand::prelude::*; - use sha2::Digest; - use sha2::Sha256; use super::*; + use crate::encode::CompressCodec; + + fn decode_with_buffer( + cr: &mut DecompressDecoder, + compressed_content: &[u8], + output_buffer_size: usize, + input_batch_size: usize, + ) -> Result> { + let mut result = Vec::with_capacity(compressed_content.len()); + let mut buf = vec![0; output_buffer_size]; + let mut read = 0; + loop { + match cr.state { + DecompressState::Reading => { + // should not break when read == compressed_content.len() + let size = compressed_content.len().min(read + input_batch_size); + let n = cr.fill(&compressed_content[read..size]); + read += n; + } + DecompressState::Decoding => { + let n = cr.decode(&mut buf)?; + result.extend_from_slice(&buf[..n]) + } + DecompressState::Flushing => { + let n = cr.finish(&mut buf)?; + result.extend_from_slice(&buf[..n]) + } + DecompressState::Done => { + break; + } + } + } + assert_eq!(cr.state, DecompressState::Done); + Ok(result) + } #[tokio::test] async fn test_decompress_bytes_zlib() -> Result<()> { @@ -474,7 +485,7 @@ mod tests { let _ = env_logger::try_init(); let mut rng = ThreadRng::default(); - let size = rng.gen_range(1..16 * 1024 * 1024); + let size = rng.gen_range(1..16 * 1024); let mut content = vec![0; size]; rng.fill_bytes(&mut content); @@ -482,43 +493,12 @@ mod tests { let mut compressed_content = vec![]; e.read_to_end(&mut compressed_content).await?; - let mut cr = DecompressDecoder::new(CompressAlgorithm::Zlib); - - let mut result = Vec::with_capacity(content.len()); - let mut buf = vec![0; 1024 * 1024]; - let mut read = 0; - loop { - match cr.state { - DecompressState::Reading => { - if read == compressed_content.len() { - break; - } - - // Simulate read in 4k. - let size = min(read + 4 * 1024 * 1024, compressed_content.len()); - let n = cr.fill(&compressed_content[read..size]); - read += n; - } - DecompressState::Decoding => { - let n = cr.decode(&mut buf)?; - result.extend_from_slice(&buf[..n]) - } - DecompressState::Flushing => { - let n = cr.finish(&mut buf)?; - result.extend_from_slice(&buf[..n]) - } - DecompressState::Done => { - break; - } - } + for input_batch_size in [4 * 1024, size] { + let mut cr = DecompressDecoder::new(CompressAlgorithm::Zlib); + let result = decode_with_buffer(&mut cr, &compressed_content, 1024, input_batch_size)?; + assert_eq!(result, content); } - assert_eq!(result.len(), content.len()); - assert_eq!( - format!("{:x}", Sha256::digest(&result)), - format!("{:x}", Sha256::digest(&content)) - ); - Ok(()) } @@ -527,7 +507,7 @@ mod tests { let _ = env_logger::try_init(); let mut rng = ThreadRng::default(); - let size = rng.gen_range(1..16 * 1024 * 1024); + let size = rng.gen_range(1..16 * 1024); let mut content = vec![0; size]; rng.fill_bytes(&mut content); @@ -535,43 +515,11 @@ mod tests { let mut compressed_content = vec![]; e.read_to_end(&mut compressed_content).await?; - let mut cr = DecompressDecoder::new(CompressAlgorithm::Gzip); - - let mut result = Vec::with_capacity(content.len()); - let mut buf = vec![0; 1024 * 1024]; - let mut read = 0; - loop { - match cr.state { - DecompressState::Reading => { - if read == compressed_content.len() { - break; - } - - // Simulate read in 4k. - let size = min(read + 4 * 1024 * 1024, compressed_content.len()); - let n = cr.fill(&compressed_content[read..size]); - read += n; - } - DecompressState::Decoding => { - let n = cr.decode(&mut buf)?; - result.extend_from_slice(&buf[..n]) - } - DecompressState::Flushing => { - let n = cr.finish(&mut buf)?; - result.extend_from_slice(&buf[..n]) - } - DecompressState::Done => { - break; - } - } + for input_batch_size in [4 * 1024, size] { + let mut cr = DecompressDecoder::new(CompressAlgorithm::Gzip); + let result = decode_with_buffer(&mut cr, &compressed_content, 1024, input_batch_size)?; + assert_eq!(result, content); } - - assert_eq!(result.len(), content.len()); - assert_eq!( - format!("{:x}", Sha256::digest(&result)), - format!("{:x}", Sha256::digest(&content)) - ); - Ok(()) } @@ -580,21 +528,28 @@ mod tests { let _ = env_logger::try_init(); let mut rng = ThreadRng::default(); - let mut content = vec![0; 16 * 1024 * 1024]; + let mut content = vec![0; 16000]; rng.fill_bytes(&mut content); let mut e = ZlibEncoder::new(Cursor::new(content.clone())); let mut compressed_content = vec![]; e.read_to_end(&mut compressed_content).await?; + let mut encoder = CompressCodec::from(CompressAlgorithm::Zlib); + let compressed = encoder.compress_all(&content).unwrap(); + assert_eq!(compressed_content, compressed); + let mut cr = DecompressReader::new(Cursor::new(compressed_content), CompressAlgorithm::Zlib); let mut result = vec![]; cr.read_to_end(&mut result).await?; - assert_eq!(result, content); + let mut decoder = DecompressDecoder::new(CompressAlgorithm::Zlib); + let decompressed = decoder.decompress_all(&compressed).unwrap(); + assert_eq!(result, decompressed); + Ok(()) } diff --git a/src/common/compress/src/encode.rs b/src/common/compress/src/encode.rs new file mode 100644 index 000000000000..cdefecf4e5e3 --- /dev/null +++ b/src/common/compress/src/encode.rs @@ -0,0 +1,218 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Result; + +use async_compression::codec::BrotliEncoder; +use async_compression::codec::BzEncoder; +use async_compression::codec::DeflateEncoder; +use async_compression::codec::Encode; +use async_compression::codec::GzipEncoder; +use async_compression::codec::LzmaEncoder; +use async_compression::codec::XzEncoder; +use async_compression::codec::ZlibEncoder; +use async_compression::codec::ZstdEncoder; +use async_compression::util::PartialBuffer; +use async_compression::Level; +use brotli::enc::backward_references::BrotliEncoderParams; +use common_exception::ErrorCode; + +use crate::CompressAlgorithm; + +#[derive(Debug)] +pub enum CompressCodec { + /// Encoder for [`CompressAlgorithm::Brotli`] + /// + /// BrotliEncoder is too large that is 2592 bytes + /// Wrap into box to reduce the total size of the enum + Brotli(Box), + /// Encoder for [`CompressAlgorithm::Bz2`] + Bz2(BzEncoder), + /// Encoder for [`CompressAlgorithm::Deflate`] + Deflate(DeflateEncoder), + /// Encoder for [`CompressAlgorithm::Gzip`] + Gzip(GzipEncoder), + /// Encoder for [`CompressAlgorithm::Lzma`] + Lzma(LzmaEncoder), + /// Encoder for [`CompressAlgorithm::Xz`] + Xz(XzEncoder), + /// Encoder for [`CompressAlgorithm::Zlib`] + Zlib(ZlibEncoder), + /// Encoder for [`CompressAlgorithm::Zstd`] + Zstd(ZstdEncoder), +} + +impl From for CompressCodec { + fn from(v: CompressAlgorithm) -> Self { + match v { + CompressAlgorithm::Brotli => { + CompressCodec::Brotli(Box::new(BrotliEncoder::new(BrotliEncoderParams::default()))) + } + CompressAlgorithm::Bz2 => { + CompressCodec::Bz2(BzEncoder::new(Level::Default.into_bzip2(), 0)) + } + CompressAlgorithm::Deflate => { + CompressCodec::Deflate(DeflateEncoder::new(Level::Default.into_flate2())) + } + CompressAlgorithm::Gzip => { + CompressCodec::Gzip(GzipEncoder::new(Level::Default.into_flate2())) + } + CompressAlgorithm::Lzma => { + CompressCodec::Lzma(LzmaEncoder::new(Level::Default.into_xz2())) + } + CompressAlgorithm::Xz => CompressCodec::Xz(XzEncoder::new(Level::Default.into_xz2())), + CompressAlgorithm::Zlib => { + CompressCodec::Zlib(ZlibEncoder::new(Level::Default.into_flate2())) + } + CompressAlgorithm::Zstd => { + CompressCodec::Zstd(ZstdEncoder::new(Level::Default.into_zstd())) + } + } + } +} + +impl Encode for CompressCodec { + fn encode( + &mut self, + input: &mut PartialBuffer>, + output: &mut PartialBuffer + AsMut<[u8]>>, + ) -> Result<()> { + match self { + CompressCodec::Brotli(v) => v.encode(input, output), + CompressCodec::Bz2(v) => v.encode(input, output), + CompressCodec::Deflate(v) => v.encode(input, output), + CompressCodec::Gzip(v) => v.encode(input, output), + CompressCodec::Lzma(v) => v.encode(input, output), + CompressCodec::Xz(v) => v.encode(input, output), + CompressCodec::Zlib(v) => v.encode(input, output), + CompressCodec::Zstd(v) => v.encode(input, output), + } + } + + fn flush( + &mut self, + output: &mut PartialBuffer + AsMut<[u8]>>, + ) -> Result { + match self { + CompressCodec::Brotli(v) => v.flush(output), + CompressCodec::Bz2(v) => v.flush(output), + CompressCodec::Deflate(v) => v.flush(output), + CompressCodec::Gzip(v) => v.flush(output), + CompressCodec::Lzma(v) => v.flush(output), + CompressCodec::Xz(v) => v.flush(output), + CompressCodec::Zlib(v) => v.flush(output), + CompressCodec::Zstd(v) => v.flush(output), + } + } + + fn finish( + &mut self, + output: &mut PartialBuffer + AsMut<[u8]>>, + ) -> Result { + match self { + CompressCodec::Brotli(v) => v.finish(output), + CompressCodec::Bz2(v) => v.finish(output), + CompressCodec::Deflate(v) => v.finish(output), + CompressCodec::Gzip(v) => v.finish(output), + CompressCodec::Lzma(v) => v.finish(output), + CompressCodec::Xz(v) => v.finish(output), + CompressCodec::Zlib(v) => v.finish(output), + CompressCodec::Zstd(v) => v.finish(output), + } + } +} + +impl CompressCodec { + #[allow(unused)] + pub fn compress_all(&mut self, to_compress: &[u8]) -> common_exception::Result> { + let mut compress_bufs = vec![]; + let mut input = PartialBuffer::new(to_compress); + let buf_size = to_compress.len().min(4096); + + loop { + let mut output = PartialBuffer::new(vec![0u8; buf_size]); + self.encode(&mut input, &mut output).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + let written = output.written().len(); + if written > 0 { + let mut output = output.into_inner(); + output.truncate(written); + compress_bufs.push(output); + } + if input.unwritten().is_empty() { + break; + } + } + + loop { + let mut output = PartialBuffer::new(vec![0u8; buf_size]); + let finished = self.finish(&mut output).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + let written = output.written().len(); + if written > 0 { + let mut output = output.into_inner(); + output.truncate(written); + compress_bufs.push(output); + } + if finished { + break; + } + } + Ok(compress_bufs.concat()) + } +} + +#[cfg(test)] +mod tests { + use rand::prelude::*; + + use super::*; + use crate::DecompressDecoder; + + #[tokio::test] + async fn test_decompress_bytes_zlib() -> common_exception::Result<()> { + let _ = env_logger::try_init(); + + let mut rng = ThreadRng::default(); + let size = rng.gen_range(1..16 * 1024); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + for algo in [ + CompressAlgorithm::Zlib, + CompressAlgorithm::Gzip, + CompressAlgorithm::Bz2, + CompressAlgorithm::Zstd, + CompressAlgorithm::Deflate, + CompressAlgorithm::Xz, + CompressAlgorithm::Lzma, + ] { + let mut encoder = CompressCodec::from(algo); + let compressed = encoder.compress_all(&content)?; + let mut decoder = DecompressDecoder::new(algo); + let decompressed = decoder.decompress_all(&compressed)?; + assert_eq!( + decompressed, + content, + "fail to compress {algo:?}, {} {} {}", + size, + compressed.len(), + decompressed.len() + ); + } + + Ok(()) + } +} diff --git a/src/common/compress/src/lib.rs b/src/common/compress/src/lib.rs index a92440cbb864..e5fcd97ce815 100644 --- a/src/common/compress/src/lib.rs +++ b/src/common/compress/src/lib.rs @@ -16,9 +16,12 @@ //! This mod provides compress support for BytesWrite and decompress support for BytesRead. -mod compress; -pub use compress::CompressAlgorithm; -pub use compress::DecompressCodec; -pub use compress::DecompressDecoder; -pub use compress::DecompressReader; -pub use compress::DecompressState; +mod compress_algorithms; +mod decode; +mod encode; + +pub use compress_algorithms::CompressAlgorithm; +pub use decode::DecompressCodec; +pub use decode::DecompressDecoder; +pub use decode::DecompressReader; +pub use decode::DecompressState; diff --git a/src/query/pipeline/sources/src/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/input_formats/input_format_text.rs index 32e2c01fc5fb..cf5bbce4d468 100644 --- a/src/query/pipeline/sources/src/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/input_formats/input_format_text.rs @@ -36,7 +36,6 @@ use common_settings::Settings; use common_storage::FileStatus; use common_storage::StageFileInfo; use log::debug; -use log::warn; use opendal::Operator; use crate::input_formats::input_pipeline::AligningStateTrait; @@ -518,16 +517,25 @@ impl AligningStateTrait for AligningStateMaybeCompressed fn align(&mut self, read_batch: Option>) -> Result> { let row_batches = if let Some(data) = read_batch { let buf = if let Some(decoder) = self.decompressor.as_mut() { - decompress(decoder, &data)? + decoder.decompress_batch(&data)? } else { data }; self.state.align(&buf)? } else { - if let Some(decoder) = &self.decompressor { + if let Some(decoder) = self.decompressor.as_mut() { let state = decoder.state(); - if !matches!(state, DecompressState::Done | DecompressState::Reading) { - warn!("decompressor end with state {:?}", state) + if !matches!(state, DecompressState::Done) { + let data = decoder.decompress_batch(&[])?; + if !data.is_empty() { + self.state.align(&data)?; + } + } + if !matches!(state, DecompressState::Done) { + return Err(ErrorCode::BadBytes(format!( + "decompressor state is {:?} after decompressing all data", + state + ))); } } self.state.align_flush()? @@ -655,37 +663,3 @@ impl BlockBuilderTrait for BlockBuilder { } } } - -fn decompress(decoder: &mut DecompressDecoder, compressed: &[u8]) -> Result> { - let mut decompress_bufs = vec![]; - let mut amt = 0; - loop { - match decoder.state() { - DecompressState::Reading => { - if amt == compressed.len() { - break; - } - let read = decoder.fill(&compressed[amt..]); - amt += read; - } - DecompressState::Decoding => { - let mut decompress_buf = vec![0u8; 4096]; - let written = decoder.decode(&mut decompress_buf[..]).map_err(|e| { - ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) - })?; - decompress_buf.truncate(written); - decompress_bufs.push(decompress_buf); - } - DecompressState::Flushing => { - let mut decompress_buf = vec![0u8; 4096]; - let written = decoder.finish(&mut decompress_buf).map_err(|e| { - ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) - })?; - decompress_buf.truncate(written); - decompress_bufs.push(decompress_buf); - } - DecompressState::Done => break, - } - } - Ok(decompress_bufs.concat()) -}