Skip to content

Commit

Permalink
Save
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 27, 2024
1 parent 41affcc commit 0cfdab2
Showing 1 changed file with 37 additions and 43 deletions.
80 changes: 37 additions & 43 deletions native/core/src/execution/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,47 +161,22 @@ impl BatchReader {

let mut arrays = Vec::with_capacity(schema.fields().len());
for i in 0..schema.fields().len() {
// read data buffer length
length.copy_from_slice(&self.input[self.offset..self.offset + 8]);
let buffer_len = usize::from_le_bytes(length);
self.offset += 8;

// read data buffer
// println!("reading data buffer with {buffer_len} bytes");
let offset_segment_start = self.offset + buffer_len;
let buffer = Buffer::from(&self.input[self.offset..offset_segment_start]);
self.offset += buffer_len;
let buffer = self.read_data_buffer();

match schema.field(i).data_type() {
DataType::Int32 => {
// create array
let data_buffer = ScalarBuffer::<i32>::from(buffer);

// read null buffer
length.copy_from_slice(&self.input[self.offset..self.offset + 8]);
let null_buffer_length = usize::from_le_bytes(length);
self.offset += 8;
let null_buffer = if null_buffer_length != 0 {
let null_buffer =
&self.input[self.offset..self.offset + null_buffer_length];
Some(NullBuffer::new(BooleanBuffer::new(
Buffer::from(null_buffer),
0,
null_buffer.len() * 8,
)))
} else {
None
};
self.offset += null_buffer_length;
let null_buffer = self.read_null_buffer();

let array: ArrayRef = Arc::new(Int32Array::try_new(data_buffer, null_buffer)?);
arrays.push(array);
}
DataType::Utf8 => {
// read offset buffer length
length.copy_from_slice(
&self.input[offset_segment_start..offset_segment_start + 8],
);
length.copy_from_slice(&self.input[self.offset..self.offset + 8]);
let offset_buffer_len = usize::from_le_bytes(length);
self.offset += 8;

Expand All @@ -214,21 +189,7 @@ impl BatchReader {
let offset_buffer = OffsetBuffer::new(scalar_buffer);

// read null buffer
length.copy_from_slice(&self.input[self.offset..self.offset + 8]);
let null_buffer_length = usize::from_le_bytes(length);
self.offset += 8;
let null_buffer = if null_buffer_length != 0 {
let null_buffer =
&self.input[self.offset..self.offset + null_buffer_length];
Some(NullBuffer::new(BooleanBuffer::new(
Buffer::from(null_buffer),
0,
null_buffer.len() * 8,
)))
} else {
None
};
self.offset += null_buffer_length;
let null_buffer = self.read_null_buffer();

// create array
let array: ArrayRef =
Expand All @@ -241,6 +202,39 @@ impl BatchReader {

Ok(RecordBatch::try_new(schema, arrays).unwrap())
}

fn read_data_buffer(&mut self) -> Buffer {
// read data buffer length
let mut length = [0; 8];
length.copy_from_slice(&self.input[self.offset..self.offset + 8]);
let buffer_len = usize::from_le_bytes(length);
self.offset += 8;

// read data buffer
// println!("reading data buffer with {buffer_len} bytes");
let buffer = Buffer::from(&self.input[self.offset..self.offset + buffer_len]);
self.offset += buffer_len;
buffer
}

fn read_null_buffer(&mut self) -> Option<NullBuffer> {
let mut length = [0; 8];
length.copy_from_slice(&self.input[self.offset..self.offset + 8]);
let null_buffer_length = usize::from_le_bytes(length);
self.offset += 8;
let null_buffer = if null_buffer_length != 0 {
let null_buffer = &self.input[self.offset..self.offset + null_buffer_length];
Some(NullBuffer::new(BooleanBuffer::new(
Buffer::from(null_buffer),
0,
null_buffer.len() * 8,
)))
} else {
None
};
self.offset += null_buffer_length;
null_buffer
}
}

#[cfg(test)]
Expand Down

0 comments on commit 0cfdab2

Please sign in to comment.