Skip to content

Commit

Permalink
encapsulate main.rs logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tomharmon committed Feb 27, 2020
1 parent dc125c6 commit a334a7e
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 219 deletions.
265 changes: 173 additions & 92 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
//! represented as a `Vec<Column>`
use crate::parsers::parse_line_with_schema;
use crate::schema::DataType;
use crate::schema::{infer_schema, DataType};
use std::fmt;
use std::io::BufRead;
use std::io::{prelude::*, SeekFrom};
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::thread;

/// Represents a column in the DataFrame
#[derive(PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -37,6 +38,16 @@ pub enum Data {
Null,
}

/// Represents a DataFrame which contains
/// [columnar](::crate::dataframe::Column) data and a
/// [Schema](::crate::parsers::Schema).
pub struct DataFrame {
/// The [Schema](::crate::parsers::Schema) of this DataFrame
pub schema: Vec<DataType>,
/// The [columnar](::crate::dataframe::Column) data of this DataFrame.
pub data: Vec<Column>,
}

/// Print the Data of a Data cell.
/// The number for Ints and floats
/// 0 for false
Expand All @@ -56,31 +67,10 @@ impl fmt::Display for Data {
}
}

/// This defines different functions that can be called on a DataFrame
pub trait DataFrame {
/// Implementations for a [crate::DataFrame::DataFrame]
impl DataFrame {
/// Generate an empty DataFrame for the given schema
fn init(schema: &Vec<DataType>) -> Self;

/// Reads a file (even one too large to fit into memory) according to the given
/// `schema` and `options` and turns it into a columnar dataframe.
///
/// This is the top level function for using `SoRer` and the one you should be
/// using unless you are trying to extend `SoRer`. There are many intricate
/// facets to using `SoRer` so you *must* RTFM [here](../index.html)
fn from_file<T: BufRead + Seek>(
schema: Vec<DataType>,
reader: &mut T,
from: u64,
len: u64,
) -> Self;

/// Get the (i,j) element from the DataFrame
fn get(&self, i: u64, j: u64) -> Data;
}

/// Implements the DataFrame where DataFrame is a Vec<Column>
impl DataFrame for Vec<Column> {
fn init(schema: &Vec<DataType>) -> Self {
pub fn init(schema: &Vec<DataType>) -> Self {
let mut result = Vec::with_capacity(schema.len() + 1);
for t in schema {
match t {
Expand All @@ -90,75 +80,110 @@ impl DataFrame for Vec<Column> {
DataType::String => result.push(Column::String(Vec::new())),
}
}
result
DataFrame {
data: result,
schema: schema.clone(),
}
}

fn from_file<T>(
schema: Vec<DataType>,
reader: &mut T,
from: u64,
len: u64,
) -> Self
where
T: BufRead + Seek,
{
reader.seek(SeekFrom::Start(from)).unwrap();
let mut buffer = Vec::new();
/// Reads a file (even one too large to fit into memory) according to the given
/// `schema` and `options` and turns it into a columnar dataframe.
///
/// This is the top level function for using `SoRer` and the one you should be
/// using unless you are trying to extend `SoRer`. There are many intricate
/// facets to using `SoRer` so you *must* RTFM [here](../index.html)
pub fn from_file(file_path: String, from: u64, len: u64) -> Self {
// infer the schema
let f: File = File::open(file_path.clone()).unwrap();
let reader = BufReader::new(f);
let schema = infer_schema(reader);

let mut so_far = if from != 0 {
// throw away the first line
let l1_len = reader.read_until(b'\n', &mut buffer).unwrap();
buffer.clear();
l1_len as u64
// number of threads to use
let num_threads = 8;

// the total number of bytes to read
let num_chars = if len == std::u64::MAX {
(std::fs::metadata(file_path.clone()).unwrap().len() - from) as f64
} else {
0
len as f64
};
// each thread will parse this many characters +- some number
let step = (num_chars / num_threads as f64).ceil() as u64;

let mut parsed_data: Vec<Column> = DataFrame::init(&schema);
// setup the work array with the from / len for each thread
// each element in the work array is a tuple of (starting index, number of byte for this thread)
let f: File = File::open(file_path.clone()).unwrap();
let mut reader = BufReader::new(f);
let mut work: Vec<(u64, u64)> = Vec::with_capacity(num_threads + 1);

loop {
let line_len = reader.read_until(b'\n', &mut buffer).unwrap();
so_far += line_len as u64;
if line_len == 0 || so_far >= len {
break;
}
// add the first one separately since we want to access the previous thread's
// work when in the loop. Since the work of the first thread will call
// `read_file(schema, 0, step)` it will not throw away the first line
// since from is 0 and will throw away the last line since step > 0
work.push((from, step));

// parse line with schema and place into the columnar vec here
match parse_line_with_schema(&buffer[..], &schema) {
None => {
buffer.clear();
continue;
}
Some(data) => {
let iter = data.iter().zip(parsed_data.iter_mut());
for (d, col) in iter {
match (d, col) {
(Data::Bool(b), Column::Bool(c)) => {
c.push(Some(*b))
}
(Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
(Data::Float(f), Column::Float(c)) => {
c.push(Some(*f))
}
(Data::String(s), Column::String(c)) => {
c.push(Some(s.clone()))
}
(Data::Null, Column::Bool(c)) => c.push(None),
(Data::Null, Column::Int(c)) => c.push(None),
(Data::Null, Column::Float(c)) => c.push(None),
(Data::Null, Column::String(c)) => c.push(None),
_ => panic!("Parser Failed"),
}
}
let mut so_far = from;
let mut buffer = Vec::new();

// This loop finds the byte offset for the start of a line
// by adding the length of the last line that a previous thread would've
// thrown away. The work gets added to the following thread so that
// each thread starts at a full line and reads only until the end of a line
for i in 1..num_threads {
so_far += step;
// advance the reader to this threads starting index then
// find the next newline character
reader.seek(SeekFrom::Start(so_far)).unwrap();
reader.read_until(b'\n', &mut buffer).unwrap();
work.push((so_far, step));

// Since the previous thread throws away the last line, add the length
// of the last line of prev thread to the work of this thread so that
// we read all lines.
work.get_mut(i - 1).unwrap().1 += buffer.len() as u64 + 1;
buffer.clear();
}

// initialize the threads with their own BufReader
let mut threads = Vec::new();
for w in work {
let new_schema = schema.clone();
let f: File = File::open(file_path.clone()).unwrap();
let mut r = BufReader::new(f);
// spawn the thread and give it a closure which calls `from_file`
// to parse the data into columnar format.
threads.push(thread::spawn(move || {
read_chunk(new_schema, &mut r, w.0, w.1)
}));
}

// initialize the resulting columnar data frame
let mut parsed_data: Vec<Column> = DataFrame::init(&schema).data;
// let all the threads finish then combine the parsed data into the
// columnar data frame
for t in threads {
let mut x: Vec<Column> = t.join().unwrap();
let iter = parsed_data.iter_mut().zip(x.iter_mut());
for (complete, partial) in iter {
match (complete, partial) {
(Column::Bool(c1), Column::Bool(c2)) => c1.append(c2),
(Column::Int(c1), Column::Int(c2)) => c1.append(c2),
(Column::Float(c1), Column::Float(c2)) => c1.append(c2),
(Column::String(c1), Column::String(c2)) => c1.append(c2),
_ => panic!("Unexpected result from thread"),
}
}
buffer.clear();
}
parsed_data

DataFrame {
data: parsed_data,
schema,
}
}

fn get(&self, i: u64, j: u64) -> Data {
match &self[i as usize] {
/// Get the (i,j) element from the DataFrame
pub fn get(&self, i: u64, j: u64) -> Data {
match &self.data[i as usize] {
Column::Bool(b) => {
if let Some(val) = &b[j as usize] {
Data::Bool(*val)
Expand Down Expand Up @@ -191,6 +216,66 @@ impl DataFrame for Vec<Column> {
}
}

fn read_chunk<T>(
schema: Vec<DataType>,
reader: &mut T,
from: u64,
len: u64,
) -> Vec<Column>
where
T: BufRead + Seek,
{
reader.seek(SeekFrom::Start(from)).unwrap();
let mut buffer = Vec::new();

let mut so_far = if from != 0 {
// throw away the first line
let l1_len = reader.read_until(b'\n', &mut buffer).unwrap();
buffer.clear();
l1_len as u64
} else {
0
};

let mut parsed_data = DataFrame::init(&schema).data;

loop {
let line_len = reader.read_until(b'\n', &mut buffer).unwrap();
so_far += line_len as u64;
if line_len == 0 || so_far >= len {
break;
}

// parse line with schema and place into the columnar vec here
match parse_line_with_schema(&buffer[..], &schema) {
None => {
buffer.clear();
continue;
}
Some(data) => {
let iter = data.iter().zip(parsed_data.iter_mut());
for (d, col) in iter {
match (d, col) {
(Data::Bool(b), Column::Bool(c)) => c.push(Some(*b)),
(Data::Int(i), Column::Int(c)) => c.push(Some(*i)),
(Data::Float(f), Column::Float(c)) => c.push(Some(*f)),
(Data::String(s), Column::String(c)) => {
c.push(Some(s.clone()))
}
(Data::Null, Column::Bool(c)) => c.push(None),
(Data::Null, Column::Int(c)) => c.push(None),
(Data::Null, Column::Float(c)) => c.push(None),
(Data::Null, Column::String(c)) => c.push(None),
_ => panic!("Parser Failed"),
}
}
}
}
buffer.clear();
}
parsed_data
}

#[cfg(test)]
mod tests {

Expand All @@ -212,33 +297,29 @@ mod tests {
// Simple case : first nd last line are not discarded
let mut input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>");
let parsed1: Vec<Column> =
DataFrame::from_file(schema.clone(), &mut input, 0, 26);
read_chunk(schema.clone(), &mut input, 0, 26);
assert_eq!(parsed1, expected.clone());

// last line is discarded
let mut larger_input = Cursor::new(b"<1><1>\n<a><0>\n<1.2><>\n<no><1>");
let parsed2: Vec<Column> =
DataFrame::from_file(schema.clone(), &mut larger_input, 0, 27);
read_chunk(schema.clone(), &mut larger_input, 0, 27);
assert_eq!(parsed2, expected.clone());

// first line is discarded
let mut input_skipped_l1 =
Cursor::new(b"<b><1>\n<1><1>\n<a><0>\n<1.2><>");
let parsed3: Vec<Column> =
DataFrame::from_file(schema.clone(), &mut input_skipped_l1, 3, 26);
read_chunk(schema.clone(), &mut input_skipped_l1, 3, 26);
assert_eq!(parsed3, expected.clone());

// Invalid line is discarded
// Note since parsed lines with schema is correctly tested we do not
// need to test every possible way a line can be invalid here
let mut input_with_invalid =
Cursor::new(b"<1><1>\n<a><0>\n<c><1.2>\n<1.2><>");
let parsed4: Vec<Column> = DataFrame::from_file(
schema.clone(),
&mut input_with_invalid,
0,
32,
);
let parsed4: Vec<Column> =
read_chunk(schema.clone(), &mut input_with_invalid, 0, 32);
assert_eq!(parsed4, expected.clone());
}
}
Loading

0 comments on commit a334a7e

Please sign in to comment.