Skip to content

Commit

Permalink
Skip sending LTCs if a network input reaction has been scheduled at t…
Browse files Browse the repository at this point in the history
…he current tag
  • Loading branch information
byeonggiljun committed Apr 23, 2024
1 parent 4ebd09e commit 1241f87
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 24 deletions.
2 changes: 2 additions & 0 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi
env->_lf_intended_tag_fields = NULL;
env->_lf_intended_tag_fields_size = 0;
}
#elif FEDERATED_CENTRALIZED
env->need_to_send_LTC = false;
#else
(void)env;
(void)num_is_present_fields;
Expand Down
3 changes: 1 addition & 2 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,7 @@ void send_downstream_next_event_tag_if_needed(scheduling_node_t* node, uint16_t
DNET = NEVER_TAG;
}

if (lf_tag_compare(node->last_DNET, DNET) != 0 && lf_tag_compare(node->completed, DNET) < 0 &&
lf_tag_compare(node->next_event, DNET) <= 0) {
if (lf_tag_compare(node->last_DNET, DNET) != 0 && lf_tag_compare(node->next_event, DNET) <= 0) {
send_downstream_next_event_tag(node, DNET);
}
}
Expand Down
21 changes: 4 additions & 17 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ federate_instance_t _fed = {.socket_TCP_RTI = -1,
.is_last_TAG_provisional = false,
.has_upstream = false,
.has_downstream = false,
.last_skipped_LTC = {.time = NEVER, .microstep = 0u},
.last_DNET = {.time = NEVER, .microstep = 0u},
.received_stop_request_from_rti = false,
.last_sent_LTC = {.time = NEVER, .microstep = 0u},
Expand Down Expand Up @@ -1451,12 +1450,6 @@ static void handle_downstream_next_event_tag() {
LF_PRINT_LOG("Received Downstream Next Event Tag (DNET): " PRINTF_TAG ".", DNET.time - start_time, DNET.microstep);

_fed.last_DNET = DNET;

if (lf_tag_compare(_fed.last_skipped_LTC, NEVER_TAG) != 0 &&
lf_tag_compare(_fed.last_skipped_LTC, _fed.last_DNET) >= 0) {
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, _fed.last_skipped_LTC);
_fed.last_skipped_LTC = NEVER_TAG;
}
}

/**
Expand Down Expand Up @@ -2230,22 +2223,16 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
}

void lf_latest_tag_complete(tag_t tag_to_send) {
environment_t* env;
_lf_get_environments(&env);
int compare_with_last_LTC = lf_tag_compare(_fed.last_sent_LTC, tag_to_send);
int compare_with_last_DNET = lf_tag_compare(_fed.last_DNET, tag_to_send);
if (compare_with_last_LTC >= 0) {
return;
}
if (compare_with_last_DNET > 0) {
LF_PRINT_LOG("Skipping Latest Tag Complete (LTC) " PRINTF_TAG " .", tag_to_send.time - start_time,
tag_to_send.microstep);
_fed.last_skipped_LTC = tag_to_send;
if (compare_with_last_LTC >= 0 || !env->need_to_send_LTC) {
return;
}
LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time,
tag_to_send.microstep);
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
_fed.last_sent_LTC = tag_to_send;
_fed.last_skipped_LTC = NEVER_TAG;
}

parse_rti_code_t lf_parse_rti_addr(const char* rti_addr) {
Expand Down Expand Up @@ -2391,7 +2378,7 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// This if statement does not fall through but rather returns.
// NET is not bounded by physical time or has no downstream federates.
// Normal case.
if (lf_tag_compare(_fed.last_DNET, tag) <= 0 || lf_tag_compare(_fed.last_TAG, tag) < 0) {
if (lf_tag_compare(_fed.last_DNET, tag) < 0 || (_fed.has_upstream && lf_tag_compare(_fed.last_TAG, tag) < 0)) {
send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag);
_fed.last_sent_NET = tag;
LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.", tag.time - start_time, tag.microstep);
Expand Down
2 changes: 2 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ void _lf_start_time_step(environment_t* env) {
}
#endif // FEDERATED_DECENTRALIZED

env->need_to_send_LTC = false;

// Reset absent fields on network ports because
// their status is unknown
lf_reset_status_fields_on_input_port_triggers();
Expand Down
7 changes: 7 additions & 0 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,13 @@ void _lf_worker_do_work(environment_t* env, int worker_number) {
worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index),
current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id,
current_reaction_to_execute->deadline);
#ifdef FEDERATED
printf("At tag " PRINTF_TAG ", is input reaction = %d.\n", env->current_tag.time - start_time,
env->current_tag.microstep, current_reaction_to_execute->is_an_input_reaction);
if (current_reaction_to_execute->is_an_input_reaction) {
env->need_to_send_LTC = true;
}
#endif // FEDERATED

bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute);

Expand Down
1 change: 1 addition & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ typedef struct environment_t {
lf_cond_t global_tag_barrier_requestors_reached_zero;
#endif // LF_SINGLE_THREADED
#if defined(FEDERATED)
bool need_to_send_LTC;
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
#endif // FEDERATED
Expand Down
5 changes: 0 additions & 5 deletions include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,6 @@ typedef struct federate_instance_t {
*/
bool has_downstream;

/**
*
*/
tag_t last_skipped_LTC;

/**
*
*/
Expand Down

0 comments on commit 1241f87

Please sign in to comment.