Skip to content

Commit

Permalink
feat(storage): improve block memory usage (#15024)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Feb 23, 2024
1 parent e223b9f commit 41e723b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 37 deletions.
9 changes: 2 additions & 7 deletions src/storage/src/hummock/file_cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,13 +701,8 @@ mod tests {
builder.add_for_test(construct_full_key_struct(0, b"k3", 3), b"v03");
builder.add_for_test(construct_full_key_struct(0, b"k4", 4), b"v04");

Box::new(
Block::decode(
builder.build().to_vec().into(),
builder.uncompressed_block_size(),
)
.unwrap(),
)
let uncompress = builder.uncompressed_block_size();
Box::new(Block::decode(builder.build().to_vec().into(), uncompress).unwrap())
}

fn sstable_for_test() -> Sstable {
Expand Down
79 changes: 53 additions & 26 deletions src/storage/src/hummock/sstable/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,20 +215,20 @@ impl Block {
let mut decoder = lz4::Decoder::new(compressed_data.reader())
.map_err(HummockError::decode_error)?;
let mut decoded = Vec::with_capacity(uncompressed_capacity);
decoder
let read_size = decoder
.read_to_end(&mut decoded)
.map_err(HummockError::decode_error)?;
debug_assert_eq!(decoded.capacity(), uncompressed_capacity);
assert_eq!(read_size, uncompressed_capacity);
Bytes::from(decoded)
}
CompressionAlgorithm::Zstd => {
let mut decoder = zstd::Decoder::new(compressed_data.reader())
.map_err(HummockError::decode_error)?;
let mut decoded = Vec::with_capacity(uncompressed_capacity);
decoder
let read_size = decoder
.read_to_end(&mut decoded)
.map_err(HummockError::decode_error)?;
debug_assert_eq!(decoded.capacity(), uncompressed_capacity);
assert_eq!(read_size, uncompressed_capacity);
Bytes::from(decoded)
}
};
Expand Down Expand Up @@ -445,6 +445,8 @@ impl Default for BlockBuilderOptions {
pub struct BlockBuilder {
/// Write buffer.
buf: BytesMut,
/// Compress buffer
compress_buf: BytesMut,
/// Entry interval between restart points.
restart_count: usize,
/// Restart points.
Expand All @@ -465,8 +467,9 @@ pub struct BlockBuilder {
impl BlockBuilder {
pub fn new(options: BlockBuilderOptions) -> Self {
Self {
// add more space to avoid re-allocate space.
buf: BytesMut::with_capacity(options.capacity + 256),
// add more space to avoid re-allocate space. (for restart_points and restart_points_type_index)
buf: BytesMut::with_capacity(Self::buf_reserve_size(&options)),
compress_buf: BytesMut::default(),
restart_count: options.restart_interval,
restart_points: Vec::with_capacity(
options.capacity / DEFAULT_ENTRY_SIZE / options.restart_interval + 1,
Expand Down Expand Up @@ -664,22 +667,35 @@ impl BlockBuilder {
);

self.buf.put_u32_le(self.table_id.unwrap());
if self.compression_algorithm != CompressionAlgorithm::None {
self.buf = Self::compress(&self.buf[..], self.compression_algorithm);
}
let result_buf = if self.compression_algorithm != CompressionAlgorithm::None {
self.compress_buf.clear();
self.compress_buf = Self::compress(
&self.buf[..],
self.compression_algorithm,
std::mem::take(&mut self.compress_buf),
);

&mut self.compress_buf
} else {
&mut self.buf
};

self.compression_algorithm.encode(&mut self.buf);
let checksum = xxhash64_checksum(&self.buf);
self.buf.put_u64_le(checksum);
self.compression_algorithm.encode(result_buf);
let checksum = xxhash64_checksum(result_buf);
result_buf.put_u64_le(checksum);
assert!(
self.buf.len() < (u32::MAX) as usize,
result_buf.len() < (u32::MAX) as usize,
"buf_len {} entry_count {} table {:?}",
self.buf.len(),
result_buf.len(),
self.entry_count,
self.table_id
);

self.buf.as_ref()
if self.compression_algorithm != CompressionAlgorithm::None {
self.compress_buf.as_ref()
} else {
self.buf.as_ref()
}
}

pub fn compress_block(
Expand All @@ -693,21 +709,29 @@ impl BlockBuilder {
let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
let compressed_data = &buf[..buf.len() - 9];
assert_eq!(compression, CompressionAlgorithm::None);
let mut writer = Self::compress(compressed_data, target_compression);
let mut compress_writer = Self::compress(
compressed_data,
target_compression,
BytesMut::with_capacity(buf.len()),
);

target_compression.encode(&mut writer);
let checksum = xxhash64_checksum(&writer);
writer.put_u64_le(checksum);
Ok(writer.freeze())
target_compression.encode(&mut compress_writer);
let checksum = xxhash64_checksum(&compress_writer);
compress_writer.put_u64_le(checksum);
Ok(compress_writer.freeze())
}

pub fn compress(buf: &[u8], compression_algorithm: CompressionAlgorithm) -> BytesMut {
pub fn compress(
buf: &[u8],
compression_algorithm: CompressionAlgorithm,
compress_writer: BytesMut,
) -> BytesMut {
match compression_algorithm {
CompressionAlgorithm::None => unreachable!(),
CompressionAlgorithm::Lz4 => {
let mut encoder = lz4::EncoderBuilder::new()
.level(4)
.build(BytesMut::with_capacity(buf.len()).writer())
.build(compress_writer.writer())
.map_err(HummockError::encode_error)
.unwrap();
encoder
Expand All @@ -719,10 +743,9 @@ impl BlockBuilder {
writer.into_inner()
}
CompressionAlgorithm::Zstd => {
let mut encoder =
zstd::Encoder::new(BytesMut::with_capacity(buf.len()).writer(), 4)
.map_err(HummockError::encode_error)
.unwrap();
let mut encoder = zstd::Encoder::new(compress_writer.writer(), 4)
.map_err(HummockError::encode_error)
.unwrap();
encoder
.write_all(buf)
.map_err(HummockError::encode_error)
Expand Down Expand Up @@ -762,6 +785,10 @@ impl BlockBuilder {
pub fn table_id(&self) -> Option<u32> {
self.table_id
}

fn buf_reserve_size(option: &BlockBuilderOptions) -> usize {
option.capacity + 1024 + 256
}
}

#[cfg(test)]
Expand Down
20 changes: 16 additions & 4 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
self.add(full_key, value).await
}

/// only for test
pub fn current_block_size(&self) -> usize {
self.block_builder.approximate_len()
}
Expand Down Expand Up @@ -344,6 +343,12 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
|| !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
let table_id = full_key.user_key.table_id.table_id();
let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
let current_block_size = self.current_block_size();
let is_block_full = current_block_size >= self.options.block_capacity
|| (current_block_size > self.options.block_capacity / 4 * 3
&& current_block_size + self.raw_value.len() + self.raw_key.len()
> self.options.block_capacity);

if is_new_table {
assert!(
could_switch_block,
Expand All @@ -356,9 +361,7 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
if !self.block_builder.is_empty() {
self.build_block().await?;
}
} else if self.block_builder.approximate_len() >= self.options.block_capacity
&& could_switch_block
{
} else if is_block_full && could_switch_block {
self.build_block().await?;
}
self.last_table_stats.total_key_count += 1;
Expand Down Expand Up @@ -704,6 +707,15 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
data_len, block_meta.offset
)
});

if data_len as usize > self.options.capacity * 2 {
tracing::warn!(
"WARN unexpected block size {} table {:?}",
data_len,
self.block_builder.table_id()
);
}

self.block_builder.clear();
Ok(())
}
Expand Down

0 comments on commit 41e723b

Please sign in to comment.