Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downstream next event tag (DNET), a new signal for more efficient centralized federated execution #349

Open
wants to merge 54 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
98a569d
Prepare for adding a new message type, Downstream Next Event Tag (DNET)
byeonggiljun Jan 28, 2024
af3d083
Rename upstream and downstream to immediate_upstreams and immediate_d…
byeonggiljun Feb 5, 2024
2373e9f
Use a matrix instead of multiple arrays to store minimum delays of ev…
byeonggiljun Feb 5, 2024
86f3d50
Merge branch 'main' into rti-DNET
byeonggiljun Feb 5, 2024
6b37e36
Save IDs of all downstreams for faster access to the min_dleays matrix
byeonggiljun Feb 7, 2024
eea1edf
Add a function for subtracting tags
byeonggiljun Feb 7, 2024
b4900f3
Calculate and send DNET messages
byeonggiljun Feb 8, 2024
83fafe2
Merge branch 'main' into rti-DNET
byeonggiljun Feb 8, 2024
42fe08f
Remove unnecessary LTC and NET messages by DNET
byeonggiljun Feb 8, 2024
d3e462c
Sends last skipped DNETs when sending T_MSGs
byeonggiljun Feb 10, 2024
3d4653c
Merge branch 'main' into rti-DNET
byeonggiljun Feb 13, 2024
4f2eb91
Merge branch 'main' into rti-DNET
byeonggiljun Feb 13, 2024
9967d9a
Do not send DNET to upstreams of ZDC
byeonggiljun Feb 13, 2024
400d138
Disable the RTI unit tests until it reflects the changes of the RTI
byeonggiljun Feb 13, 2024
1719951
Send NET regardless of DNET if it needs TAG from the RTI
byeonggiljun Feb 13, 2024
a6e2b5d
Using ID of the federate which sends NET to calculate DNET
byeonggiljun Feb 14, 2024
8664909
Move DNET calculation function from tag.h to rti_common.h
byeonggiljun Feb 19, 2024
ccc9b8b
Fix the calculation of DNET candidates
byeonggiljun Feb 19, 2024
e9ae464
Re-enable the RTI unit tests
byeonggiljun Feb 21, 2024
24e1e3b
Merge branch 'main' into rti-DNET
byeonggiljun Feb 25, 2024
800b570
Merge main into branch rti-DNET
byeonggiljun Mar 15, 2024
1c2777d
Merge branch main into branch 'rti-DNET'
byeonggiljun Apr 22, 2024
4ebd09e
Fix a trace point for DNET
byeonggiljun Apr 23, 2024
9a9d120
Skip sending LTCs if a network input reaction has been scheduled at t…
byeonggiljun Apr 23, 2024
9b742af
Maintain a variable for storing the last skipped NET signal
byeonggiljun Apr 27, 2024
a7601ae
Compute TAG values using EIMT
byeonggiljun May 8, 2024
09001d3
Merge branch 'main' into rti-DNET
byeonggiljun May 9, 2024
43ede39
Merge branch main into 'rti-DNET'
byeonggiljun Jun 18, 2024
99aa14f
Revert changes relating to LTC signals
byeonggiljun Jun 18, 2024
6275943
Minor fix
byeonggiljun Jun 18, 2024
013aa0c
Resolve FIXMEs
byeonggiljun Jun 18, 2024
b99ace2
Minor Fix
byeonggiljun Jun 19, 2024
51d6ae0
Merge branch 'main' into rti-DNET
byeonggiljun Jun 25, 2024
34df67b
Merge branch 'main' into rti-DNET
byeonggiljun Jul 2, 2024
6e15213
Merge branch 'main' into rti-DENT
byeonggiljun Aug 9, 2024
925985b
Add an option for turning on and off DNET signal
byeonggiljun Aug 13, 2024
fc970bb
Send a necessary NET even the federate can advance its tag
byeonggiljun Aug 14, 2024
a2ab5bc
Add some comments and remove a FIXME
byeonggiljun Aug 14, 2024
5d52945
Minor refactoring
byeonggiljun Aug 15, 2024
5e09b52
Update comments.
byeonggiljun Aug 15, 2024
530b0cd
Apply suggestions from code review
byeonggiljun Aug 23, 2024
2857ab8
Move function `lf_tag_latest_earlier` to tag.c
byeonggiljun Aug 23, 2024
4507043
Invalidate every node's delay information in `invalidate_min_delays`
byeonggiljun Aug 24, 2024
896e152
Apply suggestions from @edwardalee and minor fix
byeonggiljun Aug 25, 2024
0fab3b6
Run Clang format
byeonggiljun Aug 25, 2024
2bfd833
Merge branch 'main' into rti-DNET
byeonggiljun Aug 30, 2024
da80702
Merge branch 'main' into rti-DNET
edwardalee Oct 13, 2024
05280d0
Merge branch 'main' into rti-DNET
byeonggiljun Oct 17, 2024
a757105
Remove the fields all_upstream and all_downstream
byeonggiljun Oct 18, 2024
adc0a1d
Clang format
byeonggiljun Oct 18, 2024
c2f5316
Turn on DNET signals by default and make the option to turn off the f…
byeonggiljun Oct 19, 2024
2b7cae5
Merge branch 'main' into rti-DNET
byeonggiljun Oct 22, 2024
7f81c7c
Exclude the target node itself when computing DNET
byeonggiljun Oct 28, 2024
2f839ac
Merge branch 'main' into rti-DNET
byeonggiljun Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ void usage(int argc, const char* argv[]) {
lf_print(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n");
lf_print(" -a, --auth Turn on HMAC authentication options.\n");
lf_print(" -t, --tracing Turn on tracing.\n");
lf_print(" -d, --dnet Turn on DNET signals for reducing network messages.\n");
byeonggiljun marked this conversation as resolved.
Show resolved Hide resolved

lf_print("Command given:");
for (int i = 0; i < argc; i++) {
Expand Down Expand Up @@ -263,6 +264,8 @@ int process_args(int argc, const char* argv[]) {
rti.authentication_enabled = true;
} else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) {
rti.base.tracing_enabled = true;
} else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet") == 0) {
rti.base.dnet_enabled = true;
} else if (strcmp(argv[i], " ") == 0) {
// Tolerate spaces
continue;
Expand Down Expand Up @@ -316,9 +319,14 @@ int main(int argc, const char* argv[]) {
assert(rti.base.number_of_scheduling_nodes < UINT16_MAX);

// Allocate memory for the federates
rti.base.scheduling_nodes =
(scheduling_node_t**)calloc(rti.base.number_of_scheduling_nodes, sizeof(scheduling_node_t*));
for (uint16_t i = 0; i < rti.base.number_of_scheduling_nodes; i++) {
int n = rti.base.number_of_scheduling_nodes;
rti.base.scheduling_nodes = (scheduling_node_t**)calloc(n, sizeof(scheduling_node_t*));
// Allocate memory for the array of min_delays.
rti.base.min_delays = (tag_t*)calloc((n * n), sizeof(tag_t));
edwardalee marked this conversation as resolved.
Show resolved Hide resolved
for (uint16_t i = 0; i < n; i++) {
for (uint16_t j = 0; j < n; j++) {
rti.base.min_delays[i + j * n] = FOREVER_TAG;
}
federate_info_t* fed_info = (federate_info_t*)calloc(1, sizeof(federate_info_t));
initialize_federate(fed_info, i);
rti.base.scheduling_nodes[i] = (scheduling_node_t*)fed_info;
Expand Down
252 changes: 205 additions & 47 deletions core/federated/RTI/rti_common.c

Large diffs are not rendered by default.

103 changes: 78 additions & 25 deletions core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,20 @@ typedef struct scheduling_node_t {
tag_t last_granted; // The maximum TAG that has been granted so far (or NEVER if none granted)
tag_t last_provisionally_granted; // The maximum PTAG that has been provisionally granted (or NEVER if none granted)
tag_t next_event; // Most recent NET received from the scheduling node (or NEVER if none received).
tag_t last_DNET; // Most recent DNET.
scheduling_node_state_t state; // State of the scheduling node.
uint16_t* upstream; // Array of upstream scheduling node ids.
interval_t* upstream_delay; // Minimum delay on connections from upstream scheduling nodes.
// Here, NEVER encodes no delay. 0LL is a microstep delay.
int num_upstream; // Size of the array of upstream scheduling nodes and delays.
uint16_t* downstream; // Array of downstream scheduling node ids.
int num_downstream; // Size of the array of downstream scheduling nodes.
execution_mode_t mode; // FAST or REALTIME.
minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node.
size_t num_min_delays; // Size of min_delays array.
int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE
uint16_t* immediate_upstreams; // Array of immediate upstream scheduling node ids.
interval_t* immediate_upstream_delays; // Minimum delay on connections from immdediate upstream scheduling nodes.
// Here, NEVER encodes no delay. 0LL is a microstep delay.
uint16_t num_immediate_upstreams; // Size of the array of immediate upstream scheduling nodes and delays.
uint16_t* immediate_downstreams; // Array of immediate downstream scheduling node ids.
uint16_t num_immediate_downstreams; // Size of the array of immediate downstream scheduling nodes.
execution_mode_t mode; // FAST or REALTIME.
uint16_t* all_upstreams; // Array of all upstream scheduling node ids.
uint16_t num_all_upstreams; // Size of the array of all upstream scheduling nodes and delays.
uint16_t* all_downstreams; // Array of all downstream scheduling node ids.
uint16_t num_all_downstreams; // Size of the array of all downstream scheduling nodes.
byeonggiljun marked this conversation as resolved.
Show resolved Hide resolved
int flags; // One of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE
} scheduling_node_t;

/**
Expand All @@ -76,7 +79,13 @@ typedef struct rti_common_t {
scheduling_node_t** scheduling_nodes;

// Number of scheduling nodes
int32_t number_of_scheduling_nodes;
uint16_t number_of_scheduling_nodes;

// Matrix of minimum delays between pairs of nodes.
// Rows represent upstream nodes and Columns represent downstream nodes.
// FOREVER_TAG means there is no path, and ZERO_TAG means there is no delay.
// This could be NULL if the matrix is not being used, so accesses should test for NULL first.
tag_t* min_delays;

// RTI's decided stop tag for the scheduling nodes
tag_t max_stop_tag;
Expand All @@ -87,6 +96,9 @@ typedef struct rti_common_t {
// Boolean indicating that tracing is enabled.
bool tracing_enabled;

// Boolean indicating that DNET is enabled.
bool dnet_enabled;

// The RTI mutex for making thread-safe access to the shared state.
lf_mutex_t* mutex;
} rti_common_t;
Expand Down Expand Up @@ -246,34 +258,75 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e);
tag_t eimt_strict(scheduling_node_t* e);

/**
* Return true if the node is in a zero-delay cycle.
* For the given scheduling node (enclave or federate), if necessary, update the `min_delays`,
* `all_upstreams`, `num_all_upstreams`, and the fields that indicate cycles. These fields will be
* updated only if they have not been previously updated or if invalidate_min_delays
* has been called since they were last updated.
* @param node The node.
*/
bool is_in_zero_delay_cycle(scheduling_node_t* node);
void update_min_delays_upstream(scheduling_node_t* node);

/**
* Return true if the node is in a cycle (possibly a zero-delay cycle).
* For the given scheduling node (enclave or federate), if necessary, update the `all_downstreams` and
* `num_all_downstreams` fields. These fields will be updated only if they have not been previously updated
* or if invalidate_min_delays has been called since they were last updated.
* @param node The node.
*/
bool is_in_cycle(scheduling_node_t* node);
void update_all_downstreams(scheduling_node_t* node);

/**
* For the given scheduling node (enclave or federate), if necessary, update the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles. These fields will be
* updated only if they have not been previously updated or if invalidate_min_delays_upstream
* has been called since they were last updated.
* Find the tag g that is the latest tag that satisfies lf_tag_add(g, minimum_delay) < next_event_tag.
* This function behaves like the tag subtraction, next_event_tag - minimum_delay.
* minimum_delay cannot be NEVER.
*
* This function is called in function downstream_next_event_tag.
* @param next_event_tag The next event tag of a downstream node.
* @param minimum_delay The minimum delay between the target upstream node and the downstream node.
*/
tag_t get_dnet_candidate(tag_t next_event_tag, tag_t minimum_delay);

/**
* @brief Determine whether the specified scheduling node is needed to receive a downstream next event tag (DNET),
* and, if so, return the details.
*
* This function is called upon receiving a NET from one of the specified node's downstream nodes.
*
* This function calculates the minimum tag M over
* all downstream scheduling nodes of the most recent NET from that node minus the "after delay" (see function
* get_dnet_candidate). If M is earlier than the startup tag, then set the result as the NEVER_TAG.
*
* @param node The target node that may receive a new DNET.
* @param node_sending_new_net_id The ID of the node that sends a new NET. If this node's new NET does not
* change the DNET value, we can exit this function immediately. If it does, we have to look up the target node's
* downstream federates to compute the exact new DNET value.
* @return If needed, return the tag value. Otherwise, return the NEVER_TAG.
*/
tag_t downstream_next_event_tag(scheduling_node_t* node, uint16_t node_sending_new_net_id);

/**
* Notify a downstream next event tag (DNET) signal to the specified scheduling node.
* @param e The target node.
* @param tag The downstream next event tag for e.
*/
void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag);

/**
* Return true if the node is in a zero-delay cycle.
* @param node The node.
*/
void update_min_delays_upstream(scheduling_node_t* node);
bool is_in_zero_delay_cycle(scheduling_node_t* node);

/**
* For the given scheduling node (enclave or federate), invalidate the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles.
* This should be called whenever the structure of the connections upstream of the
* given node have changed.
* Return true if the node is in a cycle (possibly a zero-delay cycle).
* @param node The node.
*/
void invalidate_min_delays_upstream(scheduling_node_t* node);
bool is_in_cycle(scheduling_node_t* node);

/**
* Invalidate the `min_delays`, `num_min_delays`, and the fields that indicate cycles
* of all nodes. This should be called whenever the structure of the connections have changed.
*/
void invalidate_min_delays();

/**
* Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself.
Expand Down
12 changes: 8 additions & 4 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ void initialize_local_rti(environment_t* envs, int num_envs) {
rti_local->base.scheduling_nodes[i] = (scheduling_node_t*)enclave_info;

// Encode the connection topology into the enclave_info object.
enclave_info->base.num_downstream = lf_get_downstream_of(i, &enclave_info->base.downstream);
enclave_info->base.num_upstream = lf_get_upstream_of(i, &enclave_info->base.upstream);
lf_get_upstream_delay_of(i, &enclave_info->base.upstream_delay);
enclave_info->base.num_immediate_downstreams = _lf_get_downstream_of(i, &enclave_info->base.immediate_downstreams);
enclave_info->base.num_immediate_upstreams = _lf_get_upstream_of(i, &enclave_info->base.immediate_upstreams);
_lf_get_upstream_delay_of(i, &enclave_info->base.immediate_upstream_delays);

enclave_info->base.state = GRANTED;
}
Expand Down Expand Up @@ -112,7 +112,7 @@ tag_t rti_next_event_tag_locked(enclave_info_t* e, tag_t next_event_tag) {
}

// If this enclave has no upstream, then we give a TAG till forever straight away.
if (e->base.num_upstream == 0) {
if (e->base.num_immediate_upstreams == 0) {
LF_PRINT_LOG("RTI: enclave %u has no upstream. Giving it a to the NET", e->base.id);
e->base.last_granted = next_event_tag;
}
Expand Down Expand Up @@ -191,6 +191,10 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
LF_PRINT_LOG("RTI: enclave %u callback with PTAG " PRINTF_TAG " ", e->id, tag.time - lf_time_start(), tag.microstep);
}

void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) {
// Nothing to do here.
}

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
// Nothing to do here.
}
Expand Down
92 changes: 60 additions & 32 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
// Note that this is transitive.
// NOTE: This is not needed for enclaves because zero-delay loops are prohibited.
// It's only needed for federates, which is why this is implemented here.
for (int j = 0; j < e->num_upstream; j++) {
scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]];
for (int j = 0; j < e->num_immediate_upstreams; j++) {
scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->immediate_upstreams[j]];

// Ignore this federate if it has resigned.
if (upstream->state == NOT_CONNECTED)
Expand All @@ -272,6 +272,35 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
}
}

void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) {
if (e->state == NOT_CONNECTED) {
return;
}
// Need to make sure that the destination federate's thread has already
// sent the starting MSG_TYPE_TIMESTAMP message.
while (e->state == PENDING) {
// Need to wait here.
lf_cond_wait(&sent_start_time);
}
size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t);
unsigned char buffer[message_length];
buffer[0] = MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG;
encode_int64(tag.time, &(buffer[1]));
encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)]));

if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_DNET, e->id, &tag);
}
if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) {
lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
e->last_DNET = tag;
LF_PRINT_LOG("RTI sent to federate %d the Downstream Next Event Tag (DNET) " PRINTF_TAG ".", e->id,
tag.time - start_time, tag.microstep);
}
}

void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_event_tag) {
federate_info_t* fed = GET_FED_INFO(federate_id);
tag_t min_in_transit_tag = pqueue_tag_peek_tag(fed->in_transit_message_tags);
Expand Down Expand Up @@ -1325,31 +1354,29 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) {
} else {
federate_info_t* fed = GET_FED_INFO(fed_id);
// Read the number of upstream and downstream connections
fed->enclave.num_upstream = extract_int32(&(connection_info_header[1]));
fed->enclave.num_downstream = extract_int32(&(connection_info_header[1 + sizeof(int32_t)]));
LF_PRINT_DEBUG("RTI got %d upstreams and %d downstreams from federate %d.", fed->enclave.num_upstream,
fed->enclave.num_downstream, fed_id);
fed->enclave.num_immediate_upstreams = extract_int32(&(connection_info_header[1]));
fed->enclave.num_immediate_downstreams = extract_int32(&(connection_info_header[1 + sizeof(int32_t)]));
LF_PRINT_DEBUG("RTI got %d upstreams and %d downstreams from federate %d.", fed->enclave.num_immediate_upstreams,
fed->enclave.num_immediate_downstreams, fed_id);

// Allocate memory for the upstream and downstream pointers
if (fed->enclave.num_upstream > 0) {
fed->enclave.upstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream);
LF_ASSERT_NON_NULL(fed->enclave.upstream);
if (fed->enclave.num_immediate_upstreams > 0) {
fed->enclave.immediate_upstreams = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_immediate_upstreams);
// Allocate memory for the upstream delay pointers
fed->enclave.upstream_delay = (interval_t*)malloc(sizeof(interval_t) * fed->enclave.num_upstream);
LF_ASSERT_NON_NULL(fed->enclave.upstream_delay);
fed->enclave.immediate_upstream_delays =
(interval_t*)malloc(sizeof(interval_t) * fed->enclave.num_immediate_upstreams);
} else {
fed->enclave.upstream = (uint16_t*)NULL;
fed->enclave.upstream_delay = (interval_t*)NULL;
fed->enclave.immediate_upstreams = (uint16_t*)NULL;
fed->enclave.immediate_upstream_delays = (interval_t*)NULL;
}
if (fed->enclave.num_downstream > 0) {
fed->enclave.downstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream);
LF_ASSERT_NON_NULL(fed->enclave.downstream);
if (fed->enclave.num_immediate_downstreams > 0) {
fed->enclave.immediate_downstreams = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_immediate_downstreams);
} else {
fed->enclave.downstream = (uint16_t*)NULL;
fed->enclave.immediate_downstreams = (uint16_t*)NULL;
}

size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_upstream) +
(sizeof(uint16_t) * fed->enclave.num_downstream);
size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_immediate_upstreams) +
(sizeof(uint16_t) * fed->enclave.num_immediate_downstreams);
unsigned char* connections_info_body = NULL;
if (connections_info_body_size > 0) {
connections_info_body = (unsigned char*)malloc(connections_info_body_size);
Expand All @@ -1360,16 +1387,16 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) {
// Keep track of where we are in the buffer
size_t message_head = 0;
// First, read the info about upstream federates
for (int i = 0; i < fed->enclave.num_upstream; i++) {
fed->enclave.upstream[i] = extract_uint16(&(connections_info_body[message_head]));
for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) {
fed->enclave.immediate_upstreams[i] = extract_uint16(&(connections_info_body[message_head]));
message_head += sizeof(uint16_t);
fed->enclave.upstream_delay[i] = extract_int64(&(connections_info_body[message_head]));
fed->enclave.immediate_upstream_delays[i] = extract_int64(&(connections_info_body[message_head]));
message_head += sizeof(int64_t);
}

// Next, read the info about downstream federates
for (int i = 0; i < fed->enclave.num_downstream; i++) {
fed->enclave.downstream[i] = extract_uint16(&(connections_info_body[message_head]));
for (int i = 0; i < fed->enclave.num_immediate_downstreams; i++) {
fed->enclave.immediate_downstreams[i] = extract_uint16(&(connections_info_body[message_head]));
message_head += sizeof(uint16_t);
}

Expand Down Expand Up @@ -1738,6 +1765,7 @@ void initialize_RTI(rti_remote_t* rti) {
rti_remote->clock_sync_exchanges_per_interval = 10;
rti_remote->authentication_enabled = false;
rti_remote->base.tracing_enabled = false;
rti_remote->base.dnet_enabled = false;
rti_remote->stop_in_progress = false;
}

Expand All @@ -1748,17 +1776,17 @@ void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
invalidate_min_delays();
for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) {
scheduling_node_t* node = scheduling_nodes[i];
if (node->upstream != NULL) {
free(node->upstream);
free(node->upstream_delay);
}
if (node->min_delays != NULL) {
free(node->min_delays);
if (node->immediate_upstreams != NULL) {
free(node->immediate_upstreams);
free(node->immediate_upstream_delays);
// free(node->all_upstreams);
}
if (node->downstream != NULL) {
free(node->downstream);
if (node->immediate_downstreams != NULL) {
free(node->immediate_downstreams);
// free(node->all_downstreams);
}
free(node);
}
Expand Down
1 change: 1 addition & 0 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ typedef struct rti_remote_t {
* Boolean indicating that authentication is enabled.
*/
bool authentication_enabled;

/**
* Boolean indicating that a stop request is already in progress.
*/
Expand Down
Loading
Loading