From c2166059d1ca868f2a2d380e74329f58b762d809 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sun, 11 Feb 2024 00:50:27 -0700 Subject: [PATCH] Enabled C federated tests with Rust RTI - At first, thirty-five C federated tests are evaluated with Rust RTI --- .../java/org/lflang/tests/RunSingleTest.java | 3 + .../lflang/tests/SimplifiedRuntimeTest.java | 41 ++++ .../org/lflang/tests/runtime/RustRtiTest.java | 62 ++++++ .../federated/generator/FedGenerator.java | 96 +++++++++ .../launcher/FedLauncherGenerator.java | 194 +++++++++++++++++- .../org/lflang/generator/LFGenerator.java | 39 +++- .../generator/docker/DockerGenerator.java | 2 +- .../main/java/org/lflang/target/Target.java | 15 +- core/src/main/resources/lib/c/reactor-c | 2 +- .../java/org/lflang/tests/TestBase.java | 141 +++++++++++++ test/RustRti/.gitignore | 1 + test/RustRti/src/federated/Absent.lf | 46 +++++ .../src/federated/BroadcastFeedback.lf | 33 +++ .../BroadcastFeedbackWithHierarchy.lf | 40 ++++ test/RustRti/src/federated/DistributedBank.lf | 24 +++ .../federated/DistributedBankToMultiport.lf | 33 +++ .../src/federated/DistributedDoublePort.lf | 52 +++++ .../src/federated/DistributedInterleaved.lf | 44 ++++ .../src/federated/DistributedLoopedAction.lf | 62 ++++++ .../src/federated/DistributedMultiport.lf | 48 +++++ .../federated/DistributedMultiportToBank.lf | 41 ++++ .../federated/DistributedMultiportToken.lf | 46 +++++ .../src/federated/DistributedNetworkOrder.lf | 75 +++++++ .../DistributedPhysicalActionUpstream.lf | 60 ++++++ .../src/federated/DistributedStopZero.lf | 84 ++++++++ .../federated/EnclaveFederatedRequestStop.lf | 39 ++++ .../src/federated/FederatedFilePkgReader.lf | 57 +++++ .../src/federated/FederatedFileReader.lf | 66 ++++++ test/RustRti/src/federated/FeedbackDelay.lf | 85 ++++++++ test/RustRti/src/federated/FeedbackDelay3.lf | 41 ++++ test/RustRti/src/federated/FeedbackDelay5.lf | 57 +++++ .../src/federated/FeedbackDelaySimple.lf | 41 ++++ .../RustRti/src/federated/HelloDistributed.lf | 56 +++++ .../src/federated/InheritanceFederated.lf | 23 +++ .../federated/LoopDistributedCentralized.lf | 48 +++++ .../federated/LoopDistributedCentralized2.lf | 75 +++++++ ...oopDistributedCentralizedPhysicalAction.lf | 74 +++++++ .../LoopDistributedCentralizedPrecedence.lf | 56 +++++ ...stributedCentralizedPrecedenceHierarchy.lf | 73 +++++++ .../src/federated/ParallelDestinations.lf | 23 +++ test/RustRti/src/federated/ParallelSources.lf | 24 +++ .../src/federated/ParallelSourcesMultiport.lf | 34 +++ test/RustRti/src/federated/SimpleFederated.lf | 17 ++ .../src/federated/SpuriousDependency.lf | 63 ++++++ test/RustRti/src/federated/StopAtShutdown.lf | 45 ++++ .../src/federated/TopLevelArtifacts.lf | 44 ++++ test/RustRti/src/lib/Count.lf | 11 + test/RustRti/src/lib/FileLevelPreamble.lf | 12 ++ test/RustRti/src/lib/FileReader.txt | 1 + test/RustRti/src/lib/GenDelay.lf | 21 ++ test/RustRti/src/lib/Imported.lf | 14 ++ test/RustRti/src/lib/ImportedAgain.lf | 15 ++ test/RustRti/src/lib/ImportedComposition.lf | 22 ++ test/RustRti/src/lib/InternalDelay.lf | 15 ++ test/RustRti/src/lib/LoopedActionSender.lf | 36 ++++ test/RustRti/src/lib/PassThrough.lf | 11 + test/RustRti/src/lib/Test.lf | 15 ++ test/RustRti/src/lib/TestCount.lf | 34 +++ test/RustRti/src/lib/TestCountMultiport.lf | 41 ++++ 59 files changed, 2565 insertions(+), 8 deletions(-) create mode 100644 core/src/integrationTest/java/org/lflang/tests/SimplifiedRuntimeTest.java create mode 100644 core/src/integrationTest/java/org/lflang/tests/runtime/RustRtiTest.java create mode 100644 test/RustRti/.gitignore create mode 100644 test/RustRti/src/federated/Absent.lf create mode 100644 test/RustRti/src/federated/BroadcastFeedback.lf create mode 100644 test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf create mode 100644 test/RustRti/src/federated/DistributedBank.lf create mode 100644 test/RustRti/src/federated/DistributedBankToMultiport.lf create mode 100644 test/RustRti/src/federated/DistributedDoublePort.lf create mode 100644 test/RustRti/src/federated/DistributedInterleaved.lf create mode 100644 test/RustRti/src/federated/DistributedLoopedAction.lf create mode 100644 test/RustRti/src/federated/DistributedMultiport.lf create mode 100644 test/RustRti/src/federated/DistributedMultiportToBank.lf create mode 100644 test/RustRti/src/federated/DistributedMultiportToken.lf create mode 100644 test/RustRti/src/federated/DistributedNetworkOrder.lf create mode 100644 test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf create mode 100644 test/RustRti/src/federated/DistributedStopZero.lf create mode 100644 test/RustRti/src/federated/EnclaveFederatedRequestStop.lf create mode 100644 test/RustRti/src/federated/FederatedFilePkgReader.lf create mode 100644 test/RustRti/src/federated/FederatedFileReader.lf create mode 100644 test/RustRti/src/federated/FeedbackDelay.lf create mode 100644 test/RustRti/src/federated/FeedbackDelay3.lf create mode 100644 test/RustRti/src/federated/FeedbackDelay5.lf create mode 100644 test/RustRti/src/federated/FeedbackDelaySimple.lf create mode 100644 test/RustRti/src/federated/HelloDistributed.lf create mode 100644 test/RustRti/src/federated/InheritanceFederated.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralized.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralized2.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf create mode 100644 test/RustRti/src/federated/ParallelDestinations.lf create mode 100644 test/RustRti/src/federated/ParallelSources.lf create mode 100644 test/RustRti/src/federated/ParallelSourcesMultiport.lf create mode 100644 test/RustRti/src/federated/SimpleFederated.lf create mode 100644 test/RustRti/src/federated/SpuriousDependency.lf create mode 100644 test/RustRti/src/federated/StopAtShutdown.lf create mode 100644 test/RustRti/src/federated/TopLevelArtifacts.lf create mode 100644 test/RustRti/src/lib/Count.lf create mode 100644 test/RustRti/src/lib/FileLevelPreamble.lf create mode 100644 test/RustRti/src/lib/FileReader.txt create mode 100644 test/RustRti/src/lib/GenDelay.lf create mode 100644 test/RustRti/src/lib/Imported.lf create mode 100644 test/RustRti/src/lib/ImportedAgain.lf create mode 100644 test/RustRti/src/lib/ImportedComposition.lf create mode 100644 test/RustRti/src/lib/InternalDelay.lf create mode 100644 test/RustRti/src/lib/LoopedActionSender.lf create mode 100644 test/RustRti/src/lib/PassThrough.lf create mode 100644 test/RustRti/src/lib/Test.lf create mode 100644 test/RustRti/src/lib/TestCount.lf create mode 100644 test/RustRti/src/lib/TestCountMultiport.lf diff --git a/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java b/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java index 0200f98183..cbab653e3b 100644 --- a/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java +++ b/core/src/integrationTest/java/org/lflang/tests/RunSingleTest.java @@ -37,6 +37,7 @@ import org.lflang.tests.runtime.CTest; import org.lflang.tests.runtime.CppTest; import org.lflang.tests.runtime.PythonTest; +import org.lflang.tests.runtime.RustRtiTest; import org.lflang.tests.runtime.RustTest; import org.lflang.tests.runtime.TypeScriptTest; @@ -90,6 +91,8 @@ private static Class getTestInstance(Target target) { return PythonTest.class; case Rust: return RustTest.class; + case RustRti: + return RustRtiTest.class; default: throw new IllegalArgumentException(); } diff --git a/core/src/integrationTest/java/org/lflang/tests/SimplifiedRuntimeTest.java b/core/src/integrationTest/java/org/lflang/tests/SimplifiedRuntimeTest.java new file mode 100644 index 0000000000..05ce13845d --- /dev/null +++ b/core/src/integrationTest/java/org/lflang/tests/SimplifiedRuntimeTest.java @@ -0,0 +1,41 @@ +package org.lflang.tests; + +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.lflang.target.Target; +import org.lflang.tests.TestRegistry.TestCategory; + +/** + * A collection of JUnit tests to perform on a given set of targets. + * + * @author Marten Lohstroh + * @author Chanhee Lee + */ +public abstract class SimplifiedRuntimeTest extends TestBase { + + /** + * Construct a test instance that runs tests for a single target. + * + * @param target The target to run tests for. + */ + protected SimplifiedRuntimeTest(Target target) { + super(target); + } + + /** Whether to enable {@link #runFederatedTests()}. */ + protected boolean supportsFederatedExecution() { + return false; + } + + @Test + public void runFederatedTestsWithRustRti() { + Assumptions.assumeTrue(supportsFederatedExecution(), Message.NO_FEDERATION_SUPPORT); + runTestsForTargetsWithRustRti( + Message.DESC_FEDERATED_WITH_RUST_RTI, + TestCategory.FEDERATED::equals, + Transformers::noChanges, + Configurators::noChanges, + TestLevel.EXECUTION, + false); + } +} diff --git a/core/src/integrationTest/java/org/lflang/tests/runtime/RustRtiTest.java b/core/src/integrationTest/java/org/lflang/tests/runtime/RustRtiTest.java new file mode 100644 index 0000000000..90d31a42bd --- /dev/null +++ b/core/src/integrationTest/java/org/lflang/tests/runtime/RustRtiTest.java @@ -0,0 +1,62 @@ +/************* + * Copyright (c) 2019-2024, The University of California at Berkeley. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ***************/ +package org.lflang.tests.runtime; + +import org.junit.jupiter.api.Test; +import org.lflang.target.Target; +import org.lflang.tests.SimplifiedRuntimeTest; + +/** + * Collection of tests for the C target with Rust RTI. + * + *

Tests that are implemented in the base class are still overridden so that each test can be + * easily invoked individually from IDEs with JUnit support like Eclipse and IntelliJ. This is + * typically done by right-clicking on the name of the test method and then clicking "Run".* + * + * @author Marten Lohstroh + * @author Chanhee Lee + */ +public class RustRtiTest extends SimplifiedRuntimeTest { + + public RustRtiTest() { + super(Target.RustRti); + } + + @Override + protected boolean supportsSingleThreadedExecution() { + return true; + } + + @Override + protected boolean supportsFederatedExecution() { + return true; + } + + @Test + @Override + public void runFederatedTestsWithRustRti() { + super.runFederatedTestsWithRustRti(); + } +} diff --git a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java index 0acf495fc7..792c9f89a0 100644 --- a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java +++ b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java @@ -201,6 +201,97 @@ public boolean doGenerate(Resource resource, LFGeneratorContext context) throws return false; } + /** + * Produce LF code for each federate in a separate file, then invoke a target-specific code + * generator for each of those files. + * + * @param resource The resource that has the federated main reactor in it + * @param context The context in which to carry out the code generation. + * @return False if no errors have occurred, true otherwise. + */ + public boolean doGenerateForRustRTI(Resource resource, LFGeneratorContext context) + throws IOException { + if (!federatedExecutionIsSupported(resource)) return true; + cleanIfNeeded(context); + + // In a federated execution, we need keepalive to be true, + // otherwise a federate could exit simply because it hasn't received + // any messages. + KeepaliveProperty.INSTANCE.override(targetConfig, true); + + // Process command-line arguments + processCLIArguments(context); + + // Find the federated reactor + Reactor federation = FedASTUtils.findFederatedReactor(resource); + + // Make sure the RTI host is set correctly. + setRTIHost(federation); + + // Create the FederateInstance objects. + ReactorInstance main = createFederateInstances(federation, context); + + // Insert reactors that split multiports into many ports. + insertIndexers(main, resource); + + // Clear banks so that each bank member becomes a single federate. + for (Instantiation instantiation : ASTUtils.allInstantiations(federation)) { + instantiation.setWidthSpec(null); + instantiation.setWidthSpec(null); + } + + // Find all the connections between federates. + // For each connection between federates, replace it in the + // AST with an action (which inherits the delay) and three reactions. + // The action will be physical for physical connections and logical + // for logical connections. + replaceFederateConnectionsWithProxies(federation, main, resource); + + FedEmitter fedEmitter = + new FedEmitter( + fileConfig, + ASTUtils.toDefinition(mainDef.getReactorClass()), + messageReporter, + rtiConfig); + + // Generate LF code for each federate. + Map lf2lfCodeMapMap = new HashMap<>(); + for (FederateInstance federate : federates) { + lf2lfCodeMapMap.putAll(fedEmitter.generateFederate(context, federate, federates.size())); + } + + // Do not invoke target code generators if --no-compile flag is used. + if (context.getTargetConfig().get(NoCompileProperty.INSTANCE)) { + context.finish(Status.GENERATED, lf2lfCodeMapMap); + return false; + } + + // If the RTI is to be built locally, set up a build environment for it. + prepareRtiBuildEnvironment(context); + + Map codeMapMap = + compileFederates( + context, + lf2lfCodeMapMap, + subContexts -> { + createDockerFiles(context, subContexts); + generateLaunchScriptForRustRti(); + // If an error has occurred during codegen of any federate, report it. + subContexts.forEach( + c -> { + if (c.getErrorReporter().getErrorsOccurred()) { + context + .getErrorReporter() + .at(c.getFileConfig().srcFile) + .error("Failure during code generation of " + c.getFileConfig().srcFile); + } + }); + }); + + context.finish(Status.COMPILED, codeMapMap); + return false; + } + /** * Prepare a build environment for the rti alongside the generated sources of the federates. * @@ -229,6 +320,11 @@ private void generateLaunchScript() { .doGenerate(federates, rtiConfig); } + private void generateLaunchScriptForRustRti() { + new FedLauncherGenerator(this.targetConfig, this.fileConfig, this.messageReporter) + .doGenerateForRustRTI(federates, new RtiConfig()); + } + /** * Generate a Dockerfile for each federate and a docker-compose.yml for the federation. * diff --git a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java index 67cc8085f0..765a3504d2 100644 --- a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java +++ b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java @@ -265,6 +265,169 @@ public void doGenerate(List federates, RtiConfig rtiConfig) { } } + /** + * Create the launcher shell scripts. This will create one or two files in the output path (bin + * directory). The first has name equal to the filename of the source file without the ".lf" + * extension. This will be a shell script that launches the RTI and the federates. If, in + * addition, either the RTI or any federate is mapped to a particular machine (anything other than + * the default "localhost" or "0.0.0.0"), then this will generate a shell script in the bin + * directory with name filename_distribute.sh that copies the relevant source files to the remote + * host and compiles them so that they are ready to execute using the launcher. + * + *

A precondition for this to work is that the user invoking this code generator can log into + * the remote host without supplying a password. Specifically, you have to have installed your + * public key (typically found in ~/.ssh/id_rsa.pub) in ~/.ssh/authorized_keys on the remote host. + * In addition, the remote host must be running an ssh service. On an Arch Linux system using + * systemd, for example, this means running: + * + *

sudo systemctl ssh.service + * + *

Enable means to always start the service at startup, whereas start means to just start it + * this once. + * + * @param federates A list of federate instances in the federation + * @param rtiConfig Can have values for 'host', 'dir', and 'user' + */ + public void doGenerateForRustRTI(List federates, RtiConfig rtiConfig) { + // NOTE: It might be good to use screen when invoking the RTI + // or federates remotely, so you can detach and the process keeps running. + // However, I was unable to get it working properly. + // What this means is that the shell that invokes the launcher + // needs to remain live for the duration of the federation. + // If that shell is killed, the federation will die. + // Hence, it is reasonable to launch the federation on a + // machine that participates in the federation, for example, + // on the machine that runs the RTI. The command I tried + // to get screen to work looks like this: + // ssh -t «target» cd «path»; screen -S «filename»_«federate.name» -L + // bin/«filename»_«federate.name» 2>&1 + // var outPath = binGenPath + StringBuilder shCode = new StringBuilder(); + StringBuilder distCode = new StringBuilder(); + shCode.append(getSetupCode()).append("\n"); + String distHeader = getDistHeader(); + String host = rtiConfig.getHost(); + String target = host; + + String user = rtiConfig.getUser(); + if (user != null) { + target = user + "@" + host; + } + + shCode.append("#### Host is ").append(host); + + // Launch the RTI in the foreground. + if (host.equals("localhost") || host.equals("0.0.0.0")) { + // FIXME: the paths below will not work on Windows + shCode.append(getLaunchCodeForRustRti(Integer.toString(federates.size()))).append("\n"); + } else { + // Start the RTI on the remote machine - Not supported yet for Rust RTI. + } + + // Index used for storing pids of federates + int federateIndex = 0; + for (FederateInstance federate : federates) { + var buildConfig = getBuildConfig(federate, fileConfig, messageReporter); + if (federate.isRemote) { + if (distCode.isEmpty()) distCode.append(distHeader).append("\n"); + distCode.append(getDistCode(rtiConfig.getDirectory(), federate)).append("\n"); + shCode + .append(getFedRemoteLaunchCode(rtiConfig.getDirectory(), federate, federateIndex++)) + .append("\n"); + } else { + String executeCommand = buildConfig.localExecuteCommand(); + shCode + .append(getFedLocalLaunchCode(federate, executeCommand, federateIndex++)) + .append("\n"); + } + } + if (host.equals("localhost") || host.equals("0.0.0.0")) { + // Local PID managements + shCode.append( + "echo \"#### Bringing the RTI back to foreground so it can receive Control-C.\"" + "\n"); + shCode.append("fg %1" + "\n"); + } + // Wait for launched processes to finish + shCode + .append( + String.join( + "\n", + "echo \"RTI has exited. Wait for federates to exit.\"", + "# Wait for launched processes to finish.", + "# The errors are handled separately via trap.", + "for pid in \"${pids[@]}\"", + "do", + " wait $pid || exit $?", + "done", + "echo \"All done.\"", + "EXITED_SUCCESSFULLY=true")) + .append("\n"); + + // Create bin directory for the script. + if (!Files.exists(fileConfig.binPath)) { + try { + Files.createDirectories(fileConfig.binPath); + } catch (IOException e) { + messageReporter.nowhere().error("Unable to create directory: " + fileConfig.binPath); + } + } + + // Write the launcher file. + File file = fileConfig.binPath.resolve(fileConfig.name).toFile(); + messageReporter.nowhere().info("Script for launching the federation: " + file); + + // Delete file previously produced, if any. + if (file.exists()) { + if (!file.delete()) + messageReporter + .nowhere() + .error("Failed to delete existing federated launch script \"" + file + "\""); + } + + FileOutputStream fOut = null; + try { + fOut = new FileOutputStream(file); + } catch (FileNotFoundException e) { + messageReporter.nowhere().error("Unable to find file: " + file); + } + if (fOut != null) { + try { + fOut.write(shCode.toString().getBytes()); + fOut.close(); + } catch (IOException e) { + messageReporter.nowhere().error("Unable to write to file: " + file); + } + } + + if (!file.setExecutable(true, false)) { + messageReporter.nowhere().warning("Unable to make launcher script executable."); + } + + // Write the distributor file. + // Delete the file even if it does not get generated. + file = fileConfig.binPath.resolve(fileConfig.name + "_distribute.sh").toFile(); + if (file.exists()) { + if (!file.delete()) + messageReporter + .nowhere() + .error("Failed to delete existing federated distributor script \"" + file + "\""); + } + if (distCode.length() > 0) { + try { + fOut = new FileOutputStream(file); + fOut.write(distCode.toString().getBytes()); + fOut.close(); + if (!file.setExecutable(true, false)) { + messageReporter.nowhere().warning("Unable to make file executable: " + file); + } + } catch (FileNotFoundException e) { + messageReporter.nowhere().error("Unable to find file: " + file); + } catch (IOException e) { + messageReporter.nowhere().error("Unable to write to file " + file); + } + } + } + private String getSetupCode() { return String.join( "\n", @@ -377,6 +540,35 @@ private String getLaunchCode(String rtiLaunchCode) { "sleep 1"); } + private String getLaunchCodeForRustRti(String numberOfFederates) { + String launchCodeWithoutLogging = + new String("cargo run -- -i ${FEDERATION_ID} -n " + numberOfFederates + " -c init &"); + return String.join( + "\n", + "echo \"#### Launching the Rust runtime infrastructure (RTI).\"", + "# The Rust RTI is started first to allow proper boot-up", + "# before federates will try to connect.", + "# The RTI will be brought back to foreground", + "# to be responsive to user inputs after all federates", + "# are launched.", + "RUST_RTI_REMOTE_PATHS=`find ~/ -name rti_remote.rs`", + "if [ \"${RUST_RTI_REMOTE_PATHS}\" = \"\" ]; then", + " git clone https://github.com/hokeun/lf-rust-rti.git", + " cd lf-rust-rti/rust/rti", + "else", + " FIRST_RUST_RTI_REMOTE_PATH=($RUST_RTI_REMOTE_PATHS)", + " FIRST_RUST_RTI_PATH=${FIRST_RUST_RTI_REMOTE_PATH[0]%/*}", + " cd ${FIRST_RUST_RTI_PATH}; cd ../", + "fi", + launchCodeWithoutLogging, + "# Store the PID of the RTI", + "RTI=$!", + "# Wait for the RTI to boot up before", + "# starting federates (this could be done by waiting for a specific output", + "# from the RTI, but here we use sleep)", + "sleep 1"); + } + private String getRemoteLaunchCode( Object host, Object target, String logFileName, String rtiLaunchString) { return String.join( @@ -590,7 +782,7 @@ private String getFedLocalLaunchCode( private BuildConfig getBuildConfig( FederateInstance federate, FederationFileConfig fileConfig, MessageReporter messageReporter) { return switch (federate.targetConfig.target) { - case C, CCPP -> new CBuildConfig(federate, fileConfig, messageReporter); + case C, CCPP, RustRti -> new CBuildConfig(federate, fileConfig, messageReporter); case Python -> new PyBuildConfig(federate, fileConfig, messageReporter); case TS -> new TsBuildConfig(federate, fileConfig, messageReporter); case CPP, Rust -> throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/lflang/generator/LFGenerator.java b/core/src/main/java/org/lflang/generator/LFGenerator.java index f4e61e93d5..305b08f063 100644 --- a/core/src/main/java/org/lflang/generator/LFGenerator.java +++ b/core/src/main/java/org/lflang/generator/LFGenerator.java @@ -56,7 +56,7 @@ public static FileConfig createFileConfig( } return switch (target) { - case CCPP, C -> new CFileConfig(resource, srcGenBasePath, useHierarchicalBin); + case CCPP, C, RustRti -> new CFileConfig(resource, srcGenBasePath, useHierarchicalBin); case Python -> new PyFileConfig(resource, srcGenBasePath, useHierarchicalBin); case CPP -> new CppFileConfig(resource, srcGenBasePath, useHierarchicalBin); case Rust -> new RustFileConfig(resource, srcGenBasePath, useHierarchicalBin); @@ -82,6 +82,7 @@ private GeneratorBase createGenerator(LFGeneratorContext context) { case CPP -> new CppGenerator(context, scopeProvider); case TS -> new TSGenerator(context); case Rust -> new RustGenerator(context, scopeProvider); + case RustRti -> new CGenerator(context, true); }; } @@ -121,6 +122,42 @@ public void doGenerate(Resource resource, IFileSystemAccess2 fsa, IGeneratorCont } } + public void doGenerateForRustRTI( + Resource resource, IFileSystemAccess2 fsa, IGeneratorContext context) { + assert injector != null; + final LFGeneratorContext lfContext; + if (context instanceof LFGeneratorContext) { + lfContext = (LFGeneratorContext) context; + } else { + lfContext = LFGeneratorContext.lfGeneratorContextOf(resource, fsa, context); + } + + // The fastest way to generate code is to not generate any code. + if (lfContext.getMode() == LFGeneratorContext.Mode.LSP_FAST) return; + + if (FedASTUtils.findFederatedReactor(resource) != null) { + try { + FedGenerator fedGenerator = new FedGenerator(lfContext); + injector.injectMembers(fedGenerator); + generatorErrorsOccurred = fedGenerator.doGenerateForRustRTI(resource, lfContext); + } catch (IOException e) { + throw new RuntimeIOException("Error during federated code generation", e); + } + + } else { + final GeneratorBase generator = createGenerator(lfContext); + + if (generator != null) { + generator.doGenerate(resource, lfContext); + generatorErrorsOccurred = generator.errorsOccurred(); + } + } + final MessageReporter messageReporter = lfContext.getErrorReporter(); + if (messageReporter instanceof LanguageServerMessageReporter) { + ((LanguageServerMessageReporter) messageReporter).publishDiagnostics(); + } + } + /** Return true if errors occurred in the last call to doGenerate(). */ public boolean errorsOccurred() { return generatorErrorsOccurred; diff --git a/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java b/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java index 203c5f94d7..25bcfef6d2 100644 --- a/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java +++ b/core/src/main/java/org/lflang/generator/docker/DockerGenerator.java @@ -69,7 +69,7 @@ public static DockerGenerator dockerGeneratorFactory(LFGeneratorContext context) case C, CCPP -> new CDockerGenerator(context); case TS -> new TSDockerGenerator(context); case Python -> new PythonDockerGenerator(context); - case CPP, Rust -> throw new IllegalArgumentException( + case CPP, Rust, RustRti -> throw new IllegalArgumentException( "No Docker support for " + target + " yet."); }; } diff --git a/core/src/main/java/org/lflang/target/Target.java b/core/src/main/java/org/lflang/target/Target.java index a01b863303..109d8b211b 100644 --- a/core/src/main/java/org/lflang/target/Target.java +++ b/core/src/main/java/org/lflang/target/Target.java @@ -383,6 +383,13 @@ public enum Target { // In our Rust implementation, the only reserved keywords // are those that are a valid expression. Others may be escaped // with the syntax r#keyword. + Arrays.asList("self", "true", "false")), + RustRti( + "RustRti", + true, + // In our Rust implementation, the only reserved keywords + // are those that are a valid expression. Others may be escaped + // with the syntax r#keyword. Arrays.asList("self", "true", "false")); /** String representation of this target. */ @@ -460,7 +467,7 @@ public boolean isReservedIdent(String ident) { /** Return true if the target supports federated execution. */ public boolean supportsFederated() { return switch (this) { - case C, CCPP, Python, TS -> true; + case C, CCPP, Python, TS, RustRti -> true; default -> false; }; } @@ -476,7 +483,7 @@ public boolean supportsInheritance() { /** Return true if the target supports multiports and banks of reactors. */ public boolean supportsMultiports() { return switch (this) { - case C, CCPP, CPP, Python, Rust, TS -> true; + case C, CCPP, CPP, Python, Rust, TS, RustRti -> true; default -> false; }; } @@ -501,7 +508,7 @@ public boolean supportsReactionDeclarations() { public boolean buildsUsingDocker() { return switch (this) { case TS -> false; - case C, CCPP, CPP, Python, Rust -> true; + case C, CCPP, CPP, Python, Rust, RustRti -> true; }; } @@ -639,7 +646,7 @@ public void initialize(TargetConfig config) { SingleThreadedProperty.INSTANCE, TracingProperty.INSTANCE, WorkersProperty.INSTANCE); - case Rust -> config.register( + case Rust, RustRti -> config.register( BuildTypeProperty.INSTANCE, CargoDependenciesProperty.INSTANCE, CargoFeaturesProperty.INSTANCE, diff --git a/core/src/main/resources/lib/c/reactor-c b/core/src/main/resources/lib/c/reactor-c index 188fb3c983..26eb3465c5 160000 --- a/core/src/main/resources/lib/c/reactor-c +++ b/core/src/main/resources/lib/c/reactor-c @@ -1 +1 @@ -Subproject commit 188fb3c983aac9ea7bcafa4e5172daaa5f8a883d +Subproject commit 26eb3465c5ca425a03ded2e042a7b6b35cf0181f diff --git a/core/src/testFixtures/java/org/lflang/tests/TestBase.java b/core/src/testFixtures/java/org/lflang/tests/TestBase.java index 6923149f07..bf9a8eb6d1 100644 --- a/core/src/testFixtures/java/org/lflang/tests/TestBase.java +++ b/core/src/testFixtures/java/org/lflang/tests/TestBase.java @@ -139,6 +139,7 @@ public static class Message { public static final String DESC_MULTIPORT = "Run multiport tests."; public static final String DESC_AS_FEDERATED = "Run non-federated tests in federated mode."; public static final String DESC_FEDERATED = "Run federated tests."; + public static final String DESC_FEDERATED_WITH_RUST_RTI = "Run federated tests with Rust RTI."; public static final String DESC_DOCKER = "Run docker tests."; public static final String DESC_DOCKER_FEDERATED = "Run docker federated tests."; public static final String DESC_ENCLAVE = "Run enclave tests."; @@ -196,6 +197,36 @@ protected final void runTestsAndPrintResults( } } + /** + * Run selected tests for a given target and configurator up to the specified level. + * + * @param target The target to run tests for. + * @param selected A predicate that given a test category returns whether it should be included in + * this test run or not. + * @param configurator A procedure for configuring the tests. + * @param copy Whether to work on copies of tests in the test. registry. + */ + protected final void runTestsAndPrintResultsWithRustRti( + Target target, + Predicate selected, + TestLevel level, + Transformer transformer, + Configurator configurator, + boolean copy) { + var categories = Arrays.stream(TestCategory.values()).filter(selected).toList(); + for (var category : categories) { + System.out.println(category.getHeader()); + var tests = testRegistry.getRegisteredTests(target, category, copy); + try { + validateAndRunWithRustRti(tests, transformer, configurator, level); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + System.out.println(testRegistry.getCoverageReport(target, category)); + checkAndReportFailures(tests); + } + } + /** * Run tests in the given selection for all targets enabled in this class. * @@ -217,6 +248,28 @@ protected void runTestsForTargets( } } + /** + * Run tests in the given selection for all targets enabled in this class. + * + * @param description A string that describes the collection of tests. + * @param selected A predicate that given a test category returns whether it should be included in + * this test run or not. + * @param configurator A procedure for configuring the tests. + * @param copy Whether to work on copies of tests in the test. registry. + */ + protected void runTestsForTargetsWithRustRti( + String description, + Predicate selected, + Transformer transformer, + Configurator configurator, + TestLevel level, + boolean copy) { + for (Target target : this.targets) { + runTestsForRustRti( + List.of(target), description, selected, transformer, configurator, level, copy); + } + } + /** * Run tests in the given selection for a subset of given targets. * @@ -241,6 +294,30 @@ protected void runTestsFor( } } + /** + * Run tests in the given selection for a subset of given targets. + * + * @param subset The subset of targets to run the selected tests for. + * @param description A string that describes the collection of tests. + * @param selected A predicate that given a test category returns whether it should be included in + * this test run or not. + * @param configurator A procedure for configuring the tests. + * @param copy Whether to work on copies of tests in the test. registry. + */ + protected void runTestsForRustRti( + List subset, + String description, + Predicate selected, + Transformer transformer, + Configurator configurator, + TestLevel level, + boolean copy) { + for (Target target : subset) { + printTestHeader(target, description); + runTestsAndPrintResultsWithRustRti(target, selected, level, transformer, configurator, copy); + } + } + /** Whether to enable threading. */ protected boolean supportsSingleThreadedExecution() { return false; @@ -496,6 +573,25 @@ private void generateCode(LFTest test) throws TestError { } } + /** + * Invoke the code generator for the given test. + * + * @param test The test to generate code for. + */ + private void generateCodeForRustRti(LFTest test) throws TestError { + if (test.getFileConfig().resource == null) { + test.getContext().finish(GeneratorResult.NOTHING); + } + try { + generator.doGenerateForRustRTI(test.getFileConfig().resource, fileAccess, test.getContext()); + } catch (Throwable e) { + throw new TestError("Code generation unsuccessful.", Result.CODE_GEN_FAIL, e); + } + if (generator.errorsOccurred()) { + throw new TestError("Code generation unsuccessful.", Result.CODE_GEN_FAIL); + } + } + /** * Given an indexed test, execute it and label the test as failing if it did not execute, took too * long to execute, or executed but exited with an error code. @@ -712,4 +808,49 @@ private void validateAndRun( System.out.print(System.lineSeparator()); } + + /** + * Validate and run the given tests, using the specified configuratator and level. + * + *

While performing tests, this method prints a header that reaches completion once all tests + * have been run. + * + * @param tests A set of tests to run. + * @param transformer A procedure for transforming the tests. + * @param configurator A procedure for configuring the tests. + * @param level The level of testing. + * @throws IOException If initial file configuration fails + */ + private void validateAndRunWithRustRti( + Set tests, Transformer transformer, Configurator configurator, TestLevel level) + throws IOException { + var done = 1; + + System.out.println(THICK_LINE); + + for (var test : tests) { + System.out.println( + "Running: " + test.toString() + " (" + (int) (done / (float) tests.size() * 100) + "%)"); + try { + test.redirectOutputs(); + prepare(test, transformer, configurator); + validate(test); + generateCodeForRustRti(test); + if (level == TestLevel.EXECUTION) { + execute(test); + } + test.markPassed(); + } catch (TestError e) { + test.handleTestError(e); + } catch (Throwable e) { + test.handleTestError( + new TestError("Unknown exception during test execution", Result.TEST_EXCEPTION, e)); + } finally { + test.restoreOutputs(); + } + done++; + } + + System.out.print(System.lineSeparator()); + } } diff --git a/test/RustRti/.gitignore b/test/RustRti/.gitignore new file mode 100644 index 0000000000..08f514ebc5 --- /dev/null +++ b/test/RustRti/.gitignore @@ -0,0 +1 @@ +include/ diff --git a/test/RustRti/src/federated/Absent.lf b/test/RustRti/src/federated/Absent.lf new file mode 100644 index 0000000000..7130210cf3 --- /dev/null +++ b/test/RustRti/src/federated/Absent.lf @@ -0,0 +1,46 @@ +target C { + tracing: true, + timeout: 100 ms +} + +reactor Sender { + output out1: int + output out2: int + timer t(0, 20 ms) + state c: int = 1 + + reaction(t) -> out1, out2 {= + if (self->c % 2 != 0) { + lf_set(out1, self->c); + } else { + lf_set(out2, self->c); + } + self->c++; + =} +} + +reactor Receiver { + input in1: int + input in2: int + + reaction(in1) {= + lf_print("Received %d on in1", in1->value); + if (in1->value % 2 == 0) { + lf_print_error_and_exit("********* Expected an odd integer!"); + } + =} + + reaction(in2) {= + lf_print("Received %d on in2", in2->value); + if (in2->value % 2 != 0) { + lf_print_error_and_exit("********* Expected an even integer!"); + } + =} +} + +federated reactor(d: time = 1 ms) { + s = new Sender() + r = new Receiver() + s.out1 -> r.in1 + s.out2 -> r.in2 +} diff --git a/test/RustRti/src/federated/BroadcastFeedback.lf b/test/RustRti/src/federated/BroadcastFeedback.lf new file mode 100644 index 0000000000..66a93c275b --- /dev/null +++ b/test/RustRti/src/federated/BroadcastFeedback.lf @@ -0,0 +1,33 @@ +/** This tests an output that is broadcast back to a multiport input of a bank. */ +target C { + timeout: 1 sec, + build-type: RelWithDebInfo +} + +reactor SenderAndReceiver { + output out: int + input[2] in: int + state received: bool = false + + reaction(startup) -> out {= + lf_set(out, 42); + =} + + reaction(in) {= + if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) { + lf_print("SUCCESS"); + self->received = true; + } + =} + + reaction(shutdown) {= + if (!self->received == true) { + lf_print_error_and_exit("Failed to receive broadcast"); + } + =} +} + +federated reactor { + s = new[2] SenderAndReceiver() + (s.out)+ -> s.in +} diff --git a/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf new file mode 100644 index 0000000000..114e42cfd7 --- /dev/null +++ b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf @@ -0,0 +1,40 @@ +/** This tests an output that is broadcast back to a multiport input of a bank. */ +target C { + timeout: 1 sec +} + +reactor SenderAndReceiver { + output out: int + input[2] in: int + state received: bool = false + + r = new Receiver() + in -> r.in + + reaction(startup) -> out {= + lf_set(out, 42); + =} +} + +reactor Receiver { + input[2] in: int + state received: bool = false + + reaction(in) {= + if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) { + lf_print("SUCCESS"); + self->received = true; + } + =} + + reaction(shutdown) {= + if (!self->received == true) { + lf_print_error_and_exit("Failed to receive broadcast"); + } + =} +} + +federated reactor { + s = new[2] SenderAndReceiver() + (s.out)+ -> s.in +} diff --git a/test/RustRti/src/federated/DistributedBank.lf b/test/RustRti/src/federated/DistributedBank.lf new file mode 100644 index 0000000000..65a6f871c2 --- /dev/null +++ b/test/RustRti/src/federated/DistributedBank.lf @@ -0,0 +1,24 @@ +// Check bank of federates. +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Node(bank_index: int = 0) { + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) {= + lf_print("Hello world %d.", self->count++); + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("Timer reactions did not execute."); + } + =} +} + +federated reactor DistributedBank { + n = new[2] Node() +} diff --git a/test/RustRti/src/federated/DistributedBankToMultiport.lf b/test/RustRti/src/federated/DistributedBankToMultiport.lf new file mode 100644 index 0000000000..d73b0959fd --- /dev/null +++ b/test/RustRti/src/federated/DistributedBankToMultiport.lf @@ -0,0 +1,33 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 3 sec +} + +import Count from "../lib/Count.lf" + +reactor Destination { + input[2] in: int + state count: int = 1 + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + lf_print("Received %d.", in[i]->value); + if (self->count != in[i]->value) { + lf_print_error_and_exit("Expected %d.", self->count); + } + } + self->count++; + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor { + s = new[2] Count() + d = new Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedDoublePort.lf b/test/RustRti/src/federated/DistributedDoublePort.lf new file mode 100644 index 0000000000..ec0a6d0b1d --- /dev/null +++ b/test/RustRti/src/federated/DistributedDoublePort.lf @@ -0,0 +1,52 @@ +/** + * Test the case for when two upstream federates send messages to a downstream federate on two + * different ports. One message should carry a microstep delay relative to the other message. + * + * @author Soroush Bateni + */ +target C { + timeout: 900 msec, + coordination: centralized +} + +import Count from "../lib/Count.lf" + +reactor CountMicrostep { + state count: int = 1 + output out: int + logical action act: int + timer t(0, 1 sec) + + reaction(t) -> act {= + lf_schedule_int(act, 0, self->count++); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +reactor Print { + input in: int + input in2: int + + reaction(in, in2) {= + interval_t elapsed_time = lf_time_logical_elapsed(); + lf_print("At tag " PRINTF_TAG ", received in = %d and in2 = %d.", elapsed_time, lf_tag().microstep, in->value, in2->value); + if (in->is_present && in2->is_present) { + lf_print_error_and_exit("ERROR: invalid logical simultaneity."); + } + =} + + reaction(shutdown) {= + lf_print("SUCCESS: messages were at least one microstep apart."); + =} +} + +federated reactor DistributedDoublePort { + c = new Count() + cm = new CountMicrostep() + p = new Print() + c.out -> p.in // Indicating a 'logical' connection. + cm.out -> p.in2 +} diff --git a/test/RustRti/src/federated/DistributedInterleaved.lf b/test/RustRti/src/federated/DistributedInterleaved.lf new file mode 100644 index 0000000000..dc212daf17 --- /dev/null +++ b/test/RustRti/src/federated/DistributedInterleaved.lf @@ -0,0 +1,44 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 3 sec +} + +reactor Count(offset: time = 0, period: time = 1 sec) { + state count: int = 1 + output[4] out: int + timer t(offset, period) + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count++); + } + =} +} + +reactor Destination { + input[2] in: int + state count: int = 0 + + reaction(in) {= + lf_print("Received %d.", in[0]->value); + lf_print("Received %d.", in[1]->value); + // Because the connection is interleaved, the difference between the + // two inputs should be 2, not 1. + if (in[1]->value - in[0]->value != 2) { + lf_print_error_and_exit("Expected a difference of two."); + } + self->count++; + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor { + s = new Count() + d = new[2] Destination() + s.out -> interleaved(d.in) +} diff --git a/test/RustRti/src/federated/DistributedLoopedAction.lf b/test/RustRti/src/federated/DistributedLoopedAction.lf new file mode 100644 index 0000000000..88418f84d1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedLoopedAction.lf @@ -0,0 +1,62 @@ +/** + * Test a sender-receiver network system that relies on microsteps being taken into account. + * + * @author Soroush Bateni + */ +target C { + logging: LOG, + timeout: 1 sec +} + +import Sender from "../lib/LoopedActionSender.lf" + +reactor Receiver(take_a_break_after: int = 10, break_interval: time = 400 msec) { + input in: int + state received_messages: int = 0 + state total_received_messages: int = 0 + state breaks: int = 0 + timer t(0, 10 msec) // This will impact the performance + + // but forces the logical time to advance Comment this line for a more sensible log output. + reaction(in) {= + lf_print("At tag " PRINTF_TAG " received value %d.", + lf_time_logical_elapsed(), + lf_tag().microstep, + in->value); + self->total_received_messages++; + if (in->value != self->received_messages++) { + lf_print_error("Expected %d", self->received_messages - 1); + // exit(1); + } + if (lf_time_logical_elapsed() != self->breaks * self->break_interval) { + lf_print_error("Received messages at an incorrect time: " PRINTF_TIME, lf_time_logical_elapsed()); + // exit(2); + } + + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(t) {= + // Do nothing + =} + + reaction(shutdown) {= + if (self->breaks != 3 || + (self->total_received_messages != ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + lf_print_error_and_exit("Did not receive enough messages."); + } + printf("SUCCESS: Successfully received all messages from the sender.\n"); + =} +} + +federated reactor DistributedLoopedAction { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedMultiport.lf b/test/RustRti/src/federated/DistributedMultiport.lf new file mode 100644 index 0000000000..44a04c4654 --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiport.lf @@ -0,0 +1,48 @@ +// Check multiport connections between federates. +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Source(width: int = 2) { + output[width] out: int + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count++); + } + =} +} + +reactor Destination(width: int = 3) { + input[width] in: int + state count: int = 0 + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (in[i]->is_present) { + tag_t now = lf_tag(); + lf_print("Received %d at channel %d at tag " PRINTF_TAG, in[i]->value, i, + now.time - lf_time_start(), now.microstep + ); + if (in[i]->value != self->count++) { + lf_print_error_and_exit("Expected %d.", self->count - 1); + } + } + } + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor DistributedMultiport { + s = new Source(width=4) + d = new Destination(width=4) + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedMultiportToBank.lf b/test/RustRti/src/federated/DistributedMultiportToBank.lf new file mode 100644 index 0000000000..d8171de51e --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiportToBank.lf @@ -0,0 +1,41 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 1 sec +} + +reactor Source { + output[2] out: int + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count); + } + self->count++; + =} +} + +reactor Destination { + input in: int + state count: int = 0 + + reaction(in) {= + lf_print("Received %d.", in->value); + if (self->count++ != in->value) { + lf_print_error_and_exit("Expected %d.", self->count - 1); + } + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor DistributedMultiportToBank { + s = new Source() + d = new[2] Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedMultiportToken.lf b/test/RustRti/src/federated/DistributedMultiportToken.lf new file mode 100644 index 0000000000..547fe651d9 --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiportToken.lf @@ -0,0 +1,46 @@ +// Check multiport connections between federates where the message is carried by a Token (in this +// case, with an array of char). +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Source { + output[4] out: char* + timer t(0, 200 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + // With NULL, 0 arguments, snprintf tells us how many bytes are needed. + // Add one for the null terminator. + int length = snprintf(NULL, 0, "Hello %d", self->count) + 1; + // Dynamically allocate memory for the output. + SET_NEW_ARRAY(out[i], length); + // Populate the output string and increment the count. + snprintf(out[i]->value, length, "Hello %d", self->count++); + lf_print("MessageGenerator: At time " PRINTF_TIME ", send message: %s.", + lf_time_logical_elapsed(), + out[i]->value + ); + } + =} +} + +reactor Destination { + input[4] in: char* + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (in[i]->is_present) { + lf_print("Received %s.", in[i]->value); + } + } + =} +} + +federated reactor DistributedMultiportToken { + s = new Source() + d = new Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedNetworkOrder.lf b/test/RustRti/src/federated/DistributedNetworkOrder.lf new file mode 100644 index 0000000000..b1413c11b1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedNetworkOrder.lf @@ -0,0 +1,75 @@ +/** + * This is a test for send_timed_message, which is an internal API. + * + * This test sends a second message at time 5 msec that has the same intended tag as a message that + * it had previously sent at time 0 msec. This results in a warning, but the message microstep is + * incremented and correctly received one microstep later. + * + * @author Soroush Bateni + */ +target C { + timeout: 1 sec, + build-type: RelWithDebInfo // Release with debug info +} + +preamble {= + #ifdef __cplusplus + extern "C" { + #endif + #include "federate.h" + #ifdef __cplusplus + } + #endif +=} + +reactor Sender { + output out: int + timer t(0, 1 msec) + + reaction(t) -> out {= + int payload = 1; + if (lf_time_logical_elapsed() == 0LL) { + lf_send_tagged_message(self->base.environment, MSEC(10), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } else if (lf_time_logical_elapsed() == MSEC(5)) { + payload = 2; + lf_send_tagged_message(self->base.environment, MSEC(5), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } + =} +} + +reactor Receiver { + input in: int + state success: int = 0 + + reaction(in) {= + tag_t current_tag = lf_tag(); + if (current_tag.time == (lf_time_start() + MSEC(10))) { + if (current_tag.microstep == 0 && in->value == 1) { + self->success++; + } else if (current_tag.microstep == 1 && in->value == 2) { + self->success++; + } + } + printf("Received %d at tag " PRINTF_TAG ".\n", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + =} + + reaction(shutdown) {= + if (self->success != 2) { + fprintf(stderr, "ERROR: Failed to receive messages.\n"); + exit(1); + } + printf("SUCCESS.\n"); + =} +} + +federated reactor DistributedNetworkOrder { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf new file mode 100644 index 0000000000..3a85c9b3d1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf @@ -0,0 +1,60 @@ +/** + * Test that a rapidly produced physical action in an upstream federate can be properly handled in + * federated execution. + */ +target C { + timeout: 10 secs, + coordination-options: { + advance-message-interval: 30 msec + } +} + +import PassThrough from "../lib/PassThrough.lf" +import TestCount from "../lib/TestCount.lf" + +preamble {= + extern int _counter; + void callback(void *a); + void* take_time(void* a); +=} + +reactor WithPhysicalAction { + preamble {= + int _counter = 1; + void callback(void *a) { + lf_schedule_int(a, 0, _counter++); + } + // Simulate time passing before a callback occurs. + void* take_time(void* a) { + while (_counter < 15) { + instant_t sleep_time = MSEC(10); + lf_sleep(sleep_time); + callback(a); + } + return NULL; + } + =} + + output out: int + state thread_id: lf_thread_t = 0 + physical action act(0): int + + reaction(startup) -> act {= + // start new thread, provide callback + lf_thread_create(&self->thread_id, &take_time, act); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +federated reactor { + a = new WithPhysicalAction() + m1 = new PassThrough() + m2 = new PassThrough() + test = new TestCount(num_inputs=14) + a.out -> m1.in + m1.out -> m2.in + m2.out -> test.in +} diff --git a/test/RustRti/src/federated/DistributedStopZero.lf b/test/RustRti/src/federated/DistributedStopZero.lf new file mode 100644 index 0000000000..876bd6f7f4 --- /dev/null +++ b/test/RustRti/src/federated/DistributedStopZero.lf @@ -0,0 +1,84 @@ +/** + * Test for lf_request_stop() in federated execution with centralized coordination at tag (0,0). + * + * @author Soroush Bateni + */ +target C + +reactor Sender { + output out: int + timer t(0, 1 usec) + + reaction(t) -> out {= + printf("Sending 42 at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_set(out, 42); + + tag_t zero = (tag_t) { .time = lf_time_start(), .microstep = 0u }; + if (lf_tag_compare(lf_tag(), zero) == 0) { + // Request stop at (0,0) + printf("Requesting stop at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + } + =} + + reaction(shutdown) {= + if (lf_time_logical_elapsed() != USEC(0) || + lf_tag().microstep != 1) { + fprintf(stderr, "ERROR: Sender failed to stop the federation in time. " + "Stopping at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + exit(1); + } + printf("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +reactor Receiver { + input in: int + + reaction(in) {= + printf("Received %d at " PRINTF_TAG ".\n", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + tag_t zero = (tag_t) { .time = lf_time_start(), .microstep = 0u }; + if (lf_tag_compare(lf_tag(), zero) == 0) { + // Request stop at (0,0) + printf("Requesting stop at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + } + =} + + reaction(shutdown) {= + // Sender should have requested stop earlier than the receiver. + // Therefore, the shutdown events must occur at (0, 0) on the + // receiver. + if (lf_time_logical_elapsed() != USEC(0) || + lf_tag().microstep != 1) { + fprintf(stderr, "ERROR: Receiver failed to stop the federation in time. " + "Stopping at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + exit(1); + } + printf("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +federated reactor { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf b/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf new file mode 100644 index 0000000000..0bed9e03d1 --- /dev/null +++ b/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf @@ -0,0 +1,39 @@ +/** + * Test that enclaves within federates all stop at the time requested by the first enclave to + * request a stop. Note that the test has no timeout because any finite timeout can, in theory, + * cause the test to fail. The first federate to request a stop does no at 50 ms, so the program + * should terminate quickly if all goes well. + */ +target C + +reactor Stop( + // Zero value here means "don't stop". + stop_time: time = 0) { + preamble {= + #include "platform.h" // Defines PRINTF_TIME + =} + timer t(stop_time) + + reaction(t) {= + if (self->stop_time > 0) lf_request_stop(); + =} + + reaction(shutdown) {= + lf_print("Stopped at tag (" PRINTF_TIME ", %d)", lf_time_logical_elapsed(), lf_tag().microstep); + if (lf_time_logical_elapsed() != 50000000LL || lf_tag().microstep != 1) { + lf_print_error_and_exit("Expected stop tag to be (50ms, 1)."); + } + =} +} + +reactor Fed(least_stop_time: time = 0) { + @enclave + s1 = new Stop() + @enclave + s2 = new Stop(stop_time=least_stop_time) +} + +federated reactor { + f1 = new Fed() + f2 = new Fed(least_stop_time = 50 ms) +} diff --git a/test/RustRti/src/federated/FederatedFilePkgReader.lf b/test/RustRti/src/federated/FederatedFilePkgReader.lf new file mode 100644 index 0000000000..cf79291acf --- /dev/null +++ b/test/RustRti/src/federated/FederatedFilePkgReader.lf @@ -0,0 +1,57 @@ +/** Test reading a file at a location relative to the source file. */ +target C { + timeout: 0 s +} + +reactor Source { + output out: char* // Use char*, not string, so memory is freed. + + reaction(startup) -> out {= + char* file_path = + LF_PACKAGE_DIRECTORY + LF_FILE_SEPARATOR "src" + LF_FILE_SEPARATOR "lib" + LF_FILE_SEPARATOR "FileReader.txt"; + + FILE* file = fopen(file_path, "rb"); + if (file == NULL) lf_print_error_and_exit("Error opening file at path %s.", file_path); + + // Determine the file size + fseek(file, 0, SEEK_END); + long file_size = ftell(file); + fseek(file, 0, SEEK_SET); + + // Allocate memory for the buffer + char* buffer = (char *) malloc(file_size + 1); + if (buffer == NULL) lf_print_error_and_exit("Out of memory."); + + // Read the file into the buffer + fread(buffer, file_size, 1, file); + buffer[file_size] = '\0'; + fclose(file); + + // For federated version, have to use lf_set_array so array size is know + // to the serializer. + lf_set_array(out, buffer, file_size + 1); + =} +} + +reactor Check { + preamble {= + #include + =} + input in: char* + + reaction(in) {= + printf("Received: %s\n", in->value); + if (strcmp("Hello World", in->value) != 0) { + lf_print_error_and_exit("Expected 'Hello World'"); + } + =} +} + +federated reactor { + s = new Source() + c = new Check() + s.out -> c.in +} diff --git a/test/RustRti/src/federated/FederatedFileReader.lf b/test/RustRti/src/federated/FederatedFileReader.lf new file mode 100644 index 0000000000..617d34c3c8 --- /dev/null +++ b/test/RustRti/src/federated/FederatedFileReader.lf @@ -0,0 +1,66 @@ +/** Test reading a file at a location relative to the source file. */ +target C { + logging: DEBUG, + timeout: 0 s +} + +reactor Source { + output out: char* // Use char*, not string, so memory is freed. + + reaction(startup) -> out {= + char* file_path = + LF_SOURCE_DIRECTORY + LF_FILE_SEPARATOR ".." + LF_FILE_SEPARATOR "lib" + LF_FILE_SEPARATOR "FileReader.txt"; + + FILE* file = fopen(file_path, "rb"); + if (file == NULL) lf_print_error_and_exit("Error opening file at path %s.", file_path); + + // Determine the file size + fseek(file, 0, SEEK_END); + long file_size = ftell(file); + fseek(file, 0, SEEK_SET); + + // Allocate memory for the buffer + char* buffer = (char *) malloc(file_size + 1); + if (buffer == NULL) lf_print_error_and_exit("Out of memory."); + + // Read the file into the buffer + fread(buffer, file_size, 1, file); + buffer[file_size] = '\0'; + fclose(file); + + // For federated version, have to use lf_set_array so array size is know + // to the serializer. + lf_set_array(out, buffer, file_size + 1); + =} +} + +reactor Check { + preamble {= + #include + =} + input in: char* + state received: bool = false + + reaction(in) {= + printf("Received: %s\n", in->value); + self->received = true; + if (strcmp("Hello World", in->value) != 0) { + lf_print_error_and_exit("Expected 'Hello World'"); + } + =} + + reaction(shutdown) {= + if (!self->received) { + lf_print_error_and_exit("No input received."); + } + =} +} + +federated reactor { + s = new Source() + c = new Check() + s.out -> c.in +} diff --git a/test/RustRti/src/federated/FeedbackDelay.lf b/test/RustRti/src/federated/FeedbackDelay.lf new file mode 100644 index 0000000000..88b15945b2 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelay.lf @@ -0,0 +1,85 @@ +/** + * This test has two coupled cycles. In this variant, one is a zero-delay cycle (ZDC) and the other + * is not, having a microstep delay. In this variant, the microstep delay is on a connection + * entering the ZDC. + */ +target C { + timeout: 1 sec +} + +reactor PhysicalPlant { + input control: double + output sensor: double + timer t(0, 100 ms) + state last_sensor_time: time = 0 + state previous_sensor_time: time = 0 + state count: int = 0 + + reaction(t) -> sensor {= + lf_set(sensor, 42); + self->previous_sensor_time = self->last_sensor_time; + self->last_sensor_time = lf_time_physical(); + =} + + reaction(control) {= + self->count++; + lf_print("Control input: %f", control->value); + instant_t control_time = lf_time_physical(); + lf_print("Latency: " PRINTF_TIME ".", control_time - self->previous_sensor_time); + lf_print("Logical time: " PRINTF_TIME ".", lf_time_logical_elapsed()); + =} + + reaction(shutdown) {= + if (self->count != 10) { + lf_print_error_and_exit("Received only %d inputs.", self->count); + } + =} +} + +reactor Controller { + input sensor: double + output control: double + + state latest_control: double = 0.0 + state first: bool = true + + output request_for_planning: double + input planning: double + + reaction(planning) {= + self->latest_control = planning->value; + tag_t now = lf_tag(); + lf_print("Controller received planning value %f at tag " PRINTF_TAG, + self->latest_control, now.time - lf_time_start(), now.microstep + ); + =} + + reaction(sensor) -> control, request_for_planning {= + if (!self->first) { + lf_set(control, self->latest_control); + } + self->first = false; + lf_set(request_for_planning, sensor->value); + =} +} + +reactor Planner { + input request: double + output response: double + + reaction(request) -> response {= + lf_sleep(MSEC(10)); + lf_set(response, request->value); + =} +} + +federated reactor { + p = new PhysicalPlant() + c = new Controller() + pl = new Planner() + + p.sensor -> c.sensor + c.request_for_planning -> pl.request + pl.response -> c.planning after 0 + c.control -> p.control +} diff --git a/test/RustRti/src/federated/FeedbackDelay3.lf b/test/RustRti/src/federated/FeedbackDelay3.lf new file mode 100644 index 0000000000..e7d47d9340 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelay3.lf @@ -0,0 +1,41 @@ +/** This test has two coupled cycles. In this variant, both are a zero-delay cycles (ZDC). */ +target C { + timeout: 1 sec, + tracing: true +} + +import PhysicalPlant, Planner from "FeedbackDelay.lf" + +reactor Controller { + input sensor: double + output control: double + + state latest_control: double = 0.0 + state first: bool = true + + output request_for_planning: double + input planning: double + + reaction(sensor) -> control, request_for_planning {= + if (!self->first) { + lf_set(control, self->latest_control); + } + self->first = false; + lf_set(request_for_planning, sensor->value); + =} + + reaction(planning) {= + self->latest_control = planning->value; + =} +} + +federated reactor { + p = new PhysicalPlant() + c = new Controller() + pl = new Planner() + + p.sensor -> c.sensor + c.request_for_planning -> pl.request + pl.response -> c.planning + c.control -> p.control +} diff --git a/test/RustRti/src/federated/FeedbackDelay5.lf b/test/RustRti/src/federated/FeedbackDelay5.lf new file mode 100644 index 0000000000..cd7edcd051 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelay5.lf @@ -0,0 +1,57 @@ +/** + * This test has two coupled cycles. In this variant, both are zero-delay cycles (ZDC), but one of + * the cycles has two superposed cycles, one of which is zero delay and the other of which is not. + */ +target C { + timeout: 900 ms +} + +import PhysicalPlant from "FeedbackDelay.lf" + +reactor Controller { + input in: double + input sensor: double + output control: double + + state latest_control: double = 0.0 + + output request_for_planning: double + input planning: double + + reaction(in, planning) {= + self->latest_control = planning->value; + =} + + reaction(sensor) -> control, request_for_planning {= + lf_set(control, self->latest_control); + lf_set(request_for_planning, sensor->value); + =} +} + +reactor Planner { + input request: double + output response: double + output out: double + timer t(0, 100 ms) + + reaction(t) -> out {= + lf_set(out, 0); + =} + + reaction(request) -> response {= + lf_sleep(MSEC(10)); + lf_set(response, request->value); + =} +} + +federated reactor { + p = new PhysicalPlant() + c = new Controller() + pl = new Planner() + + p.sensor -> c.sensor + c.request_for_planning -> pl.request + pl.response -> c.planning after 0 + c.control -> p.control + pl.out -> c.in +} diff --git a/test/RustRti/src/federated/FeedbackDelaySimple.lf b/test/RustRti/src/federated/FeedbackDelaySimple.lf new file mode 100644 index 0000000000..655fbe0762 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelaySimple.lf @@ -0,0 +1,41 @@ +target C { + timeout: 1 sec +} + +reactor Loop { + input in: int + output out: int + timer t(0, 100 msec) + state count: int = 1 + + reaction(in) {= + lf_print("Received %d.", in->value); + if (in->value != self->count) { + lf_print_error_and_exit( + "Expected %d. Got %d.", + self->count, + in->value + ); + } + self->count++; + =} + + reaction(t) -> out {= + lf_set(out, self->count); + =} + + reaction(shutdown) {= + if (self->count != 11) { + lf_print_error_and_exit( + "Expected 11 messages. Got %d.", + self->count + ); + } + =} +} + +federated reactor { + l = new Loop() + + l.out -> l.in after 0 +} diff --git a/test/RustRti/src/federated/HelloDistributed.lf b/test/RustRti/src/federated/HelloDistributed.lf new file mode 100644 index 0000000000..fc7c10da3c --- /dev/null +++ b/test/RustRti/src/federated/HelloDistributed.lf @@ -0,0 +1,56 @@ +/** + * Test a particularly simple form of a distributed deterministic system where a federation that + * receives timestamped messages has only those messages as triggers. Therefore, no additional + * coordination of the advancement of time (HLA or Ptides) is needed. + * @author Edward A. Lee + */ +target C + +preamble {= + #include +=} + +reactor Source { + output out: string + + reaction(startup) -> out {= + lf_print("Sending 'Hello World!' message from source federate."); + lf_set(out, "Hello World!"); + lf_request_stop(); + =} +} + +reactor Destination { + input in: string + state received: bool = false + + reaction(startup) {= + lf_print("Destination started."); + =} + + reaction(in) {= + lf_print("At logical time " PRINTF_TIME ", destination received: %s", lf_time_logical_elapsed(), in->value); + if (strcmp(in->value, "Hello World!") != 0) { + fprintf(stderr, "ERROR: Expected to receive 'Hello World!'\n"); + exit(1); + } + self->received = true; + =} + + reaction(shutdown) {= + lf_print("Shutdown invoked."); + if (!self->received) { + lf_print_error_and_exit("Destination did not receive the message."); + } + =} +} + +federated reactor HelloDistributed at localhost { + s = new Source() // Reactor s is in federate Source + d = new Destination() // Reactor d is in federate Destination + s.out -> d.in // This version preserves the timestamp. + + reaction(startup) {= + lf_print("Printing something in top-level federated reactor."); + =} +} diff --git a/test/RustRti/src/federated/InheritanceFederated.lf b/test/RustRti/src/federated/InheritanceFederated.lf new file mode 100644 index 0000000000..90098b29bb --- /dev/null +++ b/test/RustRti/src/federated/InheritanceFederated.lf @@ -0,0 +1,23 @@ +// Test for inheritance in a federated program. +// Compilation without errors is success. +// Based on https://github.com/lf-lang/lingua-franca/issues/1733. +target C { + timeout: 1 ms +} + +reactor A { + reaction(startup) {= + printf("Hello\n"); + =} +} + +reactor B { + a = new A() +} + +reactor C extends B { +} + +federated reactor { + c = new C() +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralized.lf b/test/RustRti/src/federated/LoopDistributedCentralized.lf new file mode 100644 index 0000000000..968ac2784e --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralized.lf @@ -0,0 +1,48 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 4 sec, + logging: DEBUG +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + timer t(0, 1 sec) + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor LoopDistributedCentralized(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralized2.lf b/test/RustRti/src/federated/LoopDistributedCentralized2.lf new file mode 100644 index 0000000000..25de5873e2 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralized2.lf @@ -0,0 +1,75 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 4 sec +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + timer t(0, 1 sec) + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +reactor Looper2(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + timer t(0, 1 sec) + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper2(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf new file mode 100644 index 0000000000..ac783f07cc --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf @@ -0,0 +1,74 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + flags: "-Wall", + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec, + logging: warn +} + +preamble {= + #include // Defines sleep() + extern bool stop; + void* ping(void* actionref); +=} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + preamble {= + bool stop = false; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop) { + lf_print("Scheduling action."); + lf_schedule(actionref, 0); + sleep(1); + } + return NULL; + } + =} + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + lf_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + + reaction(a) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf new file mode 100644 index 0000000000..51a10faac2 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf @@ -0,0 +1,56 @@ +/** + * This tests that the precedence order of reaction invocation is kept when a feedback loop is + * present in centralized coordination. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ +target C { + flags: "-Wall", + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + state count: int = 0 + state received_count: int = 0 + timer t(0, 1 sec) + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + self->received_count = self->count; + =} + + reaction(t) {= + if (self->received_count != self->count) { + lf_print_error_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept."); + } + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 6 * self->incr) { + lf_print_error_and_exit("Failed to receive all six expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf new file mode 100644 index 0000000000..82adfca699 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf @@ -0,0 +1,73 @@ +/** + * This tests that the precedence order of reaction invocation is kept in the hierarchy of reactors + * when a feedback loop is present in centralized coordination. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ +target C { + flags: "-Wall", + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec +} + +reactor Contained(incr: int = 1) { + timer t(0, 1 sec) + input in: int + state count: int = 0 + state received_count: int = 0 + + reaction(t) {= + self->count += self->incr; + =} + + reaction(in) {= + self->received_count = self->count; + =} + + reaction(t) {= + if (self->received_count != self->count) { + lf_print_error_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept."); + } + =} +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + state count: int = 0 + timer t(0, 1 sec) + + c = new Contained(incr=incr) + in -> c.in + + reaction(t) -> out {= + lf_print("Sending network output %d", self->count); + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 6 * self->incr) { + lf_print_error_and_exit("Failed to receive all six expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/ParallelDestinations.lf b/test/RustRti/src/federated/ParallelDestinations.lf new file mode 100644 index 0000000000..a4a4c026db --- /dev/null +++ b/test/RustRti/src/federated/ParallelDestinations.lf @@ -0,0 +1,23 @@ +/** Test parallel connections for federated execution. */ +target C { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Source { + output[2] out: int + c1 = new Count() + c2 = new Count() + + c1.out, c2.out -> out +} + +federated reactor { + s = new Source() + t1 = new TestCount(num_inputs=3) + t2 = new TestCount(num_inputs=3) + + s.out -> t1.in, t2.in +} diff --git a/test/RustRti/src/federated/ParallelSources.lf b/test/RustRti/src/federated/ParallelSources.lf new file mode 100644 index 0000000000..0bedc87d68 --- /dev/null +++ b/test/RustRti/src/federated/ParallelSources.lf @@ -0,0 +1,24 @@ +/** Test parallel connections for federated execution. */ +target C { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Destination { + input[2] in: int + + t1 = new TestCount(num_inputs=3) + t2 = new TestCount(num_inputs=3) + + in -> t1.in, t2.in +} + +federated reactor { + c1 = new Count() + c2 = new Count() + d = new Destination() + + c1.out, c2.out -> d.in +} diff --git a/test/RustRti/src/federated/ParallelSourcesMultiport.lf b/test/RustRti/src/federated/ParallelSourcesMultiport.lf new file mode 100644 index 0000000000..026c223463 --- /dev/null +++ b/test/RustRti/src/federated/ParallelSourcesMultiport.lf @@ -0,0 +1,34 @@ +/** Test parallel connections for federated execution. */ +target C { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Source { + output[2] out: int + c1 = new Count() + c2 = new Count() + + c1.out, c2.out -> out +} + +reactor Destination1 { + input[3] in: int + + t1 = new TestCount(num_inputs=3) + t2 = new TestCount(num_inputs=3) + t3 = new TestCount(num_inputs=3) + + in -> t1.in, t2.in, t3.in +} + +federated reactor { + s1 = new Source() + s2 = new Source() + d1 = new Destination1() + t4 = new TestCount(num_inputs=3) + + s1.out, s2.out -> d1.in, t4.in +} diff --git a/test/RustRti/src/federated/SimpleFederated.lf b/test/RustRti/src/federated/SimpleFederated.lf new file mode 100644 index 0000000000..cb6a798f8b --- /dev/null +++ b/test/RustRti/src/federated/SimpleFederated.lf @@ -0,0 +1,17 @@ +target C { + timeout: 2 secs, + build-type: RelWithDebInfo +} + +reactor Fed { + input in: int + output out: int +} + +federated reactor { + fed1 = new Fed() + fed2 = new Fed() + + fed1.out -> fed2.in + fed2.out -> fed1.in +} diff --git a/test/RustRti/src/federated/SpuriousDependency.lf b/test/RustRti/src/federated/SpuriousDependency.lf new file mode 100644 index 0000000000..b810d5288f --- /dev/null +++ b/test/RustRti/src/federated/SpuriousDependency.lf @@ -0,0 +1,63 @@ +/** + * This checks that a federated program does not deadlock when it is ambiguous, given the structure + * of a federate, whether it is permissible to require certain network sender/receiver reactions to + * precede others in the execution of a given tag. + */ +target C { + timeout: 1 sec +} + +reactor Passthrough(id: int = 0) { + input in: int + output out: int + + reaction(in) -> out {= + lf_print("Hello from passthrough %d", self->id); + lf_set(out, in->value); + =} +} + +reactor Twisty { + input in0: int + input in1: int + output out0: int + output out1: int + p0 = new Passthrough(id=0) + p1 = new Passthrough(id=1) + in0 -> p0.in + p0.out -> out0 + in1 -> p1.in + p1.out -> out1 +} + +reactor Check { + input in: int + + state count: int = 0 + + reaction(in) {= + lf_print("count is now %d", ++self->count); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 1) { + lf_print_error_and_exit("Failed to receive expected input."); + } + =} +} + +federated reactor { + t0 = new Twisty() + t1 = new Twisty() + check = new Check() + t0.out1 -> t1.in0 + t1.out1 -> t0.in0 + state count: int = 0 + + t1.out0 -> check.in + + reaction(startup) -> t0.in1 {= + lf_set(t0.in1, 0); + =} +} diff --git a/test/RustRti/src/federated/StopAtShutdown.lf b/test/RustRti/src/federated/StopAtShutdown.lf new file mode 100644 index 0000000000..2fad7db3d0 --- /dev/null +++ b/test/RustRti/src/federated/StopAtShutdown.lf @@ -0,0 +1,45 @@ +/** + * Check that lf_request_stop() doesn't cause any issues at the shutdown tag. + * + * Original bug discovered by Steven Wong + * + * @author Steven Wong + */ +target C { + timeout: 2 sec +} + +reactor A { + input in: int + + reaction(startup) {= + lf_print("Hello World!"); + =} + + reaction(in) {= + lf_print("Got it"); + =} + + reaction(shutdown) {= + lf_request_stop(); + =} +} + +reactor B { + output out: int + timer t(1 sec) + + reaction(t) -> out {= + lf_set(out, 1); + =} + + reaction(shutdown) {= + lf_request_stop(); + =} +} + +federated reactor { + a = new A() + b = new B() + b.out -> a.in +} diff --git a/test/RustRti/src/federated/TopLevelArtifacts.lf b/test/RustRti/src/federated/TopLevelArtifacts.lf new file mode 100644 index 0000000000..d73ea35967 --- /dev/null +++ b/test/RustRti/src/federated/TopLevelArtifacts.lf @@ -0,0 +1,44 @@ +/** + * Test whether top-level reactions, actions, and ports are handled appropriately. + * + * Currently, these artifacts are replicated on all federates. + * + * @note This just tests for the correctness of the code generation. These top-level artifacts might + * be disallowed in the future. + */ +target C { + timeout: 1 msec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +federated reactor { + state successes: int = 0 + timer t(0, 1 sec) + logical action act(0) + + c = new Count() + tc = new TestCount() + c.out -> tc.in + + reaction(startup) {= + self->successes++; + =} + + reaction(t) -> act {= + self->successes++; + lf_schedule(act, 0); + =} + + reaction(act) {= + self->successes++; + =} + + reaction(shutdown) {= + if (self->successes != 3) { + lf_print_error_and_exit("Failed to properly execute top-level reactions"); + } + lf_print("SUCCESS!"); + =} +} diff --git a/test/RustRti/src/lib/Count.lf b/test/RustRti/src/lib/Count.lf new file mode 100644 index 0000000000..ee3953b021 --- /dev/null +++ b/test/RustRti/src/lib/Count.lf @@ -0,0 +1,11 @@ +target C + +reactor Count(offset: time = 0, period: time = 1 sec) { + state count: int = 1 + output out: int + timer t(offset, period) + + reaction(t) -> out {= + lf_set(out, self->count++); + =} +} diff --git a/test/RustRti/src/lib/FileLevelPreamble.lf b/test/RustRti/src/lib/FileLevelPreamble.lf new file mode 100644 index 0000000000..11067d5e63 --- /dev/null +++ b/test/RustRti/src/lib/FileLevelPreamble.lf @@ -0,0 +1,12 @@ +/** Test for ensuring that file-level preambles are inherited when a file is imported. */ +target C + +preamble {= + #define FOO 2 +=} + +reactor FileLevelPreamble { + reaction(startup) {= + printf("FOO: %d\n", FOO); + =} +} diff --git a/test/RustRti/src/lib/FileReader.txt b/test/RustRti/src/lib/FileReader.txt new file mode 100644 index 0000000000..5e1c309dae --- /dev/null +++ b/test/RustRti/src/lib/FileReader.txt @@ -0,0 +1 @@ +Hello World \ No newline at end of file diff --git a/test/RustRti/src/lib/GenDelay.lf b/test/RustRti/src/lib/GenDelay.lf new file mode 100644 index 0000000000..8f21c3de1b --- /dev/null +++ b/test/RustRti/src/lib/GenDelay.lf @@ -0,0 +1,21 @@ +target C + +preamble {= + typedef int message_t; +=} + +reactor Source { + output out: message_t + + reaction(startup) -> out {= + lf_set(out, 42); + =} +} + +reactor Sink { + input in: message_t + + reaction(in) {= + lf_print("Received %d at time %lld", in->value, lf_time_logical_elapsed()); + =} +} diff --git a/test/RustRti/src/lib/Imported.lf b/test/RustRti/src/lib/Imported.lf new file mode 100644 index 0000000000..85d0a2b493 --- /dev/null +++ b/test/RustRti/src/lib/Imported.lf @@ -0,0 +1,14 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +import ImportedAgain from "./ImportedAgain.lf" + +reactor Imported { + input x: int + a = new ImportedAgain() + + reaction(x) -> a.x {= + lf_set(a.x, x->value); + =} +} diff --git a/test/RustRti/src/lib/ImportedAgain.lf b/test/RustRti/src/lib/ImportedAgain.lf new file mode 100644 index 0000000000..6870526b95 --- /dev/null +++ b/test/RustRti/src/lib/ImportedAgain.lf @@ -0,0 +1,15 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +reactor ImportedAgain { + input x: int + + reaction(x) {= + printf("Received: %d.\n", x->value); + if (x->value != 42) { + printf("ERROR: Expected input to be 42. Got: %d.\n", x->value); + exit(1); + } + =} +} diff --git a/test/RustRti/src/lib/ImportedComposition.lf b/test/RustRti/src/lib/ImportedComposition.lf new file mode 100644 index 0000000000..e5524f3d22 --- /dev/null +++ b/test/RustRti/src/lib/ImportedComposition.lf @@ -0,0 +1,22 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +reactor Gain { + input x: int + output y: int + + reaction(x) -> y {= + lf_set(y, x->value * 2); + =} +} + +reactor ImportedComposition { + input x: int + output y: int + g1 = new Gain() + g2 = new Gain() + x -> g1.x after 10 msec + g1.y -> g2.x after 30 msec + g2.y -> y after 15 msec +} diff --git a/test/RustRti/src/lib/InternalDelay.lf b/test/RustRti/src/lib/InternalDelay.lf new file mode 100644 index 0000000000..fb7124a4ec --- /dev/null +++ b/test/RustRti/src/lib/InternalDelay.lf @@ -0,0 +1,15 @@ +target C + +reactor InternalDelay(delay: time = 10 msec) { + input in: int + output out: int + logical action d: int + + reaction(in) -> d {= + lf_schedule_int(d, self->delay, in->value); + =} + + reaction(d) -> out {= + lf_set(out, d->value); + =} +} diff --git a/test/RustRti/src/lib/LoopedActionSender.lf b/test/RustRti/src/lib/LoopedActionSender.lf new file mode 100644 index 0000000000..e9ea36f40a --- /dev/null +++ b/test/RustRti/src/lib/LoopedActionSender.lf @@ -0,0 +1,36 @@ +/** + * A sender reactor that outputs integers in superdense time. + * + * @author Soroush Bateni + */ +target C + +/** + * @param take_a_break_after: Indicates how many messages are sent in consecutive superdense time + * @param break_interval: Determines how long the reactor should take a break after sending + * take_a_break_after messages. + */ +reactor Sender(take_a_break_after: int = 10, break_interval: time = 400 msec) { + output out: int + logical action act + state sent_messages: int = 0 + + reaction(startup, act) -> act, out {= + // Send a message on out + /* printf("At tag (%lld, %u) sending value %d.\n", + lf_time_logical_elapsed(), + lf_tag().microstep, + self->sent_messages + ); */ + lf_set(out, self->sent_messages); + lf_print("Sender sent %d.", self->sent_messages); + self->sent_messages++; + if (self->sent_messages < self->take_a_break_after) { + lf_schedule(act, 0); + } else { + // Take a break + self->sent_messages=0; + lf_schedule(act, self->break_interval); + } + =} +} diff --git a/test/RustRti/src/lib/PassThrough.lf b/test/RustRti/src/lib/PassThrough.lf new file mode 100644 index 0000000000..389905489a --- /dev/null +++ b/test/RustRti/src/lib/PassThrough.lf @@ -0,0 +1,11 @@ +/** Forward the integer input on `in` to the output port `out`. */ +target C + +reactor PassThrough { + input in: int + output out: int + + reaction(in) -> out {= + lf_set(out, in->value); + =} +} diff --git a/test/RustRti/src/lib/Test.lf b/test/RustRti/src/lib/Test.lf new file mode 100644 index 0000000000..69e4f79b2c --- /dev/null +++ b/test/RustRti/src/lib/Test.lf @@ -0,0 +1,15 @@ +target C + +reactor TestDouble(expected: double[] = {1.0, 1.0, 1.0, 1.0}) { + input in: double + state count: int = 0 + + reaction(in) {= + printf("Received: %f\n", in->value); + if (in->value != self->expected[self->count]) { + printf("ERROR: Expected %f.\n", self->expected[self->count]); + exit(1); + } + self->count++; + =} +} diff --git a/test/RustRti/src/lib/TestCount.lf b/test/RustRti/src/lib/TestCount.lf new file mode 100644 index 0000000000..e4fbb82b02 --- /dev/null +++ b/test/RustRti/src/lib/TestCount.lf @@ -0,0 +1,34 @@ +/** + * Test that a counting sequence of inputs starts with the specified start parameter value, + * increments by the specified stride, and receives the specified number of inputs. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected. Default is 1. + */ +target C + +reactor TestCount(start: int = 1, stride: int = 1, num_inputs: int = 1) { + state count: int = start + state inputs_received: int = 0 + input in: int + + reaction(in) {= + lf_print("Received %d.", in->value); + if (in->value != self->count) { + lf_print_error_and_exit("Expected %d.", self->count); + } + self->count += self->stride; + self->inputs_received++; + =} + + reaction(shutdown) {= + lf_print("Shutdown invoked."); + if (self->inputs_received != self->num_inputs) { + lf_print_error_and_exit("Expected to receive %d inputs, but got %d.", + self->num_inputs, + self->inputs_received + ); + } + =} +} diff --git a/test/RustRti/src/lib/TestCountMultiport.lf b/test/RustRti/src/lib/TestCountMultiport.lf new file mode 100644 index 0000000000..a0b0db294d --- /dev/null +++ b/test/RustRti/src/lib/TestCountMultiport.lf @@ -0,0 +1,41 @@ +/** + * Test that a counting sequence of inputs starts with the specified start parameter value, + * increments by the specified stride, and receives the specified number of inputs. This version has + * a multiport input, and each input is expected to be present and incremented over the previous + * input. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected on each channel. Default is 1. + */ +target C + +reactor TestCountMultiport(start: int = 1, stride: int = 1, num_inputs: int = 1, width: int = 2) { + state count: int = start + state inputs_received: int = 0 + input[width] in: int + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (!in[i]->is_present) { + lf_print_error_and_exit("No input on channel %d.", i); + } + lf_print("Received %d on channel %d.", in[i]->value, i); + if (in[i]->value != self->count) { + lf_print_error_and_exit("Expected %d.", self->count); + } + self->count += self->stride; + } + self->inputs_received++; + =} + + reaction(shutdown) {= + lf_print("Shutdown invoked."); + if (self->inputs_received != self->num_inputs) { + lf_print_error_and_exit("Expected to receive %d inputs, but only got %d.", + self->num_inputs, + self->inputs_received + ); + } + =} +}