Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 17, 2024
1 parent 2782a9f commit e37d039
Showing 1 changed file with 10 additions and 30 deletions.
40 changes: 10 additions & 30 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,7 @@ impl PartitionBuffer {

/// Initializes active builders if necessary.
/// Returns error if memory reservation fails.
fn init_active_if_necessary(
&mut self,
mempool_time: &Time,
repart_time: &Time,
) -> Result<isize> {
fn init_active_if_necessary(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<isize> {
let mut mem_diff = 0;

if self.active.is_empty() {
Expand All @@ -261,11 +257,11 @@ impl PartitionBuffer {
.sum::<usize>();
}

let mut mempool_timer = mempool_time.timer();
let mut mempool_timer = metrics.mempool_time.timer();
self.reservation.try_grow(self.active_slots_mem_size)?;
mempool_timer.stop();

let mut repart_timer = repart_time.timer();
let mut repart_timer = metrics.repart_time.timer();
self.active = new_array_builders(&self.schema, self.batch_size);
repart_timer.stop();

Expand All @@ -280,15 +276,13 @@ impl PartitionBuffer {
columns: &[ArrayRef],
indices: &[usize],
start_index: usize,
mempool_time: &Time,
repart_time: &Time,
ipc_time: &Time,
metrics: &ShuffleRepartitionerMetrics,
) -> AppendRowStatus {
let mut mem_diff = 0;
let mut start = start_index;

// lazy init because some partition may be empty
let init = self.init_active_if_necessary(mempool_time, repart_time);
let init = self.init_active_if_necessary(metrics);
if init.is_err() {
return AppendRowStatus::StartIndex(start);
}
Expand All @@ -297,7 +291,7 @@ impl PartitionBuffer {
while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());

let mut repart_timer = repart_time.timer();
let mut repart_timer = metrics.repart_time.timer();
self.active
.iter_mut()
.zip(columns)
Expand All @@ -308,13 +302,13 @@ impl PartitionBuffer {
repart_timer.stop();

if self.num_active_rows >= self.batch_size {
let flush = self.flush(ipc_time);
let flush = self.flush(&metrics.ipc_time);
if let Err(e) = flush {
return AppendRowStatus::MemDiff(Err(e));
}
mem_diff += flush.unwrap();

let init = self.init_active_if_necessary(mempool_time, repart_time);
let init = self.init_active_if_necessary(metrics);
if init.is_err() {
return AppendRowStatus::StartIndex(end);
}
Expand Down Expand Up @@ -1051,14 +1045,7 @@ impl ShuffleRepartitioner {
// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
let mut start_index: usize = 0;
let mut output_ret = output.append_rows(
columns,
indices,
start_index,
&self.metrics.mempool_time,
&self.metrics.repart_time,
&self.metrics.ipc_time,
);
let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics);

loop {
match output_ret {
Expand All @@ -1078,14 +1065,7 @@ impl ShuffleRepartitioner {
mempool_timer.stop();

start_index = new_start;
output_ret = output.append_rows(
columns,
indices,
start_index,
&self.metrics.mempool_time,
&self.metrics.repart_time,
&self.metrics.ipc_time,
);
output_ret = output.append_rows(columns, indices, start_index, &self.metrics);

if let AppendRowStatus::StartIndex(new_start) = output_ret {
if new_start == start_index {
Expand Down

0 comments on commit e37d039

Please sign in to comment.