diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 3f8b79cfd..fcc8c51f6 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -243,11 +243,7 @@ impl PartitionBuffer { /// Initializes active builders if necessary. /// Returns error if memory reservation fails. - fn init_active_if_necessary( - &mut self, - mempool_time: &Time, - repart_time: &Time, - ) -> Result { + fn init_active_if_necessary(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result { let mut mem_diff = 0; if self.active.is_empty() { @@ -261,11 +257,11 @@ impl PartitionBuffer { .sum::(); } - let mut mempool_timer = mempool_time.timer(); + let mut mempool_timer = metrics.mempool_time.timer(); self.reservation.try_grow(self.active_slots_mem_size)?; mempool_timer.stop(); - let mut repart_timer = repart_time.timer(); + let mut repart_timer = metrics.repart_time.timer(); self.active = new_array_builders(&self.schema, self.batch_size); repart_timer.stop(); @@ -280,15 +276,13 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], start_index: usize, - mempool_time: &Time, - repart_time: &Time, - ipc_time: &Time, + metrics: &ShuffleRepartitionerMetrics, ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; // lazy init because some partition may be empty - let init = self.init_active_if_necessary(mempool_time, repart_time); + let init = self.init_active_if_necessary(metrics); if init.is_err() { return AppendRowStatus::StartIndex(start); } @@ -297,7 +291,7 @@ impl PartitionBuffer { while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); - let mut repart_timer = repart_time.timer(); + let mut repart_timer = metrics.repart_time.timer(); self.active .iter_mut() .zip(columns) @@ -308,13 +302,13 @@ impl PartitionBuffer { repart_timer.stop(); if self.num_active_rows >= self.batch_size { - let flush = self.flush(ipc_time); + let flush = self.flush(&metrics.ipc_time); if let Err(e) = flush { return AppendRowStatus::MemDiff(Err(e)); } mem_diff += flush.unwrap(); - let init = self.init_active_if_necessary(mempool_time, repart_time); + let init = self.init_active_if_necessary(metrics); if init.is_err() { return AppendRowStatus::StartIndex(end); } @@ -1051,14 +1045,7 @@ impl ShuffleRepartitioner { // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = output.append_rows( - columns, - indices, - start_index, - &self.metrics.mempool_time, - &self.metrics.repart_time, - &self.metrics.ipc_time, - ); + let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics); loop { match output_ret { @@ -1078,14 +1065,7 @@ impl ShuffleRepartitioner { mempool_timer.stop(); start_index = new_start; - output_ret = output.append_rows( - columns, - indices, - start_index, - &self.metrics.mempool_time, - &self.metrics.repart_time, - &self.metrics.ipc_time, - ); + output_ret = output.append_rows(columns, indices, start_index, &self.metrics); if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index {