Skip to content

Commit

Permalink
fix(storage): fix compactor oom 0206 (#15023)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Feb 6, 2024
1 parent 4e5a936 commit 3da8d14
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 26 deletions.
12 changes: 6 additions & 6 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ pub(crate) mod tests {
compact_task.current_epoch_time = 0;

let (_tx, rx) = tokio::sync::oneshot::channel();
let (result_task, task_stats) = compact(
let ((result_task, task_stats), _) = compact(
compact_ctx.clone(),
compact_task.clone(),
rx,
Expand Down Expand Up @@ -452,7 +452,7 @@ pub(crate) mod tests {
{
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (result_task, task_stats) = compact(
let ((result_task, task_stats), _) = compact(
compact_ctx.clone(),
compact_task.clone(),
rx,
Expand Down Expand Up @@ -784,7 +784,7 @@ pub(crate) mod tests {

// 4. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (result_task, task_stats) = compact(
let ((result_task, task_stats), _) = compact(
compact_ctx,
compact_task.clone(),
rx,
Expand Down Expand Up @@ -978,7 +978,7 @@ pub(crate) mod tests {

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (result_task, task_stats) = compact(
let ((result_task, task_stats), _) = compact(
compact_ctx,
compact_task.clone(),
rx,
Expand Down Expand Up @@ -1166,7 +1166,7 @@ pub(crate) mod tests {

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (result_task, task_stats) = compact(
let ((result_task, task_stats), _) = compact(
compact_ctx,
compact_task.clone(),
rx,
Expand Down Expand Up @@ -1336,7 +1336,7 @@ pub(crate) mod tests {

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (result_task, task_stats) = compact(
let ((result_task, task_stats), _) = compact(
compact_ctx,
compact_task.clone(),
rx,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/sync_point_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub async fn compact_once(
compact_task.compaction_filter_mask = compaction_filter_flag.bits();
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (result_task, task_stats) = compact(
let ((result_task, task_stats), _) = compact(
compact_ctx,
compact_task.clone(),
rx,
Expand Down
45 changes: 31 additions & 14 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::hummock::iterator::{
Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, SkipWatermarkIterator,
};
use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use crate::hummock::utils::MemoryTracker;
use crate::hummock::value::HummockValue;
use crate::hummock::{
BlockedXor16FilterBuilder, CachePolicy, CompactionDeleteRangeIterator, CompressionAlgorithm,
Expand Down Expand Up @@ -251,7 +252,10 @@ pub async fn compact(
mut shutdown_rx: Receiver<()>,
object_id_getter: Box<dyn GetObjectId>,
filter_key_extractor_manager: FilterKeyExtractorManager,
) -> (CompactTask, HashMap<u32, TableStats>) {
) -> (
(CompactTask, HashMap<u32, TableStats>),
Option<MemoryTracker>,
) {
let context = compactor_context.clone();
let group_label = compact_task.compaction_group_id.to_string();
let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
Expand Down Expand Up @@ -330,7 +334,10 @@ pub async fn compact(
Err(e) => {
tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids);
let task_status = TaskStatus::ExecuteFailed;
return compact_done(compact_task, context.clone(), vec![], task_status);
return (
compact_done(compact_task, context.clone(), vec![], task_status),
None,
);
}
Ok(extractor) => extractor,
};
Expand All @@ -344,7 +351,10 @@ pub async fn compact(
if !removed_tables.is_empty() {
tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables);
let task_status = TaskStatus::ExecuteFailed;
return compact_done(compact_task, context.clone(), vec![], task_status);
return (
compact_done(compact_task, context.clone(), vec![], task_status),
None,
);
}
}

Expand Down Expand Up @@ -410,7 +420,10 @@ pub async fn compact(
Err(e) => {
tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
task_status = TaskStatus::ExecuteFailed;
return compact_done(compact_task, context.clone(), vec![], task_status);
return (
compact_done(compact_task, context.clone(), vec![], task_status),
None,
);
}
}
}
Expand All @@ -426,11 +439,14 @@ pub async fn compact(
context.running_task_parallelism.load(Ordering::Relaxed),
context.max_task_parallelism.load(Ordering::Relaxed),
);
return compact_done(
compact_task,
context.clone(),
vec![],
TaskStatus::NoAvailCpuResourceCanceled,
return (
compact_done(
compact_task,
context.clone(),
vec![],
TaskStatus::NoAvailCpuResourceCanceled,
),
None,
);
}

Expand Down Expand Up @@ -488,7 +504,10 @@ pub async fn compact(
context.memory_limiter.quota()
);
task_status = TaskStatus::NoAvailMemoryResourceCanceled;
return compact_done(compact_task, context.clone(), output_ssts, task_status);
return (
compact_done(compact_task, context.clone(), output_ssts, task_status),
memory_detector,
);
}

context.compactor_metrics.compact_task_pending_num.inc();
Expand Down Expand Up @@ -546,7 +565,7 @@ pub async fn compact(
cost_time,
compact_task_to_string(&compact_task)
);
return (compact_task, table_stats);
return ((compact_task, table_stats), memory_detector);
}
for (split_index, _) in compact_task.splits.iter().enumerate() {
let filter = multi_filter.clone();
Expand Down Expand Up @@ -619,8 +638,6 @@ pub async fn compact(
}
}

drop(memory_detector);

if task_status != TaskStatus::Success {
for abort_handle in abort_handles {
abort_handle.abort();
Expand All @@ -641,7 +658,7 @@ pub async fn compact(
cost_time,
compact_task_to_string(&compact_task)
);
(compact_task, table_stats)
((compact_task, table_stats), memory_detector)
}

/// Fills in the compact task and tries to report the task result to meta node.
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ pub fn start_compactor(
let (tx, rx) = tokio::sync::oneshot::channel();
let task_id = compact_task.task_id;
shutdown.lock().unwrap().insert(task_id, tx);
let (compact_task, table_stats) = match sstable_object_id_manager.add_watermark_object_id(None).await
let ((compact_task, table_stats), _memory_tracker) = match sstable_object_id_manager.add_watermark_object_id(None).await
{
Ok(tracker_id) => {
let sstable_object_id_manager_clone = sstable_object_id_manager.clone();
Expand All @@ -480,7 +480,7 @@ pub fn start_compactor(
tracing::warn!(error = %err.as_report(), "Failed to track pending SST object id");
let mut compact_task = compact_task;
compact_task.set_task_status(TaskStatus::TrackSstObjectIdFailed);
(compact_task, HashMap::default())
((compact_task, HashMap::default()),None)
}
};
shutdown.lock().unwrap().remove(&task_id);
Expand Down Expand Up @@ -669,7 +669,7 @@ pub fn start_shared_compactor(
let task_id = compact_task.task_id;
shutdown.lock().unwrap().insert(task_id, tx);

let (compact_task, table_stats) = compactor_runner::compact(
let ((compact_task, table_stats), _memory_tracker)= compactor_runner::compact(
context.clone(),
compact_task,
rx,
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,7 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
if !self.block_builder.is_empty() {
self.build_block().await?;
}
} else if is_new_user_key
&& self.block_builder.approximate_len() >= self.options.block_capacity
} else if self.block_builder.approximate_len() >= self.options.block_capacity
&& could_switch_block
{
self.build_block().await?;
Expand Down

0 comments on commit 3da8d14

Please sign in to comment.