diff --git a/examples/avro_rw.rs b/examples/avro_rw.rs index 7e3f33a..f63fe7c 100644 --- a/examples/avro_rw.rs +++ b/examples/avro_rw.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use apache_avro::AvroSchema; use clap::Parser; -use renoir::prelude::*; +use renoir::{prelude::*, Replication}; use serde::{Deserialize, Serialize}; #[derive(Debug, Parser)] @@ -28,7 +28,9 @@ fn main() { let ctx = StreamContext::new(conf.clone()); let source = if let Some(input) = opts.input { - ctx.stream_avro(input).into_boxed() + ctx.stream_avro_file(Replication::One, input) + .from_avro_value::() + .into_boxed() } else { ctx.stream_iter((0..100).map(|i| InputType { s: format!("{i:o}"), diff --git a/src/lib.rs b/src/lib.rs index 3cd5f07..aaa62e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -152,4 +152,5 @@ pub mod prelude { #[cfg(feature = "timestamp")] pub use super::operator::window::{EventTimeWindow, TransactionWindow}; pub use super::{BatchMode, RuntimeConfig, StreamContext}; + pub use super::Replication; } diff --git a/src/operator/source/avro.rs b/src/operator/source/avro.rs index 63eb48d..1d476db 100644 --- a/src/operator/source/avro.rs +++ b/src/operator/source/avro.rs @@ -1,111 +1,108 @@ use std::fmt::Display; use std::fs::File; -use std::io::BufReader; -use std::marker::PhantomData; +use std::io::{BufReader, Read}; use std::path::PathBuf; -use apache_avro::Reader; +use apache_avro::types::Value; +use apache_avro::{from_value, Reader}; use serde::Deserialize; use crate::block::{BlockStructure, OperatorKind, OperatorStructure, Replication}; use crate::operator::source::Source; -use crate::operator::{Data, Operator, StreamElement}; +use crate::operator::{Operator, StreamElement}; use crate::scheduler::ExecutionMetadata; -use crate::Stream; +use crate::{CoordUInt, Stream}; -/// Source that reads and parses a CSV file. -/// -/// The file is divided in chunks and is read concurrently by multiple replicas. -pub struct AvroSource Deserialize<'a>> { - /// Path of the file. +pub trait MakeReader: Send + Clone { + type Reader: Read + Send; + fn make_reader(&self, index: CoordUInt, peers: CoordUInt) -> Self::Reader; +} + +#[derive(Clone)] +pub struct MakeFileReader { path: PathBuf, - /// Reader used to parse the CSV file. - reader: Option>>, - /// Whether the reader has terminated its job. +} + +impl MakeReader for MakeFileReader { + type Reader = BufReader; + fn make_reader(&self, _: CoordUInt, _: CoordUInt) -> Self::Reader { + let file = File::options() + .read(true) + .write(false) + .open(&self.path) + .expect("could not open file"); + BufReader::new(file) + } +} + +impl MakeReader for T +where + T: Fn(CoordUInt, CoordUInt) -> R + Send + Clone, + R: Read + Send, +{ + type Reader = R; + fn make_reader(&self, index: CoordUInt, peers: CoordUInt) -> Self::Reader { + (self)(index, peers) + } +} + +pub struct AvroSource { + replication: Replication, + + make_reader: R, + reader: Option>, + terminated: bool, - _out: PhantomData, } -impl Deserialize<'a>> Display for AvroSource { +impl Display for AvroSource { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "AvroSource<{}>", std::any::type_name::()) + write!(f, "AvroSource<{}>", std::any::type_name::()) } } -impl Deserialize<'a>> AvroSource { - /// Create a new source that reads and parse the lines of a CSV file. - /// - /// The file is partitioned into as many chunks as replicas, each replica has to have the - /// **same** file in the same path. It is guaranteed that each line of the file is emitted by - /// exactly one replica. - /// - /// After creating the source it's possible to customize its behaviour using one of the - /// available methods. By default it is assumed that the delimiter is `,` and the CSV has - /// headers. - /// - /// Each line will be deserialized into the type `Out`, so the structure of the CSV must be - /// valid for that deserialization. The [`csv`](https://crates.io/crates/csv) crate is used for - /// the parsing. - /// - /// **Note**: the file must be readable and its size must be available. This means that only - /// regular files can be read. - /// - /// ## Example - /// - /// ``` - /// # use renoir::{StreamContext, RuntimeConfig}; - /// # use renoir::operator::source::AvroSource; - /// # use serde::{Deserialize, Serialize}; - /// # let mut env = StreamContext::new_local(); - /// #[derive(Clone, Deserialize, Serialize)] - /// struct Thing { - /// what: String, - /// count: u64, - /// } - /// let source = AvroSource::::new("/datasets/huge.csv"); - /// let s = env.stream(source); - /// ``` - pub fn new>(path: P) -> Self { +impl AvroSource { + pub fn from_file + Send + Clone>(replication: Replication, path: P) -> Self { Self { - path: path.into(), + replication, + make_reader: MakeFileReader { path: path.into() }, reader: None, terminated: false, - _out: PhantomData, } } } -impl Deserialize<'a>> Source for AvroSource { - fn replication(&self) -> Replication { - Replication::One +impl AvroSource { + pub fn from_fn(replication: Replication, f: F) -> Self { + Self { + replication, + make_reader: f, + reader: None, + terminated: false, + } } } -impl Deserialize<'a>> Operator for AvroSource { - type Out = Out; +impl Source for AvroSource { + fn replication(&self) -> Replication { + self.replication + } +} - fn setup(&mut self, _metadata: &mut ExecutionMetadata) { - // let global_id = metadata.global_id; - // let instances = metadata.replicas.len(); +impl Operator for AvroSource { + type Out = Value; - let file = File::options() - .read(true) - .write(false) - .open(&self.path) - .unwrap_or_else(|err| { - panic!( - "AvroSource: error while opening file {:?}: {:?}", - self.path, err - ) - }); - - let buf_reader = BufReader::new(file); - let reader = Reader::new(buf_reader).expect("failed to create avro reader"); + fn setup(&mut self, metadata: &mut ExecutionMetadata) { + let global_id = metadata.global_id; + let instances = metadata.replicas.len() as CoordUInt; - self.reader = Some(reader); + self.reader = Some( + Reader::new(self.make_reader.make_reader(global_id, instances)) + .expect("failed to create avro reader"), + ); } - fn next(&mut self) -> StreamElement { + fn next(&mut self) -> StreamElement { if self.terminated { return StreamElement::Terminate; } @@ -117,10 +114,7 @@ impl Deserialize<'a>> Operator for AvroSource { match reader.next() { Some(Ok(el)) => { tracing::trace!("avro Value: {el:?}"); - StreamElement::Item( - apache_avro::from_value(&el) - .expect("could not deserialize from avro Value to specified type"), - ) + StreamElement::Item(el) } Some(Err(e)) => panic!("Error while reading Aveo file: {:?}", e), None => { @@ -131,101 +125,59 @@ impl Deserialize<'a>> Operator for AvroSource { } fn structure(&self) -> BlockStructure { - let mut operator = OperatorStructure::new::("AvroSource"); + let mut operator = OperatorStructure::new::("AvroSource"); operator.kind = OperatorKind::Source; BlockStructure::default().add_operator(operator) } } -impl Deserialize<'a>> Clone for AvroSource { +impl Clone for AvroSource { fn clone(&self) -> Self { assert!( self.reader.is_none(), "AvroSource must be cloned before calling setup" ); Self { - path: self.path.clone(), reader: None, terminated: false, - _out: PhantomData, + replication: self.replication.clone(), + make_reader: self.make_reader.clone(), } } } impl crate::StreamContext { /// Convenience method, creates a `AvroSource` and makes a stream using `StreamContext::stream` - pub fn stream_avro Deserialize<'a>>( + pub fn stream_avro_file( &self, + replication: Replication, path: impl Into, - ) -> Stream> { - let source = AvroSource::new(path); + ) -> Stream> { + let source = AvroSource::from_file(replication, path.into()); + self.stream(source) + } + + pub fn stream_avro< + F: Fn(CoordUInt, CoordUInt) -> R + Send + Clone + 'static, + R: Read + Send, + >( + &self, + replication: Replication, + f: F, + ) -> Stream> { + let source = AvroSource::from_fn(replication, f); self.stream(source) } } -#[cfg(test)] -mod tests { - // use std::io::Write; - - // use itertools::Itertools; - // use serde::{Deserialize, Serialize}; - // use tempfile::NamedTempFile; - - // use crate::config::RuntimeConfig; - // use crate::environment::StreamContext; - // use crate::operator::source::AvroSource; - - // #[test] - // fn csv_without_headers() { - // for num_records in 0..100 { - // for terminator in &["\n", "\r\n"] { - // let file = NamedTempFile::new().unwrap(); - // for i in 0..num_records { - // write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap(); - // } - - // let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); - // let source = AvroSource::<(i32, i32)>::new(file.path()).has_headers(false); - // let res = env.stream(source).shuffle().collect_vec(); - // env.execute_blocking(); - - // let mut res = res.get().unwrap(); - // res.sort_unstable(); - // assert_eq!(res, (0..num_records).map(|x| (x, x + 1)).collect_vec()); - // } - // } - // } - - // #[test] - // fn csv_with_headers() { - // #[derive(Clone, Serialize, Deserialize)] - // struct T { - // a: i32, - // b: i32, - // } - - // for num_records in 0..100 { - // for terminator in &["\n", "\r\n"] { - // let file = NamedTempFile::new().unwrap(); - // write!(file.as_file(), "a,b{terminator}").unwrap(); - // for i in 0..num_records { - // write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap(); - // } - - // let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); - // let source = AvroSource::::new(file.path()); - // let res = env.stream(source).shuffle().collect_vec(); - // env.execute_blocking(); - - // let res = res - // .get() - // .unwrap() - // .into_iter() - // .map(|x| (x.a, x.b)) - // .sorted() - // .collect_vec(); - // assert_eq!(res, (0..num_records).map(|x| (x, x + 1)).collect_vec()); - // } - // } - // } +impl Stream +where + Op: Operator + 'static, +{ + pub fn from_avro_value Deserialize<'de> + Send>(self) -> Stream> { + self.map(|v| { + let de = from_value(&v); + de.expect("failed to deserialize") + }) + } }