diff --git a/core/src/integrationTest/java/org/lflang/tests/RuntimeTest.java b/core/src/integrationTest/java/org/lflang/tests/RuntimeTest.java index 12c71eed7c..fbd6b580da 100644 --- a/core/src/integrationTest/java/org/lflang/tests/RuntimeTest.java +++ b/core/src/integrationTest/java/org/lflang/tests/RuntimeTest.java @@ -142,6 +142,18 @@ public void runFederatedTests() { false); } + @Test + public void runFederatedTestsWithRustRti() { + Assumptions.assumeTrue(supportsFederatedExecution(), Message.NO_FEDERATION_SUPPORT); + runTestsForTargetsWithRustRti( + Message.DESC_FEDERATED_WITH_RUST_RTI, + TestCategory.FEDERATED::equals, + Transformers::noChanges, + Configurators::noChanges, + TestLevel.EXECUTION, + false); + } + /** Run the tests for modal reactors. */ @Test public void runModalTests() { diff --git a/core/src/integrationTest/java/org/lflang/tests/runtime/CTest.java b/core/src/integrationTest/java/org/lflang/tests/runtime/CTest.java index 88c4aa89cf..180f017d67 100644 --- a/core/src/integrationTest/java/org/lflang/tests/runtime/CTest.java +++ b/core/src/integrationTest/java/org/lflang/tests/runtime/CTest.java @@ -112,6 +112,12 @@ public void runFederatedTests() { super.runFederatedTests(); } + @Test + public void runFederatedTestsWithRustRti() { + Assumptions.assumeFalse(isWindows(), Message.NO_WINDOWS_SUPPORT); + super.runFederatedTestsWithRustRti(); + } + @Test public void runModalTests() { super.runModalTests(); diff --git a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java index 0acf495fc7..792c9f89a0 100644 --- a/core/src/main/java/org/lflang/federated/generator/FedGenerator.java +++ b/core/src/main/java/org/lflang/federated/generator/FedGenerator.java @@ -201,6 +201,97 @@ public boolean doGenerate(Resource resource, LFGeneratorContext context) throws return false; } + /** + * Produce LF code for each federate in a separate file, then invoke a target-specific code + * generator for each of those files. + * + * @param resource The resource that has the federated main reactor in it + * @param context The context in which to carry out the code generation. + * @return False if no errors have occurred, true otherwise. + */ + public boolean doGenerateForRustRTI(Resource resource, LFGeneratorContext context) + throws IOException { + if (!federatedExecutionIsSupported(resource)) return true; + cleanIfNeeded(context); + + // In a federated execution, we need keepalive to be true, + // otherwise a federate could exit simply because it hasn't received + // any messages. + KeepaliveProperty.INSTANCE.override(targetConfig, true); + + // Process command-line arguments + processCLIArguments(context); + + // Find the federated reactor + Reactor federation = FedASTUtils.findFederatedReactor(resource); + + // Make sure the RTI host is set correctly. + setRTIHost(federation); + + // Create the FederateInstance objects. + ReactorInstance main = createFederateInstances(federation, context); + + // Insert reactors that split multiports into many ports. + insertIndexers(main, resource); + + // Clear banks so that each bank member becomes a single federate. + for (Instantiation instantiation : ASTUtils.allInstantiations(federation)) { + instantiation.setWidthSpec(null); + instantiation.setWidthSpec(null); + } + + // Find all the connections between federates. + // For each connection between federates, replace it in the + // AST with an action (which inherits the delay) and three reactions. + // The action will be physical for physical connections and logical + // for logical connections. + replaceFederateConnectionsWithProxies(federation, main, resource); + + FedEmitter fedEmitter = + new FedEmitter( + fileConfig, + ASTUtils.toDefinition(mainDef.getReactorClass()), + messageReporter, + rtiConfig); + + // Generate LF code for each federate. + Map lf2lfCodeMapMap = new HashMap<>(); + for (FederateInstance federate : federates) { + lf2lfCodeMapMap.putAll(fedEmitter.generateFederate(context, federate, federates.size())); + } + + // Do not invoke target code generators if --no-compile flag is used. + if (context.getTargetConfig().get(NoCompileProperty.INSTANCE)) { + context.finish(Status.GENERATED, lf2lfCodeMapMap); + return false; + } + + // If the RTI is to be built locally, set up a build environment for it. + prepareRtiBuildEnvironment(context); + + Map codeMapMap = + compileFederates( + context, + lf2lfCodeMapMap, + subContexts -> { + createDockerFiles(context, subContexts); + generateLaunchScriptForRustRti(); + // If an error has occurred during codegen of any federate, report it. + subContexts.forEach( + c -> { + if (c.getErrorReporter().getErrorsOccurred()) { + context + .getErrorReporter() + .at(c.getFileConfig().srcFile) + .error("Failure during code generation of " + c.getFileConfig().srcFile); + } + }); + }); + + context.finish(Status.COMPILED, codeMapMap); + return false; + } + /** * Prepare a build environment for the rti alongside the generated sources of the federates. * @@ -229,6 +320,11 @@ private void generateLaunchScript() { .doGenerate(federates, rtiConfig); } + private void generateLaunchScriptForRustRti() { + new FedLauncherGenerator(this.targetConfig, this.fileConfig, this.messageReporter) + .doGenerateForRustRTI(federates, new RtiConfig()); + } + /** * Generate a Dockerfile for each federate and a docker-compose.yml for the federation. * diff --git a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java index 67cc8085f0..765a3504d2 100644 --- a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java +++ b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java @@ -265,6 +265,169 @@ public void doGenerate(List federates, RtiConfig rtiConfig) { } } + /** + * Create the launcher shell scripts. This will create one or two files in the output path (bin + * directory). The first has name equal to the filename of the source file without the ".lf" + * extension. This will be a shell script that launches the RTI and the federates. If, in + * addition, either the RTI or any federate is mapped to a particular machine (anything other than + * the default "localhost" or "0.0.0.0"), then this will generate a shell script in the bin + * directory with name filename_distribute.sh that copies the relevant source files to the remote + * host and compiles them so that they are ready to execute using the launcher. + * + *

A precondition for this to work is that the user invoking this code generator can log into + * the remote host without supplying a password. Specifically, you have to have installed your + * public key (typically found in ~/.ssh/id_rsa.pub) in ~/.ssh/authorized_keys on the remote host. + * In addition, the remote host must be running an ssh service. On an Arch Linux system using + * systemd, for example, this means running: + * + *

sudo systemctl ssh.service + * + *

Enable means to always start the service at startup, whereas start means to just start it + * this once. + * + * @param federates A list of federate instances in the federation + * @param rtiConfig Can have values for 'host', 'dir', and 'user' + */ + public void doGenerateForRustRTI(List federates, RtiConfig rtiConfig) { + // NOTE: It might be good to use screen when invoking the RTI + // or federates remotely, so you can detach and the process keeps running. + // However, I was unable to get it working properly. + // What this means is that the shell that invokes the launcher + // needs to remain live for the duration of the federation. + // If that shell is killed, the federation will die. + // Hence, it is reasonable to launch the federation on a + // machine that participates in the federation, for example, + // on the machine that runs the RTI. The command I tried + // to get screen to work looks like this: + // ssh -t «target» cd «path»; screen -S «filename»_«federate.name» -L + // bin/«filename»_«federate.name» 2>&1 + // var outPath = binGenPath + StringBuilder shCode = new StringBuilder(); + StringBuilder distCode = new StringBuilder(); + shCode.append(getSetupCode()).append("\n"); + String distHeader = getDistHeader(); + String host = rtiConfig.getHost(); + String target = host; + + String user = rtiConfig.getUser(); + if (user != null) { + target = user + "@" + host; + } + + shCode.append("#### Host is ").append(host); + + // Launch the RTI in the foreground. + if (host.equals("localhost") || host.equals("0.0.0.0")) { + // FIXME: the paths below will not work on Windows + shCode.append(getLaunchCodeForRustRti(Integer.toString(federates.size()))).append("\n"); + } else { + // Start the RTI on the remote machine - Not supported yet for Rust RTI. + } + + // Index used for storing pids of federates + int federateIndex = 0; + for (FederateInstance federate : federates) { + var buildConfig = getBuildConfig(federate, fileConfig, messageReporter); + if (federate.isRemote) { + if (distCode.isEmpty()) distCode.append(distHeader).append("\n"); + distCode.append(getDistCode(rtiConfig.getDirectory(), federate)).append("\n"); + shCode + .append(getFedRemoteLaunchCode(rtiConfig.getDirectory(), federate, federateIndex++)) + .append("\n"); + } else { + String executeCommand = buildConfig.localExecuteCommand(); + shCode + .append(getFedLocalLaunchCode(federate, executeCommand, federateIndex++)) + .append("\n"); + } + } + if (host.equals("localhost") || host.equals("0.0.0.0")) { + // Local PID managements + shCode.append( + "echo \"#### Bringing the RTI back to foreground so it can receive Control-C.\"" + "\n"); + shCode.append("fg %1" + "\n"); + } + // Wait for launched processes to finish + shCode + .append( + String.join( + "\n", + "echo \"RTI has exited. Wait for federates to exit.\"", + "# Wait for launched processes to finish.", + "# The errors are handled separately via trap.", + "for pid in \"${pids[@]}\"", + "do", + " wait $pid || exit $?", + "done", + "echo \"All done.\"", + "EXITED_SUCCESSFULLY=true")) + .append("\n"); + + // Create bin directory for the script. + if (!Files.exists(fileConfig.binPath)) { + try { + Files.createDirectories(fileConfig.binPath); + } catch (IOException e) { + messageReporter.nowhere().error("Unable to create directory: " + fileConfig.binPath); + } + } + + // Write the launcher file. + File file = fileConfig.binPath.resolve(fileConfig.name).toFile(); + messageReporter.nowhere().info("Script for launching the federation: " + file); + + // Delete file previously produced, if any. + if (file.exists()) { + if (!file.delete()) + messageReporter + .nowhere() + .error("Failed to delete existing federated launch script \"" + file + "\""); + } + + FileOutputStream fOut = null; + try { + fOut = new FileOutputStream(file); + } catch (FileNotFoundException e) { + messageReporter.nowhere().error("Unable to find file: " + file); + } + if (fOut != null) { + try { + fOut.write(shCode.toString().getBytes()); + fOut.close(); + } catch (IOException e) { + messageReporter.nowhere().error("Unable to write to file: " + file); + } + } + + if (!file.setExecutable(true, false)) { + messageReporter.nowhere().warning("Unable to make launcher script executable."); + } + + // Write the distributor file. + // Delete the file even if it does not get generated. + file = fileConfig.binPath.resolve(fileConfig.name + "_distribute.sh").toFile(); + if (file.exists()) { + if (!file.delete()) + messageReporter + .nowhere() + .error("Failed to delete existing federated distributor script \"" + file + "\""); + } + if (distCode.length() > 0) { + try { + fOut = new FileOutputStream(file); + fOut.write(distCode.toString().getBytes()); + fOut.close(); + if (!file.setExecutable(true, false)) { + messageReporter.nowhere().warning("Unable to make file executable: " + file); + } + } catch (FileNotFoundException e) { + messageReporter.nowhere().error("Unable to find file: " + file); + } catch (IOException e) { + messageReporter.nowhere().error("Unable to write to file " + file); + } + } + } + private String getSetupCode() { return String.join( "\n", @@ -377,6 +540,35 @@ private String getLaunchCode(String rtiLaunchCode) { "sleep 1"); } + private String getLaunchCodeForRustRti(String numberOfFederates) { + String launchCodeWithoutLogging = + new String("cargo run -- -i ${FEDERATION_ID} -n " + numberOfFederates + " -c init &"); + return String.join( + "\n", + "echo \"#### Launching the Rust runtime infrastructure (RTI).\"", + "# The Rust RTI is started first to allow proper boot-up", + "# before federates will try to connect.", + "# The RTI will be brought back to foreground", + "# to be responsive to user inputs after all federates", + "# are launched.", + "RUST_RTI_REMOTE_PATHS=`find ~/ -name rti_remote.rs`", + "if [ \"${RUST_RTI_REMOTE_PATHS}\" = \"\" ]; then", + " git clone https://github.com/hokeun/lf-rust-rti.git", + " cd lf-rust-rti/rust/rti", + "else", + " FIRST_RUST_RTI_REMOTE_PATH=($RUST_RTI_REMOTE_PATHS)", + " FIRST_RUST_RTI_PATH=${FIRST_RUST_RTI_REMOTE_PATH[0]%/*}", + " cd ${FIRST_RUST_RTI_PATH}; cd ../", + "fi", + launchCodeWithoutLogging, + "# Store the PID of the RTI", + "RTI=$!", + "# Wait for the RTI to boot up before", + "# starting federates (this could be done by waiting for a specific output", + "# from the RTI, but here we use sleep)", + "sleep 1"); + } + private String getRemoteLaunchCode( Object host, Object target, String logFileName, String rtiLaunchString) { return String.join( @@ -590,7 +782,7 @@ private String getFedLocalLaunchCode( private BuildConfig getBuildConfig( FederateInstance federate, FederationFileConfig fileConfig, MessageReporter messageReporter) { return switch (federate.targetConfig.target) { - case C, CCPP -> new CBuildConfig(federate, fileConfig, messageReporter); + case C, CCPP, RustRti -> new CBuildConfig(federate, fileConfig, messageReporter); case Python -> new PyBuildConfig(federate, fileConfig, messageReporter); case TS -> new TsBuildConfig(federate, fileConfig, messageReporter); case CPP, Rust -> throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/lflang/generator/LFGenerator.java b/core/src/main/java/org/lflang/generator/LFGenerator.java index f4e61e93d5..da9feed1a7 100644 --- a/core/src/main/java/org/lflang/generator/LFGenerator.java +++ b/core/src/main/java/org/lflang/generator/LFGenerator.java @@ -121,6 +121,42 @@ public void doGenerate(Resource resource, IFileSystemAccess2 fsa, IGeneratorCont } } + public void doGenerateForRustRTI( + Resource resource, IFileSystemAccess2 fsa, IGeneratorContext context) { + assert injector != null; + final LFGeneratorContext lfContext; + if (context instanceof LFGeneratorContext) { + lfContext = (LFGeneratorContext) context; + } else { + lfContext = LFGeneratorContext.lfGeneratorContextOf(resource, fsa, context); + } + + // The fastest way to generate code is to not generate any code. + if (lfContext.getMode() == LFGeneratorContext.Mode.LSP_FAST) return; + + if (FedASTUtils.findFederatedReactor(resource) != null) { + try { + FedGenerator fedGenerator = new FedGenerator(lfContext); + injector.injectMembers(fedGenerator); + generatorErrorsOccurred = fedGenerator.doGenerateForRustRTI(resource, lfContext); + } catch (IOException e) { + throw new RuntimeIOException("Error during federated code generation", e); + } + + } else { + final GeneratorBase generator = createGenerator(lfContext); + + if (generator != null) { + generator.doGenerate(resource, lfContext); + generatorErrorsOccurred = generator.errorsOccurred(); + } + } + final MessageReporter messageReporter = lfContext.getErrorReporter(); + if (messageReporter instanceof LanguageServerMessageReporter) { + ((LanguageServerMessageReporter) messageReporter).publishDiagnostics(); + } + } + /** Return true if errors occurred in the last call to doGenerate(). */ public boolean errorsOccurred() { return generatorErrorsOccurred; diff --git a/core/src/main/java/org/lflang/target/Target.java b/core/src/main/java/org/lflang/target/Target.java index a01b863303..a865eac278 100644 --- a/core/src/main/java/org/lflang/target/Target.java +++ b/core/src/main/java/org/lflang/target/Target.java @@ -460,7 +460,7 @@ public boolean isReservedIdent(String ident) { /** Return true if the target supports federated execution. */ public boolean supportsFederated() { return switch (this) { - case C, CCPP, Python, TS -> true; + case C, CCPP, Python, TS, RustRti -> true; default -> false; }; } diff --git a/core/src/testFixtures/java/org/lflang/tests/TestBase.java b/core/src/testFixtures/java/org/lflang/tests/TestBase.java index 6923149f07..bf9a8eb6d1 100644 --- a/core/src/testFixtures/java/org/lflang/tests/TestBase.java +++ b/core/src/testFixtures/java/org/lflang/tests/TestBase.java @@ -139,6 +139,7 @@ public static class Message { public static final String DESC_MULTIPORT = "Run multiport tests."; public static final String DESC_AS_FEDERATED = "Run non-federated tests in federated mode."; public static final String DESC_FEDERATED = "Run federated tests."; + public static final String DESC_FEDERATED_WITH_RUST_RTI = "Run federated tests with Rust RTI."; public static final String DESC_DOCKER = "Run docker tests."; public static final String DESC_DOCKER_FEDERATED = "Run docker federated tests."; public static final String DESC_ENCLAVE = "Run enclave tests."; @@ -196,6 +197,36 @@ protected final void runTestsAndPrintResults( } } + /** + * Run selected tests for a given target and configurator up to the specified level. + * + * @param target The target to run tests for. + * @param selected A predicate that given a test category returns whether it should be included in + * this test run or not. + * @param configurator A procedure for configuring the tests. + * @param copy Whether to work on copies of tests in the test. registry. + */ + protected final void runTestsAndPrintResultsWithRustRti( + Target target, + Predicate selected, + TestLevel level, + Transformer transformer, + Configurator configurator, + boolean copy) { + var categories = Arrays.stream(TestCategory.values()).filter(selected).toList(); + for (var category : categories) { + System.out.println(category.getHeader()); + var tests = testRegistry.getRegisteredTests(target, category, copy); + try { + validateAndRunWithRustRti(tests, transformer, configurator, level); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + System.out.println(testRegistry.getCoverageReport(target, category)); + checkAndReportFailures(tests); + } + } + /** * Run tests in the given selection for all targets enabled in this class. * @@ -217,6 +248,28 @@ protected void runTestsForTargets( } } + /** + * Run tests in the given selection for all targets enabled in this class. + * + * @param description A string that describes the collection of tests. + * @param selected A predicate that given a test category returns whether it should be included in + * this test run or not. + * @param configurator A procedure for configuring the tests. + * @param copy Whether to work on copies of tests in the test. registry. + */ + protected void runTestsForTargetsWithRustRti( + String description, + Predicate selected, + Transformer transformer, + Configurator configurator, + TestLevel level, + boolean copy) { + for (Target target : this.targets) { + runTestsForRustRti( + List.of(target), description, selected, transformer, configurator, level, copy); + } + } + /** * Run tests in the given selection for a subset of given targets. * @@ -241,6 +294,30 @@ protected void runTestsFor( } } + /** + * Run tests in the given selection for a subset of given targets. + * + * @param subset The subset of targets to run the selected tests for. + * @param description A string that describes the collection of tests. + * @param selected A predicate that given a test category returns whether it should be included in + * this test run or not. + * @param configurator A procedure for configuring the tests. + * @param copy Whether to work on copies of tests in the test. registry. + */ + protected void runTestsForRustRti( + List subset, + String description, + Predicate selected, + Transformer transformer, + Configurator configurator, + TestLevel level, + boolean copy) { + for (Target target : subset) { + printTestHeader(target, description); + runTestsAndPrintResultsWithRustRti(target, selected, level, transformer, configurator, copy); + } + } + /** Whether to enable threading. */ protected boolean supportsSingleThreadedExecution() { return false; @@ -496,6 +573,25 @@ private void generateCode(LFTest test) throws TestError { } } + /** + * Invoke the code generator for the given test. + * + * @param test The test to generate code for. + */ + private void generateCodeForRustRti(LFTest test) throws TestError { + if (test.getFileConfig().resource == null) { + test.getContext().finish(GeneratorResult.NOTHING); + } + try { + generator.doGenerateForRustRTI(test.getFileConfig().resource, fileAccess, test.getContext()); + } catch (Throwable e) { + throw new TestError("Code generation unsuccessful.", Result.CODE_GEN_FAIL, e); + } + if (generator.errorsOccurred()) { + throw new TestError("Code generation unsuccessful.", Result.CODE_GEN_FAIL); + } + } + /** * Given an indexed test, execute it and label the test as failing if it did not execute, took too * long to execute, or executed but exited with an error code. @@ -712,4 +808,49 @@ private void validateAndRun( System.out.print(System.lineSeparator()); } + + /** + * Validate and run the given tests, using the specified configuratator and level. + * + *

While performing tests, this method prints a header that reaches completion once all tests + * have been run. + * + * @param tests A set of tests to run. + * @param transformer A procedure for transforming the tests. + * @param configurator A procedure for configuring the tests. + * @param level The level of testing. + * @throws IOException If initial file configuration fails + */ + private void validateAndRunWithRustRti( + Set tests, Transformer transformer, Configurator configurator, TestLevel level) + throws IOException { + var done = 1; + + System.out.println(THICK_LINE); + + for (var test : tests) { + System.out.println( + "Running: " + test.toString() + " (" + (int) (done / (float) tests.size() * 100) + "%)"); + try { + test.redirectOutputs(); + prepare(test, transformer, configurator); + validate(test); + generateCodeForRustRti(test); + if (level == TestLevel.EXECUTION) { + execute(test); + } + test.markPassed(); + } catch (TestError e) { + test.handleTestError(e); + } catch (Throwable e) { + test.handleTestError( + new TestError("Unknown exception during test execution", Result.TEST_EXCEPTION, e)); + } finally { + test.restoreOutputs(); + } + done++; + } + + System.out.print(System.lineSeparator()); + } }