Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurency to the partition writing routine #628

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 99 additions & 56 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bytes::BytesMut;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::parquet::basic::{Compression, ZstdLevel};
use datafusion::parquet::format::FileMetaData;
use datafusion::{
arrow::datatypes::{Schema, SchemaRef},
datasource::TableProvider,
Expand All @@ -28,6 +29,7 @@ use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectStore;
use std::fs::File;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::{NamedTempFile, TempPath};

Expand All @@ -48,11 +50,8 @@ const PARTITION_FILE_BUFFER_SIZE: usize = 128 * 1024;
// This denotes the threshold size for an individual multipart request payload prior to upload.
// It dictates the memory usage, as we'll need to to keep each part in memory until sent.
const PARTITION_FILE_MIN_PART_SIZE: usize = 5 * 1024 * 1024;
// Controls how many multipart upload tasks we let run in parallel; this is in part dictated by the
// fact that object store concurrently uploads parts for each of our tasks. That concurrency in
// turn is hard coded to 8 (https://github.com/apache/arrow-rs/blob/master/object_store/src/aws/mod.rs#L145)
// meaning that with 2 partition upload tasks x 8 part upload tasks x 5MB we have 80MB of memory usage
const PARTITION_FILE_UPLOAD_MAX_CONCURRENCY: usize = 2;
// Controls how many partition write/multipart upload tasks we let run in parallel at any one time.
const PARTITION_FILE_WRITE_MAX_CONCURRENCY: usize = 3;

#[cfg(test)]
fn get_uuid() -> Uuid {
Expand Down Expand Up @@ -92,75 +91,116 @@ fn temp_partition_file_writer(
}

/// Execute a plan and upload the results to object storage as Parquet files, indexing them.
/// Partially taken from DataFusion's plan_to_parquet with some additions (file stats, using a DiskManager)
/// We try to adhere to a couple of invariants here, namely:
/// - no partition will have more than `max_partition_size` rows
/// - there will be at least `min_file_count_hint` partition files and possibly more (but not less)
pub async fn plan_to_object_store(
state: &SessionState,
plan: &Arc<dyn ExecutionPlan>,
store: Arc<dyn ObjectStore>,
local_data_dir: Option<String>,
max_partition_size: u32,
min_file_count_hint: Option<usize>,
) -> Result<Vec<Add>> {
let mut current_partition_size = 0;
let (mut current_partition_file_path, mut writer) =
temp_partition_file_writer(plan.schema())?;
let mut partition_file_paths = vec![current_partition_file_path];
let mut partition_metadata = vec![];
let mut tasks = vec![];
let max_concurrency = match min_file_count_hint {
// No/zero file hint passed, go with sequential writing
None | Some(0) => 1,
// Some file hint provided; use that or `PARTITION_FILE_WRITE_MAX_CONCURRENCY` tasks to
// re-chunk DataFusion partitions if the anticipated file count is larger
Some(min_file_count) => min_file_count.min(PARTITION_FILE_WRITE_MAX_CONCURRENCY),
};

// Pointer to the next partition to stream from
let partition = Arc::new(AtomicUsize::new(0));

// Iterate over Datafusion partitions and re-chunk them, since we want to enforce a pre-defined
let input_partitions = plan.output_partitioning().partition_count();

// Iterate over DataFusion partitions and re-chunk them, since we want to enforce a pre-defined
// partition size limit, which is not guaranteed by DF.
info!("Persisting data into temporary partition objects on disk");
for i in 0..plan.output_partitioning().partition_count() {
info!("Persisting data into temporary partition objects on disk in {max_concurrency} tasks from {input_partitions} input partitions");
let mut tasks = vec![];
for _ in 0..max_concurrency {
let task_partition = Arc::clone(&partition);

let input = plan.clone();
let task_ctx = Arc::new(TaskContext::from(state));
let mut stream = plan.execute(i, task_ctx)?;

while let Some(batch) = stream.next().await {
let mut batch = batch?;

let mut leftover_partition_capacity =
(max_partition_size - current_partition_size) as usize;

while batch.num_rows() > leftover_partition_capacity {
if leftover_partition_capacity > 0 {
// Fill up the remaining capacity in the slice
writer
.write(&batch.slice(0, leftover_partition_capacity))
.map_err(DataFusionError::from)?;
// Trim away the part that made it to the current partition
batch = batch.slice(
leftover_partition_capacity,
batch.num_rows() - leftover_partition_capacity,
);
}
let handle: tokio::task::JoinHandle<Result<Vec<(TempPath, FileMetaData)>>> =
tokio::task::spawn(async move {
let mut current_partition_size = 0;
let (mut current_partition_file_path, mut writer) =
temp_partition_file_writer(input.schema())?;
let mut partition_file_paths = vec![current_partition_file_path];
let mut partition_metadata = vec![];

let mut next_partition = task_partition.fetch_add(1, Ordering::SeqCst);
while next_partition < input_partitions {
let mut stream = input.execute(next_partition, task_ctx.clone())?;
while let Some(batch) = stream.next().await {
let mut batch = batch?;

let mut leftover_partition_capacity =
(max_partition_size - current_partition_size) as usize;

while batch.num_rows() > leftover_partition_capacity {
if leftover_partition_capacity > 0 {
// Fill up the remaining capacity in the slice
writer
.write(&batch.slice(0, leftover_partition_capacity))
.map_err(DataFusionError::from)?;
// Trim away the part that made it to the current partition
batch = batch.slice(
leftover_partition_capacity,
batch.num_rows() - leftover_partition_capacity,
);
}

// Roll-over into the next partition: close partition writer, reset partition size
// counter and open new temp file + writer.
let file_metadata = writer.close().map_err(DataFusionError::from)?;
partition_metadata.push(file_metadata);
// Roll-over into the next partition: close partition writer, reset partition size
// counter and open new temp file + writer.
let file_metadata =
writer.close().map_err(DataFusionError::from)?;
partition_metadata.push(file_metadata);

current_partition_size = 0;
leftover_partition_capacity = max_partition_size as usize;
current_partition_size = 0;
leftover_partition_capacity = max_partition_size as usize;

(current_partition_file_path, writer) =
temp_partition_file_writer(plan.schema())?;
partition_file_paths.push(current_partition_file_path);
}
(current_partition_file_path, writer) =
temp_partition_file_writer(input.schema())?;
partition_file_paths.push(current_partition_file_path);
}

current_partition_size += batch.num_rows() as u32;
writer.write(&batch).map_err(DataFusionError::from)?;
}
current_partition_size += batch.num_rows() as u32;
writer.write(&batch).map_err(DataFusionError::from)?;
}

// Roll-over into the next partition
next_partition = task_partition.fetch_add(1, Ordering::SeqCst);
}
let file_metadata = writer.close().map_err(DataFusionError::from)?;
partition_metadata.push(file_metadata);

Ok(partition_file_paths
.into_iter()
.zip(partition_metadata)
.collect())
});
tasks.push(handle);
}
let file_metadata = writer.close().map_err(DataFusionError::from)?;
partition_metadata.push(file_metadata);

// Merge all the partition handles/metadata
let partition_data: Vec<(TempPath, FileMetaData)> = futures::future::join_all(tasks)
.await
.into_iter()
.map(|x| x.unwrap_or_else(|e| Err(DataFusionError::External(Box::new(e)))))
.collect::<Result<Vec<Vec<(TempPath, FileMetaData)>>>>()?
.into_iter()
.flatten()
.collect();

info!("Starting upload of partition objects");
let partitions_uuid = get_uuid();

let sem = Arc::new(Semaphore::new(PARTITION_FILE_UPLOAD_MAX_CONCURRENCY));
for (part, (partition_file_path, metadata)) in partition_file_paths
.into_iter()
.zip(partition_metadata)
.enumerate()
let mut tasks = vec![];
let sem = Arc::new(Semaphore::new(PARTITION_FILE_WRITE_MAX_CONCURRENCY));
for (part, (partition_file_path, metadata)) in partition_data.into_iter().enumerate()
{
let permit = Arc::clone(&sem).acquire_owned().await.ok();

Expand Down Expand Up @@ -412,6 +452,7 @@ impl SeafowlContext {
table_log_store.object_store(),
local_table_dir,
self.config.misc.max_partition_size,
None,
)
.await?;

Expand Down Expand Up @@ -554,6 +595,7 @@ mod tests {
object_store.clone(),
local_table_dir,
2,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -707,6 +749,7 @@ mod tests {
object_store,
None,
max_partition_size,
None,
)
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl SeafowlContext {
object_store,
local_table_dir,
self.config.misc.max_partition_size,
None,
)
.await?;

Expand Down Expand Up @@ -427,6 +428,7 @@ impl SeafowlContext {
object_store,
local_table_dir,
self.config.misc.max_partition_size,
None,
)
.await?;

Expand Down
5 changes: 5 additions & 0 deletions src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ impl SeafowlDataSyncWriter {
})
.collect::<Vec<_>>();

// Guesstimate the number of new partition files that will be produced
let max_size = self.context.config.misc.max_partition_size as usize;
let min_file_count_hint = files.len() + (entry.rows + max_size - 1) / max_size;

// Create a special Delta table provider that will only hit the above partition files
let base_scan = Arc::new(
DeltaTableProvider::try_new(
Expand Down Expand Up @@ -505,6 +509,7 @@ impl SeafowlDataSyncWriter {
log_store.object_store(),
local_data_dir,
self.context.config.misc.max_partition_size,
Some(min_file_count_hint),
)
.await?;

Expand Down