Skip to content


Update avro source impl
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed May 27, 2024
1 parent 9cc749d commit b989c6a
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 153 deletions.
6 changes: 4 additions & 2 deletions examples/
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path::PathBuf;

use apache_avro::AvroSchema;
use clap::Parser;
use renoir::prelude::*;
use renoir::{prelude::*, Replication};
use serde::{Deserialize, Serialize};

#[derive(Debug, Parser)]
Expand All @@ -28,7 +28,9 @@ fn main() {
let ctx = StreamContext::new(conf.clone());

let source = if let Some(input) = opts.input {
ctx.stream_avro_file(Replication::One, input)
} else {
ctx.stream_iter((0..100).map(|i| InputType {
s: format!("{i:o}"),
Expand Down
1 change: 1 addition & 0 deletions src/
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,5 @@ pub mod prelude {
#[cfg(feature = "timestamp")]
pub use super::operator::window::{EventTimeWindow, TransactionWindow};
pub use super::{BatchMode, RuntimeConfig, StreamContext};
pub use super::Replication;
254 changes: 103 additions & 151 deletions src/operator/source/
Original file line number Diff line number Diff line change
@@ -1,111 +1,108 @@
use std::fmt::Display;
use std::fs::File;
use std::io::BufReader;
use std::marker::PhantomData;
use std::io::{BufReader, Read};
use std::path::PathBuf;

use apache_avro::Reader;
use apache_avro::types::Value;
use apache_avro::{from_value, Reader};
use serde::Deserialize;

use crate::block::{BlockStructure, OperatorKind, OperatorStructure, Replication};
use crate::operator::source::Source;
use crate::operator::{Data, Operator, StreamElement};
use crate::operator::{Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;
use crate::Stream;
use crate::{CoordUInt, Stream};

/// Source that reads and parses a CSV file.
/// The file is divided in chunks and is read concurrently by multiple replicas.
pub struct AvroSource<Out: Data + for<'a> Deserialize<'a>> {
/// Path of the file.
pub trait MakeReader: Send + Clone {
type Reader: Read + Send;
fn make_reader(&self, index: CoordUInt, peers: CoordUInt) -> Self::Reader;

pub struct MakeFileReader {
path: PathBuf,
/// Reader used to parse the CSV file.
reader: Option<Reader<'static, BufReader<File>>>,
/// Whether the reader has terminated its job.

impl MakeReader for MakeFileReader {
type Reader = BufReader<File>;
fn make_reader(&self, _: CoordUInt, _: CoordUInt) -> Self::Reader {
let file = File::options()
.expect("could not open file");

impl<T, R> MakeReader for T
T: Fn(CoordUInt, CoordUInt) -> R + Send + Clone,
R: Read + Send,
type Reader = R;
fn make_reader(&self, index: CoordUInt, peers: CoordUInt) -> Self::Reader {
(self)(index, peers)

pub struct AvroSource<R: MakeReader> {
replication: Replication,

make_reader: R,
reader: Option<Reader<'static, R::Reader>>,

terminated: bool,
_out: PhantomData<Out>,

impl<Out: Data + for<'a> Deserialize<'a>> Display for AvroSource<Out> {
impl<R: MakeReader> Display for AvroSource<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "AvroSource<{}>", std::any::type_name::<Out>())
write!(f, "AvroSource<{}>", std::any::type_name::<Value>())

impl<Out: Data + for<'a> Deserialize<'a>> AvroSource<Out> {
/// Create a new source that reads and parse the lines of a CSV file.
/// The file is partitioned into as many chunks as replicas, each replica has to have the
/// **same** file in the same path. It is guaranteed that each line of the file is emitted by
/// exactly one replica.
/// After creating the source it's possible to customize its behaviour using one of the
/// available methods. By default it is assumed that the delimiter is `,` and the CSV has
/// headers.
/// Each line will be deserialized into the type `Out`, so the structure of the CSV must be
/// valid for that deserialization. The [`csv`]( crate is used for
/// the parsing.
/// **Note**: the file must be readable and its size must be available. This means that only
/// regular files can be read.
/// ## Example
/// ```
/// # use renoir::{StreamContext, RuntimeConfig};
/// # use renoir::operator::source::AvroSource;
/// # use serde::{Deserialize, Serialize};
/// # let mut env = StreamContext::new_local();
/// #[derive(Clone, Deserialize, Serialize)]
/// struct Thing {
/// what: String,
/// count: u64,
/// }
/// let source = AvroSource::<Thing>::new("/datasets/huge.csv");
/// let s =;
/// ```
pub fn new<P: Into<PathBuf>>(path: P) -> Self {
impl AvroSource<MakeFileReader> {
pub fn from_file<P: Into<PathBuf> + Send + Clone>(replication: Replication, path: P) -> Self {
Self {
path: path.into(),
make_reader: MakeFileReader { path: path.into() },
reader: None,
terminated: false,
_out: PhantomData,

impl<Out: Data + for<'a> Deserialize<'a>> Source for AvroSource<Out> {
fn replication(&self) -> Replication {
impl<F: MakeReader> AvroSource<F> {
pub fn from_fn(replication: Replication, f: F) -> Self {
Self {
make_reader: f,
reader: None,
terminated: false,

impl<Out: Data + for<'a> Deserialize<'a>> Operator for AvroSource<Out> {
type Out = Out;
impl<R: MakeReader> Source for AvroSource<R> {
fn replication(&self) -> Replication {

fn setup(&mut self, _metadata: &mut ExecutionMetadata) {
// let global_id = metadata.global_id;
// let instances = metadata.replicas.len();
impl<R: MakeReader> Operator for AvroSource<R> {
type Out = Value;

let file = File::options()
.unwrap_or_else(|err| {
"AvroSource: error while opening file {:?}: {:?}",
self.path, err

let buf_reader = BufReader::new(file);
let reader = Reader::new(buf_reader).expect("failed to create avro reader");
fn setup(&mut self, metadata: &mut ExecutionMetadata) {
let global_id = metadata.global_id;
let instances = metadata.replicas.len() as CoordUInt;

self.reader = Some(reader);
self.reader = Some(
Reader::new(self.make_reader.make_reader(global_id, instances))
.expect("failed to create avro reader"),

fn next(&mut self) -> StreamElement<Out> {
fn next(&mut self) -> StreamElement<Value> {
if self.terminated {
return StreamElement::Terminate;
Expand All @@ -117,10 +114,7 @@ impl<Out: Data + for<'a> Deserialize<'a>> Operator for AvroSource<Out> {
match {
Some(Ok(el)) => {
tracing::trace!("avro Value: {el:?}");
.expect("could not deserialize from avro Value to specified type"),
Some(Err(e)) => panic!("Error while reading Aveo file: {:?}", e),
None => {
Expand All @@ -131,101 +125,59 @@ impl<Out: Data + for<'a> Deserialize<'a>> Operator for AvroSource<Out> {

fn structure(&self) -> BlockStructure {
let mut operator = OperatorStructure::new::<Out, _>("AvroSource");
let mut operator = OperatorStructure::new::<Value, _>("AvroSource");
operator.kind = OperatorKind::Source;

impl<Out: Data + for<'a> Deserialize<'a>> Clone for AvroSource<Out> {
impl<R: MakeReader + Clone> Clone for AvroSource<R> {
fn clone(&self) -> Self {
"AvroSource must be cloned before calling setup"
Self {
path: self.path.clone(),
reader: None,
terminated: false,
_out: PhantomData,
replication: self.replication.clone(),
make_reader: self.make_reader.clone(),

impl crate::StreamContext {
/// Convenience method, creates a `AvroSource` and makes a stream using `StreamContext::stream`
pub fn stream_avro<T: Data + for<'a> Deserialize<'a>>(
pub fn stream_avro_file(
replication: Replication,
path: impl Into<PathBuf>,
) -> Stream<AvroSource<T>> {
let source = AvroSource::new(path);
) -> Stream<AvroSource<MakeFileReader>> {
let source = AvroSource::from_file(replication, path.into());

pub fn stream_avro<
F: Fn(CoordUInt, CoordUInt) -> R + Send + Clone + 'static,
R: Read + Send,
replication: Replication,
f: F,
) -> Stream<AvroSource<F>> {
let source = AvroSource::from_fn(replication, f);

mod tests {
// use std::io::Write;

// use itertools::Itertools;
// use serde::{Deserialize, Serialize};
// use tempfile::NamedTempFile;

// use crate::config::RuntimeConfig;
// use crate::environment::StreamContext;
// use crate::operator::source::AvroSource;

// #[test]
// fn csv_without_headers() {
// for num_records in 0..100 {
// for terminator in &["\n", "\r\n"] {
// let file = NamedTempFile::new().unwrap();
// for i in 0..num_records {
// write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap();
// }

// let env = StreamContext::new(RuntimeConfig::local(4).unwrap());
// let source = AvroSource::<(i32, i32)>::new(file.path()).has_headers(false);
// let res =;
// env.execute_blocking();

// let mut res = res.get().unwrap();
// res.sort_unstable();
// assert_eq!(res, (0..num_records).map(|x| (x, x + 1)).collect_vec());
// }
// }
// }

// #[test]
// fn csv_with_headers() {
// #[derive(Clone, Serialize, Deserialize)]
// struct T {
// a: i32,
// b: i32,
// }

// for num_records in 0..100 {
// for terminator in &["\n", "\r\n"] {
// let file = NamedTempFile::new().unwrap();
// write!(file.as_file(), "a,b{terminator}").unwrap();
// for i in 0..num_records {
// write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap();
// }

// let env = StreamContext::new(RuntimeConfig::local(4).unwrap());
// let source = AvroSource::<T>::new(file.path());
// let res =;
// env.execute_blocking();

// let res = res
// .get()
// .unwrap()
// .into_iter()
// .map(|x| (x.a, x.b))
// .sorted()
// .collect_vec();
// assert_eq!(res, (0..num_records).map(|x| (x, x + 1)).collect_vec());
// }
// }
// }
impl<Op> Stream<Op>
Op: Operator<Out = Value> + 'static,
pub fn from_avro_value<T: for<'de> Deserialize<'de> + Send>(self) -> Stream<impl Operator<Out = T>> {|v| {
let de = from_value(&v);
de.expect("failed to deserialize")

0 comments on commit b989c6a

Please sign in to comment.