Skip to content

Commit

Permalink
use global io
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Sep 27, 2023
1 parent 0f285ee commit b850274
Showing 1 changed file with 37 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use std::sync::Arc;
use std::time::Instant;
use std::vec;

use common_base::base::tokio::sync::OwnedSemaphorePermit;
use common_base::base::tokio::sync::Semaphore;
use common_base::runtime::Runtime;
use common_base::runtime::GlobalIORuntime;
use common_base::runtime::TrySpawn;
use common_catalog::plan::PartInfoPtr;
use common_catalog::plan::Partitions;
Expand All @@ -36,6 +35,7 @@ use storages_common_table_meta::meta::Statistics;

use crate::io::SegmentsIO;
use crate::metrics::metrics_inc_compact_block_build_task_milliseconds;
use crate::operations::acquire_task_permit;
use crate::operations::common::BlockMetaIndex;
use crate::operations::mutation::compact::compact_part::CompactExtraInfo;
use crate::operations::mutation::compact::compact_part::CompactLazyPartInfo;
Expand Down Expand Up @@ -176,7 +176,8 @@ impl BlockCompactMutator {
));

let cluster = self.ctx.get_cluster();
let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() {
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads {
let column_ids = self
.compact_params
.base_snapshot
Expand Down Expand Up @@ -217,19 +218,14 @@ impl BlockCompactMutator {
mut lazy_parts: Vec<CompactLazyPartInfo>,
) -> Result<Vec<PartInfoPtr>> {
let start = Instant::now();

let max_threads = ctx.get_settings().get_max_threads()? as usize;
let max_concurrency = std::cmp::max(max_threads * 2, 10);

let runtime = Arc::new(Runtime::with_worker_threads(
max_threads,
Some("build-compact-worker".to_owned()),
)?);
let semaphore = Arc::new(Semaphore::new(max_concurrency));
let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(max_concurrency));

let mut remain = lazy_parts.len() % max_threads;
let batch_size = lazy_parts.len() / max_threads;
let mut works = Vec::with_capacity(max_threads);

while !lazy_parts.is_empty() {
let gap_size = std::cmp::min(1, remain);
let batch_size = batch_size + gap_size;
Expand All @@ -239,40 +235,31 @@ impl BlockCompactMutator {
let semaphore = semaphore.clone();

let batch = lazy_parts.drain(0..batch_size).collect::<Vec<_>>();
works.push(runtime.spawn(async_backtrace::location!().frame({
let runtime = runtime.clone();
async move {
let mut res = vec![];
for lazy_part in batch {
let mut builder =
CompactTaskBuilder::new(column_ids.clone(), cluster_key_id, thresholds);
let parts = builder
.build_tasks(
lazy_part.segment_indices,
lazy_part.compact_segments,
runtime.clone(),
semaphore.clone(),
)
.await?;
res.extend(parts);
}
Ok::<_, ErrorCode>(res)
works.push(async move {
let mut res = vec![];
for lazy_part in batch {
let mut builder =
CompactTaskBuilder::new(column_ids.clone(), cluster_key_id, thresholds);
let parts = builder
.build_tasks(
lazy_part.segment_indices,
lazy_part.compact_segments,
semaphore.clone(),
)
.await?;
res.extend(parts);
}
})));
Ok::<_, ErrorCode>(res)
});
}

match futures::future::try_join_all(works).await {
Err(e) => Err(ErrorCode::StorageOther(format!(
"build compact tasks failure, {}",
e
))),
Ok(workers) => {
let mut parts = vec![];
for worker in workers {
let res = worker?;
parts.extend(res);
}

Ok(res) => {
let parts = res.into_iter().flatten().collect::<Vec<_>>();
// Status.
{
let elapsed_time = start.elapsed().as_millis() as u64;
Expand Down Expand Up @@ -485,7 +472,6 @@ impl CompactTaskBuilder {
&mut self,
segment_indices: Vec<usize>,
compact_segments: Vec<Arc<CompactSegmentInfo>>,
runtime: Arc<Runtime>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<PartInfoPtr>> {
let mut block_idx = 0;
Expand All @@ -494,24 +480,21 @@ impl CompactTaskBuilder {
let mut unchanged_blocks = Vec::new();
let mut removed_segment_summary = Statistics::default();

let mut iter = compact_segments.into_iter().rev();
let tasks = std::iter::from_fn(|| {
iter.next().map(|v| {
Box::new(move |permit: OwnedSemaphorePermit| {
Box::pin(async move {
let _permit = permit;
let blocks = v.block_metas()?;
Ok::<_, ErrorCode>((blocks, v.summary.clone()))
})
})
})
});

let join_handlers = runtime
.try_spawn_batch_with_owned_semaphore(semaphore.clone(), tasks)
.await?;
let runtime = GlobalIORuntime::instance();
let mut handlers = Vec::with_capacity(compact_segments.len());
for segment in compact_segments.into_iter().rev() {
let permit = acquire_task_permit(semaphore.clone()).await?;
let handler = runtime.spawn(async_backtrace::location!().frame({
async move {
let blocks = segment.block_metas()?;
drop(permit);
Ok::<_, ErrorCode>((blocks, segment.summary.clone()))
}
}));
handlers.push(handler);
}

let joint = futures::future::try_join_all(join_handlers)
let joint = futures::future::try_join_all(handlers)
.await
.map_err(|e| ErrorCode::StorageOther(format!("deserialize failure, {}", e)))?;

Expand Down

0 comments on commit b850274

Please sign in to comment.