diff --git a/rust/rti/src/constants.rs b/rust/rti/src/constants.rs index f85c5df..49b6f4f 100644 --- a/rust/rti/src/constants.rs +++ b/rust/rti/src/constants.rs @@ -8,5 +8,3 @@ */ pub const STARTING_PORT: u16 = 15045; - -pub const INET_ADDRSTRLEN: usize = 16; diff --git a/rust/rti/src/enclave.rs b/rust/rti/src/enclave.rs deleted file mode 100644 index 4123efa..0000000 --- a/rust/rti/src/enclave.rs +++ /dev/null @@ -1,886 +0,0 @@ -use crate::net_common::MsgType; -use crate::net_util::NetUtil; -use crate::tag; -use crate::tag::{Instant, Interval, Tag}; -use crate::FedState::*; -use crate::Federate; -/** - * @file enclave.rs - * @author Edward A. Lee (eal@berkeley.edu) - * @author Soroush Bateni (soroush@utdallas.edu) - * @author Erling Jellum (erling.r.jellum@ntnu.no) - * @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn) - * @author Chanhee Lee (chanheel@asu.edu) - * @author Hokeun Kim (hokeun@asu.edu) - * @copyright (c) 2020-2023, The University of California at Berkeley - * License in [BSD 2-clause](..) - * @brief Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs. - * This file extends enclave.h with RTI features that are specific to federations and are not - * used by scheduling enclaves. - */ -use crate::FederationRTI; - -use std::io::Write; -use std::mem; -use std::sync::{Arc, Condvar, Mutex}; - -enum ExecutionMode { - FAST, - REALTIME, -} - -#[derive(PartialEq, Clone, Debug)] -pub enum FedState { - NotConnected, // The federate has not connected. - Granted, // Most recent MsgType::NextEventTag has been granted. - Pending, // Waiting for upstream federates. -} - -struct TagAdvanceGrant { - tag: Tag, - is_provisional: bool, -} - -impl TagAdvanceGrant { - pub fn new(tag: Tag, is_provisional: bool) -> TagAdvanceGrant { - TagAdvanceGrant { - tag, - is_provisional, - } - } - - pub fn tag(&self) -> Tag { - self.tag.clone() - } - - pub fn is_provisional(&self) -> bool { - self.is_provisional - } - - pub fn set_tag(&mut self, tag: Tag) { - self.tag = tag.clone(); - } - - pub fn set_provisional(&mut self, is_provisional: bool) { - self.is_provisional = is_provisional; - } -} - -pub struct Enclave { - id: u16, // ID of this enclave. - completed: Tag, // The largest logical tag completed by the federate (or NEVER if no LTC has been received). - last_granted: Tag, // The maximum Tag that has been granted so far (or NEVER if none granted) - last_provisionally_granted: Tag, // The maximum PTAG that has been provisionally granted (or NEVER if none granted) - next_event: Tag, // Most recent NET received from the federate (or NEVER if none received). - state: FedState, // State of the federate. - upstream: Vec, // Array of upstream federate ids. - upstream_delay: Vec, // Minimum delay on connections from upstream federates. - // Here, NEVER encodes no delay. 0LL is a microstep delay. - num_upstream: i32, // Size of the array of upstream federates and delays. - downstream: Vec, // Array of downstream federate ids. - num_downstream: i32, // Size of the array of downstream federates. - mode: ExecutionMode, // FAST or REALTIME. - // TODO: lf_cond_t next_event_condition; // Condition variable used by enclaves to notify an enclave - // that it's call to next_event_tag() should unblock. -} - -impl Enclave { - pub fn new() -> Enclave { - Enclave { - id: 0, - completed: Tag::never_tag(), - last_granted: Tag::never_tag(), - last_provisionally_granted: Tag::never_tag(), - next_event: Tag::never_tag(), - state: FedState::NotConnected, - upstream: Vec::new(), - upstream_delay: Vec::new(), - num_upstream: 0, - downstream: Vec::new(), - num_downstream: 0, - mode: ExecutionMode::REALTIME, - // TODO: lf_cond_t next_event_condition; - } - } - - pub fn initialize_enclave(&mut self, id: u16) { - self.id = id; - // Initialize the next event condition variable. - // TODO: lf_cond_init(&e->next_event_condition, &rti_mutex); - } - - pub fn id(&self) -> u16 { - self.id - } - - pub fn completed(&self) -> Tag { - self.completed.clone() - } - - pub fn last_granted(&self) -> Tag { - self.last_granted.clone() - } - - pub fn last_provisionally_granted(&self) -> Tag { - self.last_provisionally_granted.clone() - } - - pub fn next_event(&self) -> Tag { - self.next_event.clone() - } - - pub fn state(&self) -> FedState { - self.state.clone() - } - - pub fn upstream(&self) -> &Vec { - &self.upstream - } - - pub fn upstream_delay(&self) -> &Vec { - &self.upstream_delay - } - - pub fn num_upstream(&self) -> i32 { - self.num_upstream - } - - pub fn downstream(&self) -> &Vec { - &self.downstream - } - - pub fn num_downstream(&self) -> i32 { - self.num_downstream - } - - pub fn set_last_granted(&mut self, tag: Tag) { - self.last_granted = tag; - } - - pub fn set_last_provisionally_granted(&mut self, tag: Tag) { - self.last_provisionally_granted = tag; - } - - pub fn set_next_event(&mut self, next_event_tag: Tag) { - self.next_event = next_event_tag; - } - - pub fn set_state(&mut self, state: FedState) { - self.state = state; - } - - pub fn set_upstream_id_at(&mut self, upstream_id: u16, idx: usize) { - self.upstream.insert(idx, upstream_id as i32); - } - - pub fn set_completed(&mut self, completed: Tag) { - self.completed = completed.clone() - } - - pub fn set_upstream_delay_at(&mut self, upstream_delay: tag::Interval, idx: usize) { - self.upstream_delay.insert(idx, upstream_delay); - } - - pub fn set_num_upstream(&mut self, num_upstream: i32) { - self.num_upstream = num_upstream; - } - - pub fn set_downstream_id_at(&mut self, downstream_id: u16, idx: usize) { - self.downstream.insert(idx, downstream_id as i32); - } - - pub fn set_num_downstream(&mut self, num_downstream: i32) { - self.num_downstream = num_downstream; - } - - pub fn update_enclave_next_event_tag_locked( - _f_rti: Arc>, - fed_id: u16, - next_event_tag: Tag, - start_time: Instant, - sent_start_time: Arc<(Mutex, Condvar)>, - ) { - let id; - let num_upstream; - let number_of_enclaves; - { - let mut locked_rti = _f_rti.lock().unwrap(); - number_of_enclaves = locked_rti.number_of_enclaves(); - let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; - let e = fed.enclave(); - e.set_next_event(next_event_tag.clone()); - - id = e.id(); - num_upstream = e.num_upstream(); - } - println!( - "RTI: Updated the recorded next event tag for federate/enclave {} to ({},{})", - id, - next_event_tag.time() - start_time, - next_event_tag.microstep() - ); - - // Check to see whether we can reply now with a tag advance grant. - // If the enclave has no upstream enclaves, then it does not wait for - // nor expect a reply. It just proceeds to advance time. - if num_upstream > 0 { - Self::notify_advance_grant_if_safe( - _f_rti.clone(), - fed_id, - number_of_enclaves, - start_time, - sent_start_time.clone(), - ); - } - // Check downstream enclaves to see whether they should now be granted a TAG. - // To handle cycles, need to create a boolean array to keep - // track of which upstream enclaves have been visited. - let mut visited = vec![false as bool; number_of_enclaves as usize]; // Initializes to 0. - Self::notify_downstream_advance_grant_if_safe( - _f_rti.clone(), - fed_id, - number_of_enclaves, - start_time, - &mut visited, - sent_start_time, - ); - } - - fn notify_advance_grant_if_safe( - _f_rti: Arc>, - fed_id: u16, - number_of_enclaves: i32, - start_time: Instant, - sent_start_time: Arc<(Mutex, Condvar)>, - ) { - let grant = - Self::tag_advance_grant_if_safe(_f_rti.clone(), fed_id, number_of_enclaves, start_time); - if Tag::lf_tag_compare(&grant.tag(), &Tag::never_tag()) != 0 { - if grant.is_provisional() { - Self::notify_provisional_tag_advance_grant( - _f_rti, - fed_id, - number_of_enclaves, - grant.tag(), - start_time, - sent_start_time, - ); - } else { - Self::notify_tag_advance_grant( - _f_rti, - fed_id, - grant.tag(), - start_time, - sent_start_time, - ); - } - } - } - - fn tag_advance_grant_if_safe( - _f_rti: Arc>, - fed_id: u16, - number_of_enclaves: i32, - start_time: Instant, - ) -> TagAdvanceGrant { - let mut result = TagAdvanceGrant::new(Tag::never_tag(), false); - - // Find the earliest LTC of upstream enclaves (M). - { - let mut min_upstream_completed = Tag::forever_tag(); - let mut locked_rti = _f_rti.lock().unwrap(); - let idx: usize = fed_id.into(); - let enclaves = locked_rti.enclaves(); - let fed = &enclaves[idx]; - let e = fed.e(); - let upstreams = e.upstream(); - let upstream_delay = e.upstream_delay(); - for j in 0..upstreams.len() { - let delay = upstream_delay[j]; - // FIXME: Replace "as usize" properly. - let upstream = &enclaves[upstreams[j] as usize].e(); - // Ignore this enclave if it no longer connected. - if upstream.state() == FedState::NotConnected { - continue; - } - - // Adjust by the "after" delay. - // Note that "no delay" is encoded as NEVER, - // whereas one microstep delay is encoded as 0LL. - let candidate = Tag::lf_delay_strict(&upstream.completed(), delay); - - if Tag::lf_tag_compare(&candidate, &min_upstream_completed) < 0 { - min_upstream_completed = candidate.clone(); - } - } - println!( - "Minimum upstream LTC for federate/enclave {} is ({},{}) (adjusted by after delay).", - e.id(), - // FIXME: Check the below calculation - min_upstream_completed.time(), // - start_time, - min_upstream_completed.microstep() - ); - if Tag::lf_tag_compare(&min_upstream_completed, &e.last_granted()) > 0 - && Tag::lf_tag_compare(&min_upstream_completed, &e.next_event()) >= 0 - // The enclave has to advance its tag - { - result.set_tag(min_upstream_completed); - return result; - } - } - - // Can't make progress based only on upstream LTCs. - // If all (transitive) upstream enclaves of the enclave - // have earliest event tags such that the - // enclave can now advance its tag, then send it a TAG message. - // Find the earliest event time of each such upstream enclave, - // adjusted by delays on the connections. - - // To handle cycles, need to create a boolean array to keep - // track of which upstream enclave have been visited. - let mut visited = vec![false as bool; number_of_enclaves.try_into().unwrap()]; - - // Find the tag of the earliest possible incoming message from - // upstream enclaves. - let mut t_d_nonzero_delay = Tag::forever_tag(); - // The tag of the earliest possible incoming message from a zero-delay connection. - // Delayed connections are not guarded from STP violations by the MLAA; this property is - // acceptable because delayed connections impose no deadlock risk and in some cases (startup) - // this property is necessary to avoid deadlocks. However, it requires some special care here - // when potentially sending a PTAG because we must not send a PTAG for a tag at which data may - // still be received over nonzero-delay connections. - let mut t_d_zero_delay = Tag::forever_tag(); - println!( - "NOTE: FOREVER is displayed as ({},{}) and NEVER as ({},{})", - i64::MAX - start_time, - u32::MAX, - // FIXME: Check the below calculation - i64::MIN + i64::MAX + i64::MAX - start_time + 2, - 0 - ); - - let next_event_tag; - let last_provisionally_granted_tag; - let last_granted_tag; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let idx: usize = fed_id.into(); - let enclaves = locked_rti.enclaves(); - let fed = &enclaves[idx]; - let e = fed.e(); - next_event_tag = e.next_event(); - last_provisionally_granted_tag = e.last_provisionally_granted(); - last_granted_tag = e.last_granted(); - let upstreams = e.upstream(); - for j in 0..upstreams.len() { - let upstream = &enclaves[j].e(); - - // Ignore this enclave if it is no longer connected. - if upstream.state() == FedState::NotConnected { - continue; - } - - // Find the (transitive) next event tag upstream. - let upstream_next_event = Self::transitive_next_event( - enclaves, - upstream, - upstream.next_event(), - &mut visited, - start_time, - ); - - println!( - "Earliest next event upstream of fed/encl {} at fed/encl {} has tag ({},{}).", - e.id(), - upstream.id(), - upstream_next_event.time() - start_time, - upstream_next_event.microstep() - ); - - // Adjust by the "after" delay. - // Note that "no delay" is encoded as NEVER, - // whereas one microstep delay is encoded as 0LL. - // FIXME: Replace "as usize" properly. - let candidate = Tag::lf_delay_strict(&upstream_next_event, e.upstream_delay[j]); - - if e.upstream_delay[j] == Some(i64::MIN) { - if Tag::lf_tag_compare(&candidate, &t_d_zero_delay) < 0 { - t_d_zero_delay = candidate; - } - } else { - if Tag::lf_tag_compare(&candidate, &t_d_nonzero_delay) < 0 { - t_d_nonzero_delay = candidate; - } - } - } - } - - let t_d; - if Tag::lf_tag_compare(&t_d_zero_delay, &t_d_nonzero_delay) < 0 { - t_d = t_d_zero_delay.clone(); - } else { - t_d = t_d_nonzero_delay.clone(); - } - println!( - "Earliest next event upstream has tag ({},{}).", - t_d.time() - start_time, - t_d.microstep() - ); - - println!("t_d={}, e.next_event={}", t_d.time(), next_event_tag.time()); - println!( - "t_d={}, e.last_provisionally_granted={}", - t_d.time(), - last_provisionally_granted_tag.time() - ); - println!( - "t_d={}, e.last_granted={}", - t_d.time(), - last_granted_tag.time() - ); - if Tag::lf_tag_compare(&t_d, &next_event_tag) > 0 // The enclave has something to do. - && Tag::lf_tag_compare(&t_d, &last_provisionally_granted_tag) >= 0 // The grant is not redundant - // (equal is important to override any previous - // PTAGs). - && Tag::lf_tag_compare(&t_d, &last_granted_tag) > 0 - // The grant is not redundant. - { - // All upstream enclaves have events with a larger tag than fed, so it is safe to send a TAG. - println!("Earliest upstream message time for fed/encl {} is ({},{}) (adjusted by after delay). Granting tag advance for ({},{})", - fed_id, - t_d.time() - start_time, t_d.microstep(), - next_event_tag.time(), // - start_time, - next_event_tag.microstep()); - result.set_tag(next_event_tag); - } else if Tag::lf_tag_compare(&t_d_zero_delay, &next_event_tag) == 0 // The enclave has something to do. - && Tag::lf_tag_compare(&t_d_zero_delay, &t_d_nonzero_delay) < 0 // The statuses of nonzero-delay connections are known at tag t_d_zero_delay - && Tag::lf_tag_compare(&t_d_zero_delay, &last_provisionally_granted_tag) > 0 // The grant is not redundant. - && Tag::lf_tag_compare(&t_d_zero_delay, &last_granted_tag) > 0 - // The grant is not redundant. - { - // Some upstream enclaves has an event that has the same tag as fed's next event, so we can only provisionally - // grant a TAG (via a PTAG). - println!("Earliest upstream message time for fed/encl {} is ({},{}) (adjusted by after delay). Granting provisional tag advance.", - fed_id, - t_d_zero_delay.time() - start_time, t_d_zero_delay.microstep()); - result.set_tag(t_d_zero_delay); - result.set_provisional(true); - } - - result - } - - fn transitive_next_event( - enclaves: &Vec, - e: &Enclave, - candidate: Tag, - visited: &mut Vec, - start_time: Instant, - ) -> Tag { - // FIXME: Replace "as usize" properly. - if visited[e.id() as usize] || e.state() == FedState::NotConnected { - // Enclave has stopped executing or we have visited it before. - // No point in checking upstream enclaves. - return candidate.clone(); - } - - // FIXME: Replace "as usize" properly. - visited[e.id() as usize] = true; - let mut result = e.next_event(); - - // If the candidate is less than this enclave's next_event, use the candidate. - if Tag::lf_tag_compare(&candidate, &result) < 0 { - result = candidate.clone(); - } - - // The result cannot be earlier than the start time. - if result.time() < start_time { - // Earliest next event cannot be before the start time. - result = Tag::new(start_time, 0); - } - - // Check upstream enclaves to see whether any of them might send - // an event that would result in an earlier next event. - for i in 0..e.upstream().len() { - // FIXME: Replace "as usize" properly. - let upstream = enclaves[e.upstream()[i] as usize].e(); - let mut upstream_result = Self::transitive_next_event( - enclaves, - upstream, - result.clone(), - visited, - start_time, - ); - - // Add the "after" delay of the connection to the result. - upstream_result = Tag::lf_delay_tag(&upstream_result, e.upstream_delay()[i]); - - // If the adjusted event time is less than the result so far, update the result. - if Tag::lf_tag_compare(&upstream_result, &result) < 0 { - result = upstream_result; - } - } - let completed = e.completed(); - if Tag::lf_tag_compare(&result, &completed) < 0 { - result = completed; - } - - result - } - - fn notify_tag_advance_grant( - _f_rti: Arc>, - fed_id: u16, - tag: Tag, - start_time: Instant, - sent_start_time: Arc<(Mutex, Condvar)>, - ) { - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - let idx: usize = fed_id.into(); - let fed: &Federate = &enclaves[idx]; - let e = fed.e(); - if e.state() == FedState::NotConnected - || Tag::lf_tag_compare(&tag, &e.last_granted()) <= 0 - || Tag::lf_tag_compare(&tag, &e.last_provisionally_granted()) <= 0 - { - return; - } - // Need to make sure that the destination federate's thread has already - // sent the starting MSG_TYPE_TIMESTAMP message. - while e.state() == FedState::Pending { - // Need to wait here. - let (lock, condvar) = &*sent_start_time; - let mut notified = lock.lock().unwrap(); - while !*notified { - notified = condvar.wait(notified).unwrap(); - } - } - } - let message_length = 1 + mem::size_of::() + mem::size_of::(); - // FIXME: Replace "as usize" properly. - let mut buffer = vec![0 as u8; message_length as usize]; - buffer[0] = MsgType::TagAdvanceGrant.to_byte(); - NetUtil::encode_int64(tag.time(), &mut buffer, 1); - // FIXME: Replace "as i32" properly. - NetUtil::encode_int32( - tag.microstep() as i32, - &mut buffer, - 1 + mem::size_of::(), - ); - - // This function is called in notify_advance_grant_if_safe(), which is a long - // function. During this call, the socket might close, causing the following write_to_socket - // to fail. Consider a failure here a soft failure and update the federate's status. - let mut error_occurred = false; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - // FIXME: Replace "as usize" properly. - let fed: &Federate = &enclaves[fed_id as usize]; - let e = fed.e(); - let mut stream = fed.stream().as_ref().unwrap(); - match stream.write(&buffer) { - Ok(bytes_written) => { - if bytes_written < message_length { - println!( - "RTI failed to send tag advance grant to federate {}.", - e.id() - ); - } - } - Err(_err) => { - error_occurred = true; - } - } - } - { - let mut locked_rti = _f_rti.lock().unwrap(); - // FIXME: Replace "as usize" properly. - let mut_fed: &mut Federate = &mut locked_rti.enclaves()[fed_id as usize]; - let enclave = mut_fed.enclave(); - if error_occurred { - enclave.set_state(FedState::NotConnected); - // FIXME: We need better error handling, but don't stop other execution here. - } else { - enclave.set_last_granted(tag.clone()); - println!( - "RTI sent to federate {} the Tag Advance Grant (TAG) ({},{}).", - enclave.id(), - tag.time() - start_time, - tag.microstep() - ); - } - } - } - - fn notify_provisional_tag_advance_grant( - _f_rti: Arc>, - fed_id: u16, - number_of_enclaves: i32, - tag: Tag, - start_time: Instant, - sent_start_time: Arc<(Mutex, Condvar)>, - ) { - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - let idx: usize = fed_id.into(); - let fed: &Federate = &enclaves[idx]; - let e = fed.e(); - if e.state() == FedState::NotConnected - || Tag::lf_tag_compare(&tag, &e.last_granted()) <= 0 - || Tag::lf_tag_compare(&tag, &e.last_provisionally_granted()) <= 0 - { - return; - } - // Need to make sure that the destination federate's thread has already - // sent the starting MSG_TYPE_TIMESTAMP message. - while e.state() == FedState::Pending { - // Need to wait here. - let (lock, condvar) = &*sent_start_time; - let mut notified = lock.lock().unwrap(); - while !*notified { - notified = condvar.wait(notified).unwrap(); - } - } - } - let message_length = 1 + mem::size_of::() + mem::size_of::(); - // FIXME: Replace "as usize" properly. - let mut buffer = vec![0 as u8; message_length as usize]; - buffer[0] = MsgType::PropositionalTagAdvanceGrant.to_byte(); - NetUtil::encode_int64(tag.time(), &mut buffer, 1); - NetUtil::encode_int32( - tag.microstep().try_into().unwrap(), - &mut buffer, - 1 + mem::size_of::(), - ); - - // This function is called in notify_advance_grant_if_safe(), which is a long - // function. During this call, the socket might close, causing the following write_to_socket - // to fail. Consider a failure here a soft failure and update the federate's status. - let mut error_occurred = false; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - // FIXME: Replace "as usize" properly. - let fed: &Federate = &enclaves[fed_id as usize]; - let e = fed.e(); - let mut stream = fed.stream().as_ref().unwrap(); - match stream.write(&buffer) { - Ok(bytes_written) => { - if bytes_written < message_length { - println!( - "RTI failed to send tag advance grant to federate {}.", - e.id() - ); - return; - } - } - Err(_err) => { - error_occurred = true; - } - } - } - { - let mut locked_rti = _f_rti.lock().unwrap(); - // FIXME: Replace "as usize" properly. - let mut_fed: &mut Federate = &mut locked_rti.enclaves()[fed_id as usize]; - let enclave = mut_fed.enclave(); - if error_occurred { - enclave.set_state(FedState::NotConnected); - // FIXME: We need better error handling, but don't stop other execution here. - } - - enclave.set_last_provisionally_granted(tag.clone()); - println!( - "RTI sent to federate {} the Provisional Tag Advance Grant (PTAG) ({},{}).", - enclave.id(), - tag.time() - start_time, - tag.microstep() - ); - } - - // Send PTAG to all upstream federates, if they have not had - // a later or equal PTAG or TAG sent previously and if their transitive - // NET is greater than or equal to the tag. - // NOTE: This could later be replaced with a TNET mechanism once - // we have an available encoding of causality interfaces. - // That might be more efficient. - let num_upstream; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - let idx: usize = fed_id.into(); - let fed: &Federate = &enclaves[idx]; - let e = fed.e(); - num_upstream = e.num_upstream(); - } - for j in 0..num_upstream { - let e_id; - let upstream_next_event; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - let idx: usize = fed_id.into(); - let fed: &Federate = &enclaves[idx]; - // FIXME: Replace "as usize" properly. - e_id = fed.e().upstream()[j as usize]; - // FIXME: Replace "as usize" properly. - let upstream: &Federate = &enclaves[e_id as usize]; - - // Ignore this federate if it has resigned. - if upstream.e().state() == NotConnected { - continue; - } - // To handle cycles, need to create a boolean array to keep - // track of which upstream federates have been visited. - // FIXME: Replace "as usize" properly. - let mut visited = vec![false; number_of_enclaves as usize]; - - // Find the (transitive) next event tag upstream. - upstream_next_event = Self::transitive_next_event( - &enclaves, - upstream.e(), - upstream.e().next_event(), - &mut visited, - start_time, - ); - } - // If these tags are equal, then - // a TAG or PTAG should have already been granted, - // in which case, another will not be sent. But it - // may not have been already granted. - if Tag::lf_tag_compare(&upstream_next_event, &tag) >= 0 { - Self::notify_provisional_tag_advance_grant( - _f_rti.clone(), - // FIXME: Handle unwrap properly. - e_id.try_into().unwrap(), - number_of_enclaves, - tag.clone(), - start_time, - sent_start_time.clone(), - ); - } - } - } - - pub fn notify_downstream_advance_grant_if_safe( - _f_rti: Arc>, - fed_id: u16, - number_of_enclaves: i32, - start_time: Instant, - visited: &mut Vec, - sent_start_time: Arc<(Mutex, Condvar)>, - ) { - // FIXME: Replace "as usize" properly. - visited[fed_id as usize] = true; - let num_downstream; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let idx: usize = fed_id.into(); - let fed: &Federate = &locked_rti.enclaves()[idx]; - let e = fed.e(); - num_downstream = e.num_downstream(); - } - for i in 0..num_downstream { - let e_id; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - let idx: usize = fed_id.into(); - let fed: &Federate = &enclaves[idx]; - let downstreams = fed.e().downstream(); - // FIXME: Replace "as u16" properly. - e_id = downstreams[i as usize] as u16; - // FIXME: Replace "as usize" properly. - if visited[e_id as usize] { - continue; - } - } - Self::notify_advance_grant_if_safe( - _f_rti.clone(), - e_id, - number_of_enclaves, - start_time, - sent_start_time.clone(), - ); - Self::notify_downstream_advance_grant_if_safe( - _f_rti.clone(), - e_id, - number_of_enclaves, - start_time, - visited, - sent_start_time.clone(), - ); - } - } - - pub fn logical_tag_complete( - _f_rti: Arc>, - fed_id: u16, - number_of_enclaves: i32, - start_time: Instant, - sent_start_time: Arc<(Mutex, Condvar)>, - completed: Tag, - ) { - // FIXME: Consolidate this message with NET to get NMR (Next Message Request). - // Careful with handling startup and shutdown. - { - let mut locked_rti = _f_rti.lock().unwrap(); - let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; - let enclave = fed.enclave(); - enclave.set_completed(completed); - - println!( - "RTI received from federate/enclave {} the Logical Tag Complete (LTC) ({},{}).", - enclave.id(), - enclave.completed().time() - start_time, - enclave.completed().microstep() - ); - } - - // Check downstream enclaves to see whether they should now be granted a TAG. - let num_downstream; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let idx: usize = fed_id.into(); - let fed: &Federate = &locked_rti.enclaves()[idx]; - let e = fed.e(); - num_downstream = e.num_downstream(); - } - for i in 0..num_downstream { - let e_id; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); - let idx: usize = fed_id.into(); - let fed: &Federate = &enclaves[idx]; - let downstreams = fed.e().downstream(); - // FIXME: Replace "as u16" properly. - e_id = downstreams[i as usize] as u16; - } - // Notify downstream enclave if appropriate. - Self::notify_advance_grant_if_safe( - _f_rti.clone(), - e_id, - number_of_enclaves, - start_time, - sent_start_time.clone(), - ); - let mut visited = vec![false as bool; number_of_enclaves as usize]; // Initializes to 0. - // Notify enclaves downstream of downstream if appropriate. - Self::notify_downstream_advance_grant_if_safe( - _f_rti.clone(), - e_id, - number_of_enclaves, - start_time, - &mut visited, - sent_start_time.clone(), - ); - } - } -} diff --git a/rust/rti/src/federate.rs b/rust/rti/src/federate_info.rs similarity index 90% rename from rust/rti/src/federate.rs rename to rust/rti/src/federate_info.rs index cbdff30..2ed3cea 100644 --- a/rust/rti/src/federate.rs +++ b/rust/rti/src/federate_info.rs @@ -1,3 +1,4 @@ +use crate::message_record::message_record::InTransitMessageRecordQueue; /** * @file * @author Edward A. Lee (eal@berkeley.edu) @@ -9,11 +10,10 @@ * @copyright (c) 2020-2023, The University of California at Berkeley * License in [BSD 2-clause](..) * @brief Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs. - * This file extends enclave.h with RTI features that are specific to federations and are not + * This file extends rti_common.h with RTI features that are specific to federations and are not * used by scheduling enclaves. */ -use crate::enclave::*; -use crate::message_record::message_record::InTransitMessageRecordQueue; +use crate::rti_common::*; use std::net::TcpStream; use std::option::Option; @@ -26,8 +26,8 @@ use std::option::Option; * denoted with ~>) because those connections do not impose * any scheduling constraints. */ -pub struct Federate { - enclave: Enclave, +pub struct FederateInfo { + enclave: SchedulingNode, requested_stop: bool, // Indicates that the federate has requested stop or has replied // to a request for stop from the RTI. Used to prevent double-counting // a federate when handling lf_request_stop(). @@ -48,10 +48,10 @@ pub struct Federate { // server of the federate. } -impl Federate { - pub fn new() -> Federate { - Federate { - enclave: Enclave::new(), +impl FederateInfo { + pub fn new() -> FederateInfo { + FederateInfo { + enclave: SchedulingNode::new(), requested_stop: false, stream: None::, clock_synchronization_enabled: true, @@ -61,11 +61,11 @@ impl Federate { } } - pub fn e(&self) -> &Enclave { + pub fn e(&self) -> &SchedulingNode { &self.enclave } - pub fn enclave(&mut self) -> &mut Enclave { + pub fn enclave(&mut self) -> &mut SchedulingNode { &mut self.enclave } diff --git a/rust/rti/src/lib.rs b/rust/rti/src/lib.rs index d7c61b3..4e1050b 100644 --- a/rust/rti/src/lib.rs +++ b/rust/rti/src/lib.rs @@ -7,9 +7,9 @@ * @brief .. */ mod constants; -mod enclave; -mod federate; -mod federation_rti; +mod federate_info; +mod rti_common; +mod rti_remote; mod message_record { pub mod message_record; pub mod rti_pqueue_support; @@ -22,9 +22,9 @@ mod tag; use std::error::Error; use crate::constants::*; -use crate::enclave::*; -use crate::federate::*; -use crate::federation_rti::*; +use crate::federate_info::*; +use crate::rti_common::*; +use crate::rti_remote::*; use server::Server; @@ -45,7 +45,7 @@ impl ClockSyncStat { } } -pub fn process_args(rti: &mut FederationRTI, argv: &[String]) -> Result<(), &'static str> { +pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static str> { let mut idx = 1; let argc = argv.len(); while idx < argc { @@ -82,8 +82,12 @@ pub fn process_args(rti: &mut FederationRTI, argv: &[String]) -> Result<(), &'st return Err("Fail to parse a string to i64"); } }; - rti.set_number_of_enclaves(num_federates.try_into().unwrap()); // FIXME: panic if the converted value doesn't fit - println!("RTI: Number of federates: {}", rti.number_of_enclaves()); + rti.base() + .set_number_of_scheduling_nodes(num_federates.try_into().unwrap()); // FIXME: panic if the converted value doesn't fit + println!( + "RTI: Number of federates: {}", + rti.base().number_of_scheduling_nodes() + ); } else if arg == "-p" || arg == "--port" { if argc < idx + 2 { println!( @@ -130,7 +134,7 @@ pub fn process_args(rti: &mut FederationRTI, argv: &[String]) -> Result<(), &'st } idx += 1; } - if rti.number_of_enclaves() == 0 { + if rti.base().number_of_scheduling_nodes() == 0 { println!("--number_of_federates needs a valid positive integer argument."); usage(argc, argv); return Err("Invalid number of enclaves"); @@ -167,25 +171,25 @@ fn usage(argc: usize, argv: &[String]) { } } -pub fn initialize_federates(rti: &mut FederationRTI) { - let mut i: u16 = 0; - while i32::from(i) < rti.number_of_enclaves() { - let mut federate = Federate::new(); - initialize_federate(&mut federate, i); - let enclaves: &mut Vec = rti.enclaves(); - enclaves.push(federate); - i += 1; +pub fn initialize_federates(rti: &mut RTIRemote) { + for i in 0..rti.base().number_of_scheduling_nodes() { + let mut federate = FederateInfo::new(); + // FIXME: Handle "as u16" properly. + initialize_federate(&mut federate, i as u16); + let scheduling_nodes: &mut Vec = rti.base().scheduling_nodes(); + // FIXME: Handle "as usize" properly. + scheduling_nodes.insert(i as usize, federate); } } -fn initialize_federate(fed: &mut Federate, id: u16) { +fn initialize_federate(fed: &mut FederateInfo, id: u16) { let enclave = fed.enclave(); - enclave.initialize_enclave(id); + enclave.initialize_scheduling_node(id); // TODO: fed.set_in_transit_message_tags(); // TODO: fed.set_server_ip_addr(); } -pub fn start_rti_server(_f_rti: &mut FederationRTI) -> Result> { +pub fn start_rti_server(_f_rti: &mut RTIRemote) -> Result> { // TODO: _lf_initialize_clock(); Ok(Server::create_server( _f_rti.user_specified_port().to_string(), @@ -209,6 +213,6 @@ pub fn start_rti_server(_f_rti: &mut FederationRTI) -> Result FederationRTI { - FederationRTI::new() +pub fn initialize_rti() -> RTIRemote { + RTIRemote::new() } diff --git a/rust/rti/src/main.rs b/rust/rti/src/main.rs index f1bb4a4..55c75ee 100644 --- a/rust/rti/src/main.rs +++ b/rust/rti/src/main.rs @@ -10,27 +10,27 @@ use std::env; use std::process; fn main() { - let mut _f_rti = rti::initialize_rti(); + let mut rti = rti::initialize_rti(); let args: Vec = env::args().collect(); // dbg!(args); - rti::process_args(&mut _f_rti, &args).unwrap_or_else(|err| { + rti::process_args(&mut rti, &args).unwrap_or_else(|err| { println!("Problem parsing arguments: {err}"); process::exit(1); }); println!( "Starting RTI for {} federates in federation ID {}.", - _f_rti.number_of_enclaves(), - _f_rti.federation_id() + rti.base().number_of_scheduling_nodes(), + rti.federation_id() ); - assert!(_f_rti.number_of_enclaves() < u16::MAX.into()); + assert!(rti.base().number_of_scheduling_nodes() < u16::MAX.into()); - rti::initialize_federates(&mut _f_rti); + rti::initialize_federates(&mut rti); - let server = rti::start_rti_server(&mut _f_rti); + let server = rti::start_rti_server(&mut rti); server .expect("Failed to wait for federates") - .wait_for_federates(_f_rti); + .wait_for_federates(rti); } diff --git a/rust/rti/src/rti_common.rs b/rust/rti/src/rti_common.rs new file mode 100644 index 0000000..7db8939 --- /dev/null +++ b/rust/rti/src/rti_common.rs @@ -0,0 +1,1184 @@ +use crate::net_common::MsgType; +use crate::net_util::NetUtil; +/** + * @file enclave.rs + * @author Edward A. Lee (eal@berkeley.edu) + * @author Soroush Bateni (soroush@utdallas.edu) + * @author Erling Jellum (erling.r.jellum@ntnu.no) + * @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn) + * @author Chanhee Lee (chanheel@asu.edu) + * @author Hokeun Kim (hokeun@asu.edu) + * @copyright (c) 2020-2024, The University of California at Berkeley + * License in [BSD 2-clause](..) + * @brief Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs. + * This file extends enclave.h with RTI features that are specific to federations and are not + * used by scheduling enclaves. + */ +use crate::rti_remote::RTIRemote; +use crate::tag; +use crate::tag::{Instant, Interval, Tag, FOREVER}; +use crate::FederateInfo; +use crate::SchedulingNodeState::*; + +use std::io::Write; +use std::mem; +use std::sync::{Arc, Condvar, Mutex}; + +const IS_IN_ZERO_DELAY_CYCLE: i32 = 1; +const IS_IN_CYCLE: i32 = 2; + +/** Mode of execution of a federate. */ +enum ExecutionMode { + FAST, + REALTIME, +} + +#[derive(PartialEq, Clone, Debug)] +pub enum SchedulingNodeState { + NotConnected, // The scheduling node has not connected. + Granted, // Most recent MsgType::NextEventTag has been granted. + Pending, // Waiting for upstream scheduling nodes. +} + +/** Struct for minimum delays from upstream nodes. */ +pub struct MinimumDelay { + id: i32, // ID of the upstream node. + min_delay: Tag, // Minimum delay from upstream. +} + +impl MinimumDelay { + pub fn new(id: i32, min_delay: Tag) -> MinimumDelay { + MinimumDelay { id, min_delay } + } + + pub fn id(&self) -> i32 { + self.id + } + + pub fn min_delay(&self) -> &Tag { + &self.min_delay + } +} +/** + * Information about the scheduling nodes coordinated by the RTI. + * The abstract scheduling node could either be an enclave or a federate. + * The information includes its runtime state, + * mode of execution, and connectivity with other scheduling nodes. + * The list of upstream and downstream scheduling nodes does not include + * those that are connected via a "physical" connection (one + * denoted with ~>) because those connections do not impose + * any scheduling constraints. + */ +pub struct SchedulingNode { + id: u16, // ID of this scheduling node. + completed: Tag, // The largest logical tag completed by the federate (or NEVER if no LTC has been received). + last_granted: Tag, // The maximum Tag that has been granted so far (or NEVER if none granted) + last_provisionally_granted: Tag, // The maximum PTAG that has been provisionally granted (or NEVER if none granted) + next_event: Tag, // Most recent NET received from the federate (or NEVER if none received). + state: SchedulingNodeState, // State of the federate. + upstream: Vec, // Array of upstream federate ids. + upstream_delay: Vec, // Minimum delay on connections from upstream federates. + // Here, NEVER encodes no delay. 0LL is a microstep delay. + num_upstream: i32, // Size of the array of upstream federates and delays. + downstream: Vec, // Array of downstream federate ids. + num_downstream: i32, // Size of the array of downstream federates. + mode: ExecutionMode, // FAST or REALTIME. + min_delays: Vec, // Array of minimum delays from upstream nodes, not including this node. + num_min_delays: u64, // Size of min_delays array. + flags: i32, // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE +} + +impl SchedulingNode { + pub fn new() -> SchedulingNode { + SchedulingNode { + id: 0, + completed: Tag::never_tag(), + last_granted: Tag::never_tag(), + last_provisionally_granted: Tag::never_tag(), + next_event: Tag::never_tag(), + state: SchedulingNodeState::NotConnected, + upstream: Vec::new(), + upstream_delay: Vec::new(), + num_upstream: 0, + downstream: Vec::new(), + num_downstream: 0, + mode: ExecutionMode::REALTIME, + min_delays: Vec::new(), + num_min_delays: 0, + flags: 0, + } + } + + pub fn initialize_scheduling_node(&mut self, id: u16) { + self.id = id; + // Initialize the next event condition variable. + // TODO: lf_cond_init(&e->next_event_condition, &rti_mutex); + } + + pub fn id(&self) -> u16 { + self.id + } + + pub fn completed(&self) -> Tag { + self.completed.clone() + } + + pub fn last_granted(&self) -> Tag { + self.last_granted.clone() + } + + pub fn last_provisionally_granted(&self) -> Tag { + self.last_provisionally_granted.clone() + } + + pub fn next_event(&self) -> Tag { + self.next_event.clone() + } + + pub fn state(&self) -> SchedulingNodeState { + self.state.clone() + } + + pub fn upstream(&self) -> &Vec { + &self.upstream + } + + pub fn upstream_delay(&self) -> &Vec { + &self.upstream_delay + } + + pub fn num_upstream(&self) -> i32 { + self.num_upstream + } + + pub fn downstream(&self) -> &Vec { + &self.downstream + } + + pub fn num_downstream(&self) -> i32 { + self.num_downstream + } + + pub fn min_delays(&mut self) -> &mut Vec { + &mut self.min_delays + } + + pub fn num_min_delays(&self) -> u64 { + self.num_min_delays + } + + pub fn flags(&self) -> i32 { + self.flags + } + + pub fn set_last_granted(&mut self, tag: Tag) { + self.last_granted = tag; + } + + pub fn set_last_provisionally_granted(&mut self, tag: Tag) { + self.last_provisionally_granted = tag; + } + + pub fn set_next_event(&mut self, next_event_tag: Tag) { + self.next_event = next_event_tag; + } + + pub fn set_state(&mut self, state: SchedulingNodeState) { + self.state = state; + } + + pub fn set_upstream_id_at(&mut self, upstream_id: u16, idx: usize) { + self.upstream.insert(idx, upstream_id as i32); + } + + pub fn set_completed(&mut self, completed: Tag) { + self.completed = completed.clone() + } + + pub fn set_upstream_delay_at(&mut self, upstream_delay: tag::Interval, idx: usize) { + self.upstream_delay.insert(idx, upstream_delay); + } + + pub fn set_num_upstream(&mut self, num_upstream: i32) { + self.num_upstream = num_upstream; + } + + pub fn set_downstream_id_at(&mut self, downstream_id: u16, idx: usize) { + self.downstream.insert(idx, downstream_id as i32); + } + + pub fn set_num_downstream(&mut self, num_downstream: i32) { + self.num_downstream = num_downstream; + } + + pub fn set_min_delays(&mut self, min_delays: Vec) { + self.min_delays = min_delays; + } + + pub fn set_num_min_delays(&mut self, num_min_delays: u64) { + self.num_min_delays = num_min_delays; + } + + pub fn set_flags(&mut self, flags: i32) { + self.flags = flags; + } + + pub fn update_scheduling_node_next_event_tag_locked( + _f_rti: Arc>, + fed_id: u16, + next_event_tag: Tag, + start_time: Instant, + sent_start_time: Arc<(Mutex, Condvar)>, + ) { + let num_upstream; + let number_of_scheduling_nodes; + { + let mut locked_rti = _f_rti.lock().unwrap(); + number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed = &mut locked_rti.base().scheduling_nodes()[idx]; + let e = fed.enclave(); + e.set_next_event(next_event_tag.clone()); + num_upstream = e.num_upstream(); + } + println!( + "RTI: Updated the recorded next event tag for federate/enclave {} to ({},{})", + fed_id, + next_event_tag.time() - start_time, + next_event_tag.microstep() + ); + + // Check to see whether we can reply now with a tag advance grant. + // If the enclave has no upstream enclaves, then it does not wait for + // nor expect a reply. It just proceeds to advance time. + if num_upstream > 0 { + Self::notify_advance_grant_if_safe( + _f_rti.clone(), + fed_id, + number_of_scheduling_nodes, + start_time, + sent_start_time.clone(), + ); + } + // Check downstream enclaves to see whether they should now be granted a TAG. + // To handle cycles, need to create a boolean array to keep + // track of which upstream enclaves have been visited. + let mut visited = vec![false as bool; number_of_scheduling_nodes as usize]; // Initializes to 0. + Self::notify_downstream_advance_grant_if_safe( + _f_rti.clone(), + fed_id, + number_of_scheduling_nodes, + start_time, + &mut visited, + sent_start_time, + ); + } + + fn notify_advance_grant_if_safe( + _f_rti: Arc>, + fed_id: u16, + number_of_enclaves: i32, + start_time: Instant, + sent_start_time: Arc<(Mutex, Condvar)>, + ) { + let grant = Self::tag_advance_grant_if_safe(_f_rti.clone(), fed_id, start_time); + if Tag::lf_tag_compare(&grant.tag(), &Tag::never_tag()) != 0 { + if grant.is_provisional() { + Self::notify_provisional_tag_advance_grant( + _f_rti, + fed_id, + number_of_enclaves, + grant.tag(), + start_time, + sent_start_time, + ); + } else { + Self::notify_tag_advance_grant( + _f_rti, + fed_id, + grant.tag(), + start_time, + sent_start_time, + ); + } + } + } + + fn tag_advance_grant_if_safe( + _f_rti: Arc>, + fed_id: u16, + // number_of_enclaves: i32, + start_time: Instant, + ) -> TagAdvanceGrant { + let mut result = TagAdvanceGrant::new(Tag::never_tag(), false); + + // Find the earliest LTC of upstream scheduling_nodes (M). + { + let mut min_upstream_completed = Tag::forever_tag(); + + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let e = scheduling_nodes[idx].e(); + let upstreams = e.upstream(); + let upstream_delay = e.upstream_delay(); + for j in 0..upstreams.len() { + let delay = upstream_delay[j]; + // FIXME: Replace "as usize" properly. + let upstream = &scheduling_nodes[upstreams[j] as usize].e(); + // Ignore this enclave if it no longer connected. + if upstream.state() == SchedulingNodeState::NotConnected { + continue; + } + + // Adjust by the "after" delay. + // Note that "no delay" is encoded as NEVER, + // whereas one microstep delay is encoded as 0LL. + let candidate = Tag::lf_delay_strict(&upstream.completed(), delay); + + if Tag::lf_tag_compare(&candidate, &min_upstream_completed) < 0 { + min_upstream_completed = candidate.clone(); + } + } + if min_upstream_completed.time() >= start_time { + println!( + "Minimum upstream LTC for federate/enclave {} is ({},{}) (adjusted by after delay).", + e.id(), + min_upstream_completed.time() - start_time, + min_upstream_completed.microstep() + ); + } else { + println!( + "Minimum upstream LTC for federate/enclave {} is ({},{}) (adjusted by after delay).\nWARNING!!! min_upstream_completed.time() < start_time", + e.id(), + // FIXME: Check the below calculation + min_upstream_completed.time(), // - start_time, + min_upstream_completed.microstep() + ); + } + if Tag::lf_tag_compare(&min_upstream_completed, &e.last_granted()) > 0 + && Tag::lf_tag_compare(&min_upstream_completed, &e.next_event()) >= 0 + // The enclave has to advance its tag + { + result.set_tag(min_upstream_completed); + return result; + } + } + + // Can't make progress based only on upstream LTCs. + // If all (transitive) upstream enclaves of the enclave + // have earliest event tags such that the + // enclave can now advance its tag, then send it a TAG message. + // Find the tag of the earliest event that may be later received from an upstream enclave + // or federate (which includes any after delays on the connections). + let t_d = + Self::earliest_future_incoming_message_tag(_f_rti.clone(), fed_id as u16, start_time); + + if t_d.time() >= start_time { + println!( + "RTI: Earliest next event upstream of node {} has tag ({},{}).", + fed_id, + t_d.time() - start_time, + t_d.microstep() + ); + } else { + println!(" t_d.time < start_time ({},{}", t_d.time(), start_time); + } + + // Given an EIMT (earliest incoming message tag) there are these possible scenarios: + // 1) The EIMT is greater than the NET we want to advance to. Grant a TAG. + // 2) The EIMT is equal to the NET and the federate is part of a zero-delay cycle (ZDC). + // 3) The EIMT is equal to the NET and the federate is not part of a ZDC. + // 4) The EIMT is less than the NET + // In (1) we can give a TAG to NET. In (2) we can give a PTAG. + // In (3) and (4), we wait for further updates from upstream federates. + let next_event; + let last_provisionally_granted; + let last_granted; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let e = scheduling_nodes[idx].e(); + next_event = e.next_event(); + last_provisionally_granted = e.last_provisionally_granted(); + last_granted = e.last_granted(); + } + + // Scenario (1) above + if Tag::lf_tag_compare(&t_d, &next_event) > 0 // EIMT greater than NET + && Tag::lf_tag_compare(&t_d, &last_provisionally_granted) >= 0 // The grant is not redundant + // (equal is important to override any previous + // PTAGs). + // The grant is not redundant. + && Tag::lf_tag_compare(&t_d, &last_granted) > 0 + { + // No upstream node can send events that will be received with a tag less than or equal to + // e->next_event, so it is safe to send a TAG. + println!("RTI: Earliest upstream message time for fed/encl {} is ({},{})(adjusted by after delay). Granting tag advance (TAG) for ({},{})", + fed_id, + t_d.time() - start_time, t_d.microstep(), + next_event.time(), // TODO: - start_time, + next_event.microstep()); + result.set_tag(next_event); + } else if + // Scenario (2) or (3) above + Tag::lf_tag_compare(&t_d, &next_event) == 0 // EIMT equal to NET + && Self::is_in_zero_delay_cycle(_f_rti.clone(), fed_id) // The node is part of a ZDC + && Tag::lf_tag_compare(&t_d, &last_provisionally_granted) > 0 // The grant is not redundant + // The grant is not redundant. + && Tag::lf_tag_compare(&t_d, &last_granted) > 0 + { + // Some upstream node may send an event that has the same tag as this node's next event, + // so we can only grant a PTAG. + println!("RTI: Earliest upstream message time for fed/encl {} is ({},{})(adjusted by after delay). Granting provisional tag advance (PTAG) for ({},{})", + fed_id, + t_d.time() - start_time, t_d.microstep(), + next_event.time() - start_time, + next_event.microstep()); + result.set_tag(next_event); + result.set_provisional(true); + } + result + } + + fn is_in_zero_delay_cycle(_f_rti: Arc>, fed_id: u16) -> bool { + let is_first_time; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + is_first_time = scheduling_nodes[idx].enclave().min_delays().len() == 0; + } + if is_first_time { + Self::update_min_delays_upstream(_f_rti.clone(), fed_id); + } + let flags; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let node = scheduling_nodes[idx].e(); + flags = node.flags() + } + (flags & IS_IN_ZERO_DELAY_CYCLE) != 0 + } + + fn notify_tag_advance_grant( + _f_rti: Arc>, + fed_id: u16, + tag: Tag, + start_time: Instant, + sent_start_time: Arc<(Mutex, Condvar)>, + ) { + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let e = fed.e(); + if e.state() == SchedulingNodeState::NotConnected + || Tag::lf_tag_compare(&tag, &e.last_granted()) <= 0 + || Tag::lf_tag_compare(&tag, &e.last_provisionally_granted()) <= 0 + { + return; + } + // Need to make sure that the destination federate's thread has already + // sent the starting MSG_TYPE_TIMESTAMP message. + while e.state() == SchedulingNodeState::Pending { + // Need to wait here. + let (lock, condvar) = &*sent_start_time; + let mut notified = lock.lock().unwrap(); + while !*notified { + notified = condvar.wait(notified).unwrap(); + } + } + } + let message_length = 1 + mem::size_of::() + mem::size_of::(); + // FIXME: Replace "as usize" properly. + let mut buffer = vec![0 as u8; message_length as usize]; + buffer[0] = MsgType::TagAdvanceGrant.to_byte(); + NetUtil::encode_int64(tag.time(), &mut buffer, 1); + // FIXME: Replace "as i32" properly. + NetUtil::encode_int32( + tag.microstep() as i32, + &mut buffer, + 1 + mem::size_of::(), + ); + + // This function is called in notify_advance_grant_if_safe(), which is a long + // function. During this call, the socket might close, causing the following write_to_socket + // to fail. Consider a failure here a soft failure and update the federate's status. + let mut error_occurred = false; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + // FIXME: Replace "as usize" properly. + let fed: &FederateInfo = &scheduling_nodes[fed_id as usize]; + let e = fed.e(); + let mut stream = fed.stream().as_ref().unwrap(); + match stream.write(&buffer) { + Ok(bytes_written) => { + if bytes_written < message_length { + println!( + "RTI failed to send tag advance grant to federate {}.", + e.id() + ); + } + } + Err(_err) => { + error_occurred = true; + } + } + } + { + let mut locked_rti = _f_rti.lock().unwrap(); + // FIXME: Replace "as usize" properly. + let mut_fed: &mut FederateInfo = + &mut locked_rti.base().scheduling_nodes()[fed_id as usize]; + let enclave = mut_fed.enclave(); + if error_occurred { + enclave.set_state(SchedulingNodeState::NotConnected); + // FIXME: We need better error handling, but don't stop other execution here. + } else { + enclave.set_last_granted(tag.clone()); + println!( + "RTI sent to federate {} the Tag Advance Grant (TAG) ({},{}).", + enclave.id(), + tag.time() - start_time, + tag.microstep() + ); + } + } + } + + fn notify_provisional_tag_advance_grant( + _f_rti: Arc>, + fed_id: u16, + number_of_enclaves: i32, + tag: Tag, + start_time: Instant, + sent_start_time: Arc<(Mutex, Condvar)>, + ) { + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let e = fed.e(); + if e.state() == SchedulingNodeState::NotConnected + || Tag::lf_tag_compare(&tag, &e.last_granted()) <= 0 + || Tag::lf_tag_compare(&tag, &e.last_provisionally_granted()) <= 0 + { + return; + } + // Need to make sure that the destination federate's thread has already + // sent the starting MSG_TYPE_TIMESTAMP message. + while e.state() == SchedulingNodeState::Pending { + // Need to wait here. + let (lock, condvar) = &*sent_start_time; + let mut notified = lock.lock().unwrap(); + while !*notified { + notified = condvar.wait(notified).unwrap(); + } + } + } + let message_length = 1 + mem::size_of::() + mem::size_of::(); + // FIXME: Replace "as usize" properly. + let mut buffer = vec![0 as u8; message_length as usize]; + buffer[0] = MsgType::PropositionalTagAdvanceGrant.to_byte(); + NetUtil::encode_int64(tag.time(), &mut buffer, 1); + // FIXME: Sometime panic occurs in 'lingua-franca/test/C/fed-gen/SimpleFederated' test. + NetUtil::encode_int32( + tag.microstep().try_into().unwrap(), + &mut buffer, + 1 + mem::size_of::(), + ); + + // This function is called in notify_advance_grant_if_safe(), which is a long + // function. During this call, the socket might close, causing the following write_to_socket + // to fail. Consider a failure here a soft failure and update the federate's status. + let mut error_occurred = false; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + // FIXME: Replace "as usize" properly. + let fed: &FederateInfo = &enclaves[fed_id as usize]; + let e = fed.e(); + let mut stream = fed.stream().as_ref().unwrap(); + match stream.write(&buffer) { + Ok(bytes_written) => { + if bytes_written < message_length { + println!( + "RTI failed to send tag advance grant to federate {}.", + e.id() + ); + return; + } + } + Err(_err) => { + error_occurred = true; + } + } + } + { + let mut locked_rti = _f_rti.lock().unwrap(); + // FIXME: Replace "as usize" properly. + let mut_fed: &mut FederateInfo = + &mut locked_rti.base().scheduling_nodes()[fed_id as usize]; + let enclave = mut_fed.enclave(); + if error_occurred { + enclave.set_state(SchedulingNodeState::NotConnected); + // FIXME: We need better error handling, but don't stop other execution here. + } + + enclave.set_last_provisionally_granted(tag.clone()); + println!( + "RTI sent to federate {} the Provisional Tag Advance Grant (PTAG) ({},{}).", + enclave.id(), + tag.time() - start_time, + tag.microstep() + ); + } + + // Send PTAG to all upstream federates, if they have not had + // a later or equal PTAG or TAG sent previously and if their transitive + // NET is greater than or equal to the tag. + // NOTE: This could later be replaced with a TNET mechanism once + // we have an available encoding of causality interfaces. + // That might be more efficient. + // NOTE: This is not needed for enclaves because zero-delay loops are prohibited. + // It's only needed for federates, which is why this is implemented here. + let num_upstream; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let e = fed.e(); + num_upstream = e.num_upstream(); + } + for j in 0..num_upstream { + let e_id; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + // FIXME: Replace "as usize" properly. + e_id = fed.e().upstream()[j as usize]; + // FIXME: Replace "as usize" properly. + let upstream: &FederateInfo = &enclaves[e_id as usize]; + + // Ignore this federate if it has resigned. + if upstream.e().state() == NotConnected { + continue; + } + } + // FIXME: Replace "as u16" properly. + let earlist = + Self::earliest_future_incoming_message_tag(_f_rti.clone(), e_id as u16, start_time); + + // If these tags are equal, then a TAG or PTAG should have already been granted, + // in which case, another will not be sent. But it may not have been already granted. + if Tag::lf_tag_compare(&earlist, &tag) >= 0 { + Self::notify_provisional_tag_advance_grant( + _f_rti.clone(), + // FIXME: Handle unwrap properly. + e_id.try_into().unwrap(), + number_of_enclaves, + tag.clone(), + start_time, + sent_start_time.clone(), + ); + } + } + } + + fn earliest_future_incoming_message_tag( + _f_rti: Arc>, + fed_id: u16, + start_time: Instant, + ) -> Tag { + // First, we need to find the shortest path (minimum delay) path to each upstream node + // and then find the minimum of the node's recorded NET plus the minimum path delay. + // Update the shortest paths, if necessary. + let is_first_time; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + is_first_time = scheduling_nodes[idx].enclave().min_delays().len() == 0; + } + if is_first_time { + Self::update_min_delays_upstream(_f_rti.clone(), fed_id); + } + + // Next, find the tag of the earliest possible incoming message from upstream enclaves or + // federates, which will be the smallest upstream NET plus the least delay. + // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. + let mut t_d = Tag::forever_tag(); + let num_min_delays; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let e = fed.e(); + num_min_delays = e.num_min_delays(); + } + for i in 0..num_min_delays { + let upstream_id; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let e = fed.e(); + // FIXME: Handle "as usize" properly. + upstream_id = e.min_delays[i as usize].id() as usize; + } + let upstream_next_event; + { + // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let fed: &mut FederateInfo = &mut enclaves[upstream_id]; + let upstream = fed.enclave(); + // If we haven't heard from the upstream node, then assume it can send an event at the start time. + upstream_next_event = upstream.next_event(); + if Tag::lf_tag_compare(&upstream_next_event, &Tag::never_tag()) == 0 { + let start_tag = Tag::new(start_time, 0); + upstream.set_next_event(start_tag); + } + } + let min_delay; + let earliest_tag_from_upstream; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &mut FederateInfo = &mut enclaves[idx]; + let e = fed.enclave(); + // FIXME: Handle "as usize" properly. + min_delay = e.min_delays()[i as usize].min_delay(); + earliest_tag_from_upstream = Tag::lf_tag_add(&upstream_next_event, &min_delay); + if earliest_tag_from_upstream.time() >= start_time { + println!("RTI: Earliest next event upstream of fed/encl {} at fed/encl {} has tag ({},{}).", + fed_id, + upstream_id, + earliest_tag_from_upstream.time() - start_time, + earliest_tag_from_upstream.microstep()); + } else { + println!( + " earliest_tag_from_upstream.time() < start_time, ({},{})", + earliest_tag_from_upstream.time(), + start_time + ); + } + } + if Tag::lf_tag_compare(&earliest_tag_from_upstream, &t_d) < 0 { + t_d = earliest_tag_from_upstream.clone(); + } + } + t_d + } + + fn update_min_delays_upstream(_f_rti: Arc>, node_idx: u16) { + let num_min_delays; + let number_of_scheduling_nodes; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = node_idx.into(); + num_min_delays = scheduling_nodes[idx].enclave().min_delays().len(); + number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); + } + // Check whether cached result is valid. + if num_min_delays == 0 { + // This is not Dijkstra's algorithm, but rather one optimized for sparse upstream nodes. + // There must be a name for this algorithm. + + // Array of results on the stack: + let mut path_delays = Vec::new(); + // This will be the number of non-FOREVER entries put into path_delays. + let mut count: u64 = 0; + + for _i in 0..number_of_scheduling_nodes { + path_delays.push(Tag::forever_tag()); + } + // FIXME:: Handle "as i32" properly. + Self::_update_min_delays_upstream( + _f_rti.clone(), + node_idx as i32, + -1, + &mut path_delays, + &mut count, + ); + + // Put the results onto the node's struct. + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = node_idx.into(); + let node = scheduling_nodes[idx].enclave(); + node.set_num_min_delays(count); + node.set_min_delays(Vec::new()); + println!( + "++++ Node {}(is in ZDC: {}), COUNT = {}, flags = {}, number_of_scheduling_nodes = {}\n", + node_idx, + node.flags() & IS_IN_ZERO_DELAY_CYCLE, count, node.flags(), number_of_scheduling_nodes + ); + + let mut k = 0; + for i in 0..number_of_scheduling_nodes { + // FIXME: Handle "as usize" properly. + if Tag::lf_tag_compare(&path_delays[i as usize], &Tag::forever_tag()) < 0 { + // Node i is upstream. + if k >= count { + println!( + "Internal error! Count of upstream nodes {} for node {} is wrong!", + count, i + ); + std::process::exit(1); + } + // FIXME: Handle "as usize" properly. + let min_delay = MinimumDelay::new(i, path_delays[i as usize].clone()); + // FIXME: Handle "as usize" properly. + if node.min_delays().len() > k as usize { + let _ = + std::mem::replace(&mut node.min_delays()[k as usize], min_delay); + } else { + node.min_delays().insert(k as usize, min_delay); + } + k = k + 1; + // N^2 debug statement could be a problem with large benchmarks. + // println!( + // "++++ Node {} is upstream with delay ({},{}), k = {}", + // i, + // path_delays[i as usize].time(), + // path_delays[i as usize].microstep(), + // k + // ); + } + } + } + } + } + + // Local function used recursively to find minimum delays upstream. + // Return in count the number of non-FOREVER_TAG entries in path_delays[]. + fn _update_min_delays_upstream( + _f_rti: Arc>, + end_idx: i32, + mut intermediate_idx: i32, + path_delays: &mut Vec, + count: &mut u64, + ) { + // On first call, intermediate will be NULL, so the path delay is initialized to zero. + let mut delay_from_intermediate_so_far = Tag::zero_tag(); + if intermediate_idx < 0 { + intermediate_idx = end_idx; + } else { + // Not the first call, so intermediate is upstream of end. + // FIXME: Handle "as usize" properly. + delay_from_intermediate_so_far = path_delays[intermediate_idx as usize].clone(); + } + { + let mut locked_rti = _f_rti.lock().unwrap(); + let fed: &FederateInfo = + &locked_rti.base().scheduling_nodes()[intermediate_idx as usize]; + let intermediate = fed.e(); + if intermediate.state() == SchedulingNodeState::NotConnected { + // Enclave or federate is not connected. + // No point in checking upstream scheduling_nodes. + return; + } + } + // Check nodes upstream of intermediate (or end on first call). + // NOTE: It would be better to iterate through these sorted by minimum delay, + // but for most programs, the gain might be negligible since there are relatively few + // upstream nodes. + let num_upstream; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let fed: &FederateInfo = + &locked_rti.base().scheduling_nodes()[intermediate_idx as usize]; + let e = fed.e(); + num_upstream = e.num_upstream(); + } + for i in 0..num_upstream { + let upstream_id; + let upstream_delay; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + // FIXME: Handle "as usize" properly. + let e = scheduling_nodes[intermediate_idx as usize].e(); + // FIXME: Handle "as usize" properly. + upstream_id = e.upstream[i as usize]; + // FIXME: Handle "as usize" properly. + upstream_delay = e.upstream_delay[i as usize]; + } + // Add connection delay to path delay so far. + let path_delay = Tag::lf_delay_tag(&delay_from_intermediate_so_far, upstream_delay); + // If the path delay is less than the so-far recorded path delay from upstream, update upstream. + // FIXME: Handle "as usize" properly. + if Tag::lf_tag_compare(&path_delay, &path_delays[upstream_id as usize]) < 0 { + // FIXME: Handle "as usize" properly. + if path_delays[upstream_id as usize].time() == FOREVER { + // Found a finite path. + *count = *count + 1; + } + // FIXME: Handle "as usize" properly. + if path_delays.len() > upstream_id as usize { + let _ = std::mem::replace( + &mut path_delays[upstream_id as usize], + path_delay.clone(), + ); + } else { + path_delays.insert(upstream_id as usize, path_delay.clone()); + } + // Since the path delay to upstream has changed, recursively update those upstream of it. + // Do not do this, however, if the upstream node is the end node because this means we have + // completed a cycle. + if end_idx != upstream_id { + Self::_update_min_delays_upstream( + _f_rti.clone(), + end_idx, + upstream_id, + path_delays, + count, + ); + } else { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + // FIXME: Handle "as usize" properly. + let end: &mut SchedulingNode = scheduling_nodes[end_idx as usize].enclave(); + // Found a cycle. + end.set_flags(end.flags() | IS_IN_CYCLE); + // Is it a zero-delay cycle? + if Tag::lf_tag_compare(&path_delay, &Tag::zero_tag()) == 0 + && upstream_delay < Some(0) + { + end.set_flags(end.flags() | IS_IN_ZERO_DELAY_CYCLE); + } else { + // Clear the flag. + end.set_flags(end.flags() & !IS_IN_ZERO_DELAY_CYCLE); + } + } + } + } + } + + pub fn notify_downstream_advance_grant_if_safe( + _f_rti: Arc>, + fed_id: u16, + number_of_enclaves: i32, + start_time: Instant, + visited: &mut Vec, + sent_start_time: Arc<(Mutex, Condvar)>, + ) { + // FIXME: Replace "as usize" properly. + visited[fed_id as usize] = true; + let num_downstream; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; + let e = fed.e(); + num_downstream = e.num_downstream(); + } + for i in 0..num_downstream { + let e_id; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let downstreams = fed.e().downstream(); + // FIXME: Replace "as u16" properly. + e_id = downstreams[i as usize] as u16; + // FIXME: Replace "as usize" properly. + if visited[e_id as usize] { + continue; + } + } + Self::notify_advance_grant_if_safe( + _f_rti.clone(), + e_id, + number_of_enclaves, + start_time, + sent_start_time.clone(), + ); + Self::notify_downstream_advance_grant_if_safe( + _f_rti.clone(), + e_id, + number_of_enclaves, + start_time, + visited, + sent_start_time.clone(), + ); + } + } + + pub fn logical_tag_complete( + _f_rti: Arc>, + fed_id: u16, + number_of_enclaves: i32, + start_time: Instant, + sent_start_time: Arc<(Mutex, Condvar)>, + completed: Tag, + ) { + // FIXME: Consolidate this message with NET to get NMR (Next Message Request). + // Careful with handling startup and shutdown. + { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = fed_id.into(); + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let enclave = fed.enclave(); + enclave.set_completed(completed); + + println!( + "RTI received from federate/enclave {} the Logical Tag Complete (LTC) ({},{}).", + enclave.id(), + enclave.completed().time() - start_time, + enclave.completed().microstep() + ); + } + + // Check downstream enclaves to see whether they should now be granted a TAG. + let num_downstream; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; + let e = fed.e(); + num_downstream = e.num_downstream(); + } + for i in 0..num_downstream { + let e_id; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; + let downstreams = fed.e().downstream(); + // FIXME: Replace "as u16" properly. + e_id = downstreams[i as usize] as u16; + } + // Notify downstream enclave if appropriate. + Self::notify_advance_grant_if_safe( + _f_rti.clone(), + e_id, + number_of_enclaves, + start_time, + sent_start_time.clone(), + ); + let mut visited = vec![false as bool; number_of_enclaves as usize]; // Initializes to 0. + // Notify enclaves downstream of downstream if appropriate. + Self::notify_downstream_advance_grant_if_safe( + _f_rti.clone(), + e_id, + number_of_enclaves, + start_time, + &mut visited, + sent_start_time.clone(), + ); + } + } +} + +pub struct RTICommon { + // The scheduling nodes. + scheduling_nodes: Vec, + + // Number of scheduling nodes + number_of_scheduling_nodes: i32, + + // RTI's decided stop tag for the scheduling nodes + max_stop_tag: Tag, + + // Number of scheduling nodes handling stop + num_scheduling_nodes_handling_stop: i32, + + // Boolean indicating that tracing is enabled. + tracing_enabled: bool, + // Pointer to a tracing object + // TODO: trace_t* trace; + + // The RTI mutex for making thread-safe access to the shared state. + // TODO: lf_mutex_t* mutex; +} + +impl RTICommon { + pub fn new() -> RTICommon { + RTICommon { + scheduling_nodes: Vec::new(), + number_of_scheduling_nodes: 0, + max_stop_tag: Tag::never_tag(), + num_scheduling_nodes_handling_stop: 0, + tracing_enabled: false, + } + } + + pub fn scheduling_nodes(&mut self) -> &mut Vec { + &mut self.scheduling_nodes + } + + pub fn number_of_scheduling_nodes(&self) -> i32 { + self.number_of_scheduling_nodes + } + + pub fn max_stop_tag(&self) -> Tag { + self.max_stop_tag.clone() + } + + pub fn num_scheduling_nodes_handling_stop(&self) -> i32 { + self.num_scheduling_nodes_handling_stop + } + + pub fn set_max_stop_tag(&mut self, max_stop_tag: Tag) { + self.max_stop_tag = max_stop_tag.clone(); + } + + pub fn set_number_of_scheduling_nodes(&mut self, number_of_scheduling_nodes: i32) { + self.number_of_scheduling_nodes = number_of_scheduling_nodes; + } + + pub fn set_num_scheduling_nodes_handling_stop( + &mut self, + num_scheduling_nodes_handling_stop: i32, + ) { + self.num_scheduling_nodes_handling_stop = num_scheduling_nodes_handling_stop; + } +} + +struct TagAdvanceGrant { + tag: Tag, // NEVER if there is no tag advance grant. + is_provisional: bool, // True for PTAG, false for TAG. +} + +impl TagAdvanceGrant { + pub fn new(tag: Tag, is_provisional: bool) -> TagAdvanceGrant { + TagAdvanceGrant { + tag, + is_provisional, + } + } + + pub fn tag(&self) -> Tag { + self.tag.clone() + } + + pub fn is_provisional(&self) -> bool { + self.is_provisional + } + + pub fn set_tag(&mut self, tag: Tag) { + self.tag = tag.clone(); + } + + pub fn set_provisional(&mut self, is_provisional: bool) { + self.is_provisional = is_provisional; + } +} diff --git a/rust/rti/src/federation_rti.rs b/rust/rti/src/rti_remote.rs similarity index 73% rename from rust/rti/src/federation_rti.rs rename to rust/rti/src/rti_remote.rs index c7c9ea7..e884d27 100644 --- a/rust/rti/src/federation_rti.rs +++ b/rust/rti/src/rti_remote.rs @@ -13,35 +13,15 @@ * used by scheduling enclaves. */ use crate::constants::*; -use crate::federate::*; -use crate::tag::Tag; use crate::ClockSyncStat; +use crate::RTICommon; /** * Structure that an RTI instance uses to keep track of its own and its * corresponding federates' state. */ -pub struct FederationRTI { - ////////////////// Enclave specific attributes ////////////////// - - // The federates. - enclaves: Vec, - - // Number of enclaves - number_of_enclaves: i32, - - // RTI's decided stop tag for enclaves - max_stop_tag: Tag, - - // Number of enclaves handling stop - num_enclaves_handling_stop: i32, - - // Boolean indicating that tracing is enabled. - tracing_enabled: bool, - - // Pointer to a tracing object - // TODO: trace:Trace, - ////////////// Federation only specific attributes ////////////// +pub struct RTIRemote { + base: RTICommon, // Maximum start time seen so far from the federates. max_start_time: i64, @@ -62,7 +42,7 @@ pub struct FederationRTI { /** * The ID of the federation that this RTI will supervise. * This should be overridden with a command-line -i option to ensure - * that each federate only joins its assigned federation. + * that each federate_info only joins its assigned federation. */ federation_id: String, @@ -113,15 +93,10 @@ pub struct FederationRTI { stop_in_progress: bool, } -impl FederationRTI { - pub fn new() -> FederationRTI { - FederationRTI { - enclaves: Vec::new(), - // enclave_rti related initializations - max_stop_tag: Tag::never_tag(), - number_of_enclaves: 0, - num_enclaves_handling_stop: 0, - // federation_rti related initializations +impl RTIRemote { + pub fn new() -> RTIRemote { + RTIRemote { + base: RTICommon::new(), max_start_time: 0, num_feds_proposed_start: 0, // all_federates_exited:false, @@ -135,25 +110,12 @@ impl FederationRTI { clock_sync_period_ns: 10 * 1000000, clock_sync_exchanges_per_interval: 10, authentication_enabled: false, - tracing_enabled: false, stop_in_progress: false, } } - pub fn enclaves(&mut self) -> &mut Vec { - &mut self.enclaves - } - - pub fn max_stop_tag(&self) -> Tag { - self.max_stop_tag.clone() - } - - pub fn number_of_enclaves(&self) -> i32 { - self.number_of_enclaves - } - - pub fn num_enclaves_handling_stop(&self) -> i32 { - self.num_enclaves_handling_stop + pub fn base(&mut self) -> &mut RTICommon { + &mut self.base } pub fn max_start_time(&self) -> i64 { @@ -184,18 +146,6 @@ impl FederationRTI { self.stop_in_progress } - pub fn set_max_stop_tag(&mut self, max_stop_tag: Tag) { - self.max_stop_tag = max_stop_tag.clone(); - } - - pub fn set_number_of_enclaves(&mut self, number_of_enclaves: i32) { - self.number_of_enclaves = number_of_enclaves; - } - - pub fn set_num_enclaves_handling_stop(&mut self, num_enclaves_handling_stop: i32) { - self.num_enclaves_handling_stop = num_enclaves_handling_stop; - } - pub fn set_max_start_time(&mut self, max_start_time: i64) { self.max_start_time = max_start_time; } diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 238ff06..0486f9d 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -20,10 +20,10 @@ use crate::net_util::*; use crate::tag; use crate::tag::*; use crate::ClockSyncStat; -use crate::Enclave; -use crate::FedState; -use crate::Federate; -use crate::FederationRTI; +use crate::FederateInfo; +use crate::RTIRemote; +use crate::SchedulingNode; +use crate::SchedulingNodeState; struct StopGranted { _lf_rti_stop_granted_already_sent_to_federates: bool, @@ -59,16 +59,16 @@ impl Server { Server { port } } - pub fn wait_for_federates(&mut self, _f_rti: FederationRTI) { + pub fn wait_for_federates(&mut self, _f_rti: RTIRemote) { + println!("Server listening on port {}", self.port); let mut address = String::from("0.0.0.0:"); address.push_str(self.port.as_str()); let socket = TcpListener::bind(address).unwrap(); - // accept connections and process them, spawning a new thread for each one - println!("Server listening on port {}", self.port); let start_time = Arc::new(Mutex::new(StartTime::new())); let received_start_times = Arc::new((Mutex::new(false), Condvar::new())); let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); let stop_granted = Arc::new(Mutex::new(StopGranted::new())); + // Wait for connections from federates and create a thread for each. let handles = self.connect_to_federates( socket, _f_rti, @@ -78,24 +78,27 @@ impl Server { stop_granted, ); + // All federates have connected. println!("RTI: All expected federates have connected. Starting execution."); - for handle in handles { - handle.join().unwrap(); - } - - // TODO: _f_rti.set_all_federates_exited(true); - // The socket server will not continue to accept connections after all the federates // have joined. // In case some other federation's federates are trying to join the wrong // federation, need to respond. Start a separate thread to do that. // TODO: lf_thread_create(&responder_thread, respond_to_erroneous_connections, NULL); + // Wait for federate_info threads to exit. + for handle in handles { + handle.join().unwrap(); + } + + // TODO: _f_rti.set_all_federates_exited(true); + // Shutdown and close the socket so that the accept() call in // respond_to_erroneous_connections returns. That thread should then // check _f_rti->all_federates_exited and it should exit. - // TODO: drop(socket); + // TODO: shutdown(socket); + // NOTE: In all common TCP/IP stacks, there is a time period, // typically between 30 and 120 seconds, called the TIME_WAIT period, // before the port is released after this close. This is because @@ -107,26 +110,30 @@ impl Server { fn connect_to_federates( &mut self, socket: TcpListener, - mut _f_rti: FederationRTI, + mut _f_rti: RTIRemote, start_time: Arc>, received_start_times: Arc<(Mutex, Condvar)>, sent_start_time: Arc<(Mutex, Condvar)>, stop_granted: Arc>, ) -> Vec> { // TODO: Error-handling of unwrap() - let number_of_enclaves: usize = _f_rti.number_of_enclaves().try_into().unwrap(); + let number_of_enclaves: usize = _f_rti + .base() + .number_of_scheduling_nodes() + .try_into() + .unwrap(); let arc_rti = Arc::new(Mutex::new(_f_rti)); let mut handle_list: Vec> = vec![]; for _i in 0..number_of_enclaves { let cloned_rti = Arc::clone(&arc_rti); // Wait for an incoming connection request. - // The following blocks until a federate connects. + // The following blocks until a federate_info connects. for stream in socket.incoming() { match stream { Ok(mut stream) => { println!("\nNew connection: {}", stream.peer_addr().unwrap()); - // The first message from the federate should contain its ID and the federation ID. + // The first message from the federate_info should contain its ID and the federation ID. let fed_id = self.receive_and_check_fed_id_message(&mut stream, cloned_rti.clone()); // TODO: Error-handling of fed_id.try_into().unwrap() @@ -142,7 +149,7 @@ impl Server { cloned_rti.clone(), ) { - // Create a thread to communicate with the federate. + // Create a thread to communicate with the federate_info. // This has to be done after clock synchronization is finished // or that thread may end up attempting to handle incoming clock // synchronization messages. @@ -155,8 +162,8 @@ impl Server { { let mut locked_rti = cloned_rti.lock().unwrap(); // FIXME: Handle "as usize" properly. - let fed: &mut Federate = - &mut locked_rti.enclaves()[fed_id as usize]; + let fed: &mut FederateInfo = + &mut locked_rti.base().scheduling_nodes()[fed_id as usize]; fed.set_stream(stream.try_clone().unwrap()); } @@ -165,15 +172,15 @@ impl Server { // are forwarded piece by piece. let mut buffer = vec![0 as u8; 1]; - // Listen for messages from the federate. + // Listen for messages from the federate_info. loop { { let mut locked_rti = cloned_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); + let enclaves = locked_rti.base().scheduling_nodes(); // FIXME: Replace "as usize" properly. - let fed: &mut Federate = &mut enclaves[fed_id as usize]; + let fed: &mut FederateInfo = &mut enclaves[fed_id as usize]; let enclave = fed.enclave(); - if enclave.state() == FedState::NotConnected { + if enclave.state() == SchedulingNodeState::NotConnected { break; } } @@ -186,18 +193,18 @@ impl Server { ); if bytes_read < 1 { // Socket is closed - println!("RTI: Socket to federate {} is closed. Exiting the thread.", + println!("RTI: Socket to federate_info {} is closed. Exiting the thread.", fed_id); let mut locked_rti = cloned_rti.lock().unwrap(); - let enclaves = locked_rti.enclaves(); + let enclaves = locked_rti.base().scheduling_nodes(); // FIXME: Replace "as usize" properly. - let fed: &mut Federate = &mut enclaves[fed_id as usize]; - fed.enclave().set_state(FedState::NotConnected); + let fed: &mut FederateInfo = &mut enclaves[fed_id as usize]; + fed.enclave().set_state(SchedulingNodeState::NotConnected); // FIXME: We need better error handling here, but do not stop execution here. break; } println!( - "RTI: Received message type {} from federate {}.", + "RTI: Received message type {} from federate_info {}.", buffer[0], fed_id ); match MsgType::to_msg_type(buffer[0]) { @@ -273,23 +280,22 @@ impl Server { ), _ => { let mut locked_rti = cloned_rti.lock().unwrap(); - let fed: &mut Federate = - &mut locked_rti.enclaves()[fed_id as usize]; - println!("RTI received from federate {} an unrecognized TCP message type: {}.", fed.enclave().id(), buffer[0]); + let fed: &mut FederateInfo = + &mut locked_rti.base().scheduling_nodes() + [fed_id as usize]; + println!("RTI received from federate_info {} an unrecognized TCP message type: {}.", fed.enclave().id(), buffer[0]); } } } }); - // TODO: Need to set handle to federate.thread_id? + // TODO: Need to set handle to federate_info.thread_id? handle_list.push(_handle); } break; } Err(e) => { + // Try again println!("RTI failed to accept the socket. {}.", e); - /* connection failed */ - // FIXME: This should not exit on error, but rather just reject the connection. - std::process::exit(1); } } } @@ -303,10 +309,12 @@ impl Server { if clock_sync_global_status >= ClockSyncStat::ClockSyncOn { // Create the thread that performs periodic PTP clock synchronization sessions // over the UDP channel, but only if the UDP channel is open and at least one - // federate is performing runtime clock synchronization. + // federate_info is performing runtime clock synchronization. let mut clock_sync_enabled = false; - for i in 0..locked_rti.number_of_enclaves() { - if locked_rti.enclaves()[i as usize].clock_synchronization_enabled() { + for i in 0..locked_rti.base().number_of_scheduling_nodes() { + // FIXME: Handle "as usize" properly. + if locked_rti.base().scheduling_nodes()[i as usize].clock_synchronization_enabled() + { clock_sync_enabled = true; break; } @@ -324,24 +332,25 @@ impl Server { fn receive_and_check_fed_id_message( &mut self, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, ) -> i32 { - // Buffer for message ID, federate ID, and federation ID length. + // Buffer for message ID, federate_info ID, and federation ID length. let length = 1 + mem::size_of::() + 1; let mut first_buffer = vec![0 as u8; length]; - // Initialize to an invalid value. - let fed_id; - let cloned_rti = Arc::clone(&_f_rti); + // Read bytes from the socket. We need 4 bytes. // FIXME: This should not exit with error but rather should just reject the connection. NetUtil::read_from_stream_errexit(stream, &mut first_buffer, 0, ""); + // Initialize to an invalid value. + let fed_id; + // First byte received is the message type. if first_buffer[0] != MsgType::FedIds.to_byte() { if first_buffer[0] == MsgType::P2pSendingFedId.to_byte() || first_buffer[0] == MsgType::P2pTaggedMessage.to_byte() { - // The federate is trying to connect to a peer, not to the RTI. + // The federate_info is trying to connect to a peer, not to the RTI. // It has connected to the RTI instead. // FIXME: This should not happen, but apparently has been observed. // It should not happen because the peers get the port and IP address @@ -358,11 +367,11 @@ impl Server { ); return -1; } else { - // Received federate ID. + // Received federate_info ID. // FIXME: Change from_le_bytes properly. let u16_size = mem::size_of::(); fed_id = u16::from_le_bytes(first_buffer[1..(1 + u16_size)].try_into().unwrap()); - println!("RTI received federate ID: {}.", fed_id); + println!("RTI received federate_info ID: {}.", fed_id); // Read the federation ID. First read the length, which is one byte. // FIXME: Change from_le_bytes properly. @@ -372,6 +381,8 @@ impl Server { .unwrap(), ); let mut federation_id_buffer = vec![0 as u8; federation_id_length.into()]; + // Next read the actual federation ID. + // FIXME: This should not exit on error, but rather just reject the connection. NetUtil::read_from_stream_errexit( stream, &mut federation_id_buffer, @@ -393,40 +404,42 @@ impl Server { } println!("RTI received federation ID: {}.", federation_id_received); + let cloned_rti = Arc::clone(&_f_rti); let number_of_enclaves; let federation_id; { - let locked_rti = cloned_rti.lock().unwrap(); - number_of_enclaves = locked_rti.number_of_enclaves(); + let mut locked_rti = cloned_rti.lock().unwrap(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); federation_id = locked_rti.federation_id(); } // Compare the received federation ID to mine. if federation_id_received != federation_id { // Federation IDs do not match. Send back a MSG_TYPE_Reject message. println!( - "WARNING: Federate from another federation {} attempted to connect to RTI in federation {}.", + "WARNING: FederateInfo from another federation {} attempted to connect to RTI in federation {}.", federation_id_received, federation_id ); Self::send_reject(stream, ErrType::FederationIdDoesNotMatch.to_byte()); - std::process::exit(1); + return -1; } else { if i32::from(fed_id) >= number_of_enclaves { - // Federate ID is out of range. + // FederateInfo ID is out of range. println!( - "RTI received federate ID {}, which is out of range.", + "RTI received federate_info ID {}, which is out of range.", fed_id ); Self::send_reject(stream, ErrType::FederateIdOutOfRange.to_byte()); - std::process::exit(1); + return -1; } else { let mut locked_rti = cloned_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let federate: &mut Federate = &mut locked_rti.enclaves()[idx]; - let enclave = federate.enclave(); - if enclave.state() != FedState::NotConnected { - println!("RTI received duplicate federate ID: {}.", fed_id); + let federate_info: &mut FederateInfo = + &mut locked_rti.base().scheduling_nodes()[idx]; + let enclave = federate_info.enclave(); + if enclave.state() != SchedulingNodeState::NotConnected { + println!("RTI received duplicate federate_info ID: {}.", fed_id); Self::send_reject(stream, ErrType::FederateIdInUse.to_byte()); - std::process::exit(1); + return -1; } } } @@ -435,19 +448,23 @@ impl Server { federation_id_received, federation_id ); - // TODO: Assign the address information for federate. + // TODO: Assign the address information for federate_info. - // Set the federate's state as pending + // Set the federate_info's state as pending // because it is waiting for the start time to be // sent by the RTI before beginning its execution. { let mut locked_rti = cloned_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let federate: &mut Federate = &mut locked_rti.enclaves()[idx]; - let enclave: &mut Enclave = federate.enclave(); - enclave.set_state(FedState::Pending); + let federate_info: &mut FederateInfo = + &mut locked_rti.base().scheduling_nodes()[idx]; + let enclave: &mut SchedulingNode = federate_info.enclave(); + enclave.set_state(SchedulingNodeState::Pending); } - println!("RTI responding with MsgType::Ack to federate {}.", fed_id); + println!( + "RTI responding with MsgType::Ack to federate_info {}.", + fed_id + ); // Send an MsgType::Ack message. let ack_message: Vec = vec![MsgType::Ack.to_byte()]; NetUtil::write_to_stream_errexit(stream, &ack_message, fed_id, "MsgType::Ack message"); @@ -480,10 +497,10 @@ impl Server { &mut self, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, ) -> bool { println!( - "RTI waiting for MsgType::NeighborStructure from federate {}.", + "RTI waiting for MsgType::NeighborStructure from federate_info {}.", fed_id ); let cloned_rti = Arc::clone(&_f_rti); @@ -498,17 +515,18 @@ impl Server { ); if connection_info_header[0] != MsgType::NeighborStructure.to_byte() { - println!("RTI was expecting a MsgType::NeighborStructure message from federate {}. Got {} instead. Rejecting federate.", fed_id, connection_info_header[0]); + println!("RTI was expecting a MsgType::NeighborStructure message from federate_info {}. Got {} instead. Rejecting federate_info.", fed_id, connection_info_header[0]); Self::send_reject(stream, ErrType::UnexpectedMessage.to_byte()); return false; } else { let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; - let enclave: &mut Enclave = fed.enclave(); + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let enclave: &mut SchedulingNode = fed.enclave(); + // Read the number of upstream and downstream connections enclave.set_num_upstream(connection_info_header[1].into()); enclave.set_num_downstream(connection_info_header[1 + mem::size_of::()].into()); println!( - "RTI got {} upstreams and {} downstreams from federate {}.", + "RTI got {} upstreams and {} downstreams from federate_info {}.", enclave.num_upstream(), enclave.num_downstream(), fed_id @@ -580,13 +598,15 @@ impl Server { &mut self, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, ) -> bool { - // Read the MsgType::UdpPort message from the federate regardless of the status of - // clock synchronization. This message will tell the RTI whether the federate + // Read the MsgType::UdpPort message from the federate_info regardless of the status of + // clock synchronization. This message will tell the RTI whether the federate_info // is doing clock synchronization, and if it is, what port to use for UDP. - println!("RTI waiting for MsgType::UdpPort from federate {}.", fed_id); - let cloned_rti = Arc::clone(&_f_rti); + println!( + "RTI waiting for MsgType::UdpPort from federate_info {}.", + fed_id + ); let mut response = vec![0 as u8; 1 + mem::size_of::()]; NetUtil::read_from_stream_errexit( stream, @@ -595,10 +615,11 @@ impl Server { "MsgType::UdpPort message", ); if response[0] != MsgType::UdpPort.to_byte() { - println!("RTI was expecting a MsgType::UdpPort message from federate {}. Got {} instead. Rejecting federate.", fed_id, response[0]); + println!("RTI was expecting a MsgType::UdpPort message from federate_info {}. Got {} instead. Rejecting federate_info.", fed_id, response[0]); Self::send_reject(stream, ErrType::UnexpectedMessage.to_byte()); return false; } else { + let cloned_rti = Arc::clone(&_f_rti); let clock_sync_global_status; { let locked_rti = cloned_rti.lock().unwrap(); @@ -612,21 +633,21 @@ impl Server { u16::from_le_bytes(response[1..3].try_into().unwrap()); println!( - "RTI got MsgType::UdpPort {} from federate {}.", + "RTI got MsgType::UdpPort {} from federate_info {}.", federate_udp_port_number, fed_id ); // A port number of UINT16_MAX means initial clock sync should not be performed. if federate_udp_port_number != u16::MAX { // TODO: Implement this if body println!( - "RTI finished initial clock synchronization with federate {}.", + "RTI finished initial clock synchronization with federate_info {}.", fed_id ); } if clock_sync_global_status >= ClockSyncStat::ClockSyncOn { // If no runtime clock sync, no need to set up the UDP port. if federate_udp_port_number > 0 { - // Initialize the UDP_addr field of the federate struct + // Initialize the UDP_addr field of the federate_info struct // TODO: Handle below assignments // fed.UDP_addr.sin_family = AF_INET; // fed.UDP_addr.sin_port = htons(federate_udp_port_number); @@ -636,7 +657,7 @@ impl Server { // Disable clock sync after initial round. let mut locked_rti = cloned_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; fed.set_clock_synchronization_enabled(false); } } else { @@ -646,7 +667,7 @@ impl Server { // Note that the federates are still going to send a MSG_TYPE_UdpPort message but with a payload (port) of -1. let mut locked_rti = cloned_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; fed.set_clock_synchronization_enabled(false); } } @@ -656,7 +677,7 @@ impl Server { fn handle_timestamp( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, received_start_times: Arc<(Mutex, Condvar)>, sent_start_time: Arc<(Mutex, Condvar)>, @@ -664,7 +685,7 @@ impl Server { let mut buffer = vec![0 as u8; mem::size_of::()]; let bytes_read = NetUtil::read_from_stream(stream, &mut buffer, fed_id); if bytes_read < mem::size_of::() { - println!("ERROR reading timestamp from federate {}.", fed_id); + println!("ERROR reading timestamp from federate_info {}.", fed_id); } // FIXME: Check whether swap_bytes_if_big_endian_int64() is implemented correctly @@ -675,7 +696,7 @@ impl Server { let number_of_enclaves; { let mut locked_rti = _f_rti.lock().unwrap(); - number_of_enclaves = locked_rti.number_of_enclaves(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); let max_start_time = locked_rti.max_start_time(); num_feds_proposed_start = locked_rti.num_feds_proposed_start(); num_feds_proposed_start += 1; @@ -708,7 +729,7 @@ impl Server { } } - // Send back to the federate the maximum time plus an offset on a Timestamp + // Send back to the federate_info the maximum time plus an offset on a Timestamp // message. let mut start_time_buffer = vec![0 as u8; MSG_TYPE_TIMESTAMP_LENGTH]; start_time_buffer[0] = MsgType::Timestamp.to_byte(); @@ -726,24 +747,27 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; let stream = my_fed.stream().as_ref().unwrap(); let bytes_written = NetUtil::write_to_stream(stream, &start_time_buffer, fed_id); if bytes_written < MSG_TYPE_TIMESTAMP_LENGTH { - println!("Failed to send the starting time to federate {}.", fed_id); + println!( + "Failed to send the starting time to federate_info {}.", + fed_id + ); } - // Update state for the federate to indicate that the MSG_TYPE_Timestamp + // Update state for the federate_info to indicate that the MSG_TYPE_Timestamp // message has been sent. That MSG_TYPE_Timestamp message grants time advance to - // the federate to the start time. - my_fed.enclave().set_state(FedState::Granted); + // the federate_info to the start time. + my_fed.enclave().set_state(SchedulingNodeState::Granted); let sent_start_time_notifier = Arc::clone(&sent_start_time); let (lock, condvar) = &*sent_start_time_notifier; let mut notified = lock.lock().unwrap(); *notified = true; condvar.notify_all(); println!( - "RTI sent start time {} to federate {}.", + "RTI sent start time {} to federate_info {}.", locked_start_time.start_time(), my_fed.enclave().id() ); @@ -752,7 +776,7 @@ impl Server { fn handle_federate_resign( fed_id: u16, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -761,15 +785,17 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut Federate = &mut locked_rti.enclaves()[idx]; - my_fed.enclave().set_state(FedState::NotConnected); + let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + my_fed + .enclave() + .set_state(SchedulingNodeState::NotConnected); } - // Indicate that there will no further events from this federate. + // Indicate that there will no further events from this federate_info. { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; my_fed.enclave().set_next_event(Tag::forever_tag()); } @@ -780,24 +806,24 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; my_fed .stream() .as_ref() .unwrap() .shutdown(Shutdown::Both) .unwrap(); - - println!("Federate {} has resigned.", fed_id); } + println!("FederateInfo {} has resigned.", fed_id); + // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. let number_of_enclaves; { - let locked_rti = _f_rti.lock().unwrap(); - number_of_enclaves = locked_rti.number_of_enclaves(); + let mut locked_rti = _f_rti.lock().unwrap(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } let start_time_value; { @@ -806,7 +832,7 @@ impl Server { } // FIXME: Handle unwrap properly. let mut visited = vec![false as bool; number_of_enclaves.try_into().unwrap()]; // Initializes to 0. - Enclave::notify_downstream_advance_grant_if_safe( + SchedulingNode::notify_downstream_advance_grant_if_safe( _f_rti.clone(), fed_id, number_of_enclaves, @@ -820,7 +846,7 @@ impl Server { message_type: u8, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -877,7 +903,7 @@ impl Server { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } - println!("RTI received message from federate {} for federate {} port {} with intended tag ({}, {}). Forwarding.", + println!("RTI received message from federate_info {} for federate_info {} port {} with intended tag ({}, {}). Forwarding.", fed_id, federate_id, reactor_port_id, intended_tag.time() - start_time_value, intended_tag.microstep()); @@ -895,14 +921,14 @@ impl Server { // issue a TAG before this message has been forwarded. let mut locked_rti = _f_rti.lock().unwrap(); - // If the destination federate is no longer connected, issue a warning + // If the destination federate_info is no longer connected, issue a warning // and return. let idx: usize = federate_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; let enclave = fed.enclave(); - if enclave.state() == FedState::NotConnected { + if enclave.state() == SchedulingNodeState::NotConnected { println!( - "RTI: Destination federate {} is no longer connected. Dropping message.", + "RTI: Destination federate_info {} is no longer connected. Dropping message.", federate_id ); println!("Fed status: next_event ({}, {}), completed ({}, {}), last_granted ({}, {}), last_provisionally_granted ({}, {}).", @@ -922,29 +948,29 @@ impl Server { } println!( - "RTI forwarding message to port {} of federate {} of length {}.", + "RTI forwarding message to port {} of federate_info {} of length {}.", reactor_port_id, federate_id, length ); - // Record this in-transit message in federate's in-transit message queue. + // Record this in-transit message in federate_info's in-transit message queue. if Tag::lf_tag_compare(&completed, &intended_tag) < 0 { - // Add a record of this message to the list of in-transit messages to this federate. + // Add a record of this message to the list of in-transit messages to this federate_info. let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; MessageRecord::add_in_transit_message_record( fed.in_transit_message_tags(), intended_tag.clone(), ); println!( - "RTI: Adding a message with tag ({}, {}) to the list of in-transit messages for federate {}.", + "RTI: Adding a message with tag ({}, {}) to the list of in-transit messages for federate_info {}.", intended_tag.time() - start_time_value, intended_tag.microstep(), federate_id ); } else { println!( - "RTI: Federate {} has already completed tag ({}, {}), but there is an in-transit message with tag ({}, {}) from federate {}. This is going to cause an STP violation under centralized coordination.", + "RTI: FederateInfo {} has already completed tag ({}, {}), but there is an in-transit message with tag ({}, {}) from federate_info {}. This is going to cause an STP violation under centralized coordination.", federate_id, completed.time() - start_time_value, completed.microstep(), @@ -952,16 +978,16 @@ impl Server { intended_tag.microstep(), fed_id ); - // FIXME: Drop the federate? + // FIXME: Drop the federate_info? } - // Need to make sure that the destination federate's thread has already + // Need to make sure that the destination federate_info's thread has already // sent the starting MsgType::Timestamp message. { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; - while fed.enclave().state() == FedState::Pending { + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + while fed.enclave().state() == SchedulingNodeState::Pending { // Need to wait here. let (lock, condvar) = &*sent_start_time; let mut notified = lock.lock().unwrap(); @@ -1011,7 +1037,7 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; // FIXME: Handle unwrap properly. let destination_stream = fed.stream().as_ref().unwrap(); NetUtil::write_to_stream_errexit( @@ -1033,7 +1059,7 @@ impl Server { } fn update_federate_next_event_tag_locked( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, mut next_event_tag: Tag, start_time: Instant, @@ -1043,7 +1069,7 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; min_in_transit_tag = MessageRecord::get_minimum_in_transit_message_tag( fed.in_transit_message_tags(), start_time, @@ -1052,7 +1078,7 @@ impl Server { if Tag::lf_tag_compare(&min_in_transit_tag, &next_event_tag) < 0 { next_event_tag = min_in_transit_tag.clone(); } - Enclave::update_enclave_next_event_tag_locked( + SchedulingNode::update_scheduling_node_next_event_tag_locked( _f_rti, fed_id, next_event_tag, @@ -1064,7 +1090,7 @@ impl Server { fn handle_next_event_tag( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -1082,7 +1108,7 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; enclave_id = fed.enclave().id(); } let intended_tag = NetUtil::extract_tag( @@ -1096,7 +1122,7 @@ impl Server { start_time_value = locked_start_time.start_time(); } println!( - "RTI received from federate {} the Next Event Tag (NET) ({},{})", + "RTI received from federate_info {} the Next Event Tag (NET) ({},{})", enclave_id, intended_tag.time() - start_time_value, intended_tag.microstep() @@ -1113,7 +1139,7 @@ impl Server { fn handle_logical_tag_complete( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -1131,15 +1157,15 @@ impl Server { ); let number_of_enclaves; { - let locked_rti = _f_rti.lock().unwrap(); - number_of_enclaves = locked_rti.number_of_enclaves(); + let mut locked_rti = _f_rti.lock().unwrap(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } let start_time_value; { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } - Enclave::logical_tag_complete( + SchedulingNode::logical_tag_complete( _f_rti.clone(), fed_id, number_of_enclaves, @@ -1152,7 +1178,7 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; let in_transit_message_tags = fed.in_transit_message_tags(); MessageRecord::clean_in_transit_message_record_up_to_tag( in_transit_message_tags, @@ -1165,11 +1191,11 @@ impl Server { fn handle_stop_request_message( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, stop_granted: Arc>, ) { - println!("RTI handling stop_request from federate {}.", fed_id); + println!("RTI handling stop_request from federate_info {}.", fed_id); let mut header_buffer = vec![0 as u8; MSG_TYPE_STOP_REQUEST_LENGTH - 1]; NetUtil::read_from_stream_errexit( @@ -1184,17 +1210,17 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; // Check whether we have already received a stop_tag - // from this federate + // from this federate_info if fed.requested_stop() { // Ignore this request return; } } - // Extract the proposed stop tag for the federate + // Extract the proposed stop tag for the federate_info let proposed_stop_tag = NetUtil::extract_tag( header_buffer[0..(mem::size_of::() + mem::size_of::())] .try_into() @@ -1209,19 +1235,21 @@ impl Server { } { let mut locked_rti = _f_rti.lock().unwrap(); - if Tag::lf_tag_compare(&proposed_stop_tag, &locked_rti.max_stop_tag()) > 0 { - locked_rti.set_max_stop_tag(proposed_stop_tag.clone()); + if Tag::lf_tag_compare(&proposed_stop_tag, &locked_rti.base().max_stop_tag()) > 0 { + locked_rti + .base() + .set_max_stop_tag(proposed_stop_tag.clone()); } } println!( - "RTI received from federate {} a MsgType::StopRequest message with tag ({},{}).", + "RTI received from federate_info {} a MsgType::StopRequest message with tag ({},{}).", fed_id, proposed_stop_tag.time() - start_time_value, proposed_stop_tag.microstep() ); - // If this federate has not already asked + // If this federate_info has not already asked // for a stop, add it to the tally. Self::mark_federate_requesting_stop( fed_id, @@ -1231,8 +1259,10 @@ impl Server { ); { - let locked_rti = _f_rti.lock().unwrap(); - if locked_rti.num_enclaves_handling_stop() == locked_rti.number_of_enclaves() { + let mut locked_rti = _f_rti.lock().unwrap(); + if locked_rti.base().num_scheduling_nodes_handling_stop() + == locked_rti.base().number_of_scheduling_nodes() + { // We now have information about the stop time of all // federates. This is extremely unlikely, but it can occur // all federates call lf_request_stop() at the same tag. @@ -1243,11 +1273,11 @@ impl Server { // also issued a stop request. let mut stop_request_buffer = vec![0 as u8; MSG_TYPE_STOP_REQUEST_LENGTH]; { - let locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.lock().unwrap(); Self::encode_stop_request( &mut stop_request_buffer, - locked_rti.max_stop_tag().time(), - locked_rti.max_stop_tag().microstep(), + locked_rti.base().max_stop_tag().time(), + locked_rti.base().max_stop_tag().microstep(), ); } @@ -1262,15 +1292,15 @@ impl Server { } let number_of_enclaves; { - let locked_rti = _f_rti.lock().unwrap(); - number_of_enclaves = locked_rti.number_of_enclaves(); + let mut locked_rti = _f_rti.lock().unwrap(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } for i in 0..number_of_enclaves { let mut locked_rti = _f_rti.lock().unwrap(); // FIXME: Handle usize properly. - let f: &mut Federate = &mut locked_rti.enclaves()[i as usize]; + let f: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[i as usize]; if f.e().id() != fed_id && f.requested_stop() == false { - if f.e().state() == FedState::NotConnected { + if f.e().state() == SchedulingNodeState::NotConnected { Self::mark_federate_requesting_stop( f.e().id(), _f_rti.clone(), @@ -1290,51 +1320,53 @@ impl Server { } } { - let locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.lock().unwrap(); println!( "RTI forwarded to federates MsgType::StopRequest with tag ({}, {}).", - locked_rti.max_stop_tag().time() - start_time_value, - locked_rti.max_stop_tag().microstep() + locked_rti.base().max_stop_tag().time() - start_time_value, + locked_rti.base().max_stop_tag().microstep() ); } } fn mark_federate_requesting_stop( fed_id: u16, - _f_rti: Arc>, + _f_rti: Arc>, stop_granted: Arc>, start_time_value: Instant, ) { let mut num_enclaves_handling_stop; { - let locked_rti = _f_rti.lock().unwrap(); - num_enclaves_handling_stop = locked_rti.num_enclaves_handling_stop(); + let mut locked_rti = _f_rti.lock().unwrap(); + num_enclaves_handling_stop = locked_rti.base().num_scheduling_nodes_handling_stop(); } { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; if !fed.requested_stop() { - // Assume that the federate + // Assume that the federate_info // has requested stop - locked_rti.set_num_enclaves_handling_stop(num_enclaves_handling_stop + 1); + locked_rti + .base() + .set_num_scheduling_nodes_handling_stop(num_enclaves_handling_stop + 1); } } { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; if !fed.requested_stop() { - // Assume that the federate + // Assume that the federate_info // has requested stop fed.set_requested_stop(true); } } let number_of_enclaves; { - let locked_rti = _f_rti.lock().unwrap(); - num_enclaves_handling_stop = locked_rti.num_enclaves_handling_stop(); - number_of_enclaves = locked_rti.number_of_enclaves(); + let mut locked_rti = _f_rti.lock().unwrap(); + num_enclaves_handling_stop = locked_rti.base().num_scheduling_nodes_handling_stop(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } if num_enclaves_handling_stop == number_of_enclaves { // We now have information about the stop time of all @@ -1351,12 +1383,12 @@ impl Server { * Once the RTI has seen proposed tags from all connected federates, * it will broadcast a MSG_TYPE_StopGranted carrying the _RTI.max_stop_tag. * This function also checks the most recently received NET from - * each federate and resets that be no greater than the _RTI.max_stop_tag. + * each federate_info and resets that be no greater than the _RTI.max_stop_tag. * * This function assumes the caller holds the _RTI.rti_mutex lock. */ fn _lf_rti_broadcast_stop_time_to_federates_locked( - _f_rti: Arc>, + _f_rti: Arc>, stop_granted: Arc>, start_time_value: Instant, ) { @@ -1369,18 +1401,18 @@ impl Server { // Reply with a stop granted to all federates let mut outgoing_buffer = vec![0 as u8; MSG_TYPE_STOP_GRANTED_LENGTH]; { - let locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.lock().unwrap(); Self::encode_stop_granted( &mut outgoing_buffer, - locked_rti.max_stop_tag().time(), - locked_rti.max_stop_tag().microstep(), + locked_rti.base().max_stop_tag().time(), + locked_rti.base().max_stop_tag().microstep(), ); } let number_of_enclaves; { - let locked_rti = _f_rti.lock().unwrap(); - number_of_enclaves = locked_rti.number_of_enclaves(); + let mut locked_rti = _f_rti.lock().unwrap(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } // Iterate over federates and send each the message. for i in 0..number_of_enclaves { @@ -1388,18 +1420,18 @@ impl Server { let max_stop_tag; { let mut locked_rti = _f_rti.lock().unwrap(); - max_stop_tag = locked_rti.max_stop_tag(); + max_stop_tag = locked_rti.base().max_stop_tag(); // FIXME: Handle usize properly. - let fed: &Federate = &locked_rti.enclaves()[i as usize]; + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[i as usize]; next_event = fed.e().next_event(); - if fed.e().state() == FedState::NotConnected { + if fed.e().state() == SchedulingNodeState::NotConnected { continue; } } { let mut locked_rti = _f_rti.lock().unwrap(); // FIXME: Handle usize properly. - let fed: &mut Federate = &mut locked_rti.enclaves()[i as usize]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[i as usize]; if Tag::lf_tag_compare(&next_event, &max_stop_tag) >= 0 { // Need the next_event to be no greater than the stop tag. fed.enclave().set_next_event(max_stop_tag); @@ -1408,7 +1440,7 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); // FIXME: Handle usize properly. - let fed: &mut Federate = &mut locked_rti.enclaves()[i as usize]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[i as usize]; // FIXME: Handle unwrap properly. let stream = fed.stream().as_ref().unwrap(); NetUtil::write_to_stream_errexit( @@ -1421,11 +1453,11 @@ impl Server { } { - let locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.lock().unwrap(); println!( "RTI sent to federates MsgType::StopGranted with tag ({}, {}).", - locked_rti.max_stop_tag().time() - start_time_value, - locked_rti.max_stop_tag().microstep() + locked_rti.base().max_stop_tag().time() - start_time_value, + locked_rti.base().max_stop_tag().microstep() ); } { @@ -1459,7 +1491,7 @@ impl Server { fn handle_stop_request_reply( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, stop_granted: Arc>, ) { @@ -1483,22 +1515,22 @@ impl Server { start_time_value = locked_start_time.start_time(); } println!( - "RTI received from federate {} STOP reply tag ({}, {}).", + "RTI received from federate_info {} STOP reply tag ({}, {}).", fed_id, federate_stop_tag.time() - start_time_value, federate_stop_tag.microstep() ); // Acquire the mutex lock so that we can change the state of the RTI - // If the federate has not requested stop before, count the reply + // If the federate_info has not requested stop before, count the reply let max_stop_tag; { - let locked_rti = _f_rti.lock().unwrap(); - max_stop_tag = locked_rti.max_stop_tag(); + let mut locked_rti = _f_rti.lock().unwrap(); + max_stop_tag = locked_rti.base().max_stop_tag(); } if Tag::lf_tag_compare(&federate_stop_tag, &max_stop_tag) > 0 { let mut locked_rti = _f_rti.lock().unwrap(); - locked_rti.set_max_stop_tag(federate_stop_tag); + locked_rti.base().set_max_stop_tag(federate_stop_tag); } Self::mark_federate_requesting_stop( fed_id, @@ -1512,7 +1544,7 @@ impl Server { buffer: &Vec, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -1555,14 +1587,14 @@ impl Server { { let mut locked_rti = _f_rti.lock().unwrap(); - // If the destination federate is no longer connected, issue a warning + // If the destination federate_info is no longer connected, issue a warning // and return. let idx: usize = federate_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; let enclave = fed.enclave(); - if enclave.state() == FedState::NotConnected { + if enclave.state() == SchedulingNodeState::NotConnected { println!( - "RTI: Destination federate {} is no longer connected. Dropping message.", + "RTI: Destination federate_info {} is no longer connected. Dropping message.", federate_id ); println!("Fed status: next_event ({}, {}), completed ({}, {}), last_granted ({}, {}), last_provisionally_granted ({}, {}).", @@ -1579,17 +1611,17 @@ impl Server { } } println!( - "RTI forwarding port absent message for port {} to federate {}.", + "RTI forwarding port absent message for port {} to federate_info {}.", reactor_port_id, federate_id ); - // Need to make sure that the destination federate's thread has already + // Need to make sure that the destination federate_info's thread has already // sent the starting MsgType::Timestamp message. { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut Federate = &mut locked_rti.enclaves()[idx]; - while fed.enclave().state() == FedState::Pending { + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + while fed.enclave().state() == SchedulingNodeState::Pending { // Need to wait here. let (lock, condvar) = &*sent_start_time; let mut notified = lock.lock().unwrap(); diff --git a/rust/rti/src/tag.rs b/rust/rti/src/tag.rs index a3d5f7f..35440ae 100644 --- a/rust/rti/src/tag.rs +++ b/rust/rti/src/tag.rs @@ -31,7 +31,9 @@ pub type Interval = std::option::Option; */ pub type Microstep = u32; -const NEVER: i64 = i64::MIN; +pub const NEVER: i64 = i64::MIN; +pub const FOREVER: i64 = i64::MAX; +pub const FOREVER_MICROSTEP: u32 = u32::MAX; pub struct StartTime { start_time: Instant, @@ -54,7 +56,7 @@ impl StartTime { /** * A tag is a time, microstep pair. */ -#[derive(Hash, Eq, PartialEq, Clone)] +#[derive(Hash, Eq, PartialEq, Clone, Debug)] pub struct Tag { time: Instant, microstep: Microstep, @@ -67,17 +69,24 @@ impl Tag { Tag { time, microstep } } + pub fn zero_tag() -> Tag { + Tag { + time: 0, + microstep: 0, + } + } + pub fn never_tag() -> Tag { Tag { - time: i64::MIN, + time: NEVER, microstep: 0, } } pub fn forever_tag() -> Tag { Tag { - time: i64::MAX, - microstep: u32::MAX, + time: FOREVER, + microstep: FOREVER_MICROSTEP, } } @@ -119,14 +128,13 @@ impl Tag { } pub fn lf_delay_tag(tag: &Tag, interval: Interval) -> Tag { - if tag.time() == i64::MIN || interval < Some(0) { - // println!( - // "tag.time() == i64::MIN || interval < Some(0), (interval, time) = ({:?},{})", - // interval, - // tag.time() - // ); + if tag.time() == NEVER || interval < Some(0) { return tag.clone(); } + // Note that overflow in C is undefined for signed variables. + if tag.time() >= FOREVER - interval.unwrap() { + return Tag::forever_tag(); // Overflow. + } let mut result = tag.clone(); if interval == Some(0) { // Note that unsigned variables will wrap on overflow. @@ -139,38 +147,25 @@ impl Tag { // result.microstep() // ); } else { - // Note that overflow in C is undefined for signed variables. - if i64::MAX - interval.unwrap() < result.time() { - result.set_time(i64::MAX); - // println!( - // "i64::MAX - interval.unwrap() < result.time() (time, microstep) = ({},{})", - // result.time(), - // result.microstep() - // ); - } else { - // FIXME: Handle unwrap() properly. - result.set_time(result.time() + interval.unwrap()); - println!("result.set_time(result.time() + interval.unwrap()), (time, microstep) = ({},{})", result.time(), result.microstep()); - } + // FIXME: Handle unwrap() properly. + result.set_time(result.time() + interval.unwrap()); result.set_microstep(0); } - result } pub fn lf_delay_strict(tag: &Tag, interval: Interval) -> Tag { let mut result = Self::lf_delay_tag(tag, interval); if interval != Some(0) - && interval != Some(i64::MIN) - && interval != Some(i64::MAX) - && result.time() != i64::MIN - && result.time() != i64::MAX + && interval != Some(NEVER) + && interval != Some(FOREVER) + && result.time() != NEVER + && result.time() != FOREVER { // println!("interval={:?}, result time={}", interval, result.time()); result.set_time(result.time() - 1); result.set_microstep(u32::MAX); } - // println!( // "(time, microstep) = ({},{})", // result.time(), @@ -178,4 +173,24 @@ impl Tag { // ); result } + + pub fn lf_tag_add(a: &Tag, b: &Tag) -> Tag { + if a.time() == NEVER || b.time() == NEVER { + return Tag::never_tag(); + } + if a.time() == FOREVER || b.time() == FOREVER { + return Tag::forever_tag(); + } + let result = Tag::new(a.time() + b.time(), a.microstep() + b.microstep()); + if result.microstep() < a.microstep() { + return Tag::forever_tag(); + } + if result.time() < a.time() && b.time() > 0 { + return Tag::forever_tag(); + } + if result.time() > a.time() && b.time() < 0 { + return Tag::never_tag(); + } + result + } }