diff --git a/core/environment.c b/core/environment.c
index 4523c4721..d2d56a593 100644
--- a/core/environment.c
+++ b/core/environment.c
@@ -1,32 +1,11 @@
/**
* @file
- * @author Erling R. Jellum (erling.r.jellum@ntnu.no)
+ * @author Erling R. Jellum
+ * @copyright (c) 2023-2024, The Norwegian University of Science and Technology.
+ * License: BSD 2-clause
*
- * @section LICENSE
- * Copyright (c) 2023, The Norwegian University of Science and Technology.
- *
- * Redistribution and use in source and binary forms, with or without modification,
- * are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
- * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
- * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
- * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * @section DESCRIPTION Functions intitializing and freeing memory for environments.
- * See environment.h for docs.
+ * This file defines functions intitializing and freeing memory for environments.
+ * See environment.h for docs.
*/
#include "environment.h"
diff --git a/core/federated/federate.c b/core/federated/federate.c
index 38d69f167..498687d07 100644
--- a/core/federated/federate.c
+++ b/core/federated/federate.c
@@ -2611,14 +2611,18 @@ void lf_set_federation_id(const char* fid) { federation_metadata.federation_id =
void lf_spawn_staa_thread() { lf_thread_create(&_fed.staaSetter, update_ports_from_staa_offsets, NULL); }
#endif // FEDERATED_DECENTRALIZED
-void lf_stall_advance_level_federation(environment_t* env, size_t level) {
- LF_PRINT_DEBUG("Acquiring the environment mutex.");
- LF_MUTEX_LOCK(&env->mutex);
- LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance);
+void lf_stall_advance_level_federation_locked(size_t level) {
+ LF_PRINT_DEBUG("Waiting for MLAA %d to exceed level %zu.", max_level_allowed_to_advance, level);
while (((int)level) >= max_level_allowed_to_advance) {
lf_cond_wait(&lf_port_status_changed);
};
- LF_PRINT_DEBUG("Exiting wait with MLAA %d and next_reaction_level %zu.", max_level_allowed_to_advance, level);
+ LF_PRINT_DEBUG("Exiting wait with MLAA %d and level %zu.", max_level_allowed_to_advance, level);
+}
+
+void lf_stall_advance_level_federation(environment_t* env, size_t level) {
+ LF_PRINT_DEBUG("Acquiring the environment mutex.");
+ LF_MUTEX_LOCK(&env->mutex);
+ lf_stall_advance_level_federation_locked(level);
LF_MUTEX_UNLOCK(&env->mutex);
}
diff --git a/core/reactor.c b/core/reactor.c
index a3bc64260..abc4f41d1 100644
--- a/core/reactor.c
+++ b/core/reactor.c
@@ -286,12 +286,6 @@ void lf_request_stop(void) {
lf_set_stop_tag(env, new_stop_tag);
}
-/**
- * Return false.
- * @param reaction The reaction.
- */
-bool _lf_is_blocked_by_executing_reaction(void) { return false; }
-
/**
* The main loop of the LF program.
*
diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c
index ba81a9dc6..392f49c31 100644
--- a/core/threaded/reactor_threaded.c
+++ b/core/threaded/reactor_threaded.c
@@ -1,8 +1,8 @@
/**
* @file
- * @author Edward A. Lee (eal@berkeley.edu)
- * @author{Marten Lohstroh }
- * @author{Soroush Bateni }
+ * @author Edward A. Lee
+ * @author Marten Lohstroh
+ * @author Soroush Bateni
* @copyright (c) 2020-2024, The University of California at Berkeley.
* License: BSD 2-clause
* @brief Runtime infrastructure for the threaded version of the C target of Lingua Franca.
@@ -850,19 +850,13 @@ void _lf_worker_invoke_reaction(environment_t* env, int worker_number, reaction_
reaction->is_STP_violated = false;
}
-void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) {
-#ifdef FEDERATED
- lf_stall_advance_level_federation(env, *next_reaction_level);
-#else
- (void)env;
-#endif
- if (*next_reaction_level < SIZE_MAX)
- *next_reaction_level += 1;
-}
-
/**
- * The main looping logic of each LF worker thread.
- * This function assumes the caller holds the mutex lock.
+ * @brief The main looping logic of each LF worker thread.
+ *
+ * This function returns when the scheduler's lf_sched_get_ready_reaction()
+ * implementation returns NULL, indicating that there are no more reactions to execute.
+ *
+ * This function assumes the caller does not hold the mutex lock on the environment.
*
* @param env Environment within which we are executing.
* @param worker_number The number assigned to this worker thread
@@ -882,10 +876,9 @@ void _lf_worker_do_work(environment_t* env, int worker_number) {
while ((current_reaction_to_execute = lf_sched_get_ready_reaction(env->scheduler, worker_number)) != NULL) {
// Got a reaction that is ready to run.
LF_PRINT_DEBUG("Worker %d: Got from scheduler reaction %s: "
- "level: %lld, is input reaction: %d, chain ID: %llu, and deadline " PRINTF_TIME ".",
+ "level: %lld, is input reaction: %d, and deadline " PRINTF_TIME ".",
worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index),
- current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id,
- current_reaction_to_execute->deadline);
+ current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->deadline);
bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute);
diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c
index d590adecb..e77257209 100644
--- a/core/threaded/scheduler_GEDF_NP.c
+++ b/core/threaded/scheduler_GEDF_NP.c
@@ -1,12 +1,20 @@
/**
* @file
- * @author{Soroush Bateni }
- * @author{Edward A. Lee }
- * @author{Marten Lohstroh }
+ * @author Soroush Bateni
+ * @author Edward A. Lee
+ * @author Marten Lohstroh
* @copyright (c) 2020-2024, The University of California at Berkeley.
* License: BSD 2-clause
* @brief Global Earliest Deadline First (GEDF) non-preemptive scheduler for the
* threaded runtime of the C target of Lingua Franca.
+ *
+ * At each tag, this scheduler prioritizes reactions with the smallest (inferred) deadline.
+ * An inferred deadline for reaction _R_ is either an explicitly declared deadline or the declared deadline of
+ * a reaction that depends on _R_. This scheduler is non-preemptive, meaning that once a worker thread starts
+ * executing a reaction, it will execute that reaction to completion. The underlying thread scheduler, of
+ * course, could preempt the execution in favor of some other worker thread.
+ * This scheduler does not take into account execution times of reactions.
+ * Moreover, it does not prioritize reactions across distinct tags.
*/
#include "lf_types.h"
@@ -25,153 +33,79 @@
#include "scheduler_instance.h"
#include "scheduler_sync_tag_advance.h"
#include "scheduler.h"
-#include "lf_semaphore.h"
#include "tracepoint.h"
#include "util.h"
-/////////////////// Scheduler Private API /////////////////////////
-/**
- * @brief Insert 'reaction' into scheduler->triggered_reactions
- * at the appropriate level.
- *
- * @param reaction The reaction to insert.
- */
-static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction_t* reaction) {
- size_t reaction_level = LF_LEVEL(reaction->index);
- LF_PRINT_DEBUG("Scheduler: Trying to lock the mutex for level %zu.", reaction_level);
- LF_MUTEX_LOCK(&scheduler->array_of_mutexes[reaction_level]);
- LF_PRINT_DEBUG("Scheduler: Locked the mutex for level %zu.", reaction_level);
- pqueue_insert(((pqueue_t**)scheduler->triggered_reactions)[reaction_level], (void*)reaction);
- LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[reaction_level]);
-}
+#ifdef FEDERATED
+#include "federate.h"
+#endif
-/**
- * @brief Distribute any reaction that is ready to execute to idle worker
- * thread(s).
- *
- * @return Number of reactions that were successfully distributed to worker
- * threads.
- */
-int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) {
- pqueue_t* tmp_queue = NULL;
- // Note: All the threads are idle, which means that they are done inserting
- // reactions. Therefore, the reaction queues can be accessed without locking
- // a mutex.
-
- while (scheduler->next_reaction_level <= scheduler->max_reaction_level) {
- LF_PRINT_DEBUG("Waiting with curr_reaction_level %zu.", scheduler->next_reaction_level);
- try_advance_level(scheduler->env, &scheduler->next_reaction_level);
-
- tmp_queue = ((pqueue_t**)scheduler->triggered_reactions)[scheduler->next_reaction_level - 1];
- size_t reactions_to_execute = pqueue_size(tmp_queue);
-
- if (reactions_to_execute) {
- scheduler->executing_reactions = tmp_queue;
- return reactions_to_execute;
- }
- }
-
- return 0;
-}
+// Data specific to the GEDF scheduler.
+typedef struct custom_scheduler_data_t {
+ pqueue_t* reaction_q;
+ lf_cond_t reaction_q_changed;
+ size_t current_level;
+ bool solo_holds_mutex; // Indicates sole thread holds the mutex.
+} custom_scheduler_data_t;
-/**
- * @brief If there is work to be done, notify workers individually.
- *
- * This assumes that the caller is not holding any thread mutexes.
- */
-void _lf_sched_notify_workers(lf_scheduler_t* scheduler) {
- // Note: All threads are idle. Therefore, there is no need to lock the mutex
- // while accessing the executing queue (which is pointing to one of the
- // reaction queues).
- size_t workers_to_awaken =
- LF_MIN(scheduler->number_of_idle_workers, pqueue_size((pqueue_t*)scheduler->executing_reactions));
- LF_PRINT_DEBUG("Scheduler: Notifying %zu workers.", workers_to_awaken);
- scheduler->number_of_idle_workers -= workers_to_awaken;
- LF_PRINT_DEBUG("Scheduler: New number of idle workers: %zu.", scheduler->number_of_idle_workers);
- if (workers_to_awaken > 1) {
- // Notify all the workers except the worker thread that has called this
- // function.
- lf_semaphore_release(scheduler->semaphore, (workers_to_awaken - 1));
- }
-}
+/////////////////// Scheduler Private API /////////////////////////
/**
- * @brief Signal all worker threads that it is time to stop.
- *
+ * @brief Mark the calling thread idle and wait for notification of change to the reaction queue.
+ * @param scheduler The scheduler.
+ * @param worker_number The number of the worker thread.
*/
-void _lf_sched_signal_stop(lf_scheduler_t* scheduler) {
- scheduler->should_stop = true;
- lf_semaphore_release(scheduler->semaphore, (scheduler->number_of_workers - 1));
+inline static void wait_for_reaction_queue_updates(lf_scheduler_t* scheduler, int worker_number) {
+ scheduler->number_of_idle_workers++;
+ tracepoint_worker_wait_starts(scheduler->env, worker_number);
+ LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed);
+ tracepoint_worker_wait_ends(scheduler->env, worker_number);
+ scheduler->number_of_idle_workers--;
}
/**
- * @brief Advance tag or distribute reactions to worker threads.
- *
- * Advance tag if there are no reactions on the reaction queue. If
- * there are such reactions, distribute them to worker threads.
- *
- * This function assumes the caller does not hold the 'mutex' lock.
+ * @brief Assuming this is the last worker to go idle, advance the tag.
+ * @param scheduler The scheduler.
+ * @return Non-zero if the stop tag has been reached.
*/
-void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) {
- environment_t* env = scheduler->env;
-
- // Executing queue must be empty when this is called.
- assert(pqueue_size((pqueue_t*)scheduler->executing_reactions) == 0);
-
- // Loop until it's time to stop or work has been distributed
- while (true) {
- if (scheduler->next_reaction_level == (scheduler->max_reaction_level + 1)) {
- scheduler->next_reaction_level = 0;
- LF_MUTEX_LOCK(&env->mutex);
- // Nothing more happening at this tag.
- LF_PRINT_DEBUG("Scheduler: Advancing tag.");
- // This worker thread will take charge of advancing tag.
- if (_lf_sched_advance_tag_locked(scheduler)) {
- LF_PRINT_DEBUG("Scheduler: Reached stop tag.");
- _lf_sched_signal_stop(scheduler);
- LF_MUTEX_UNLOCK(&env->mutex);
- break;
- }
- LF_MUTEX_UNLOCK(&env->mutex);
- }
-
- if (_lf_sched_distribute_ready_reactions(scheduler) > 0) {
- _lf_sched_notify_workers(scheduler);
- break;
- }
+static int advance_tag(lf_scheduler_t* scheduler) {
+ // Set a flag in the scheduler that the lock is held by the sole executing thread.
+ // This prevents acquiring the mutex in lf_scheduler_trigger_reaction.
+ scheduler->custom_data->solo_holds_mutex = true;
+ if (_lf_sched_advance_tag_locked(scheduler)) {
+ LF_PRINT_DEBUG("Scheduler: Reached stop tag.");
+ scheduler->should_stop = true;
+ scheduler->custom_data->solo_holds_mutex = false;
+ // Notify all threads that the stop tag has been reached.
+ LF_COND_BROADCAST(&scheduler->custom_data->reaction_q_changed);
+ return 1;
}
+ scheduler->custom_data->solo_holds_mutex = false;
+ // Reset the level to 0.
+ scheduler->custom_data->current_level = 0;
+#ifdef FEDERATED
+ // In case there are blocking network input reactions at this level, stall.
+ lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level);
+#endif
+ return 0;
}
/**
- * @brief Wait until the scheduler assigns work.
- *
- * If the calling worker thread is the last to become idle, it will call on the
- * scheduler to distribute work. Otherwise, it will wait on
- * 'scheduler->semaphore'.
- *
- * @param worker_number The worker number of the worker thread asking for work
- * to be assigned to it.
+ * @brief Assuming all other workers are idle, advance to the next level.
+ * @param scheduler The scheduler.
*/
-void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) {
- // Increment the number of idle workers by 1 and check if this is the last
- // worker thread to become idle.
- if (((size_t)lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1)) ==
- scheduler->number_of_workers) {
- // Last thread to go idle
- LF_PRINT_DEBUG("Scheduler: Worker %zu is the last idle thread.", worker_number);
- // Call on the scheduler to distribute work or advance tag.
- _lf_scheduler_try_advance_tag_and_distribute(scheduler);
- } else {
- // Not the last thread to become idle.
- // Wait for work to be released.
- LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling "
- "semaphore.",
- worker_number);
- lf_semaphore_acquire(scheduler->semaphore);
- LF_PRINT_DEBUG("Scheduler: Worker %zu acquired the scheduling semaphore.", worker_number);
+static void advance_level(lf_scheduler_t* scheduler) {
+ if (++scheduler->custom_data->current_level > scheduler->max_reaction_level) {
+ // Since the reaction queue is not empty, we must be cycling back to level 0 due to deadlines
+ // having been given precedence over levels. Reset the current level to 1.
+ scheduler->custom_data->current_level = 0;
}
+ LF_PRINT_DEBUG("Scheduler: Advancing to next reaction level %zu.", scheduler->custom_data->current_level);
+#ifdef FEDERATED
+ // In case there are blocking network input reactions at this level, stall.
+ lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level);
+#endif
}
-
///////////////////// Scheduler Init and Destroy API /////////////////////////
/**
* @brief Initialize the scheduler.
@@ -195,26 +129,17 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
}
lf_scheduler_t* scheduler = env->scheduler;
- scheduler->triggered_reactions = calloc((scheduler->max_reaction_level + 1), sizeof(pqueue_t*));
-
- scheduler->array_of_mutexes = (lf_mutex_t*)calloc((scheduler->max_reaction_level + 1), sizeof(lf_mutex_t));
+ scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t));
+ // Initialize the reaction queue.
size_t queue_size = INITIAL_REACT_QUEUE_SIZE;
- for (size_t i = 0; i <= scheduler->max_reaction_level; i++) {
- if (params != NULL) {
- if (params->num_reactions_per_level != NULL) {
- queue_size = params->num_reactions_per_level[i];
- }
- }
- // Initialize the reaction queues
- ((pqueue_t**)scheduler->triggered_reactions)[i] =
- pqueue_init(queue_size, in_reverse_order, get_reaction_index, get_reaction_position, set_reaction_position,
- reaction_matches, print_reaction);
- // Initialize the mutexes for the reaction queues
- LF_MUTEX_INIT(&scheduler->array_of_mutexes[i]);
- }
+ scheduler->custom_data->reaction_q =
+ pqueue_init(queue_size, in_reverse_order, get_reaction_index, get_reaction_position, set_reaction_position,
+ reaction_matches, print_reaction);
+
+ LF_COND_INIT(&scheduler->custom_data->reaction_q_changed, &env->mutex);
- scheduler->executing_reactions = ((pqueue_t**)scheduler->triggered_reactions)[0];
+ scheduler->custom_data->current_level = 0;
}
/**
@@ -223,91 +148,118 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
* This must be called when the scheduler is no longer needed.
*/
void lf_sched_free(lf_scheduler_t* scheduler) {
- // for (size_t j = 0; j <= scheduler->max_reaction_level; j++) {
- // pqueue_free(scheduler->triggered_reactions[j]);
- // FIXME: This is causing weird memory errors.
- // }
- pqueue_free((pqueue_t*)scheduler->executing_reactions);
- lf_semaphore_destroy(scheduler->semaphore);
+ pqueue_free((pqueue_t*)scheduler->custom_data->reaction_q);
+ free(scheduler->custom_data);
}
///////////////////// Scheduler Worker API (public) /////////////////////////
-/**
- * @brief Ask the scheduler for one more reaction.
- *
- * This function blocks until it can return a ready reaction for worker thread
- * 'worker_number' or it is time for the worker thread to stop and exit (where a
- * NULL value would be returned).
- *
- * @param worker_number
- * @return reaction_t* A reaction for the worker to execute. NULL if the calling
- * worker thread should exit.
- */
+
reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) {
- // Iterate until the stop_tag is reached or reaction queue is empty
- while (!scheduler->should_stop) {
- // Need to lock the mutex for the current level
- size_t current_level = scheduler->next_reaction_level - 1;
- LF_PRINT_DEBUG("Scheduler: Worker %d trying to lock the mutex for level %zu.", worker_number, current_level);
- LF_MUTEX_LOCK(&scheduler->array_of_mutexes[current_level]);
- LF_PRINT_DEBUG("Scheduler: Worker %d locked the mutex for level %zu.", worker_number, current_level);
- reaction_t* reaction_to_return = (reaction_t*)pqueue_pop((pqueue_t*)scheduler->executing_reactions);
- LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[current_level]);
+ // Need to lock the environment mutex.
+ LF_PRINT_DEBUG("Scheduler: Worker %d locking environment mutex.", worker_number);
+ LF_MUTEX_LOCK(&scheduler->env->mutex);
+ LF_PRINT_DEBUG("Scheduler: Worker %d locked environment mutex.", worker_number);
+ // Iterate until the stop_tag is reached or the event queue is empty.
+ while (!scheduler->should_stop) {
+ reaction_t* reaction_to_return = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q);
if (reaction_to_return != NULL) {
- // Got a reaction
- return reaction_to_return;
- }
+ // Found a reaction. Check the level. Notice that because of deadlines, the current level
+ // may advance to the maximum and then back down to 0.
+ if (LF_LEVEL(reaction_to_return->index) == scheduler->custom_data->current_level) {
+ // Found a reaction at the current level.
+ LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %zu.", worker_number,
+ scheduler->custom_data->current_level);
+ // Remove the reaction from the queue.
+ pqueue_pop(scheduler->custom_data->reaction_q);
- LF_PRINT_DEBUG("Worker %d is out of ready reactions.", worker_number);
+ // If there is another reaction at the current level and an idle thread, then
+ // notify an idle thread.
+ reaction_t* next_reaction = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q);
+ if (next_reaction != NULL && LF_LEVEL(next_reaction->index) == scheduler->custom_data->current_level &&
+ scheduler->number_of_idle_workers > 0) {
+ // Notify an idle thread. Note that we could do a broadcast here, but it's probably not
+ // a good idea because all workers awakened need to acquire the same mutex to examine the
+ // reaction queue. Only one of them will acquire the mutex, and that worker can check whether
+ // there are further reactions on the same level that warrant waking another worker thread.
+ // So we opt to wake one other worker here rather than broadcasting.
+ LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed);
+ }
+ LF_MUTEX_UNLOCK(&scheduler->env->mutex);
+ return reaction_to_return;
+ } else {
+ // Found a reaction at a level other than the current level.
+ LF_PRINT_DEBUG("Scheduler: Worker %d found a reaction at level %lld. Current level is %zu", worker_number,
+ LF_LEVEL(reaction_to_return->index), scheduler->custom_data->current_level);
+ // We need to wait to advance to the next level or get a new reaction at the current level.
+ if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) {
+ // All other workers are idle. Advance to the next level.
+ advance_level(scheduler);
+ } else {
+ // Some workers are still working on reactions on the current level.
+ // Wait for them to finish.
+ wait_for_reaction_queue_updates(scheduler, worker_number);
+ }
+ }
+ } else {
+ // The reaction queue is empty.
+ LF_PRINT_DEBUG("Worker %d finds nothing on the reaction queue.", worker_number);
- // Ask the scheduler for more work and wait
- tracepoint_worker_wait_starts(scheduler->env, worker_number);
- _lf_sched_wait_for_work(scheduler, worker_number);
- tracepoint_worker_wait_ends(scheduler->env, worker_number);
+ // If all other workers are idle, then we are done with this tag.
+ if (scheduler->number_of_idle_workers == scheduler->number_of_workers - 1) {
+ // Last thread to go idle
+ LF_PRINT_DEBUG("Scheduler: Worker %d is advancing the tag.", worker_number);
+ if (advance_tag(scheduler)) {
+ // Stop tag has been reached.
+ break;
+ }
+ } else {
+ // Some other workers are still working on reactions on the current level.
+ // Wait for them to finish.
+ wait_for_reaction_queue_updates(scheduler, worker_number);
+ }
+ }
}
// It's time for the worker thread to stop and exit.
+ LF_MUTEX_UNLOCK(&scheduler->env->mutex);
return NULL;
}
-/**
- * @brief Inform the scheduler that worker thread 'worker_number' is done
- * executing the 'done_reaction'.
- *
- * @param worker_number The worker number for the worker thread that has
- * finished executing 'done_reaction'.
- * @param done_reaction The reaction that is done.
- */
void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) {
- (void)worker_number;
+ (void)worker_number; // Suppress unused parameter warning.
if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) {
lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued);
}
}
-/**
- * @brief Inform the scheduler that worker thread 'worker_number' would like to
- * trigger 'reaction' at the current tag.
- *
- * If a worker number is not available (e.g., this function is not called by a
- * worker thread), -1 should be passed as the 'worker_number'.
- *
- * The scheduler will ensure that the same reaction is not triggered twice in
- * the same tag.
- *
- * @param reaction The reaction to trigger at the current tag.
- * @param worker_number The ID of the worker that is making this call. 0 should
- * be used if there is only one worker (e.g., when the program is using the
- * single-threaded C runtime). -1 is used for an anonymous call in a context where a
- * worker number does not make sense (e.g., the caller is not a worker thread).
- */
void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {
- (void)worker_number;
+ (void)worker_number; // Suppress unused parameter warning.
if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) {
return;
}
LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index));
- _lf_sched_insert_reaction(scheduler, reaction);
+
+ // Mutex not needed when pulling from the event queue.
+ if (!scheduler->custom_data->solo_holds_mutex) {
+ LF_PRINT_DEBUG("Scheduler: Locking mutex for environment.");
+ LF_MUTEX_LOCK(&scheduler->env->mutex);
+ LF_PRINT_DEBUG("Scheduler: Locked mutex for environment.");
+ }
+ pqueue_insert(scheduler->custom_data->reaction_q, (void*)reaction);
+ if (!scheduler->custom_data->solo_holds_mutex) {
+ // If this is called from a reaction execution, then the triggered reaction
+ // has one level higher than the current level. No need to notify idle threads.
+ // But in federated execution, it could be called because of message arrival.
+ // Also, in modal models, reset and startup reactions may be triggered.
+#if defined(FEDERATED) || (defined(MODAL) && !defined(LF_SINGLE_THREADED))
+ reaction_t* triggered_reaction = (reaction_t*)pqueue_peek(scheduler->custom_data->reaction_q);
+ if (LF_LEVEL(triggered_reaction->index) == scheduler->custom_data->current_level) {
+ LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed);
+ }
+#endif // FEDERATED || MODAL
+
+ LF_MUTEX_UNLOCK(&scheduler->env->mutex);
+ }
}
#endif // SCHEDULER == SCHED_GEDF_NP
diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c
index 01b510477..7edd41a81 100644
--- a/core/threaded/scheduler_NP.c
+++ b/core/threaded/scheduler_NP.c
@@ -1,8 +1,8 @@
/**
* @file
- * @author{Soroush Bateni }
- * @author{Edward A. Lee }
- * @author{Marten Lohstroh }
+ * @author Soroush Bateni
+ * @author Edward A. Lee
+ * @author Marten Lohstroh
* @copyright (c) 2020-2024, The University of California at Berkeley.
* License: BSD 2-clause
* @brief Non-preemptive scheduler for the threaded runtime of the C target of Lingua Franca.
@@ -27,10 +27,26 @@
#include "util.h"
#include "reactor_threaded.h"
+#ifdef FEDERATED
+#include "federate.h"
+#endif
+
+// Data specific to the NP scheduler.
+typedef struct custom_scheduler_data_t {
+ reaction_t** executing_reactions;
+ lf_mutex_t* array_of_mutexes;
+ reaction_t*** triggered_reactions;
+ volatile size_t next_reaction_level;
+ lf_semaphore_t* semaphore; // Signal the maximum number of worker threads that should
+ // be executing work at the same time. Initially 0.
+ // For example, if the scheduler releases the semaphore with a count of 4,
+ // no more than 4 worker threads should wake up to process reactions.
+} custom_scheduler_data_t;
+
/////////////////// Scheduler Private API /////////////////////////
+
/**
- * @brief Insert 'reaction' into
- * scheduler->triggered_reactions at the appropriate level.
+ * @brief Insert 'reaction' into scheduler->triggered_reactions at the appropriate level.
*
* @param reaction The reaction to insert.
*/
@@ -39,19 +55,19 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction
#ifdef FEDERATED
// Lock the mutex if federated because a federate can insert reactions with
// a level equal to the current level.
- size_t current_level = scheduler->next_reaction_level - 1;
+ size_t current_level = scheduler->custom_data->next_reaction_level - 1;
// There is a race condition here where
- // `scheduler->next_reaction_level` can change after it is
+ // `scheduler->custom_data->next_reaction_level` can change after it is
// cached here. In that case, if the cached value is equal to
// `reaction_level`, the cost will be an additional unnecessary mutex lock,
// but no logic error. If the cached value is not equal to `reaction_level`,
// it can never become `reaction_level` because the scheduler will only
- // change the `scheduler->next_reaction_level` if it can
+ // change the `scheduler->custom_data->next_reaction_level` if it can
// ensure that all worker threads are idle, and thus, none are triggering
// reactions (and therefore calling this function).
if (reaction_level == current_level) {
LF_PRINT_DEBUG("Scheduler: Trying to lock the mutex for level %zu.", reaction_level);
- LF_MUTEX_LOCK(&scheduler->array_of_mutexes[reaction_level]);
+ LF_MUTEX_LOCK(&scheduler->custom_data->array_of_mutexes[reaction_level]);
LF_PRINT_DEBUG("Scheduler: Locked the mutex for level %zu.", reaction_level);
}
// The level index for the current level can sometimes become negative. Set
@@ -65,11 +81,11 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction
assert(reaction_q_level_index >= 0);
LF_PRINT_DEBUG("Scheduler: Accessing triggered reactions at the level %zu with index %d.", reaction_level,
reaction_q_level_index);
- ((reaction_t***)scheduler->triggered_reactions)[reaction_level][reaction_q_level_index] = reaction;
+ ((reaction_t***)scheduler->custom_data->triggered_reactions)[reaction_level][reaction_q_level_index] = reaction;
LF_PRINT_DEBUG("Scheduler: Index for level %zu is at %d.", reaction_level, reaction_q_level_index);
#ifdef FEDERATED
if (reaction_level == current_level) {
- LF_MUTEX_UNLOCK(&scheduler->array_of_mutexes[reaction_level]);
+ LF_MUTEX_UNLOCK(&scheduler->custom_data->array_of_mutexes[reaction_level]);
}
#endif
}
@@ -80,20 +96,22 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction
*
* @return 1 if any reaction is ready. 0 otherwise.
*/
-int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) {
+static int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) {
// Note: All the threads are idle, which means that they are done inserting
// reactions. Therefore, the reaction vectors can be accessed without
// locking a mutex.
- while (scheduler->next_reaction_level <= scheduler->max_reaction_level) {
- LF_PRINT_DEBUG("Waiting with curr_reaction_level %zu.", scheduler->next_reaction_level);
- try_advance_level(scheduler->env, &scheduler->next_reaction_level);
+ while (scheduler->custom_data->next_reaction_level <= scheduler->max_reaction_level) {
+#ifdef FEDERATED
+ lf_stall_advance_level_federation(scheduler->env, scheduler->custom_data->next_reaction_level);
+#endif
+ scheduler->custom_data->executing_reactions =
+ scheduler->custom_data->triggered_reactions[scheduler->custom_data->next_reaction_level];
+ LF_PRINT_DEBUG("Start of rxn queue at %zu is %p", scheduler->custom_data->next_reaction_level,
+ (void*)((reaction_t**)scheduler->custom_data->executing_reactions)[0]);
- scheduler->executing_reactions =
- (void*)((reaction_t***)scheduler->triggered_reactions)[scheduler->next_reaction_level - 1];
+ scheduler->custom_data->next_reaction_level++;
- LF_PRINT_DEBUG("Start of rxn queue at %zu is %p", scheduler->next_reaction_level - 1,
- (void*)((reaction_t**)scheduler->executing_reactions)[0]);
- if (((reaction_t**)scheduler->executing_reactions)[0] != NULL) {
+ if (scheduler->custom_data->executing_reactions[0] != NULL) {
// There is at least one reaction to execute
return 1;
}
@@ -107,13 +125,13 @@ int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) {
*
* This assumes that the caller is not holding any thread mutexes.
*/
-void _lf_sched_notify_workers(lf_scheduler_t* scheduler) {
+static void _lf_sched_notify_workers(lf_scheduler_t* scheduler) {
// Calculate the number of workers that we need to wake up, which is the
// number of reactions enabled at this level.
// Note: All threads are idle. Therefore, there is no need to lock the mutex while accessing the index for the
// current level.
- size_t workers_to_awaken =
- LF_MIN(scheduler->number_of_idle_workers, (size_t)(scheduler->indexes[scheduler->next_reaction_level - 1]));
+ size_t workers_to_awaken = LF_MIN(scheduler->number_of_idle_workers,
+ (size_t)(scheduler->indexes[scheduler->custom_data->next_reaction_level - 1]));
LF_PRINT_DEBUG("Scheduler: Notifying %zu workers.", workers_to_awaken);
scheduler->number_of_idle_workers -= workers_to_awaken;
@@ -122,7 +140,7 @@ void _lf_sched_notify_workers(lf_scheduler_t* scheduler) {
if (workers_to_awaken > 1) {
// Notify all the workers except the worker thread that has called this
// function.
- lf_semaphore_release(scheduler->semaphore, (workers_to_awaken - 1));
+ lf_semaphore_release(scheduler->custom_data->semaphore, (workers_to_awaken - 1));
}
}
@@ -130,9 +148,9 @@ void _lf_sched_notify_workers(lf_scheduler_t* scheduler) {
* @brief Signal all worker threads that it is time to stop.
*
*/
-void _lf_sched_signal_stop(lf_scheduler_t* scheduler) {
+static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) {
scheduler->should_stop = true;
- lf_semaphore_release(scheduler->semaphore, (scheduler->number_of_workers - 1));
+ lf_semaphore_release(scheduler->custom_data->semaphore, (scheduler->number_of_workers - 1));
}
/**
@@ -143,15 +161,15 @@ void _lf_sched_signal_stop(lf_scheduler_t* scheduler) {
*
* This function assumes the caller does not hold the 'mutex' lock.
*/
-void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) {
+static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) {
// Reset the index
environment_t* env = scheduler->env;
- scheduler->indexes[scheduler->next_reaction_level - 1] = 0;
+ scheduler->indexes[scheduler->custom_data->next_reaction_level - 1] = 0;
// Loop until it's time to stop or work has been distributed
while (true) {
- if (scheduler->next_reaction_level == (scheduler->max_reaction_level + 1)) {
- scheduler->next_reaction_level = 0;
+ if (scheduler->custom_data->next_reaction_level == (scheduler->max_reaction_level + 1)) {
+ scheduler->custom_data->next_reaction_level = 0;
LF_MUTEX_LOCK(&env->mutex);
// Nothing more happening at this tag.
LF_PRINT_DEBUG("Scheduler: Advancing tag.");
@@ -177,12 +195,12 @@ void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) {
*
* If the calling worker thread is the last to become idle, it will call on the
* scheduler to distribute work. Otherwise, it will wait on
- * 'scheduler->semaphore'.
+ * 'scheduler->custom_data->semaphore'.
*
* @param worker_number The worker number of the worker thread asking for work
* to be assigned to it.
*/
-void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) {
+static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) {
// Increment the number of idle workers by 1 and check if this is the last
// worker thread to become idle.
if (lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) {
@@ -192,15 +210,14 @@ void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) {
_lf_scheduler_try_advance_tag_and_distribute(scheduler);
} else {
// Not the last thread to become idle. Wait for work to be released.
- LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling "
- "semaphore.",
- worker_number);
- lf_semaphore_acquire(scheduler->semaphore);
+ LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling semaphore.", worker_number);
+ lf_semaphore_acquire(scheduler->custom_data->semaphore);
LF_PRINT_DEBUG("Scheduler: Worker %zu acquired the scheduling semaphore.", worker_number);
}
}
///////////////////// Scheduler Init and Destroy API /////////////////////////
+
/**
* @brief Initialize the scheduler.
*
@@ -233,9 +250,17 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
LF_PRINT_DEBUG("Scheduler: Max reaction level: %zu", env->scheduler->max_reaction_level);
- env->scheduler->triggered_reactions = calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**));
+ env->scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t));
+
+ env->scheduler->custom_data->triggered_reactions =
+ (reaction_t***)calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**));
+
+ env->scheduler->custom_data->array_of_mutexes =
+ (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t));
- env->scheduler->array_of_mutexes = (lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t));
+ env->scheduler->custom_data->semaphore = lf_semaphore_new(0);
+
+ env->scheduler->custom_data->next_reaction_level = 1;
env->scheduler->indexes = (volatile int*)calloc((env->scheduler->max_reaction_level + 1), sizeof(volatile int));
@@ -247,15 +272,14 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
}
}
// Initialize the reaction vectors
- ((reaction_t***)env->scheduler->triggered_reactions)[i] = (reaction_t**)calloc(queue_size, sizeof(reaction_t*));
+ env->scheduler->custom_data->triggered_reactions[i] = (reaction_t**)calloc(queue_size, sizeof(reaction_t*));
LF_PRINT_DEBUG("Scheduler: Initialized vector of reactions for level %zu with size %zu", i, queue_size);
// Initialize the mutexes for the reaction vectors
- LF_MUTEX_INIT(&env->scheduler->array_of_mutexes[i]);
+ LF_MUTEX_INIT(&env->scheduler->custom_data->array_of_mutexes[i]);
}
-
- env->scheduler->executing_reactions = (void*)((reaction_t***)env->scheduler->triggered_reactions)[0];
+ env->scheduler->custom_data->executing_reactions = env->scheduler->custom_data->triggered_reactions[0];
}
/**
@@ -264,14 +288,15 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
* This must be called when the scheduler is no longer needed.
*/
void lf_sched_free(lf_scheduler_t* scheduler) {
- if (scheduler->triggered_reactions) {
+ if (scheduler->custom_data->triggered_reactions) {
for (size_t j = 0; j <= scheduler->max_reaction_level; j++) {
- free(((reaction_t***)scheduler->triggered_reactions)[j]);
+ free(scheduler->custom_data->triggered_reactions[j]);
}
- free(scheduler->triggered_reactions);
+ free(scheduler->custom_data->triggered_reactions);
}
-
- lf_semaphore_destroy(scheduler->semaphore);
+ free(scheduler->custom_data->array_of_mutexes);
+ lf_semaphore_destroy(scheduler->custom_data->semaphore);
+ free(scheduler->custom_data);
}
///////////////////// Scheduler Worker API (public) /////////////////////////
@@ -290,23 +315,23 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu
// Iterate until the stop tag is reached or reaction vectors are empty
while (!scheduler->should_stop) {
// Calculate the current level of reactions to execute
- size_t current_level = scheduler->next_reaction_level - 1;
+ size_t current_level = scheduler->custom_data->next_reaction_level - 1;
reaction_t* reaction_to_return = NULL;
#ifdef FEDERATED
// Need to lock the mutex because federate.c could trigger reactions at
// the current level (if there is a causality loop)
- LF_MUTEX_LOCK(&scheduler->array_of_mutexes[current_level]);
+ LF_MUTEX_LOCK(&scheduler->custom_data->array_of_mutexes[current_level]);
#endif
int current_level_q_index = lf_atomic_add_fetch32((int32_t*)&scheduler->indexes[current_level], -1);
if (current_level_q_index >= 0) {
LF_PRINT_DEBUG("Scheduler: Worker %d popping reaction with level %zu, index "
"for level: %d.",
worker_number, current_level, current_level_q_index);
- reaction_to_return = ((reaction_t**)scheduler->executing_reactions)[current_level_q_index];
- ((reaction_t**)scheduler->executing_reactions)[current_level_q_index] = NULL;
+ reaction_to_return = scheduler->custom_data->executing_reactions[current_level_q_index];
+ scheduler->custom_data->executing_reactions[current_level_q_index] = NULL;
}
#ifdef FEDERATED
- lf_mutex_unlock(&scheduler->array_of_mutexes[current_level]);
+ lf_mutex_unlock(&scheduler->custom_data->array_of_mutexes[current_level]);
#endif
if (reaction_to_return != NULL) {
diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c
index 4b0843028..d2db00658 100644
--- a/core/threaded/scheduler_adaptive.c
+++ b/core/threaded/scheduler_adaptive.c
@@ -1,6 +1,6 @@
/**
* @file
- * @author{Peter Donovan }
+ * @author Peter Donovan
* @copyright (c) 2020-2024, The University of California at Berkeley.
* License: BSD 2-clause
* @brief This is a non-priority-driven scheduler. See scheduler.h for documentation.
@@ -21,12 +21,14 @@
#include "environment.h"
#include "util.h"
+#ifdef FEDERATED
+#include "federate.h"
+#endif
+
#ifndef MAX_REACTION_LEVEL
#define MAX_REACTION_LEVEL INITIAL_REACT_QUEUE_SIZE
#endif
-void try_advance_level(environment_t* env, volatile size_t* next_reaction_level);
-
/////////////////// Forward declarations /////////////////////////
extern bool fast;
static void worker_states_lock(lf_scheduler_t* scheduler, size_t worker);
@@ -435,7 +437,10 @@ static void advance_level_and_unlock(lf_scheduler_t* scheduler, size_t worker) {
return;
}
} else {
- try_advance_level(scheduler->env, &worker_assignments->current_level);
+#ifdef FEDERATED
+ lf_stall_advance_level_federation(scheduler->env, worker_assignments->current_level);
+#endif
+ worker_assignments->current_level++;
set_level(scheduler, worker_assignments->current_level);
}
size_t total_num_reactions = get_num_reactions(scheduler);
@@ -684,7 +689,6 @@ void lf_sched_free(lf_scheduler_t* scheduler) {
worker_assignments_free(scheduler);
data_collection_free(scheduler);
free(scheduler->custom_data);
- lf_semaphore_destroy(scheduler->semaphore);
}
///////////////////////// Scheduler Worker API ///////////////////////////////
diff --git a/core/threaded/scheduler_instance.c b/core/threaded/scheduler_instance.c
index 5487ead65..8146327bf 100644
--- a/core/threaded/scheduler_instance.c
+++ b/core/threaded/scheduler_instance.c
@@ -1,3 +1,14 @@
+/**
+ * @file
+ * @author Soroush Bateni
+ * @author Edward A. Lee
+ * @copyright (c) 2022-2024, The University of Texas at Dallas and The University of California at Berkeley.
+ * License: BSD 2-clause
+ * @brief Common scheduler functions.
+ *
+ * This file defines functions that are common across multiple schedulers.
+ */
+
#include
#include "scheduler_instance.h"
#include "environment.h"
@@ -32,9 +43,7 @@ bool init_sched_instance(environment_t* env, lf_scheduler_t** instance, size_t n
}
}
- (*instance)->semaphore = lf_semaphore_new(0);
(*instance)->number_of_workers = number_of_workers;
- (*instance)->next_reaction_level = 1;
(*instance)->should_stop = false;
(*instance)->env = env;
diff --git a/core/threaded/scheduler_sync_tag_advance.c b/core/threaded/scheduler_sync_tag_advance.c
index 1b0556ba1..cc91c88f0 100644
--- a/core/threaded/scheduler_sync_tag_advance.c
+++ b/core/threaded/scheduler_sync_tag_advance.c
@@ -1,63 +1,29 @@
-#if !defined(LF_SINGLE_THREADED)
-/*************
-Copyright (c) 2022, The University of Texas at Dallas.
-Copyright (c) 2022, The University of California at Berkeley.
-
-Redistribution and use in source and binary forms, with or without modification,
-are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
-EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
-THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-***************/
-
/**
- * @file scheduler_sync_tag_advance.c
- * @author Soroush Bateni (soroush@utdallas.edu)
- * @author Edward A. Lee
- * @author Marten Lohstroh
+ * @file
+ * @author Soroush Bateni
+ * @author Edward A. Lee
+ * @author Marten Lohstroh
* @brief API used to advance tag globally.
- *
- * @copyright Copyright (c) 2022, The University of Texas at Dallas.
- * @copyright Copyright (c) 2022, The University of California at Berkeley.
+ * @copyright (c) 2020-2024, The University of California at Berkeley and The University of Texas at Dallas
+ * License: BSD 2-clause
*/
+#if !defined(LF_SINGLE_THREADED)
+
#include "scheduler_sync_tag_advance.h"
#include "rti_local.h"
#include "environment.h"
#include "tracepoint.h"
#include "util.h"
-/////////////////// External Functions /////////////////////////
-/**
- * Placeholder for function that will advance tag and initially fill the
- * reaction queue.
- *
- * This does not acquire the mutex lock. It assumes the lock is already held.
- */
+// Forward declaration of function defined in reactor_threaded.h
+void _lf_next_locked(struct environment_t* env);
/**
* @brief Indicator that execution of at least one tag has completed.
*/
static bool _latest_tag_completed = false;
-/**
- * Return true if the worker should stop now; false otherwise.
- * This function assumes the caller holds the mutex lock.
- */
bool should_stop_locked(lf_scheduler_t* sched) {
// If this is not the very first step, check against the stop tag to see whether this is the last step.
if (_latest_tag_completed) {
@@ -70,14 +36,6 @@ bool should_stop_locked(lf_scheduler_t* sched) {
return false;
}
-/**
- * Advance tag. This will also pop events for the newly acquired tag and put
- * the triggered reactions on the '_lf_sched_vector_of_reaction_qs'.
- *
- * This function assumes the caller holds the 'mutex' lock.
- *
- * @return should_exit True if the worker thread should exit. False otherwise.
- */
bool _lf_sched_advance_tag_locked(lf_scheduler_t* sched) {
environment_t* env = sched->env;
logical_tag_complete(env->current_tag);
diff --git a/core/utils/pqueue.c b/core/utils/pqueue.c
index b2bf05090..65f6dd1d9 100644
--- a/core/utils/pqueue.c
+++ b/core/utils/pqueue.c
@@ -35,5 +35,5 @@ void set_reaction_position(void* reaction, size_t pos) { ((reaction_t*)reaction)
void print_reaction(void* reaction) {
reaction_t* r = (reaction_t*)reaction;
- LF_PRINT_DEBUG("%s: chain_id: %llu, index: %llx, reaction: %p", r->name, r->chain_id, r->index, reaction);
+ LF_PRINT_DEBUG("%s: index: %llx, reaction: %p", r->name, r->index, reaction);
}
diff --git a/include/core/environment.h b/include/core/environment.h
index a776dee95..038f97e4e 100644
--- a/include/core/environment.h
+++ b/include/core/environment.h
@@ -1,31 +1,11 @@
/**
* @file
- * @author Erling R. Jellum (erling.r.jellum@ntnu.no)
+ * @author Erling R. Jellum
+ * @copyright (c) 2023, The Norwegian University of Science and Technology.
+ * License: BSD 2-clause
+ * @brief API for the environment data structure.
*
- * @section LICENSE
- * Copyright (c) 2023, The Norwegian University of Science and Technology.
- *
- * Redistribution and use in source and binary forms, with or without modification,
- * are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
- * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
- * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
- * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * @section DESCRIPTION API for creating and destroying environments. An environment is the
+ * This is an API for creating and destroying environments. An environment is the
* "context" within which the reactors are executed. The environment contains data structures
* which are shared among the reactors such as priority queues, the current logical tag,
* the worker scheduler, and a lot of meta data. Each reactor stores a pointer to its
diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h
index 1c1028c23..230f3e277 100644
--- a/include/core/federated/federate.h
+++ b/include/core/federated/federate.h
@@ -496,6 +496,12 @@ void lf_spawn_staa_thread(void);
*/
void lf_stall_advance_level_federation(environment_t* env, size_t level);
+/**
+ * @brief Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock.
+ * @param level The level to which we would like to advance.
+ */
+void lf_stall_advance_level_federation_locked(size_t level);
+
/**
* @brief Synchronize the start with other federates via the RTI.
*
diff --git a/include/core/lf_types.h b/include/core/lf_types.h
index 75a61e405..a3a103041 100644
--- a/include/core/lf_types.h
+++ b/include/core/lf_types.h
@@ -157,9 +157,7 @@ struct reaction_t {
void* self; // Pointer to a struct with the reactor's state. INSTANCE.
int number; // The number of the reaction in the reactor (0 is the first reaction).
index_t index; // Inverse priority determined by dependency analysis. INSTANCE.
- // Binary encoding of the branches that this reaction has upstream in the dependency graph. INSTANCE.
- unsigned long long chain_id;
- size_t pos; // Current position in the priority queue. RUNTIME.
+ size_t pos; // Current position in the priority queue. RUNTIME.
reaction_t*
last_enabling_reaction; // The last enabling reaction, or NULL if there is none. Used for optimization. INSTANCE.
size_t num_outputs; // Number of outputs that may possibly be produced by this function. COMMON.
diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h
index 96de7ac49..0d58f7431 100644
--- a/include/core/threaded/reactor_threaded.h
+++ b/include/core/threaded/reactor_threaded.h
@@ -1,8 +1,8 @@
/**
* @file
- * @author Edward A. Lee (eal@berkeley.edu)
- * @author{Marten Lohstroh }
- * @author{Soroush Bateni }
+ * @author Edward A. Lee
+ * @author Marten Lohstroh
+ * @author Soroush Bateni
* @copyright (c) 2020-2024, The University of California at Berkeley.
* License: BSD 2-clause
* @brief Runtime infrastructure for the threaded version of the C target of Lingua Franca.
@@ -12,16 +12,6 @@
#include "lf_types.h"
-/**
- * @brief Advance to the next level.
- * For federated runtimes, this function should
- * stall the advance until we know that we can safely execute the next level
- * given knowledge about upstream network port statuses.
- * @param env The environment.
- * @param next_reaction_level The place to store the next reaction level.
- */
-void try_advance_level(environment_t* env, volatile size_t* next_reaction_level);
-
/**
* Enqueue port absent reactions that will send a PORT_ABSENT
* message to downstream federates if a given network output port is not present.
diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h
index ea9f008c2..f49f0bc54 100644
--- a/include/core/threaded/scheduler.h
+++ b/include/core/threaded/scheduler.h
@@ -1,38 +1,13 @@
-/*************
-Copyright (c) 2022, The University of Texas at Dallas.
-Copyright (c) 2022, The University of California at Berkeley.
-
-Redistribution and use in source and binary forms, with or without modification,
-are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
-EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
-THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-***************/
-
/**
- * @file scheduler.h
- * @author Soroush Bateni
+ * @file
+ * @author Soroush Bateni
+ * @author Edward A. Lee
+ * @copyright (c) 2022-2024, The University of Texas at Dallas and The University of California at Berkeley.
+ * License: BSD 2-clause
* @brief Scheduler API for the threaded C runtime.
*
* A scheduler for the threaded runtime of reactor-c should provide an
* implementation for functions that are defined in this header file.
- *
- * @copyright Copyright (c) 2022, The University of Texas at Dallas.
- * @copyright Copyright (c) 2022, The University of California at Berkeley.
*/
#ifndef LF_SCHEDULER_H
@@ -40,13 +15,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "lf_types.h"
#include "scheduler_instance.h"
-/**
- * @brief Default value that is assumed to be the maximum reaction level in the
- * program.
- *
- * Can be overriden by passing the appropriate `parameters` argument to
- * `lf_sched_init`.
- */
/**
* @brief Initialize the scheduler.
@@ -76,6 +44,7 @@ void lf_sched_free(lf_scheduler_t* scheduler);
* This function blocks until it can return a ready reaction for worker thread
* 'worker_number' or it is time for the worker thread to stop and exit (where a
* NULL value would be returned).
+ * This function assumes that the environment mutex is not locked.
*
* @param scheduler The scheduler
* @param worker_number For the calling worker thread.
diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h
index f664066e6..df55a86be 100644
--- a/include/core/threaded/scheduler_instance.h
+++ b/include/core/threaded/scheduler_instance.h
@@ -1,38 +1,12 @@
-/*************
-Copyright (c) 2022, The University of Texas at Dallas. Copyright (c) 2022, The
-University of California at Berkeley.
-
-Redistribution and use in source and binary forms, with or without modification,
-are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
-ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-***************/
-
/**
- * @file scheduler_params.h
- * @author Soroush Bateni
- * @brief Scheduler parameters.
- *
- * Meant for book-keeping in the threaded schedulers in the reactor C runtime.
+ * @file
+ * @author Soroush Bateni
+ * @author Edward A. Lee
+ * @copyright (c) 2022-2024, The University of Texas at Dallas and The University of California at Berkeley.
+ * License: BSD 2-clause
+ * @brief Common scheduler parameters.
*
- * @copyright Copyright (c) 2022, The University of Texas at Dallas.
- * @copyright Copyright (c) 2022, The University of California at Berkeley.
+ * This file defines data types and functions that are common across multiple schedulers.
*/
#ifndef LF_SCHEDULER_PARAMS_H
@@ -42,8 +16,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define NUMBER_OF_WORKERS 1
#endif // NUMBER_OF_WORKERS
-#include "lf_semaphore.h"
#include
+#include // for size_t
#define DEFAULT_MAX_REACTION_LEVEL 100
@@ -65,38 +39,11 @@ typedef struct lf_scheduler_t {
*/
size_t max_reaction_level;
- /**
- * @brief Used by the scheduler to signal the maximum number of worker
- * threads that should be executing work at the same time.
- *
- * Initially, the count is set to 0. Maximum value of count should be
- * `number_of_workers`.
- *
- * For example, if the scheduler releases the semaphore with a count of 4,
- * no more than 4 worker threads should wake up to process reactions.
- *
- * FIXME: specific comment
- */
- lf_semaphore_t* semaphore;
-
/**
* @brief Indicate whether the program should stop
*/
volatile bool should_stop;
- /**
- * @brief Hold triggered reactions.
- */
- void* triggered_reactions;
-
- /**
- * @brief An array of mutexes.
- *
- * Can be used to avoid race conditions. Schedulers are allowed to
- * initialize as many mutexes as they deem fit.
- */
- lf_mutex_t* array_of_mutexes;
-
/**
* @brief An array of atomic indexes.
*
@@ -105,11 +52,6 @@ typedef struct lf_scheduler_t {
*/
volatile int* indexes;
- /**
- * @brief Hold currently executing reactions.
- */
- void* executing_reactions;
-
/**
* @brief Hold reactions temporarily.
*/
@@ -126,11 +68,6 @@ typedef struct lf_scheduler_t {
*/
volatile size_t number_of_idle_workers;
- /**
- * @brief The next level of reactions to execute.
- */
- volatile size_t next_reaction_level;
-
// Pointer to an optional custom data structure that each scheduler can define.
// The type is forward declared here and must be declared again in the scheduler source file
// Is not touched by `init_sched_instance` and must be initialized by each scheduler that needs it
diff --git a/include/core/threaded/scheduler_sync_tag_advance.h b/include/core/threaded/scheduler_sync_tag_advance.h
index 3de92e540..309fffd1e 100644
--- a/include/core/threaded/scheduler_sync_tag_advance.h
+++ b/include/core/threaded/scheduler_sync_tag_advance.h
@@ -1,27 +1,12 @@
-/*************
-Copyright (c) 2022, The University of Texas at Dallas.
-Copyright (c) 2022, The University of California at Berkeley.
-
-Redistribution and use in source and binary forms, with or without modification,
-are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
-EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
-THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-***************/
+/**
+ * @file
+ * @author Soroush Bateni
+ * @author Edward A. Lee
+ * @author Marten Lohstroh
+ * @brief API used to advance tag globally.
+ * @copyright (c) 2020-2024, The University of California at Berkeley and The University of Texas at Dallas
+ * License: BSD 2-clause
+ */
#ifndef SCHEDULER_SYNC_TAG_ADVANCE_H
#define SCHEDULER_SYNC_TAG_ADVANCE_H
@@ -31,8 +16,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "tag.h"
#include "scheduler_instance.h"
-/////////////////// External Functions /////////////////////////
-void _lf_next_locked(struct environment_t* env);
/**
* Placeholder for code-generated function that will, in a federated
* execution, be used to coordinate the advancement of tag. It will notify
@@ -42,7 +25,25 @@ void _lf_next_locked(struct environment_t* env);
* @param tag_to_send The tag to send.
*/
void logical_tag_complete(tag_t tag_to_send);
+
+/**
+ * @brief Return true if the worker should stop now; false otherwise.
+ *
+ * This function assumes the caller holds the mutex lock.
+ * @param sched The scheduler instance to check.
+ */
bool should_stop_locked(lf_scheduler_t* sched);
+
+/**
+ * @brief Advance the tag to the next tag on the event queue
+ *
+ * This will also pop events for the newly acquired tag and trigger
+ * the enabled reactions using the scheduler.
+ *
+ * This function assumes the caller holds the environment mutex lock.
+ * @param sched The scheduler instance to check.
+ * @return True if the worker thread should exit. False otherwise.
+ */
bool _lf_sched_advance_tag_locked(lf_scheduler_t* sched);
#endif // LF_C11_THREADS_SUPPORT_H
diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt
index 1f7391f92..1a93378e4 100644
--- a/lingua-franca-ref.txt
+++ b/lingua-franca-ref.txt
@@ -1 +1 @@
-master
+gedf
diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h
index 63b2d1c77..9611870cc 100644
--- a/low_level_platform/api/low_level_platform.h
+++ b/low_level_platform/api/low_level_platform.h
@@ -234,7 +234,7 @@ int lf_cond_signal(lf_cond_t* cond);
/**
* Wait for condition variable "cond" to be signaled or broadcast.
- * "mutex" is assumed to be locked before.
+ * The cond->mutex is assumed to be locked when this is called.
*
* @return 0 on success, platform-specific error number otherwise.
*/