From 1241f879f5fc27dea9cb8433f451c04533223109 Mon Sep 17 00:00:00 2001 From: Byeonggil Jun Date: Tue, 23 Apr 2024 13:58:09 -0700 Subject: [PATCH] Skip sending LTCs if a network input reaction has been scheduled at the current tag --- core/environment.c | 2 ++ core/federated/RTI/rti_common.c | 3 +-- core/federated/federate.c | 21 ++++----------------- core/reactor_common.c | 2 ++ core/threaded/reactor_threaded.c | 7 +++++++ include/core/environment.h | 1 + include/core/federated/federate.h | 5 ----- 7 files changed, 17 insertions(+), 24 deletions(-) diff --git a/core/environment.c b/core/environment.c index 4523c4721..2fd3072d7 100644 --- a/core/environment.c +++ b/core/environment.c @@ -147,6 +147,8 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi env->_lf_intended_tag_fields = NULL; env->_lf_intended_tag_fields_size = 0; } +#elif FEDERATED_CENTRALIZED + env->need_to_send_LTC = false; #else (void)env; (void)num_is_present_fields; diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index dfd37ac36..0480e9dff 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -516,8 +516,7 @@ void send_downstream_next_event_tag_if_needed(scheduling_node_t* node, uint16_t DNET = NEVER_TAG; } - if (lf_tag_compare(node->last_DNET, DNET) != 0 && lf_tag_compare(node->completed, DNET) < 0 && - lf_tag_compare(node->next_event, DNET) <= 0) { + if (lf_tag_compare(node->last_DNET, DNET) != 0 && lf_tag_compare(node->next_event, DNET) <= 0) { send_downstream_next_event_tag(node, DNET); } } diff --git a/core/federated/federate.c b/core/federated/federate.c index a96f98598..985f14de2 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -87,7 +87,6 @@ federate_instance_t _fed = {.socket_TCP_RTI = -1, .is_last_TAG_provisional = false, .has_upstream = false, .has_downstream = false, - .last_skipped_LTC = {.time = NEVER, .microstep = 0u}, .last_DNET = {.time = NEVER, .microstep = 0u}, .received_stop_request_from_rti = false, .last_sent_LTC = {.time = NEVER, .microstep = 0u}, @@ -1451,12 +1450,6 @@ static void handle_downstream_next_event_tag() { LF_PRINT_LOG("Received Downstream Next Event Tag (DNET): " PRINTF_TAG ".", DNET.time - start_time, DNET.microstep); _fed.last_DNET = DNET; - - if (lf_tag_compare(_fed.last_skipped_LTC, NEVER_TAG) != 0 && - lf_tag_compare(_fed.last_skipped_LTC, _fed.last_DNET) >= 0) { - send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, _fed.last_skipped_LTC); - _fed.last_skipped_LTC = NEVER_TAG; - } } /** @@ -2230,22 +2223,16 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { } void lf_latest_tag_complete(tag_t tag_to_send) { + environment_t* env; + _lf_get_environments(&env); int compare_with_last_LTC = lf_tag_compare(_fed.last_sent_LTC, tag_to_send); - int compare_with_last_DNET = lf_tag_compare(_fed.last_DNET, tag_to_send); - if (compare_with_last_LTC >= 0) { - return; - } - if (compare_with_last_DNET > 0) { - LF_PRINT_LOG("Skipping Latest Tag Complete (LTC) " PRINTF_TAG " .", tag_to_send.time - start_time, - tag_to_send.microstep); - _fed.last_skipped_LTC = tag_to_send; + if (compare_with_last_LTC >= 0 || !env->need_to_send_LTC) { return; } LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time, tag_to_send.microstep); send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send); _fed.last_sent_LTC = tag_to_send; - _fed.last_skipped_LTC = NEVER_TAG; } parse_rti_code_t lf_parse_rti_addr(const char* rti_addr) { @@ -2391,7 +2378,7 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) // This if statement does not fall through but rather returns. // NET is not bounded by physical time or has no downstream federates. // Normal case. - if (lf_tag_compare(_fed.last_DNET, tag) <= 0 || lf_tag_compare(_fed.last_TAG, tag) < 0) { + if (lf_tag_compare(_fed.last_DNET, tag) < 0 || (_fed.has_upstream && lf_tag_compare(_fed.last_TAG, tag) < 0)) { send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag); _fed.last_sent_NET = tag; LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.", tag.time - start_time, tag.microstep); diff --git a/core/reactor_common.c b/core/reactor_common.c index 33e5582f5..54de1a906 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -210,6 +210,8 @@ void _lf_start_time_step(environment_t* env) { } #endif // FEDERATED_DECENTRALIZED + env->need_to_send_LTC = false; + // Reset absent fields on network ports because // their status is unknown lf_reset_status_fields_on_input_port_triggers(); diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 57f888fc2..4ad2579db 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -886,6 +886,13 @@ void _lf_worker_do_work(environment_t* env, int worker_number) { worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index), current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id, current_reaction_to_execute->deadline); +#ifdef FEDERATED + printf("At tag " PRINTF_TAG ", is input reaction = %d.\n", env->current_tag.time - start_time, + env->current_tag.microstep, current_reaction_to_execute->is_an_input_reaction); + if (current_reaction_to_execute->is_an_input_reaction) { + env->need_to_send_LTC = true; + } +#endif // FEDERATED bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute); diff --git a/include/core/environment.h b/include/core/environment.h index a776dee95..2e6b99e1f 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -105,6 +105,7 @@ typedef struct environment_t { lf_cond_t global_tag_barrier_requestors_reached_zero; #endif // LF_SINGLE_THREADED #if defined(FEDERATED) + bool need_to_send_LTC; tag_t** _lf_intended_tag_fields; int _lf_intended_tag_fields_size; #endif // FEDERATED diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index fd7b23a3d..339fa1e69 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -142,11 +142,6 @@ typedef struct federate_instance_t { */ bool has_downstream; - /** - * - */ - tag_t last_skipped_LTC; - /** * */