From 03f618a139fea5685c3b557927efdb887959f5c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Haye=C3=9F?= Date: Thu, 28 Jul 2022 14:33:13 +0200 Subject: [PATCH 1/8] Implement FilterBank benchmark --- Rust/Savina/src/parallelism/FilterBank.lf | 210 +++++++++--------- .../savina_parallelism_filterbank.yaml | 10 + 2 files changed, 117 insertions(+), 103 deletions(-) diff --git a/Rust/Savina/src/parallelism/FilterBank.lf b/Rust/Savina/src/parallelism/FilterBank.lf index 1e1ed0e..13612c5 100644 --- a/Rust/Savina/src/parallelism/FilterBank.lf +++ b/Rust/Savina/src/parallelism/FilterBank.lf @@ -1,6 +1,6 @@ /** * Copyright (C) 2020 TU Dresden - * + * * This benchmark is particularly interesting for LF, as it has an interesting * structure and highlights a significant advantage compared to Akka. The * benchmark implements a "filter bank". Each bank consists of a pipeline of @@ -36,7 +36,7 @@ * not needed at all in the LF implementation and makes both the "TaggedForward" * and the "Integrate" actors superflous. The combine reactor simply has a * multiport import and thus simultaneously receives values from all N banks. - * + * * @author Christian Menard * @author Hannes Klein * @author Johannes Hayeß @@ -52,23 +52,23 @@ import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; reactor Producer(numChannels: usize(8), numSimulations: usize(34816), numColumns: usize(16384)) { state num_simulations(numSimulations); - + state numMessagesSent: usize(0); - + input start: unit; output next: unit; output finished: unit; - + logical action sendNext: unit; - + reaction(start) -> sendNext {= // reset local state self.numMessagesSent = 0; - + // start execution ctx.schedule(sendNext, Asap); =} - + reaction(sendNext) -> sendNext, next, finished {= if self.numMessagesSent < self.num_simulations { ctx.set(next, ()); @@ -81,20 +81,20 @@ reactor Producer(numChannels: usize(8), numSimulations: usize(34816), numColumns } reactor Source { - state maxValue: usize(1000); + state max_value: usize(1000); state current: usize(0); - + input next: unit; output value: f64; input inFinished: unit; output outFinished: unit; - + reaction (next) -> value {= - ctx.set(value, self.current); - self.current = (self.current + 1) % self.maxValue; + ctx.set(value, self.current as f64); + self.current = (self.current + 1) % self.max_value; =} - + reaction (inFinished) -> outFinished {= ctx.set(outFinished, ()); // reset local state @@ -102,62 +102,63 @@ reactor Source { =} } -reactor Bank(bank_index: usize(0), numColumns: usize(16384), numChannels: usize(8)) { - input inValue: double; - input inFinished: void; - output outValue: double; - output outFinished: void; - - input setF: Matrix; - input setH: Matrix; - +reactor Bank(bank_index: usize(0), numColumns: usize(16384), numChannels: usize(8)) { + input inValue: f64; + input inFinished: unit; + output outValue: f64; + output outFinished: unit; + + input setF: Arc>; + input setH: Arc>; + preamble {= + use std::sync::Arc; use crate::matrix::Matrix; =} - + delay0 = new Delay(delayLength={=numColumns - 1=}); fir0 = new FirFilter(bank_index=bank_index, peekLength=numColumns); sample = new SampleFilter(sampleRate=numColumns); delay1 = new Delay(delayLength={=numColumns - 1=}); fir1 = new FirFilter(bank_index=bank_index, peekLength=numColumns); - - inFinished, delay0.outFinished, fir0.outFinished, sample.outFinished, delay1.outFinished, fir1.outFinished -> + + inFinished, delay0.outFinished, fir0.outFinished, sample.outFinished, delay1.outFinished, fir1.outFinished -> delay0.inFinished, fir0.inFinished, sample.inFinished, delay1.inFinished, fir1.inFinished, outFinished; - - inValue, delay0.outValue, fir0.outValue, sample.outValue, delay1.outValue, fir1.outValue -> - delay0.inValue, fir0.inValue, sample.inValue, delay1.inValue, fir1.inValue, outValue; - + + inValue, delay0.outValue, fir0.outValue, sample.outValue, delay1.outValue, fir1.outValue -> + delay0.inValue, fir0.inValue, sample.inValue, delay1.inValue, fir1.inValue, outValue; + setH -> fir0.setCoefficients; setF -> fir1.setCoefficients; } reactor Delay(delayLength: usize(16383)) { state delay_length(delayLength); - + state myState: Vec; state placeHolder: usize(0); - + input inValue: f64; input inFinished: unit; output outValue: f64; output outFinished: unit; - + reaction(startup) {= // one time init - self.myState = vec![0.0; self.delayLength]; + self.myState = vec![0.0; self.delay_length]; self.placeHolder = 0; =} - + reaction(inValue) -> outValue {= let result = ctx.get(inValue).unwrap(); ctx.set(outValue, self.myState[self.placeHolder]); - myState[self.placeHolder] = result; - self.placeHolder = (placeHolder + 1) % self.delay_length; + self.myState[self.placeHolder] = result; + self.placeHolder = (self.placeHolder + 1) % self.delay_length; =} - + reaction(inFinished) -> outFinished {= ctx.set(outFinished, ()); - + // reset local state self.myState = vec![0.0; self.delay_length]; self.placeHolder = 0; @@ -167,54 +168,57 @@ reactor Delay(delayLength: usize(16383)) { reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { state bank_index(bank_index); state peek_length(peekLength); - + state data: Vec; - state dataIndex: usize(0); - state dataFull: bool(false); - state coefficients: {=Rc>=}; - - input setCoefficients: Rc>; + state data_index: usize(0); + state data_full: bool(false); + state coefficients: Arc>; + + input setCoefficients: Arc>; input inValue: f64; input inFinished: unit; output outValue: f64; output outFinished: unit; - + preamble {= + use std::sync::Arc; use crate::matrix::Matrix; =} - + reaction(startup) {= // reset local state - data = vec![0.0; self.peek_length]; - self.dataIndex = 0; - self.dataFull = false; + self.data = vec![0.0; self.peek_length]; + self.data_index = 0; + self.data_full = false; =} - + reaction(setCoefficients) {= - self.coefficients = ctx.get(setCoefficients).unwrap(); + ctx.use_ref_opt(setCoefficients, |c| { + self.coefficients = c.clone(); + }); =} - + reaction(inValue) -> outValue {= - data[self.dataIndex] = ctx.get(inValue).unwrap(); - self.dataIndex += 1; - - if self.dataIndex == self.peek_length { - self.dataFull = true; - self.dataIndex = 0; + self.data[self.data_index] = ctx.get(inValue).unwrap(); + self.data_index += 1; + + if self.data_index == self.peek_length { + self.data_full = true; + self.data_index = 0; } - - if self.dataFull { - let sum = 0.0; + + if self.data_full { + let mut sum = 0.0; for (i, d) in self.data.iter().enumerate() { - sum += data * self.coefficients.get(bank_index, peek_length - i - 1); + sum += self.data[i] * self.coefficients.get(self.bank_index, self.peek_length - i - 1); } ctx.set(outValue, sum); } =} - + reaction(inFinished) -> outFinished {= - ctx.set(outFinished, 0); - + ctx.set(outFinished, ()); + // reset local state self.data = vec![0.0; self.peek_length]; self.data_index = 0; @@ -224,14 +228,14 @@ reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { reactor SampleFilter(sampleRate: usize(16384)) { state sample_rate(sampleRate); - + state samplesReceived: usize(0); - + input inValue: f64; input inFinished: unit; output outValue: f64; output outFinished: unit; - + reaction(inValue) -> outValue {= if self.samplesReceived == 0 { ctx.set(outValue, ctx.get(inValue).unwrap()); @@ -240,10 +244,10 @@ reactor SampleFilter(sampleRate: usize(16384)) { } self.samplesReceived = (self.samplesReceived + 1) % self.sample_rate; =} - + reaction(inFinished) -> outFinished {= ctx.set(outFinished, ()); - + // reset local state self.samplesReceived = 0; =} @@ -251,25 +255,25 @@ reactor SampleFilter(sampleRate: usize(16384)) { reactor Combine(numChannels: usize(8)) { state num_channels(numChannels); - + input[numChannels] inValues: f64; input[numChannels] inFinished: unit; output outValue: f64; output outFinished: unit; - + state numFinished: usize(0); - + reaction(inValues) -> outValue {= - let sum = 0; + let mut sum = 0.0; for x in inValues { - sum += ctx.get(x).unwrap(); + sum += ctx.get(&x).unwrap(); } ctx.set(outValue, sum); =} - + reaction(inFinished) -> outFinished {= for port in inFinished { - if ctx.is_present(port) { + if ctx.is_present(&port) { self.numFinished += 1; } } @@ -283,22 +287,22 @@ reactor Combine(numChannels: usize(8)) { reactor Sink(printRate: usize(100)) { state print_rate(printRate); - + state count: usize(0); - + input inValue: f64; input inFinished: unit; output outFinished: unit; - + reaction(inValue) {= let result = ctx.get(inValue).unwrap(); - - if self.count % self.print_Rate == 0 { + + if self.count % self.print_rate == 0 { info!("SinkActor: result = {}", result); } self.count += 1; =} - + reaction(inFinished) -> outFinished {= ctx.set(outFinished, ()); // reset local state @@ -311,35 +315,35 @@ main reactor (numIterations: usize(12), numSimulations: usize(34816), numColumns state num_simulations(numSimulations); state num_columns(numColumns); state num_channels(numChannels); - + preamble {= use std::sync::Arc; =} - + reaction(startup) -> banks.setF, banks.setH {= // initialize the coefficients of all FIR filters - let mut mH = Matrix(numChannels, numColumns); - let mut mF = Matrix(numChannels, numColumns); + let mut mH: Matrix = Matrix::new(self.num_channels, self.num_columns); + let mut mF: Matrix = Matrix::new(self.num_channels, self.num_columns); for j in 0..self.num_channels { for i in 0..self.num_columns { - let h = (1.0 * i * numColumns) + (1.0 * j * numChannels) + j + i + j + 1; - let f = (1.0 * i * j) + (1.0 * j * j) + j + i; + let h = ((i * self.num_columns) + (j * self.num_channels) + j + i + j + 1) as f64; + let f = ((i * j) + (j * j) + j + i) as f64; mH.set(j, i, h); mF.set(j, i, f); } } - + // convert to immutable pointers before sending. This ensures that all recipients can receive a pointer // to the same matrix and no copying is needed let mH_r = Arc::new(mH); let mF_r = Arc::new(mF); - - for (h, f) in banks__setH.into_iter().zip(&banks__setF) { + + for (h, f) in banks__setH.into_iter().zip(banks__setF.into_iter()) { ctx.set(h, Arc::clone(&mH_r)); ctx.set(f, Arc::clone(&mF_r)); } - + print_benchmark_info("FilterBankBenchmark"); print_args!( "numIterations", @@ -349,34 +353,34 @@ main reactor (numIterations: usize(12), numSimulations: usize(34816), numColumns "numColumns", self.num_columns, "numChannels", - self.num_channels, + self.num_channels ); print_system_info(); =} - runner = new BenchmarkRunner(num_iterations=numIterations); + runner = new BenchmarkRunner(num_iterations=numIterations); producer = new Producer(numSimulations=numSimulations, numChannels=numChannels, numColumns=numColumns); source = new Source(); banks = new[numChannels] Bank(numColumns=numColumns, numChannels=numChannels); combine = new Combine(numChannels=numChannels); sink = new Sink(printRate=100); - + runner.start -> producer.start; - + producer.next -> source.next; producer.finished -> source.inFinished; - + (source.value)+ -> banks.inValue; (source.outFinished)+ -> banks.inFinished; - + banks.outValue -> combine.inValues; banks.outFinished -> combine.inFinished - + combine.outValue -> sink.inValue; combine.outFinished -> sink.inFinished; - + sink.outFinished -> runner.finished; - + preamble {= use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; use crate::matrix::Matrix; diff --git a/runner/conf/benchmark/savina_parallelism_filterbank.yaml b/runner/conf/benchmark/savina_parallelism_filterbank.yaml index 2a46ffc..408ea74 100644 --- a/runner/conf/benchmark/savina_parallelism_filterbank.yaml +++ b/runner/conf/benchmark/savina_parallelism_filterbank.yaml @@ -37,3 +37,13 @@ targets: columns: ["-D", "columns="] simulations: ["-D", "time_steps="] channels: ["-D", "channels="] + lf-rust: + copy_sources: + - "${bench_path}/Rust/Savina/src/lib" + - "${bench_path}/Rust/Savina/src/parallelism" + lf_file: "parallelism/Filterbank.lf" + binary: "filter_bank" + run_args: + columns: ["--main-num-columns", ""] + simulations: ["--main-num-simulations", ""] + channels: ["--main-num-channels", ""] From 9f20d221fc98ec884b36982699de62c01e7ec24a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Haye=C3=9F?= Date: Tue, 2 Aug 2022 11:40:38 +0200 Subject: [PATCH 2/8] Add currently broken Apsp implementation Why it's broken is explained here: https://github.com/lf-lang/lingua-franca/pull/1228#issuecomment-1202254369 --- Rust/Savina/src/parallelism/Apsp.lf | 302 ++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 Rust/Savina/src/parallelism/Apsp.lf diff --git a/Rust/Savina/src/parallelism/Apsp.lf b/Rust/Savina/src/parallelism/Apsp.lf new file mode 100644 index 0000000..0f974af --- /dev/null +++ b/Rust/Savina/src/parallelism/Apsp.lf @@ -0,0 +1,302 @@ +/** + * Copyright (C) 2020 TU Dresden + * + * This benchmark implements a parallel all pairs shortest path algorithm. In + * order to split the workload, the large input matrix of size graphSize x + * graphSize is split into smaller blocks of size blockSize x blockSize. Each of + * the worker reactors (ApspFloydWarshallBlock) processes one of these blocks. + * The worker reactors are organized in the same matrix pattern, replication the + * structure of the blocks within the large input matrix. Each of the workers + * operates on its local block data, and sends results to all other workers in + * the same column or in the same row. The data from the neighbors is then used + * to compute the next intermediate result and to update the local state + * accordingly. + * + * @author Christian Menard + * @author Hannes Klein + * @author Johannes Hayeß + */ + +target Rust { + build-type : Release, + cargo-features: [ "cli" ], + rust-include: [ "../lib/matrix.rs", "../lib/pseudo_random.rs"], +}; + +import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; + +reactor ApspFloydWarshallBlock( + bank_index: usize(0), + row_index: usize(0), + graphSize: usize(300), + blockSize: usize(50), + dimension: usize(6), + verbose: bool(false) +) { + state bank_index(bank_index); + state row_index(row_index); + state graph_size(graphSize); + state block_size(blockSize); + state dimension(dimension); + state verbose(verbose); + + state num_neighbors: usize({=2 * (dimension - 1)=}); + state row_offset: usize({=row_index * blockSize=}); // row offset of the block of this reactor + state col_offset: usize({=bank_index * blockSize=}); // column offset of the block of this reactor + + state k: usize(0); // iteration counter + state reportedFinish: bool(false); + + input start: Matrix; + + input[dimension] fromRow: Arc>; + input[dimension] fromCol: Arc>; + + output toNeighbors: Arc>; + output finished: unit; + + logical action notifyNeighbors: Arc>; + + preamble {= + use std::sync::Arc; + use crate::matrix::Matrix; + + fn get_element_at( + row: usize, + col: usize, + row_ports: &ReadablePortBank>>, + col_ports: &ReadablePortBank>>, + ctx: &ReactionCtx, + block_size: usize, + row_index: usize, + bank_index: usize, + ) -> u64 { + let dest_row = row / block_size; + let dest_col = col / block_size; + let local_row = row % block_size; + let local_col = col % block_size; + + if dest_row == row_index { + ctx.use_ref_opt(&row_ports.get(dest_col), |r| { + *r.get(local_row, local_col) + }).unwrap() + } else if dest_col == bank_index { + ctx.use_ref_opt(&col_ports.get(dest_row), |c| { + *c.get(local_row, local_col) + }).unwrap() + } else { + eprintln!("Error: unexpected target location ({},{})", dest_col, dest_row); + std::process::exit(2); + } + + } + =} + + reaction(start) -> notifyNeighbors {= + // reset local state + self.k = 0; + self.reportedFinish = false; + + // start execution + let matrix = ctx.use_ref_opt(start, Clone::clone).unwrap(); + ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap); + =} + + reaction(notifyNeighbors) -> toNeighbors {= + //notify all neighbors + ctx.set(toNeighbors, ctx.use_ref_opt(notifyNeighbors, Clone::clone).unwrap()); + =} + + reaction(fromRow, fromCol) -> notifyNeighbors, finished {= + // do nothing if complete + if self.k == self.graph_size { + return; + } + + // perform computation + let mut matrix: Matrix = Matrix::new(self.block_size, self.block_size); + let bs = self.block_size; + let ri = self.row_index; + let bi = self.bank_index; + + for i in 0..self.block_size { + for j in 0..self.block_size { + let gi = self.row_offset + i; + let gj = self.col_offset + j; + + let result = get_element_at(gi, self.k, &fromRow, &fromCol, &ctx, bs, ri, bi) + get_element_at(self.k, gj, &fromRow, &fromCol, &ctx, bs, ri, bi); + matrix.set(i, j, result.min(get_element_at(gi, gj, &fromRow, &fromCol, &ctx, bs, ri, bi))); + } + } + + // increment iteration count + self.k += 1; + + if self.k == self.graph_size { + if self.verbose && self.bank_index == 0 && self.row_index == 0 { + // debugging and result checking + for i in 0..self.block_size { + let mut result = "".to_string(); + for j in 0..self.block_size { + result = format!("{} {}", result, matrix.get(i, j)); + } + info!("{}", result); + } + } + ctx.set(finished, ()); + } + + // send the result to all neighbors in the next iteration + ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap); + =} +} + +reactor ApspRow( + bank_index: usize(0), + blockSize: usize(50), + numNodes: usize(300), + dimension: usize(6), + dimension_sq: usize(36), + verbose: bool(false) +) { + + input start: Matrix; + output[dimension] finished: unit; + + input[dimension_sq] fromCol: Matrix; + output[dimension] toCol: Matrix; + + blocks = new[dimension] ApspFloydWarshallBlock( + row_index=bank_index, + blockSize=blockSize, + graphSize=numNodes, + dimension=dimension, + verbose=verbose + ); + + // connect all blocks within the row + (blocks.toNeighbors)+ -> blocks.fromRow; + + // block output to all column neighbours + blocks.toNeighbors -> toCol; + // block input from all column neighbours + fromCol -> interleaved(blocks.fromCol); + + // broadcast the incoming matrix to all blocks + (start)+ -> blocks.start; + // collect and forward finished signals from all blocks + blocks.finished -> finished; + + preamble {= + use crate::matrix::Matrix; + =} +} + +reactor ApspMatrix( + blockSize: usize(50), + numNodes: usize(300), + dimension: usize(6), + dimension_sq: usize(36), + verbose: bool(false) +) { + input start: Matrix; + output[dimension_sq] finished: unit; + + rows = new[dimension] ApspRow(blockSize=blockSize, numNodes=numNodes, dimension=dimension, dimension_sq=dimension_sq, verbose=verbose); + + // broadcast the incoming matrix to all rows + (start)+ -> rows.start; + // collect and forward finished signals from all blocks + rows.finished -> finished; + + (rows.toCol)+ -> rows.fromCol; + + preamble {= + use crate::matrix::Matrix; + =} +} + +main reactor ( + numIterations: usize(12), + maxEdgeWeight: usize(100), + blockSize: usize(50), + numNodes: usize(300), + verbose: bool(false) +) { + state num_iterations(numIterations); + state max_edge_weight(maxEdgeWeight); + state block_size(blockSize); + state num_nodes(numNodes); + state verbose(verbose); + + state graph_data: Matrix; + state num_blocks_finished: usize(0); + + runner = new BenchmarkRunner(num_iterations=numIterations); + matrix = new ApspMatrix( + blockSize=blockSize, + numNodes=numNodes, + dimension={=numNodes / blockSize=}, + dimension_sq={=(numNodes / blockSize)*(numNodes / blockSize)=}, + verbose=verbose + ); + + reaction(startup) {= + print_benchmark_info("ApspBenchmark"); + print_args!( + "numIterations", + self.num_iterations, + "maxEdgeWeight", + self.max_edge_weight, + "numNodes", + self.num_nodes, + "blockSize", + self.block_size + ); + print_system_info(); + + self.graph_data = generate_graph(self.num_nodes as i64, self.max_edge_weight as i64); + =} + + reaction(runner.start) -> matrix.start {= + // reset local state + self.num_blocks_finished = 0; + + // start execution + ctx.set(matrix__start, self.graph_data); + =} + + reaction (matrix.finished) -> runner.finished {= + for f in matrix__finished { + if ctx.is_present(&f) { + self.num_blocks_finished += 1; + } + } + let dimension = self.num_nodes / self.block_size; + if self.num_blocks_finished == dimension * dimension { + ctx.set(runner__finished, ()); + } + =} + + preamble {= + use crate::matrix::Matrix; + use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; + use crate::pseudo_random::PseudoRandomGenerator; + + fn generate_graph(n: i64, w: i64) -> Matrix { + let random = PseudoRandomGenerator::from(n); + let nu = n as usize; + let mut local_data: Matrix = Matrix::new(nu, nu); + + for i in 0..nu { + for j in (i+1)..nu { + let r = random.next_in_range(0..w).into() + 1; + local_data.set(i, j, r); + local_data.set(j, i, r); + } + } + + local_data + } + =} +} \ No newline at end of file From b820c24595f7efe7b3c67114f086934a1a14339b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Haye=C3=9F?= Date: Thu, 11 Aug 2022 12:25:04 +0200 Subject: [PATCH 3/8] Fix typo --- runner/conf/benchmark/savina_parallelism_filterbank.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/conf/benchmark/savina_parallelism_filterbank.yaml b/runner/conf/benchmark/savina_parallelism_filterbank.yaml index 408ea74..da97623 100644 --- a/runner/conf/benchmark/savina_parallelism_filterbank.yaml +++ b/runner/conf/benchmark/savina_parallelism_filterbank.yaml @@ -41,7 +41,7 @@ targets: copy_sources: - "${bench_path}/Rust/Savina/src/lib" - "${bench_path}/Rust/Savina/src/parallelism" - lf_file: "parallelism/Filterbank.lf" + lf_file: "parallelism/FilterBank.lf" binary: "filter_bank" run_args: columns: ["--main-num-columns", ""] From 032a805ccd7e0f23562defa0459cac462eacf627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Fri, 25 Nov 2022 15:01:49 +0100 Subject: [PATCH 4/8] Update rust Apsp There is a bug :( The start reaction is triggered by a port but the port has a missing value... Something probably went wrong during initialization but debugging this is hard. --- .github/workflows/ci.yml | 1 + Rust/Savina/src/lib/matrix.rs | 59 ++++-- Rust/Savina/src/parallelism/Apsp.lf | 194 ++++++++---------- .../benchmark/savina_parallelism_apsp.yaml | 12 +- 4 files changed, 134 insertions(+), 132 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0c2f83c..d755152 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,7 @@ jobs: uses: lf-lang/benchmarks-lingua-franca/.github/workflows/benchmark-tests.yml@main with: target: 'Rust' + compiler-ref: 'rust-childref-multi' ts-benchmark-tests: uses: lf-lang/benchmarks-lingua-franca/.github/workflows/benchmark-tests.yml@main with: diff --git a/Rust/Savina/src/lib/matrix.rs b/Rust/Savina/src/lib/matrix.rs index 51a87ad..206e258 100644 --- a/Rust/Savina/src/lib/matrix.rs +++ b/Rust/Savina/src/lib/matrix.rs @@ -23,25 +23,25 @@ * * @author Johannes Hayeß */ +#![allow(unused)] use std::ops::Add; +use std::fmt; -#[derive(Default, Debug, Clone)] +#[derive(Debug, Clone)] pub struct Matrix { data: Vec, + size_x: usize, size_y: usize, } -#[derive(Default)] -pub struct TransposedMatrix { - data: Vec, - size_x: usize, -} +pub struct TransposedMatrix(Matrix); -impl Matrix { - pub fn new(size_x: usize, size_y: usize) -> Self { +impl Matrix { + pub fn new(size_x: usize, size_y: usize) -> Self where T: Default + Clone { Matrix:: { data: vec![T::default(); size_x * size_y], + size_x, size_y, } } @@ -56,36 +56,51 @@ impl Matrix { } pub fn matrix_sum(matrices: &[Matrix]) -> Matrix -where - T: Default + Clone + Copy + Add, + where + T: Default + Clone + Copy + Add, { - let size_x = matrices[0].data.len() / matrices[0].size_y; + let size_x = matrices[0].size_x; let size_y = matrices[0].size_y; let mut result = Matrix::::new(size_x, size_y); for x in 0..size_x { for y in 0..size_y { - result.data[y * size_x + x] = matrices - .iter() - .fold(T::default(), |acc, m| acc + m.data[y * size_x + x]) + for m in matrices { + result.set(x, y, *result.get(x, y) + *m.get(x, y)) + } } } result } -impl TransposedMatrix { - pub fn new(size_x: usize, size_y: usize) -> Self { - TransposedMatrix:: { - data: vec![T::default(); size_x * size_y], - size_x, - } +impl TransposedMatrix { + pub fn new(size_x: usize, size_y: usize) -> Self where T: Default + Clone { + Self(Matrix::new(size_y, size_x)) } pub fn get(&self, x: usize, y: usize) -> &T { - &self.data[y * self.size_x + x] + self.0.get(y, x) } pub fn set(&mut self, x: usize, y: usize, value: T) { - self.data[y * self.size_x + x] = value; + self.0.set(y, x, value) + } +} + +impl fmt::Display for Matrix { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for i in 0..self.size_x { + for j in 0..self.size_y { + write!(f, "{} ", self.get(i, j))?; + } + write!(f, "\n")?; + } + Ok(()) + } +} + +impl fmt::Display for TransposedMatrix { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) } } diff --git a/Rust/Savina/src/parallelism/Apsp.lf b/Rust/Savina/src/parallelism/Apsp.lf index 0f974af..0fb0368 100644 --- a/Rust/Savina/src/parallelism/Apsp.lf +++ b/Rust/Savina/src/parallelism/Apsp.lf @@ -2,8 +2,8 @@ * Copyright (C) 2020 TU Dresden * * This benchmark implements a parallel all pairs shortest path algorithm. In - * order to split the workload, the large input matrix of size graphSize x - * graphSize is split into smaller blocks of size blockSize x blockSize. Each of + * order to split the workload, the large input matrix of size graph_size x + * graph_size is split into smaller blocks of size block_size x block_size. Each of * the worker reactors (ApspFloydWarshallBlock) processes one of these blocks. * The worker reactors are organized in the same matrix pattern, replication the * structure of the blocks within the large input matrix. Each of the workers @@ -28,44 +28,42 @@ import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; reactor ApspFloydWarshallBlock( bank_index: usize(0), row_index: usize(0), - graphSize: usize(300), - blockSize: usize(50), - dimension: usize(6), - verbose: bool(false) + graph_size: usize(300), + block_size: usize(50), + dimension: usize(6) ) { + state bank_index(bank_index); state row_index(row_index); - state graph_size(graphSize); - state block_size(blockSize); + state graph_size(graph_size); + state block_size(block_size); state dimension(dimension); - state verbose(verbose); state num_neighbors: usize({=2 * (dimension - 1)=}); - state row_offset: usize({=row_index * blockSize=}); // row offset of the block of this reactor - state col_offset: usize({=bank_index * blockSize=}); // column offset of the block of this reactor + state row_offset: usize({=row_index * block_size=}); // row offset of the block of this reactor + state col_offset: usize({=bank_index * block_size=}); // column offset of the block of this reactor state k: usize(0); // iteration counter state reportedFinish: bool(false); input start: Matrix; - input[dimension] fromRow: Arc>; - input[dimension] fromCol: Arc>; + input[dimension] frow_row: Matrix; + input[dimension] frow_col: Matrix; - output toNeighbors: Arc>; + output toNeighbors: Matrix; output finished: unit; - logical action notifyNeighbors: Arc>; + logical action notify_neighbors: Matrix; preamble {= - use std::sync::Arc; use crate::matrix::Matrix; fn get_element_at( row: usize, col: usize, - row_ports: &ReadablePortBank>>, - col_ports: &ReadablePortBank>>, + row_ports: &Multiport>, + col_ports: &Multiport>, ctx: &ReactionCtx, block_size: usize, row_index: usize, @@ -77,37 +75,36 @@ reactor ApspFloydWarshallBlock( let local_col = col % block_size; if dest_row == row_index { - ctx.use_ref_opt(&row_ports.get(dest_col), |r| { - *r.get(local_row, local_col) - }).unwrap() + *ctx.get_ref(&row_ports[dest_col]) + .unwrap() + .get(local_row, local_col) } else if dest_col == bank_index { - ctx.use_ref_opt(&col_ports.get(dest_row), |c| { - *c.get(local_row, local_col) - }).unwrap() + *ctx.get_ref(&col_ports[dest_row]) + .unwrap() + .get(local_row, local_col) } else { - eprintln!("Error: unexpected target location ({},{})", dest_col, dest_row); - std::process::exit(2); + panic!("Error: unexpected target location ({},{})", dest_col, dest_row); } } =} - reaction(start) -> notifyNeighbors {= + reaction(start) -> notify_neighbors {= // reset local state self.k = 0; self.reportedFinish = false; // start execution - let matrix = ctx.use_ref_opt(start, Clone::clone).unwrap(); - ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap); + let matrix = ctx.get_ref(start).unwrap().clone(); + ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap); =} - reaction(notifyNeighbors) -> toNeighbors {= + reaction(notify_neighbors) -> toNeighbors {= //notify all neighbors - ctx.set(toNeighbors, ctx.use_ref_opt(notifyNeighbors, Clone::clone).unwrap()); + ctx.set(toNeighbors, ctx.use_ref_opt(notify_neighbors, Clone::clone).unwrap()); =} - reaction(fromRow, fromCol) -> notifyNeighbors, finished {= + reaction(frow_row, frow_col) -> notify_neighbors, finished {= // do nothing if complete if self.k == self.graph_size { return; @@ -124,8 +121,9 @@ reactor ApspFloydWarshallBlock( let gi = self.row_offset + i; let gj = self.col_offset + j; - let result = get_element_at(gi, self.k, &fromRow, &fromCol, &ctx, bs, ri, bi) + get_element_at(self.k, gj, &fromRow, &fromCol, &ctx, bs, ri, bi); - matrix.set(i, j, result.min(get_element_at(gi, gj, &fromRow, &fromCol, &ctx, bs, ri, bi))); + let result = get_element_at(gi, self.k, frow_row, frow_col, &ctx, bs, ri, bi) + + get_element_at(self.k, gj, frow_row, frow_col, &ctx, bs, ri, bi); + matrix.set(i, j, result.min(get_element_at(gi, gj, frow_row, frow_col, &ctx, bs, ri, bi))); } } @@ -133,145 +131,123 @@ reactor ApspFloydWarshallBlock( self.k += 1; if self.k == self.graph_size { - if self.verbose && self.bank_index == 0 && self.row_index == 0 { - // debugging and result checking - for i in 0..self.block_size { - let mut result = "".to_string(); - for j in 0..self.block_size { - result = format!("{} {}", result, matrix.get(i, j)); - } - info!("{}", result); - } + if self.bank_index == 0 && self.row_index == 0 { + debug!("{}", matrix); } ctx.set(finished, ()); } // send the result to all neighbors in the next iteration - ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap); + ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap); =} } reactor ApspRow( bank_index: usize(0), - blockSize: usize(50), - numNodes: usize(300), + block_size: usize(50), + num_nodes: usize(300), dimension: usize(6), - dimension_sq: usize(36), - verbose: bool(false) + dimension_sq: usize(36) ) { + preamble {= + use crate::matrix::Matrix; + =} input start: Matrix; output[dimension] finished: unit; - input[dimension_sq] fromCol: Matrix; - output[dimension] toCol: Matrix; + input[dimension_sq] frow_col: Matrix; + output[dimension] to_col: Matrix; blocks = new[dimension] ApspFloydWarshallBlock( row_index=bank_index, - blockSize=blockSize, - graphSize=numNodes, - dimension=dimension, - verbose=verbose + block_size=block_size, + graph_size=num_nodes, + dimension=dimension ); // connect all blocks within the row - (blocks.toNeighbors)+ -> blocks.fromRow; + (blocks.toNeighbors)+ -> blocks.frow_row; // block output to all column neighbours - blocks.toNeighbors -> toCol; + blocks.toNeighbors -> to_col; // block input from all column neighbours - fromCol -> interleaved(blocks.fromCol); + frow_col -> interleaved(blocks.frow_col); // broadcast the incoming matrix to all blocks (start)+ -> blocks.start; // collect and forward finished signals from all blocks blocks.finished -> finished; - - preamble {= - use crate::matrix::Matrix; - =} } reactor ApspMatrix( - blockSize: usize(50), - numNodes: usize(300), + block_size: usize(50), + num_nodes: usize(300), dimension: usize(6), - dimension_sq: usize(36), - verbose: bool(false) + dimension_sq: usize(36) ) { + preamble {= + use crate::matrix::Matrix; + =} input start: Matrix; output[dimension_sq] finished: unit; - rows = new[dimension] ApspRow(blockSize=blockSize, numNodes=numNodes, dimension=dimension, dimension_sq=dimension_sq, verbose=verbose); + rows = new[dimension] ApspRow(block_size=block_size, num_nodes=num_nodes, dimension=dimension, dimension_sq=dimension_sq); // broadcast the incoming matrix to all rows (start)+ -> rows.start; // collect and forward finished signals from all blocks rows.finished -> finished; - (rows.toCol)+ -> rows.fromCol; - - preamble {= - use crate::matrix::Matrix; - =} + (rows.to_col)+ -> rows.frow_col; } main reactor ( - numIterations: usize(12), - maxEdgeWeight: usize(100), - blockSize: usize(50), - numNodes: usize(300), - verbose: bool(false) + num_iterations: usize(12), + max_edge_weight: usize(100), + block_size: usize(50), + num_nodes: usize(300) ) { - state num_iterations(numIterations); - state max_edge_weight(maxEdgeWeight); - state block_size(blockSize); - state num_nodes(numNodes); - state verbose(verbose); + state num_iterations(num_iterations); + state max_edge_weight(max_edge_weight); + state block_size(block_size); + state num_nodes(num_nodes); - state graph_data: Matrix; state num_blocks_finished: usize(0); - runner = new BenchmarkRunner(num_iterations=numIterations); + runner = new BenchmarkRunner(num_iterations=num_iterations); matrix = new ApspMatrix( - blockSize=blockSize, - numNodes=numNodes, - dimension={=numNodes / blockSize=}, - dimension_sq={=(numNodes / blockSize)*(numNodes / blockSize)=}, - verbose=verbose + block_size=block_size, + num_nodes=num_nodes, + dimension={=num_nodes / block_size=}, + dimension_sq={=(num_nodes / block_size)*(num_nodes / block_size)=} ); reaction(startup) {= print_benchmark_info("ApspBenchmark"); print_args!( - "numIterations", + "num_iterations", self.num_iterations, - "maxEdgeWeight", + "max_edge_weight", self.max_edge_weight, - "numNodes", + "num_nodes", self.num_nodes, - "blockSize", + "block_size", self.block_size ); print_system_info(); - - self.graph_data = generate_graph(self.num_nodes as i64, self.max_edge_weight as i64); =} reaction(runner.start) -> matrix.start {= // reset local state self.num_blocks_finished = 0; - + let graph_data = generate_graph(self.num_nodes, self.max_edge_weight); // start execution - ctx.set(matrix__start, self.graph_data); + ctx.set(matrix__start, graph_data); =} reaction (matrix.finished) -> runner.finished {= - for f in matrix__finished { - if ctx.is_present(&f) { - self.num_blocks_finished += 1; - } - } + self.num_blocks_finished += matrix__finished.iterate_set().count(); let dimension = self.num_nodes / self.block_size; if self.num_blocks_finished == dimension * dimension { ctx.set(runner__finished, ()); @@ -282,15 +258,15 @@ main reactor ( use crate::matrix::Matrix; use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; use crate::pseudo_random::PseudoRandomGenerator; + use std::os::raw::c_long; - fn generate_graph(n: i64, w: i64) -> Matrix { - let random = PseudoRandomGenerator::from(n); - let nu = n as usize; - let mut local_data: Matrix = Matrix::new(nu, nu); + fn generate_graph(n: usize, w: usize) -> Matrix { + let mut random = PseudoRandomGenerator::from(n as c_long); + let mut local_data: Matrix = Matrix::new(n, n); - for i in 0..nu { - for j in (i+1)..nu { - let r = random.next_in_range(0..w).into() + 1; + for i in 0..n { + for j in (i+1)..n { + let r = u64::from(random.next_in_range(0..w as c_long)) + 1; local_data.set(i, j, r); local_data.set(j, i, r); } @@ -299,4 +275,4 @@ main reactor ( local_data } =} -} \ No newline at end of file +} diff --git a/runner/conf/benchmark/savina_parallelism_apsp.yaml b/runner/conf/benchmark/savina_parallelism_apsp.yaml index bd21af9..6b3a383 100644 --- a/runner/conf/benchmark/savina_parallelism_apsp.yaml +++ b/runner/conf/benchmark/savina_parallelism_apsp.yaml @@ -45,4 +45,14 @@ targets: gen_args: num_workers: ["-D", "numNodes="] block_size: ["-D", "blockSize="] - max_edge_weight: ["-D", "maxEdgeWeight="] \ No newline at end of file + max_edge_weight: ["-D", "maxEdgeWeight="] + lf-rust: + copy_sources: + - "${bench_path}/Rust/Savina/src/lib" + - "${bench_path}/Rust/Savina/src/parallelism" + lf_file: "parallelism/Apsp.lf" + binary: "apsp" + run_args: + block_size: ["--main-block-size", ""] + max_edge_weight: ["--main-max-edge-weight", ""] + num_workers: ["--main-num-nodes", ""] From e98a385af5f4c62bb12948bec0b15027d854efd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Fri, 25 Nov 2022 18:01:18 +0100 Subject: [PATCH 5/8] Update filterbank --- Rust/Savina/src/lib/matrix.rs | 2 +- Rust/Savina/src/parallelism/Apsp.lf | 2 + Rust/Savina/src/parallelism/FilterBank.lf | 270 ++++++++++------------ 3 files changed, 130 insertions(+), 144 deletions(-) diff --git a/Rust/Savina/src/lib/matrix.rs b/Rust/Savina/src/lib/matrix.rs index 206e258..2210eee 100644 --- a/Rust/Savina/src/lib/matrix.rs +++ b/Rust/Savina/src/lib/matrix.rs @@ -23,7 +23,7 @@ * * @author Johannes Hayeß */ -#![allow(unused)] +#![allow(dead_code)] use std::ops::Add; use std::fmt; diff --git a/Rust/Savina/src/parallelism/Apsp.lf b/Rust/Savina/src/parallelism/Apsp.lf index 0fb0368..c71099e 100644 --- a/Rust/Savina/src/parallelism/Apsp.lf +++ b/Rust/Savina/src/parallelism/Apsp.lf @@ -89,6 +89,7 @@ reactor ApspFloydWarshallBlock( } =} + // @label block_start reaction(start) -> notify_neighbors {= // reset local state self.k = 0; @@ -238,6 +239,7 @@ main reactor ( print_system_info(); =} + // @label dostart reaction(runner.start) -> matrix.start {= // reset local state self.num_blocks_finished = 0; diff --git a/Rust/Savina/src/parallelism/FilterBank.lf b/Rust/Savina/src/parallelism/FilterBank.lf index 13612c5..26cd919 100644 --- a/Rust/Savina/src/parallelism/FilterBank.lf +++ b/Rust/Savina/src/parallelism/FilterBank.lf @@ -50,8 +50,8 @@ target Rust { import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; -reactor Producer(numChannels: usize(8), numSimulations: usize(34816), numColumns: usize(16384)) { - state num_simulations(numSimulations); +reactor Producer(num_channels: usize(8), num_simulations: usize(34816), num_columns: usize(16384)) { + state num_simulations(num_simulations); state numMessagesSent: usize(0); @@ -87,98 +87,94 @@ reactor Source { input next: unit; output value: f64; - input inFinished: unit; - output outFinished: unit; + input in_finished: unit; + output out_finished: unit; reaction (next) -> value {= ctx.set(value, self.current as f64); self.current = (self.current + 1) % self.max_value; =} - reaction (inFinished) -> outFinished {= - ctx.set(outFinished, ()); + reaction (in_finished) -> out_finished {= + ctx.set(out_finished, ()); // reset local state self.current = 0; =} } -reactor Bank(bank_index: usize(0), numColumns: usize(16384), numChannels: usize(8)) { - input inValue: f64; - input inFinished: unit; - output outValue: f64; - output outFinished: unit; +reactor Bank(bank_index: usize(0), num_columns: usize(16384), num_channels: usize(8), coefficients: super::filter_bank::MatrixPair({= panic!("") =})) { + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; - input setF: Arc>; - input setH: Arc>; + input set_f: Arc>; + input set_h: Arc>; preamble {= use std::sync::Arc; use crate::matrix::Matrix; =} - delay0 = new Delay(delayLength={=numColumns - 1=}); - fir0 = new FirFilter(bank_index=bank_index, peekLength=numColumns); - sample = new SampleFilter(sampleRate=numColumns); - delay1 = new Delay(delayLength={=numColumns - 1=}); - fir1 = new FirFilter(bank_index=bank_index, peekLength=numColumns); + delay0 = new Delay(delayLength={=num_columns - 1=}); + fir0 = new FirFilter(bank_index=bank_index, peekLength=num_columns, coefficients={=coefficients.0.clone()=}); + sample = new SampleFilter(sampleRate=num_columns); + delay1 = new Delay(delayLength={=num_columns - 1=}); + fir1 = new FirFilter(bank_index=bank_index, peekLength=num_columns, coefficients={=coefficients.1.clone()=}); - inFinished, delay0.outFinished, fir0.outFinished, sample.outFinished, delay1.outFinished, fir1.outFinished -> - delay0.inFinished, fir0.inFinished, sample.inFinished, delay1.inFinished, fir1.inFinished, outFinished; + in_finished, delay0.out_finished, fir0.out_finished, sample.out_finished, delay1.out_finished, fir1.out_finished -> + delay0.in_finished, fir0.in_finished, sample.in_finished, delay1.in_finished, fir1.in_finished, out_finished; - inValue, delay0.outValue, fir0.outValue, sample.outValue, delay1.outValue, fir1.outValue -> - delay0.inValue, fir0.inValue, sample.inValue, delay1.inValue, fir1.inValue, outValue; - - setH -> fir0.setCoefficients; - setF -> fir1.setCoefficients; + in_value, delay0.out_value, fir0.out_value, sample.out_value, delay1.out_value, fir1.out_value -> + delay0.in_value, fir0.in_value, sample.in_value, delay1.in_value, fir1.in_value, out_value; } reactor Delay(delayLength: usize(16383)) { state delay_length(delayLength); - state myState: Vec; - state placeHolder: usize(0); + state my_state: Vec; + state placeholder: usize(0); - input inValue: f64; - input inFinished: unit; - output outValue: f64; - output outFinished: unit; + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; reaction(startup) {= // one time init - self.myState = vec![0.0; self.delay_length]; - self.placeHolder = 0; + self.my_state = vec![0.0; self.delay_length]; + self.placeholder = 0; =} - reaction(inValue) -> outValue {= - let result = ctx.get(inValue).unwrap(); - ctx.set(outValue, self.myState[self.placeHolder]); - self.myState[self.placeHolder] = result; - self.placeHolder = (self.placeHolder + 1) % self.delay_length; + reaction(in_value) -> out_value {= + let result = ctx.get(in_value).unwrap(); + ctx.set(out_value, self.my_state[self.placeholder]); + self.my_state[self.placeholder] = result; + self.placeholder = (self.placeholder + 1) % self.delay_length; =} - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, ()); + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); // reset local state - self.myState = vec![0.0; self.delay_length]; - self.placeHolder = 0; + self.my_state = vec![0.0; self.delay_length]; + self.placeholder = 0; =} } -reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { +reactor FirFilter(bank_index: usize(0), peekLength: usize(16384), coefficients: Arc>({= panic!("") =})) { state bank_index(bank_index); state peek_length(peekLength); state data: Vec; state data_index: usize(0); state data_full: bool(false); - state coefficients: Arc>; + state coefficients: Arc>(coefficients); - input setCoefficients: Arc>; - input inValue: f64; - input inFinished: unit; - output outValue: f64; - output outFinished: unit; + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; preamble {= use std::sync::Arc; @@ -192,14 +188,8 @@ reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { self.data_full = false; =} - reaction(setCoefficients) {= - ctx.use_ref_opt(setCoefficients, |c| { - self.coefficients = c.clone(); - }); - =} - - reaction(inValue) -> outValue {= - self.data[self.data_index] = ctx.get(inValue).unwrap(); + reaction(in_value) -> out_value {= + self.data[self.data_index] = ctx.get(in_value).unwrap(); self.data_index += 1; if self.data_index == self.peek_length { @@ -212,12 +202,12 @@ reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { for (i, d) in self.data.iter().enumerate() { sum += self.data[i] * self.coefficients.get(self.bank_index, self.peek_length - i - 1); } - ctx.set(outValue, sum); + ctx.set(out_value, sum); } =} - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, ()); + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); // reset local state self.data = vec![0.0; self.peek_length]; @@ -229,58 +219,55 @@ reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { reactor SampleFilter(sampleRate: usize(16384)) { state sample_rate(sampleRate); - state samplesReceived: usize(0); + state samples_received: usize(0); - input inValue: f64; - input inFinished: unit; - output outValue: f64; - output outFinished: unit; + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; - reaction(inValue) -> outValue {= - if self.samplesReceived == 0 { - ctx.set(outValue, ctx.get(inValue).unwrap()); + reaction(in_value) -> out_value {= + if self.samples_received == 0 { + ctx.set(out_value, ctx.get(in_value).unwrap()); } else { - ctx.set(outValue, 0.0); + ctx.set(out_value, 0.0); } - self.samplesReceived = (self.samplesReceived + 1) % self.sample_rate; + self.samples_received = (self.samples_received + 1) % self.sample_rate; =} - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, ()); + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); // reset local state - self.samplesReceived = 0; + self.samples_received = 0; =} } -reactor Combine(numChannels: usize(8)) { - state num_channels(numChannels); +reactor Combine(num_channels: usize(8)) { + state num_channels(num_channels); - input[numChannels] inValues: f64; - input[numChannels] inFinished: unit; - output outValue: f64; - output outFinished: unit; + input[num_channels] inValues: f64; + input[num_channels] in_finished: unit; + output out_value: f64; + output out_finished: unit; - state numFinished: usize(0); + state num_finished: usize(0); - reaction(inValues) -> outValue {= + reaction(inValues) -> out_value {= let mut sum = 0.0; for x in inValues { - sum += ctx.get(&x).unwrap(); + sum += ctx.get(x).unwrap(); } - ctx.set(outValue, sum); + ctx.set(out_value, sum); =} - reaction(inFinished) -> outFinished {= - for port in inFinished { - if ctx.is_present(&port) { - self.numFinished += 1; - } - } - if self.numFinished == self.num_channels { - ctx.set(outFinished, ()); + reaction(in_finished) -> out_finished {= + self.num_finished += in_finished.iterate_set().count(); + + if self.num_finished == self.num_channels { + ctx.set(out_finished, ()); // reset local state - self.numFinished = 0; + self.num_finished = 0; } =} } @@ -290,12 +277,12 @@ reactor Sink(printRate: usize(100)) { state count: usize(0); - input inValue: f64; - input inFinished: unit; - output outFinished: unit; + input in_value: f64; + input in_finished: unit; + output out_finished: unit; - reaction(inValue) {= - let result = ctx.get(inValue).unwrap(); + reaction(in_value) {= + let result = ctx.get(in_value).unwrap(); if self.count % self.print_rate == 0 { info!("SinkActor: result = {}", result); @@ -303,83 +290,80 @@ reactor Sink(printRate: usize(100)) { self.count += 1; =} - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, ()); + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); // reset local state self.count = 0; =} } -main reactor (numIterations: usize(12), numSimulations: usize(34816), numColumns: usize(16384), numChannels: usize(8)) { - state num_iterations(numIterations); - state num_simulations(numSimulations); - state num_columns(numColumns); - state num_channels(numChannels); +main reactor (num_iterations: usize(12), num_simulations: usize(34816), num_columns: usize(16384), num_channels: usize(8)) { + state num_iterations(num_iterations); + state num_simulations(num_simulations); + state num_columns(num_columns); + state num_channels(num_channels); preamble {= use std::sync::Arc; - =} - - reaction(startup) -> banks.setF, banks.setH {= - // initialize the coefficients of all FIR filters - let mut mH: Matrix = Matrix::new(self.num_channels, self.num_columns); - let mut mF: Matrix = Matrix::new(self.num_channels, self.num_columns); - - for j in 0..self.num_channels { - for i in 0..self.num_columns { - let h = ((i * self.num_columns) + (j * self.num_channels) + j + i + j + 1) as f64; - let f = ((i * j) + (j * j) + j + i) as f64; - mH.set(j, i, h); - mF.set(j, i, f); + pub type MatrixPair = (Arc>, Arc>); + + fn create_coeffs(num_channels: usize, num_columns: usize) -> MatrixPair { + // initialize the coefficients of all FIR filters + let mut mH: Matrix = Matrix::new(num_channels, num_columns); + let mut mF: Matrix = Matrix::new(num_channels, num_columns); + + for j in 0..num_channels { + for i in 0..num_columns { + let h = ((i * num_columns) + (j * num_channels) + j + i + j + 1) as f64; + let f = ((i * j) + (j * j) + j + i) as f64; + mH.set(j, i, h); + mF.set(j, i, f); + } } + + // convert to immutable pointers before sending. This ensures that all recipients can receive a pointer + // to the same matrix and no copying is needed + (Arc::new(mH), Arc::new(mF)) } + =} - // convert to immutable pointers before sending. This ensures that all recipients can receive a pointer - // to the same matrix and no copying is needed - let mH_r = Arc::new(mH); - let mF_r = Arc::new(mF); - - for (h, f) in banks__setH.into_iter().zip(banks__setF.into_iter()) { - ctx.set(h, Arc::clone(&mH_r)); - ctx.set(f, Arc::clone(&mF_r)); - } - + reaction(startup) -> banks.set_f, banks.set_h {= print_benchmark_info("FilterBankBenchmark"); print_args!( - "numIterations", + "num_iterations", self.num_iterations, - "numSimulations", + "num_simulations", self.num_simulations, - "numColumns", + "num_columns", self.num_columns, - "numChannels", + "num_channels", self.num_channels ); print_system_info(); =} - runner = new BenchmarkRunner(num_iterations=numIterations); - producer = new Producer(numSimulations=numSimulations, numChannels=numChannels, numColumns=numColumns); + runner = new BenchmarkRunner(num_iterations=num_iterations); + producer = new Producer(num_simulations=num_simulations, num_channels=num_channels, num_columns=num_columns); source = new Source(); - banks = new[numChannels] Bank(numColumns=numColumns, numChannels=numChannels); - combine = new Combine(numChannels=numChannels); + banks = new[num_channels] Bank(num_columns=num_columns, num_channels=num_channels, coefficients={= create_coeffs(num_channels, num_columns) =}); + combine = new Combine(num_channels=num_channels); sink = new Sink(printRate=100); runner.start -> producer.start; producer.next -> source.next; - producer.finished -> source.inFinished; + producer.finished -> source.in_finished; - (source.value)+ -> banks.inValue; - (source.outFinished)+ -> banks.inFinished; + (source.value)+ -> banks.in_value; + (source.out_finished)+ -> banks.in_finished; - banks.outValue -> combine.inValues; - banks.outFinished -> combine.inFinished + banks.out_value -> combine.inValues; + banks.out_finished -> combine.in_finished - combine.outValue -> sink.inValue; - combine.outFinished -> sink.inFinished; + combine.out_value -> sink.in_value; + combine.out_finished -> sink.in_finished; - sink.outFinished -> runner.finished; + sink.out_finished -> runner.finished; preamble {= use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; From 617cba60fd3f65449c0a4c9a4dbfc4166736c464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Thu, 5 Jan 2023 14:55:31 +0100 Subject: [PATCH 6/8] REVERT ME: Remove Apsp because it fails --- Rust/Savina/src/parallelism/Apsp.lf | 280 ------------------ .../benchmark/savina_parallelism_apsp.yaml | 10 - 2 files changed, 290 deletions(-) delete mode 100644 Rust/Savina/src/parallelism/Apsp.lf diff --git a/Rust/Savina/src/parallelism/Apsp.lf b/Rust/Savina/src/parallelism/Apsp.lf deleted file mode 100644 index c71099e..0000000 --- a/Rust/Savina/src/parallelism/Apsp.lf +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Copyright (C) 2020 TU Dresden - * - * This benchmark implements a parallel all pairs shortest path algorithm. In - * order to split the workload, the large input matrix of size graph_size x - * graph_size is split into smaller blocks of size block_size x block_size. Each of - * the worker reactors (ApspFloydWarshallBlock) processes one of these blocks. - * The worker reactors are organized in the same matrix pattern, replication the - * structure of the blocks within the large input matrix. Each of the workers - * operates on its local block data, and sends results to all other workers in - * the same column or in the same row. The data from the neighbors is then used - * to compute the next intermediate result and to update the local state - * accordingly. - * - * @author Christian Menard - * @author Hannes Klein - * @author Johannes Hayeß - */ - -target Rust { - build-type : Release, - cargo-features: [ "cli" ], - rust-include: [ "../lib/matrix.rs", "../lib/pseudo_random.rs"], -}; - -import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; - -reactor ApspFloydWarshallBlock( - bank_index: usize(0), - row_index: usize(0), - graph_size: usize(300), - block_size: usize(50), - dimension: usize(6) -) { - - state bank_index(bank_index); - state row_index(row_index); - state graph_size(graph_size); - state block_size(block_size); - state dimension(dimension); - - state num_neighbors: usize({=2 * (dimension - 1)=}); - state row_offset: usize({=row_index * block_size=}); // row offset of the block of this reactor - state col_offset: usize({=bank_index * block_size=}); // column offset of the block of this reactor - - state k: usize(0); // iteration counter - state reportedFinish: bool(false); - - input start: Matrix; - - input[dimension] frow_row: Matrix; - input[dimension] frow_col: Matrix; - - output toNeighbors: Matrix; - output finished: unit; - - logical action notify_neighbors: Matrix; - - preamble {= - use crate::matrix::Matrix; - - fn get_element_at( - row: usize, - col: usize, - row_ports: &Multiport>, - col_ports: &Multiport>, - ctx: &ReactionCtx, - block_size: usize, - row_index: usize, - bank_index: usize, - ) -> u64 { - let dest_row = row / block_size; - let dest_col = col / block_size; - let local_row = row % block_size; - let local_col = col % block_size; - - if dest_row == row_index { - *ctx.get_ref(&row_ports[dest_col]) - .unwrap() - .get(local_row, local_col) - } else if dest_col == bank_index { - *ctx.get_ref(&col_ports[dest_row]) - .unwrap() - .get(local_row, local_col) - } else { - panic!("Error: unexpected target location ({},{})", dest_col, dest_row); - } - - } - =} - - // @label block_start - reaction(start) -> notify_neighbors {= - // reset local state - self.k = 0; - self.reportedFinish = false; - - // start execution - let matrix = ctx.get_ref(start).unwrap().clone(); - ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap); - =} - - reaction(notify_neighbors) -> toNeighbors {= - //notify all neighbors - ctx.set(toNeighbors, ctx.use_ref_opt(notify_neighbors, Clone::clone).unwrap()); - =} - - reaction(frow_row, frow_col) -> notify_neighbors, finished {= - // do nothing if complete - if self.k == self.graph_size { - return; - } - - // perform computation - let mut matrix: Matrix = Matrix::new(self.block_size, self.block_size); - let bs = self.block_size; - let ri = self.row_index; - let bi = self.bank_index; - - for i in 0..self.block_size { - for j in 0..self.block_size { - let gi = self.row_offset + i; - let gj = self.col_offset + j; - - let result = get_element_at(gi, self.k, frow_row, frow_col, &ctx, bs, ri, bi) - + get_element_at(self.k, gj, frow_row, frow_col, &ctx, bs, ri, bi); - matrix.set(i, j, result.min(get_element_at(gi, gj, frow_row, frow_col, &ctx, bs, ri, bi))); - } - } - - // increment iteration count - self.k += 1; - - if self.k == self.graph_size { - if self.bank_index == 0 && self.row_index == 0 { - debug!("{}", matrix); - } - ctx.set(finished, ()); - } - - // send the result to all neighbors in the next iteration - ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap); - =} -} - -reactor ApspRow( - bank_index: usize(0), - block_size: usize(50), - num_nodes: usize(300), - dimension: usize(6), - dimension_sq: usize(36) -) { - preamble {= - use crate::matrix::Matrix; - =} - - input start: Matrix; - output[dimension] finished: unit; - - input[dimension_sq] frow_col: Matrix; - output[dimension] to_col: Matrix; - - blocks = new[dimension] ApspFloydWarshallBlock( - row_index=bank_index, - block_size=block_size, - graph_size=num_nodes, - dimension=dimension - ); - - // connect all blocks within the row - (blocks.toNeighbors)+ -> blocks.frow_row; - - // block output to all column neighbours - blocks.toNeighbors -> to_col; - // block input from all column neighbours - frow_col -> interleaved(blocks.frow_col); - - // broadcast the incoming matrix to all blocks - (start)+ -> blocks.start; - // collect and forward finished signals from all blocks - blocks.finished -> finished; -} - -reactor ApspMatrix( - block_size: usize(50), - num_nodes: usize(300), - dimension: usize(6), - dimension_sq: usize(36) -) { - preamble {= - use crate::matrix::Matrix; - =} - input start: Matrix; - output[dimension_sq] finished: unit; - - rows = new[dimension] ApspRow(block_size=block_size, num_nodes=num_nodes, dimension=dimension, dimension_sq=dimension_sq); - - // broadcast the incoming matrix to all rows - (start)+ -> rows.start; - // collect and forward finished signals from all blocks - rows.finished -> finished; - - (rows.to_col)+ -> rows.frow_col; -} - -main reactor ( - num_iterations: usize(12), - max_edge_weight: usize(100), - block_size: usize(50), - num_nodes: usize(300) -) { - state num_iterations(num_iterations); - state max_edge_weight(max_edge_weight); - state block_size(block_size); - state num_nodes(num_nodes); - - state num_blocks_finished: usize(0); - - runner = new BenchmarkRunner(num_iterations=num_iterations); - matrix = new ApspMatrix( - block_size=block_size, - num_nodes=num_nodes, - dimension={=num_nodes / block_size=}, - dimension_sq={=(num_nodes / block_size)*(num_nodes / block_size)=} - ); - - reaction(startup) {= - print_benchmark_info("ApspBenchmark"); - print_args!( - "num_iterations", - self.num_iterations, - "max_edge_weight", - self.max_edge_weight, - "num_nodes", - self.num_nodes, - "block_size", - self.block_size - ); - print_system_info(); - =} - - // @label dostart - reaction(runner.start) -> matrix.start {= - // reset local state - self.num_blocks_finished = 0; - let graph_data = generate_graph(self.num_nodes, self.max_edge_weight); - // start execution - ctx.set(matrix__start, graph_data); - =} - - reaction (matrix.finished) -> runner.finished {= - self.num_blocks_finished += matrix__finished.iterate_set().count(); - let dimension = self.num_nodes / self.block_size; - if self.num_blocks_finished == dimension * dimension { - ctx.set(runner__finished, ()); - } - =} - - preamble {= - use crate::matrix::Matrix; - use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; - use crate::pseudo_random::PseudoRandomGenerator; - use std::os::raw::c_long; - - fn generate_graph(n: usize, w: usize) -> Matrix { - let mut random = PseudoRandomGenerator::from(n as c_long); - let mut local_data: Matrix = Matrix::new(n, n); - - for i in 0..n { - for j in (i+1)..n { - let r = u64::from(random.next_in_range(0..w as c_long)) + 1; - local_data.set(i, j, r); - local_data.set(j, i, r); - } - } - - local_data - } - =} -} diff --git a/runner/conf/benchmark/savina_parallelism_apsp.yaml b/runner/conf/benchmark/savina_parallelism_apsp.yaml index 6b3a383..e379d54 100644 --- a/runner/conf/benchmark/savina_parallelism_apsp.yaml +++ b/runner/conf/benchmark/savina_parallelism_apsp.yaml @@ -46,13 +46,3 @@ targets: num_workers: ["-D", "numNodes="] block_size: ["-D", "blockSize="] max_edge_weight: ["-D", "maxEdgeWeight="] - lf-rust: - copy_sources: - - "${bench_path}/Rust/Savina/src/lib" - - "${bench_path}/Rust/Savina/src/parallelism" - lf_file: "parallelism/Apsp.lf" - binary: "apsp" - run_args: - block_size: ["--main-block-size", ""] - max_edge_weight: ["--main-max-edge-weight", ""] - num_workers: ["--main-num-nodes", ""] From 17b6e510ad5e98549d7e5a0a7c66c276bb4955b7 Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Tue, 10 Jan 2023 08:36:30 +0100 Subject: [PATCH 7/8] Update ci.yml --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d755152..0c2f83c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,6 @@ jobs: uses: lf-lang/benchmarks-lingua-franca/.github/workflows/benchmark-tests.yml@main with: target: 'Rust' - compiler-ref: 'rust-childref-multi' ts-benchmark-tests: uses: lf-lang/benchmarks-lingua-franca/.github/workflows/benchmark-tests.yml@main with: From cdc233865da965736d0102a381bc963932f06e62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Thu, 12 Jan 2023 16:16:20 +0100 Subject: [PATCH 8/8] Fix matmul --- Rust/Savina/src/lib/matrix.rs | 4 ++ Rust/Savina/src/parallelism/MatMul.lf | 69 +++++++++++++-------------- 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/Rust/Savina/src/lib/matrix.rs b/Rust/Savina/src/lib/matrix.rs index 2210eee..88aca44 100644 --- a/Rust/Savina/src/lib/matrix.rs +++ b/Rust/Savina/src/lib/matrix.rs @@ -53,6 +53,10 @@ impl Matrix { pub fn set(&mut self, x: usize, y: usize, value: T) { self.data[x * self.size_y + y] = value; } + + pub fn transpose(self) -> TransposedMatrix { + TransposedMatrix(self) + } } pub fn matrix_sum(matrices: &[Matrix]) -> Matrix diff --git a/Rust/Savina/src/parallelism/MatMul.lf b/Rust/Savina/src/parallelism/MatMul.lf index 0fd957c..b64381d 100644 --- a/Rust/Savina/src/parallelism/MatMul.lf +++ b/Rust/Savina/src/parallelism/MatMul.lf @@ -44,8 +44,29 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { state num_workers(numWorkers); state data_length(dataLength); - state A: Arc>; - state B: Arc>; + preamble {= + #[derive(Clone)] + pub struct Data(pub Arc>, pub Arc>, pub Weak>>); + =} + + state A: Arc>({= { + let mut a = Matrix::::new(data_length, data_length); + for i in 0..data_length { + for j in 0..data_length { + a.set(i, j, i as f64); + } + } + Arc::new(a) + } =}); + state B: Arc>({= { + let mut a = TransposedMatrix::::new(data_length, data_length); + for i in 0..data_length { + for j in 0..data_length { + a.set(i, j, j as f64); + } + } + Arc::new(a) + } =}); state C: {= Vec>>> =} state workQueue: VecDeque; @@ -56,31 +77,10 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { input start: unit; output finished: unit; - output[numWorkers] data: {=(Arc>, Arc>, Weak>>)=}; + output[numWorkers] data: Data; output[numWorkers] doWork: WorkItem; input[numWorkers] moreWork: {=[WorkItem; 8]=}; - reaction(startup) {= - // Fill both input arrays with data - let (a, b) = { - let mut a = Matrix::::new(self.data_length, self.data_length); - let mut b = TransposedMatrix::::new(self.data_length, self.data_length); - - for i in 0..self.data_length { - for j in 0..self.data_length { - a.set(i, j, i as f64); - b.set(i, j, j as f64); - } - } - - (Arc::new(a), Arc::new(b)) - }; - - self.A = a; - self.B = b; - self.C = Vec::new(); - =} - reaction(start) -> data, next {= // reset the result matrix C for _ in 0..self.num_workers { @@ -89,7 +89,7 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { // send pointers to all 3 matrixes to the workers for (d, c) in data.into_iter().zip(&self.C) { - ctx.set(d, (Arc::clone(&self.A), Arc::clone(&self.B), Arc::downgrade(&c))); + ctx.set(d, Data(Arc::clone(&self.A), Arc::clone(&self.B), Arc::downgrade(&c))); } // produce the first work item, instructing the worker to multiply the complete matrix @@ -171,11 +171,9 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { reactor Worker(threshold: usize(16384)) { state threshold(threshold); - state A: Arc>; - state B: Arc>; - state C: Weak>>; + state data_ref: Option; - input data: {=(Arc>, Arc>, Weak>>)=}; + input data: super::manager::Data; input doWork: WorkItem; output moreWork: {=[WorkItem; 8]=}; @@ -186,11 +184,7 @@ reactor Worker(threshold: usize(16384)) { =} reaction (data) {= - ctx.use_ref_opt(data, |(a, b, c)| { - self.A = a.clone(); - self.B = b.clone(); - self.C = c.clone(); - }); + self.data_ref = ctx.use_ref_opt(data, Clone::clone); =} reaction(doWork) -> moreWork {= @@ -214,18 +208,19 @@ reactor Worker(threshold: usize(16384)) { work_queue[7] = WorkItem{srA: wi.srA + dim, scA: wi.scA + dim, srB: wi.srB + dim, scB: wi.scB + dim, srC: wi.srC + dim, scC: wi.scC + dim, numBlocks, dim}; ctx.set(moreWork, work_queue); - } else { + } else if let Some(super::manager::Data(a, b, c)) = &self.data_ref { + // otherwise we compute the result directly let end_r = wi.srC + wi.dim; let end_c = wi.scC + wi.dim; - let upgraded = self.C.upgrade().unwrap(); + let upgraded = c.upgrade().unwrap(); let mut c = upgraded.lock().unwrap(); for i in wi.srC..end_r { for j in wi.scC..end_c { for k in 0..wi.dim { - let mut v = self.A.get(i, wi.scA + k) * self.B.get(wi.srB + k, j); + let mut v = a.get(i, wi.scA + k) * b.get(wi.srB + k, j); v += c.get(i, j); c.set(i, j, v); }