From d4a0357d0d806078d8491ad8c4cd16df6e4d88ac Mon Sep 17 00:00:00 2001 From: imDema Date: Tue, 16 Apr 2024 17:34:44 +0200 Subject: [PATCH] Add fold_scan and reduce_scan, update examples args --- Cargo.lock | 97 +++++++------- Cargo.toml | 4 +- examples/car_accidents.rs | 6 +- examples/collatz.rs | 4 +- examples/connected_components.rs | 10 +- examples/kmeans.rs | 8 +- examples/pagerank.rs | 10 +- examples/pagerank_stateful.rs | 10 +- examples/pagerank_timely.rs | 4 +- examples/rolling_top_words_e2e.rs | 2 +- examples/transitive_closure.rs | 6 +- examples/triangles_fold.rs | 4 +- examples/triangles_rich_map.rs | 4 +- examples/wordcount.rs | 4 +- examples/wordcount_assoc.rs | 8 +- examples/wordcount_opt.rs | 4 +- examples/wordcount_windowed.rs | 4 +- src/lib.rs | 33 +---- src/operator/iteration/iterate.rs | 18 +-- src/operator/mod.rs | 90 +++++++++++++ src/profiler/backend.rs | 109 ---------------- src/profiler/bucket_profiler.rs | 203 ++++++++++++++++++++++++++++++ src/profiler/metrics.rs | 98 --------------- src/profiler/mod.rs | 172 +++++++++++++++---------- src/runner.rs | 47 +++---- src/scheduler.rs | 20 +-- tests/fold_scan.rs | 58 +++++++++ 27 files changed, 587 insertions(+), 450 deletions(-) delete mode 100644 src/profiler/backend.rs create mode 100644 src/profiler/bucket_profiler.rs delete mode 100644 src/profiler/metrics.rs create mode 100644 tests/fold_scan.rs diff --git a/Cargo.lock b/Cargo.lock index 321c9589..863f0f8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,9 +176,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.92" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2678b2e3449475e95b0aa6f9b506a28e61b3dc8996592b983695e8ebb58a8b41" +checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" [[package]] name = "cfg-if" @@ -244,7 +244,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -403,9 +403,9 @@ dependencies = [ [[package]] name = "deunicode" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6e854126756c496b8c81dec88f9a706b15b875c5849d4097a3854476b9fdf94" +checksum = "322ef0094744e63628e6f0eb2295517f79276a5b342a4c2ff3042566ca181d4e" [[package]] name = "digest" @@ -419,9 +419,9 @@ dependencies = [ [[package]] name = "either" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" [[package]] name = "env_filter" @@ -546,7 +546,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -1042,9 +1042,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e" dependencies = [ "unicode-ident", ] @@ -1272,14 +1272,14 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] name = "serde_json" -version = "1.0.115" +version = "1.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd" +checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" dependencies = [ "itoa", "ryu", @@ -1392,9 +1392,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.58" +version = "2.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687" +checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" dependencies = [ "proc-macro2", "quote", @@ -1430,7 +1430,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -1478,7 +1478,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -1535,7 +1535,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -1667,7 +1667,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", "wasm-bindgen-shared", ] @@ -1689,7 +1689,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1767,7 +1767,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -1787,17 +1787,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -1808,9 +1809,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" [[package]] name = "windows_aarch64_msvc" @@ -1820,9 +1821,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" [[package]] name = "windows_i686_gnu" @@ -1832,9 +1833,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" [[package]] name = "windows_i686_msvc" @@ -1844,9 +1851,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" [[package]] name = "windows_x86_64_gnu" @@ -1856,9 +1863,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" [[package]] name = "windows_x86_64_gnullvm" @@ -1868,9 +1875,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" [[package]] name = "windows_x86_64_msvc" @@ -1880,9 +1887,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winnow" @@ -1919,5 +1926,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] diff --git a/Cargo.toml b/Cargo.toml index b1bbfb67..9656dd17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ homepage = "https://github.com/deib-polimi/renoir" readme = "README.md" [features] -default = ["clap", "ssh", "timestamp"] +default = ["clap", "ssh", "timestamp",] timestamp = [] ssh = ["ssh2", "whoami", "shell-escape", "sha2", "base64"] tokio = ["dep:tokio", "futures", "tokio/net", "tokio/io-util", "tokio/time", "tokio/rt-multi-thread", "tokio/macros"] @@ -35,7 +35,7 @@ derivative = "2.2.0" # serialization serde = { version = "1.0.197", features = ["derive"] } -serde_json = "1.0.115" +serde_json = "1.0.116" bincode = "1.3.3" toml = "0.8.12" diff --git a/examples/car_accidents.rs b/examples/car_accidents.rs index d6e0485a..bd586946 100644 --- a/examples/car_accidents.rs +++ b/examples/car_accidents.rs @@ -292,14 +292,14 @@ fn print_query3( fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 2 { + if args.len() != 3 { panic!( "Usage: {} dataset share_source", std::env::args().next().unwrap() ); } - let path = &args[0]; - let share_source = &args[1]; + let path = &args[1]; + let share_source = &args[2]; let share_source = match share_source.as_str() { "true" => true, "false" => false, diff --git a/examples/collatz.rs b/examples/collatz.rs index 466cdeba..3c9094b5 100644 --- a/examples/collatz.rs +++ b/examples/collatz.rs @@ -6,10 +6,10 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the number of integers to check"); } - let limit: u64 = args[0].parse().unwrap(); + let limit: u64 = args[1].parse().unwrap(); let num_iter = 1000; config.spawn_remote_workers(); diff --git a/examples/connected_components.rs b/examples/connected_components.rs index 1122e73b..d0a119af 100644 --- a/examples/connected_components.rs +++ b/examples/connected_components.rs @@ -29,13 +29,13 @@ impl State { fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 4 { + if args.len() != 5 { panic!("Pass the number of iterations, number of vertices, vertex dataset and edges dataset as arguments"); } - let num_iterations: usize = args[0].parse().expect("Invalid number of iterations"); - let num_vertices: usize = args[1].parse().expect("Invalid number of vertices"); - let path_vertices = &args[2]; - let path_edges = &args[3]; + let num_iterations: usize = args[1].parse().expect("Invalid number of iterations"); + let num_vertices: usize = args[2].parse().expect("Invalid number of vertices"); + let path_vertices = &args[3]; + let path_edges = &args[4]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/kmeans.rs b/examples/kmeans.rs index 670c2a8e..33039cca 100644 --- a/examples/kmeans.rs +++ b/examples/kmeans.rs @@ -108,12 +108,12 @@ impl State { fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 3 { + if args.len() != 4 { panic!("Pass the number of centroid, the number of iterations and the dataset path as arguments"); } - let num_centroids: usize = args[0].parse().expect("Invalid number of centroids"); - let num_iters: usize = args[1].parse().expect("Invalid number of iterations"); - let path = &args[2]; + let num_centroids: usize = args[1].parse().expect("Invalid number of centroids"); + let num_iters: usize = args[2].parse().expect("Invalid number of iterations"); + let path = &args[3]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/pagerank.rs b/examples/pagerank.rs index 91e490b1..ca541b50 100644 --- a/examples/pagerank.rs +++ b/examples/pagerank.rs @@ -10,13 +10,13 @@ const DAMPENING: f64 = 0.85; fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 4 { + if args.len() != 5 { panic!("Pass the number of iterations, number of pages, pages dataset and links dataset as arguments"); } - let num_iterations: usize = args[0].parse().expect("Invalid number of iterations"); - let num_pages: usize = args[1].parse().expect("Invalid number of pages"); - let path_pages = &args[2]; - let path_links = &args[3]; + let num_iterations: usize = args[1].parse().expect("Invalid number of iterations"); + let num_pages: usize = args[2].parse().expect("Invalid number of pages"); + let path_pages = &args[3]; + let path_links = &args[4]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/pagerank_stateful.rs b/examples/pagerank_stateful.rs index 9842295a..0ba7d959 100644 --- a/examples/pagerank_stateful.rs +++ b/examples/pagerank_stateful.rs @@ -16,13 +16,13 @@ const DAMPENING: f64 = 0.85; fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 4 { + if args.len() != 5 { panic!("Pass the number of iterations, number of pages, pages dataset and links dataset as arguments"); } - let num_iterations: usize = args[0].parse().expect("Invalid number of iterations"); - let num_pages: usize = args[1].parse().expect("Invalid number of pages"); - let path_pages = &args[2]; - let path_links = &args[3]; + let num_iterations: usize = args[1].parse().expect("Invalid number of iterations"); + let num_pages: usize = args[2].parse().expect("Invalid number of pages"); + let path_pages = &args[3]; + let path_links = &args[4]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/pagerank_timely.rs b/examples/pagerank_timely.rs index 2ea17fba..02307ced 100644 --- a/examples/pagerank_timely.rs +++ b/examples/pagerank_timely.rs @@ -14,8 +14,8 @@ fn main() { let (config, args) = RuntimeConfig::from_args(); // command-line args: numbers of nodes and edges in the random graph. - let nodes: u64 = args[0].parse().unwrap(); - let edges: u64 = args[1].parse().unwrap(); + let nodes: u64 = args[1].parse().unwrap(); + let edges: u64 = args[2].parse().unwrap(); let max_iter: usize = 10000; config.spawn_remote_workers(); diff --git a/examples/rolling_top_words_e2e.rs b/examples/rolling_top_words_e2e.rs index ce1531e4..d6892b0c 100644 --- a/examples/rolling_top_words_e2e.rs +++ b/examples/rolling_top_words_e2e.rs @@ -111,7 +111,7 @@ fn main() { let k = 4; let (config, args) = RuntimeConfig::from_args(); - let limit: u64 = args[0].parse().expect("Invalid number of events"); + let limit: u64 = args[1].parse().expect("Invalid number of events"); config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/transitive_closure.rs b/examples/transitive_closure.rs index c7c3156c..950548bd 100644 --- a/examples/transitive_closure.rs +++ b/examples/transitive_closure.rs @@ -7,11 +7,11 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 2 { + if args.len() != 3 { panic!("Pass the number of iterations and the edges dataset as arguments"); } - let num_iterations: usize = args[0].parse().expect("Invalid number of iterations"); - let path_edges = &args[1]; + let num_iterations: usize = args[1].parse().expect("Invalid number of iterations"); + let path_edges = &args[2]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/triangles_fold.rs b/examples/triangles_fold.rs index 4da089b0..a8ae3bc7 100644 --- a/examples/triangles_fold.rs +++ b/examples/triangles_fold.rs @@ -7,10 +7,10 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the dataset path as an argument"); } - let path = &args[0]; + let path = &args[1]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/triangles_rich_map.rs b/examples/triangles_rich_map.rs index c9bf9afc..8b9bb1df 100644 --- a/examples/triangles_rich_map.rs +++ b/examples/triangles_rich_map.rs @@ -9,10 +9,10 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the dataset path as an argument"); } - let path = &args[0]; + let path = &args[1]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/wordcount.rs b/examples/wordcount.rs index 6d1fc6cd..9457600e 100644 --- a/examples/wordcount.rs +++ b/examples/wordcount.rs @@ -16,10 +16,10 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { tracing_subscriber::fmt::init(); let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the dataset path as an argument"); } - let path = &args[0]; + let path = &args[1]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/wordcount_assoc.rs b/examples/wordcount_assoc.rs index 87784dab..78e78954 100644 --- a/examples/wordcount_assoc.rs +++ b/examples/wordcount_assoc.rs @@ -12,10 +12,10 @@ fn main() { tracing_subscriber::fmt::init(); let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the dataset path as an argument"); } - let path = &args[0]; + let path = &args[1]; config.spawn_remote_workers(); let env = StreamContext::new(config); @@ -51,10 +51,10 @@ async fn main() { tracing_subscriber::fmt::init(); let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the dataset path as an argument"); } - let path = &args[0]; + let path = &args[1]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/wordcount_opt.rs b/examples/wordcount_opt.rs index 54e35f89..bb3c3a50 100644 --- a/examples/wordcount_opt.rs +++ b/examples/wordcount_opt.rs @@ -11,10 +11,10 @@ fn main() { env_logger::init(); let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the dataset path as an argument"); } - let path = &args[0]; + let path = &args[1]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/examples/wordcount_windowed.rs b/examples/wordcount_windowed.rs index 2d380f8a..524fc499 100644 --- a/examples/wordcount_windowed.rs +++ b/examples/wordcount_windowed.rs @@ -9,10 +9,10 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { let (config, args) = RuntimeConfig::from_args(); - if args.len() != 1 { + if args.len() != 2 { panic!("Pass the dataset path as an argument"); } - let path = &args[0]; + let path = &args[1]; config.spawn_remote_workers(); let env = StreamContext::new(config); diff --git a/src/lib.rs b/src/lib.rs index 977f49ec..3cd5f079 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,11 +115,7 @@ Refer to the [examples](examples/) directory for an extended set of working exam #[macro_use] extern crate derivative; #[macro_use] -extern crate log; - -use std::ops::{Add, AddAssign}; - -use serde::{Deserialize, Serialize}; +extern crate tracing; pub use block::structure; pub use block::BatchMode; @@ -131,10 +127,6 @@ pub use operator::iteration::IterationStateHandle; pub use scheduler::ExecutionMetadata; pub use stream::{KeyedStream, Stream, WindowedStream}; -use crate::block::BlockStructure; -use crate::network::Coord; -use crate::profiler::ProfilerResult; - pub(crate) mod block; pub(crate) mod channel; pub mod config; @@ -161,26 +153,3 @@ pub mod prelude { pub use super::operator::window::{EventTimeWindow, TransactionWindow}; pub use super::{BatchMode, RuntimeConfig, StreamContext}; } - -/// Tracing information of the current execution. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub(crate) struct TracingData { - structures: Vec<(Coord, BlockStructure)>, - profilers: Vec, -} - -impl Add for TracingData { - type Output = TracingData; - - fn add(mut self, rhs: Self) -> Self::Output { - self += rhs; - self - } -} - -impl AddAssign for TracingData { - fn add_assign(&mut self, mut rhs: Self) { - self.structures.append(&mut rhs.structures); - self.profilers.append(&mut rhs.profilers); - } -} diff --git a/src/operator/iteration/iterate.rs b/src/operator/iteration/iterate.rs index d201649b..969e9064 100644 --- a/src/operator/iteration/iterate.rs +++ b/src/operator/iteration/iterate.rs @@ -360,19 +360,14 @@ where /// sorted.sort(); /// assert_eq!(sorted, vec![30, 31, 32]); /// ``` - pub fn iterate< - Body, - StateUpdate: ExchangeData + Default, - State: ExchangeData + Sync, - OperatorChain2, - >( + pub fn iterate( self, num_iterations: usize, initial_state: State, body: Body, - local_fold: impl Fn(&mut StateUpdate, Out) + Send + Clone + 'static, - global_fold: impl Fn(&mut State, StateUpdate) + Send + Clone + 'static, - loop_condition: impl Fn(&mut State) -> bool + Send + Clone + 'static, + local_fold: L, + global_fold: G, + loop_condition: C, ) -> ( Stream>, Stream>, @@ -383,6 +378,11 @@ where IterationStateHandle, ) -> Stream, OperatorChain2: Operator + 'static, + L: Fn(&mut StateUpdate, Out) + Send + Clone + 'static, + G: Fn(&mut State, StateUpdate) + Send + Clone + 'static, + C: Fn(&mut State) -> bool + Send + Clone + 'static, + StateUpdate: ExchangeData + Default, + State: ExchangeData + Sync, { // this is required because if the iteration block is not present on all the hosts, the ones // without it won't receive the state updates. diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 11232cfc..88f0500a 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -845,6 +845,96 @@ where KeyedStream(new_stream) } + pub fn fold_scan( + self, + local_fold: L, + global_fold: G, + global_init: SG, + map: F, + ) -> Stream> + where + Op::Out: ExchangeData, + L: Fn(&mut SL, Op::Out) + Send + Clone + 'static, + G: Fn(&mut SG, SL) + Send + Clone + 'static, + F: Fn(Op::Out, &SG) -> O + Send + Clone + 'static, + SL: ExchangeData + Default, + SG: ExchangeData + Sync, + O: ExchangeData, + { + #[derive(Serialize, Deserialize, Clone)] + enum TwoPass { + First(I), + Second(I), + Output(O), + } + + let (state, s) = self.map(|el| TwoPass::First(el)).iterate( + 2, + None, + |s, state| { + s.map(move |el| match el { + TwoPass::First(el) => TwoPass::Second(el), + TwoPass::Second(el) => { + TwoPass::Output((map)(el, state.get().as_ref().unwrap())) + } + TwoPass::Output(_) => unreachable!(), + }) + }, + move |local: &mut SL, el| match el { + TwoPass::First(_) => {} + TwoPass::Second(el) => local_fold(local, el), + TwoPass::Output(_) => {} + }, + move |global: &mut Option, local| { + global_fold(global.get_or_insert(global_init.clone()), local) + }, + |_| true, + ); + + state.for_each(std::mem::drop); + s.map(|t| match t { + TwoPass::First(_) | TwoPass::Second(_) => unreachable!(), + TwoPass::Output(o) => o, + }) + } + + pub fn reduce_scan( + self, + first_map: F1, + reduce: R, + second_map: F2, + ) -> Stream> + where + Op::Out: ExchangeData, + F1: Fn(Op::Out) -> S + Send + Clone + 'static, + F2: Fn(Op::Out, &S) -> O + Send + Clone + 'static, + R: Fn(S, S) -> S + Send + Clone + 'static, + S: ExchangeData + Sync, + O: ExchangeData, + { + let reduce2 = reduce.clone(); + self.fold_scan( + move |acc: &mut Option, x| { + let map = (first_map)(x); + let cur = acc.take(); + *acc = Some(match cur { + Some(cur) => (reduce)(cur, map), + None => map, + }); + }, + move |global, local| { + let cur = global.take(); + *global = match (cur, local) { + (Some(cur), Some(local)) => Some((reduce2)(cur, local)), + (Some(a), None) | (None, Some(a)) => Some(a), + (None, None) => None, + }; + }, + None, + move |x, state| (second_map)(x, state.as_ref().unwrap()), + ) + } + /// Deduplicate elements. The resulting stream will contain exactly one occurrence /// for each unique element in the input stream /// diff --git a/src/profiler/backend.rs b/src/profiler/backend.rs deleted file mode 100644 index 8eb1b429..00000000 --- a/src/profiler/backend.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::time::Instant; - -use crate::channel::UnboundedChannelSender; -use crate::network::Coord; -use crate::profiler::metrics::{ProfilerBucket, ProfilerResult}; -use crate::profiler::{get_sender, Profiler, TimePoint}; - -/// The size of a bucket, in milliseconds. -/// -/// Each bucket will contain events for up to this amount of time. -const BUCKET_RESOLUTION_MS: TimePoint = 20; - -/// A thread-local implementation of a profiler. -/// -/// This will collect the events and store them inside buckets of size `BUCKET_RESOLUTION_MS`. All -/// the events inside a bucket are merged together. -#[derive(Clone, Debug)] -pub struct ProfilerBackend { - /// The name of the current thread. - thread_name: String, - /// The start of time: all the times will be relative to this instant, ie the start of the - /// execution. - start: Instant, - /// The list of all the buckets, sorted by their start time. - buckets: Vec, - /// The sender to use to send the profiler results back to the main thread. - sender: UnboundedChannelSender, -} - -impl ProfilerBackend { - pub fn new(start: Instant) -> Self { - Self { - thread_name: std::thread::current() - .name() - .unwrap_or("unnamed") - .to_string(), - start, - buckets: vec![ProfilerBucket::new(0)], - sender: get_sender(), - } - } - - /// Get the current time relative to the start of the execution. - fn now(&self) -> TimePoint { - self.start.elapsed().as_millis() as TimePoint - } - - /// Get a reference to the current bucket, creating a new one if needed. - #[inline] - fn bucket(&mut self) -> &mut ProfilerBucket { - let now = self.now(); - // the timestamp is outside the last bucket, create a new one - if now >= self.buckets.last().unwrap().start_ms + BUCKET_RESOLUTION_MS { - let start = now - now % BUCKET_RESOLUTION_MS; - self.buckets.push(ProfilerBucket::new(start)); - } - self.buckets.last_mut().unwrap() - } -} - -impl Drop for ProfilerBackend { - fn drop(&mut self) { - self.sender - .send(ProfilerResult { - thread_name: std::mem::take(&mut self.thread_name), - buckets: std::mem::take(&mut self.buckets), - }) - .unwrap(); - } -} - -impl Profiler for ProfilerBackend { - #[inline] - fn items_in(&mut self, from: Coord, to: Coord, amount: usize) { - let entry = self.bucket().metrics.items_in.entry((from, to)); - *entry.or_default() += amount; - } - - #[inline] - fn items_out(&mut self, from: Coord, to: Coord, amount: usize) { - let entry = self.bucket().metrics.items_out.entry((from, to)); - *entry.or_default() += amount; - } - - #[inline] - fn net_bytes_in(&mut self, from: Coord, to: Coord, amount: usize) { - let entry = self.bucket().metrics.net_messages_in.entry((from, to)); - let entry = entry.or_default(); - entry.0 += 1; - entry.1 += amount; - } - - #[inline] - fn net_bytes_out(&mut self, from: Coord, to: Coord, amount: usize) { - let entry = self.bucket().metrics.net_messages_out.entry((from, to)); - let entry = entry.or_default(); - entry.0 += 1; - entry.1 += amount; - } - - #[inline] - fn iteration_boundary(&mut self, leader_block_id: usize) { - let now = self.now(); - self.bucket() - .metrics - .iteration_boundaries - .push((leader_block_id, now)) - } -} diff --git a/src/profiler/bucket_profiler.rs b/src/profiler/bucket_profiler.rs new file mode 100644 index 00000000..530d6249 --- /dev/null +++ b/src/profiler/bucket_profiler.rs @@ -0,0 +1,203 @@ +use std::time::Instant; + +use crate::network::Coord; +use crate::scheduler::BlockId; +use flume::Sender; +use std::collections::HashMap; + +use serde::ser::SerializeSeq; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::block::CoordHasherBuilder; + +use super::{get_sender, Profiler}; + +/// The size of a bucket, in milliseconds. +/// +/// Each bucket will contain events for up to this amount of time. +const BUCKET_RESOLUTION_MS: TimePoint = 50; + +/// A thread-local implementation of a profiler. +/// +/// This will collect the events and store them inside buckets of size `BUCKET_RESOLUTION_MS`. All +/// the events inside a bucket are merged together. +#[derive(Clone, Debug)] +pub struct BucketProfiler { + /// The name of the current thread. + thread_name: String, + /// The start of time: all the times will be relative to this instant, ie the start of the + /// execution. + start: Instant, + /// The list of all the buckets, sorted by their start time. + buckets: Vec, + /// The sender to use to send the profiler results back to the main thread. + sender: Sender, +} + +impl BucketProfiler { + pub fn new(start: Instant) -> Self { + Self { + thread_name: std::thread::current() + .name() + .unwrap_or("unnamed") + .to_string(), + start, + buckets: vec![MetricsBucket::new(0)], + sender: get_sender(), + } + } + + /// Get the current time relative to the start of the execution. + fn now(&self) -> TimePoint { + self.start.elapsed().as_millis() as TimePoint + } + + /// Get a reference to the current bucket, creating a new one if needed. + #[inline] + fn bucket(&mut self) -> &mut MetricsBucket { + let now = self.now(); + // the timestamp is outside the last bucket, create a new one + if now >= self.buckets.last().unwrap().start_ms + BUCKET_RESOLUTION_MS { + let start = now - now % BUCKET_RESOLUTION_MS; + self.buckets.push(MetricsBucket::new(start)); + } + self.buckets.last_mut().unwrap() + } +} + +impl Drop for BucketProfiler { + fn drop(&mut self) { + self.sender + .send(ProfilerResult { + thread_name: std::mem::take(&mut self.thread_name), + buckets: std::mem::take(&mut self.buckets), + }) + .unwrap(); + } +} + +impl Profiler for BucketProfiler { + #[inline] + fn items_in(&mut self, from: Coord, to: Coord, amount: usize) { + let entry = self.bucket().link_metrics.entry((from, to)).or_default(); + entry.items_in += amount; + } + + #[inline] + fn items_out(&mut self, from: Coord, to: Coord, amount: usize) { + let entry = self.bucket().link_metrics.entry((from, to)).or_default(); + entry.items_out += amount; + } + + #[inline] + fn net_bytes_in(&mut self, from: Coord, to: Coord, amount: usize) { + let entry = self.bucket().link_metrics.entry((from, to)).or_default(); + entry.net_messages_in += 1; + entry.bytes_in += amount; + } + + #[inline] + fn net_bytes_out(&mut self, from: Coord, to: Coord, amount: usize) { + let entry = self.bucket().link_metrics.entry((from, to)).or_default(); + entry.net_messages_out += 1; + entry.bytes_out += amount; + } + + #[inline] + fn iteration_boundary(&mut self, leader_block_id: BlockId) { + let now = self.now(); + self.bucket().iteration_metrics.push((leader_block_id, now)) + } +} + +/// A time point. +/// +/// This represents the number of milliseconds since the start of the execution. +pub type TimePoint = u32; + +/// The results of the profiler of a thread. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ProfilerResult { + /// The name of the thread that collected the data. + pub thread_name: String, + /// The list of collected buckets. + pub buckets: Vec, +} + +/// The available metrics to be collected. +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct LinkMetrics { + /// The number of items that arrived to a block. + pub items_in: usize, + /// The number of items that left from a block. + pub items_out: usize, + + /// The number of network messages that arrived to a block and their total size. + pub net_messages_in: usize, + /// The number of network messages that left from a block and their total size. + pub net_messages_out: usize, + + pub bytes_in: usize, + pub bytes_out: usize, +} + +/// A bucket with the profiler metrics. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct MetricsBucket { + /// The time point of the start of the bucket. + /// + /// This bucket should contain the metrics in the time interval `[start, start+RESOLUTION)`. + pub start_ms: TimePoint, + /// The metrics of this bucket. + #[serde(serialize_with = "serialize_map", deserialize_with = "deserialize_map")] + pub link_metrics: HashMap<(Coord, Coord), LinkMetrics, CoordHasherBuilder>, + + /// The time point of the end of an iteration, with the id of the leader block that manages that + /// iteration. + pub iteration_metrics: Vec<(BlockId, TimePoint)>, +} + +impl MetricsBucket { + #[inline] + pub fn new(start_ms: TimePoint) -> Self { + Self { + start_ms, + ..Default::default() + } + } +} + +#[derive(Serialize, Deserialize)] +struct Entry { + from: Coord, + to: Coord, + value: T, +} + +/// Since JSON supports only maps with strings as key, this serialized _flattens_ those maps. +fn serialize_map( + map: &HashMap<(Coord, Coord), T, CoordHasherBuilder>, + s: S, +) -> Result { + let mut seq = s.serialize_seq(Some(map.len()))?; + for (&(from, to), value) in map.iter() { + let entry = Entry { from, to, value }; + seq.serialize_element(&entry)?; + } + seq.end() +} + +/// The inverse of `serialize_map`. +fn deserialize_map<'de, D, T>( + d: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, + T: Deserialize<'de>, +{ + let as_vec: Vec> = serde::de::Deserialize::deserialize(d)?; + Ok(as_vec + .into_iter() + .map(|e| ((e.from, e.to), e.value)) + .collect()) +} diff --git a/src/profiler/metrics.rs b/src/profiler/metrics.rs deleted file mode 100644 index bb65a55b..00000000 --- a/src/profiler/metrics.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::collections::HashMap; - -use serde::ser::SerializeSeq; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; - -use crate::network::Coord; -use crate::scheduler::BlockId; - -/// A time point. -/// -/// This represents the number of milliseconds since the start of the execution. -pub type TimePoint = u32; - -/// The results of the profiler of a thread. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ProfilerResult { - /// The name of the thread that collected the data. - pub thread_name: String, - /// The list of collected buckets. - pub buckets: Vec, -} - -/// The available metrics to be collected. -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct ProfilerMetrics { - /// The number of items that arrived to a block. - #[serde(serialize_with = "serialize_map", deserialize_with = "deserialize_map")] - pub items_in: HashMap<(Coord, Coord), usize>, - /// The number of items that left from a block. - #[serde(serialize_with = "serialize_map", deserialize_with = "deserialize_map")] - pub items_out: HashMap<(Coord, Coord), usize>, - - /// The number of network messages that arrived to a block and their total size. - #[serde(serialize_with = "serialize_map", deserialize_with = "deserialize_map")] - pub net_messages_in: HashMap<(Coord, Coord), (usize, usize)>, - /// The number of network messages that left from a block and their total size. - #[serde(serialize_with = "serialize_map", deserialize_with = "deserialize_map")] - pub net_messages_out: HashMap<(Coord, Coord), (usize, usize)>, - - /// The time point of the end of an iteration, with the id of the leader block that manages that - /// iteration. - pub iteration_boundaries: Vec<(BlockId, TimePoint)>, -} - -/// A bucket with the profiler metrics. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ProfilerBucket { - /// The time point of the start of the bucket. - /// - /// This bucket should contain the metrics in the time interval `[start, start+RESOLUTION)`. - pub start_ms: TimePoint, - /// The metrics of this bucket. - pub metrics: ProfilerMetrics, -} - -impl ProfilerBucket { - #[inline] - #[allow(dead_code)] - pub fn new(start_ms: TimePoint) -> Self { - Self { - start_ms, - metrics: Default::default(), - } - } -} - -#[derive(Serialize, Deserialize)] -struct Entry { - from: Coord, - to: Coord, - value: T, -} - -/// Since JSON supports only maps with strings as key, this serialized _flattens_ those maps. -fn serialize_map( - map: &HashMap<(Coord, Coord), T>, - s: S, -) -> Result { - let mut seq = s.serialize_seq(Some(map.len()))?; - for (&(from, to), &value) in map.iter() { - let entry = Entry { from, to, value }; - seq.serialize_element(&entry)?; - } - seq.end() -} - -/// The inverse of `serialize_map`. -fn deserialize_map<'de, D, T>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, - T: Deserialize<'de>, -{ - let as_vec: Vec> = serde::de::Deserialize::deserialize(d)?; - Ok(as_vec - .into_iter() - .map(|e| ((e.from, e.to), e.value)) - .collect()) -} diff --git a/src/profiler/mod.rs b/src/profiler/mod.rs index dccff877..e1e9ba1a 100644 --- a/src/profiler/mod.rs +++ b/src/profiler/mod.rs @@ -1,14 +1,15 @@ -pub use metrics::*; +use serde::{Deserialize, Serialize}; #[cfg(feature = "profiler")] pub use with_profiler::*; #[cfg(not(feature = "profiler"))] pub use without_profiler::*; -use crate::{network::Coord, scheduler::BlockId}; +use crate::{block::BlockStructure, network::Coord, scheduler::BlockId}; #[cfg(feature = "profiler")] -mod backend; -mod metrics; +mod bucket_profiler; + +pub const TRACING_PREFIX: &str = "__renoir_TRACING_DATA__"; /// The available profiling metrics. /// @@ -27,72 +28,61 @@ pub trait Profiler { fn iteration_boundary(&mut self, leader_block_id: BlockId); } -/// The implementation of the profiler when the `profiler` feature is enabled. -#[cfg(feature = "profiler")] -mod with_profiler { - use once_cell::sync::Lazy; - use std::cell::UnsafeCell; - use std::sync::Mutex; - use std::time::Instant; - - use crate::profiler::backend::ProfilerBackend; - use crate::profiler::metrics::ProfilerResult; - use flume::{Receiver, Sender}; - - /// The sender and receiver pair of the current profilers. - /// - /// These are options since they can be consumed. - static CHANNEL: Lazy, Option)>> = - Lazy::new(|| { - let (sender, receiver) = flume::unbounded(); - Mutex::new((Some(sender), Some(receiver))) - }); - - /// The sender and receiver pair of the current profilers. - /// - /// These are options since they can be consumed. - static START_TIME: Lazy = Lazy::new(|| Instant::now()); - - thread_local! { - /// The actual profiler for the current thread, if the `profiler` feature is enabled. - static PROFILER: UnsafeCell = UnsafeCell::new(ProfilerBackend::new(*START_TIME)); - } - - /// The type of the channel sender with the `ProfilerResult`s. - type ProfilerSender = Sender; - /// The type of the channel receiver with the `ProfilerResult`s. - type ProfilerReceiver = Receiver; +/// Tracing information of the current execution. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub(crate) struct TracingData { + pub structures: Vec<(Coord, BlockStructure)>, + pub profilers: Vec, +} - /// Get the sender for sending the profiler results. - pub(crate) fn get_sender() -> ProfilerSender { - let channels = CHANNEL.lock().unwrap(); - channels.0.clone().expect("Profiler sender already dropped") +// impl Add for TracingData { +// type Output = TracingData; + +// fn add(mut self, rhs: Self) -> Self::Output { +// self += rhs; +// self +// } +// } + +// impl AddAssign for TracingData { +// fn add_assign(&mut self, mut rhs: Self) { +// self.structures.append(&mut rhs.structures); +// self.profilers.append(&mut rhs.profilers); +// } +// } + +pub fn log_trace(structures: Vec<(Coord, BlockStructure)>, profilers: Vec) { + if !cfg!(feature = "profiler") { + return; } - /// Get the current profiler. - pub fn get_profiler() -> &'static mut ProfilerBackend { - PROFILER.with(|t| unsafe { &mut *t.get() }) - } - - /// Wait for all the threads that used the profiler to exit, collect all their data and reset - /// the profiler. - pub fn wait_profiler() -> Vec { - let mut channels = CHANNEL.lock().unwrap(); - let profiler_receiver = channels.1.take().expect("Profiler receiver already taken"); - - // allow the following loop to exit when all the senders are dropped - channels.0.take().expect("Profiler sender already dropped"); + use std::io::Write as _; + let data = TracingData { + structures, + profilers, + }; + + let mut stderr = std::io::stderr().lock(); + writeln!( + stderr, + "__renoir_TRACING_DATA__{}", + serde_json::to_string(&data).unwrap() + ) + .unwrap(); +} - let mut results = vec![]; - while let Ok(profiler_res) = profiler_receiver.recv() { - results.push(profiler_res); +#[inline] +pub fn try_parse_trace(s: &str) -> Option { + if let Some(s) = s.strip_prefix(TRACING_PREFIX) { + match serde_json::from_str::(s) { + Ok(trace) => Some(trace), + Err(e) => { + tracing::error!("Corrupted tracing data ({e}) `{s}`"); + None + } } - - let (sender, receiver) = flume::unbounded(); - channels.0 = Some(sender); - channels.1 = Some(receiver); - - results + } else { + None } } @@ -114,6 +104,9 @@ mod without_profiler { #[derive(Debug, Clone, Copy, Default)] pub struct NoOpProfiler; + #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] + pub struct ProfilerResult; + thread_local! { static PROFILER: UnsafeCell = const { UnsafeCell::new(NoOpProfiler) }; } @@ -141,3 +134,52 @@ mod without_profiler { Default::default() } } + +/// The implementation of the profiler when the `profiler` feature is enabled. +#[cfg(feature = "profiler")] +mod with_profiler { + use once_cell::sync::Lazy; + use std::cell::UnsafeCell; + use std::time::Instant; + + use super::bucket_profiler::BucketProfiler; + use flume::{Receiver, Sender}; + + pub use super::bucket_profiler::ProfilerResult; + + /// The sender and receiver pair of the current profilers. + /// + /// These are options since they can be consumed. + static CHANNEL: Lazy<(ProfilerSender, ProfilerReceiver)> = Lazy::new(|| flume::unbounded()); + + /// The sender and receiver pair of the current profilers. + /// + /// These are options since they can be consumed. + static START_TIME: Lazy = Lazy::new(|| Instant::now()); + + thread_local! { + /// The actual profiler for the current thread, if the `profiler` feature is enabled. + static PROFILER: UnsafeCell = UnsafeCell::new(BucketProfiler::new(*START_TIME)); + } + + /// The type of the channel sender with the `ProfilerResult`s. + type ProfilerSender = Sender; + /// The type of the channel receiver with the `ProfilerResult`s. + type ProfilerReceiver = Receiver; + + /// Get the sender for sending the profiler results. + pub(crate) fn get_sender() -> ProfilerSender { + CHANNEL.0.clone() + } + + /// Get the current profiler. + pub fn get_profiler() -> &'static mut BucketProfiler { + PROFILER.with(|t| unsafe { &mut *t.get() }) + } + + /// Wait for all the threads that used the profiler to exit, collect all their data and reset + /// the profiler. + pub fn wait_profiler() -> Vec { + CHANNEL.1.drain().collect() + } +} diff --git a/src/runner.rs b/src/runner.rs index b9439227..1d5220ec 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -17,8 +17,9 @@ use ssh2::Session; use crate::config::CONFIG_ENV_VAR; use crate::config::HOST_ID_ENV_VAR; use crate::config::{HostConfig, RemoteConfig}; +use crate::profiler::try_parse_trace; +use crate::profiler::TracingData; use crate::scheduler::HostId; -use crate::TracingData; /// Size of the buffer usedahash to send the executable file via SCP. pub(crate) const SCP_BUFFER_SIZE: usize = 512 * 1024; @@ -96,8 +97,9 @@ pub(crate) fn spawn_remote_workers(config: RemoteConfig) { max_execution_time = max_execution_time.max(result.execution_time); max_sync_time = max_sync_time.max(result.sync_time); exit_code_or |= result.exit_code; - if let Some(data) = result.tracing { - tracing_data += data; + if let Some(mut data) = result.tracing { + tracing_data.structures.append(&mut data.structures); + tracing_data.profilers.append(&mut data.profilers); } } if let Some(path) = config.tracing_dir { @@ -232,33 +234,18 @@ fn remote_worker( let mut tracing_data = None; - std::thread::scope(|s| { - s.spawn(|| { - for l in stdout_reader.lines() { - println!( - "{}|{}", - host_id, - l.unwrap_or_else(|e| format!("ERROR: {e}")) - ); - } - }); - s.spawn(|| { - // copy to stderr the output of the remote process - for line in stderr_reader.lines().map_while(Result::ok) { - if let Some(pos) = line.find("__renoir_TRACING_DATA__") { - let json_data = &line[(pos + "__renoir_TRACING_DATA__ ".len())..]; - match serde_json::from_str(json_data) { - Ok(data) => tracing_data = Some(data), - Err(err) => { - error!("Corrupted tracing data from host {}: {:?}", host_id, err); - } - } - } else { - eprintln!("{host_id}|{line}"); - } - } - }); - }); + for line in stdout_reader.lines().map_while(Result::ok) { + println!("{host_id}|{line}"); + } + + // copy to stderr the output of the remote process + for line in stderr_reader.lines().map_while(Result::ok) { + if let Some(trace) = try_parse_trace(&line) { + tracing_data = Some(trace); + } else { + eprintln!("{host_id}|{line}"); + } + } channel.wait_close().unwrap(); let exit_code = channel.exit_status().unwrap(); diff --git a/src/scheduler.rs b/src/scheduler.rs index ee34158a..a5c2f0da 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -7,10 +7,9 @@ use crate::block::{BatchMode, Block, BlockStructure, JobGraphGenerator, Replicat use crate::config::{LocalConfig, RemoteConfig, RuntimeConfig}; use crate::network::{Coord, NetworkTopology}; use crate::operator::Operator; -use crate::profiler::{wait_profiler, ProfilerResult}; +use crate::profiler::{log_trace, wait_profiler}; use crate::worker::spawn_worker; use crate::CoordUInt; -use crate::TracingData; /// Identifier of a block in the job graph. pub type BlockId = CoordUInt; @@ -210,7 +209,7 @@ impl Scheduler { join_result.expect("Could not join worker threads"); - Self::log_tracing_data(block_structures, wait_profiler()); + log_trace(block_structures, wait_profiler()); } /// Start the computation returning the list of handles used to join the workers. @@ -248,7 +247,7 @@ impl Scheduler { }) ); join_result.expect("Could not join worker threads"); - Self::log_tracing_data(block_structures, wait_profiler()); + log_trace(block_structures, wait_profiler()); }); } #[cfg(not(feature = "tokio"))] @@ -261,7 +260,7 @@ impl Scheduler { self.network.stop_and_wait(); let profiler_results = wait_profiler(); - Self::log_tracing_data(block_structures, profiler_results); + log_trace(block_structures, profiler_results); } } @@ -298,17 +297,6 @@ impl Scheduler { } } - fn log_tracing_data(structures: Vec<(Coord, BlockStructure)>, profilers: Vec) { - let data = TracingData { - structures, - profilers, - }; - log::trace!( - "__renoir_TRACING_DATA__ {}", - serde_json::to_string(&data).unwrap() - ); - } - fn log_topology(&self) { let mut topology = "job graph:".to_string(); for (block_id, block) in self.block_info.iter() { diff --git a/tests/fold_scan.rs b/tests/fold_scan.rs new file mode 100644 index 00000000..c9c7c2d8 --- /dev/null +++ b/tests/fold_scan.rs @@ -0,0 +1,58 @@ +use utils::TestHelper; + +mod utils; + +#[test] +fn fold_scan() { + TestHelper::local_remote_env(|ctx| { + let res = ctx + .stream_par_iter(200..210) + .fold_scan( + |acc: &mut usize, _| { + *acc += 1; + }, + |global, local| { + *global += local; + }, + 0, + |x, cnt| (x, *cnt), + ) + .collect_vec(); + + ctx.execute_blocking(); + if let Some(mut res) = res.get() { + res.sort(); + assert_eq!( + (200..210) + .map(|x| (x, (200..210).len())) + .collect::>(), + res + ); + } + }); +} + +#[test] +fn reduce_scan() { + TestHelper::local_remote_env(|ctx| { + let res = ctx + .stream_par_iter(200..210i32) + .reduce_scan( + |x| (x, 1usize), + |a, b| (a.0 + b.0, a.1 + b.1), + |x, &(sum, cnt)| (x, sum, cnt), + ) + .collect_vec(); + + ctx.execute_blocking(); + if let Some(mut res) = res.get() { + res.sort(); + assert_eq!( + (200..210i32) + .map(|x| (x, (200..210i32).sum::(), (200..210).len())) + .collect::>(), + res + ); + } + }); +}