diff --git a/rust/rti/src/net_common.rs b/rust/rti/src/net_common.rs index 9eba218..701a8ac 100644 --- a/rust/rti/src/net_common.rs +++ b/rust/rti/src/net_common.rs @@ -95,7 +95,7 @@ pub enum MsgType { NextEventTag, TagAdvanceGrant, PropositionalTagAdvanceGrant, - LogicalTagComplete, + LatestTagComplete, StopRequest, StopRequestReply, StopGranted, @@ -120,7 +120,7 @@ impl MsgType { MsgType::NextEventTag => 6, MsgType::TagAdvanceGrant => 7, MsgType::PropositionalTagAdvanceGrant => 8, - MsgType::LogicalTagComplete => 9, + MsgType::LatestTagComplete => 9, MsgType::StopRequest => 10, MsgType::StopRequestReply => 11, MsgType::StopGranted => 12, @@ -142,7 +142,7 @@ impl MsgType { 5 => MsgType::TaggedMessage, 6 => MsgType::NextEventTag, 8 => MsgType::PropositionalTagAdvanceGrant, - 9 => MsgType::LogicalTagComplete, + 9 => MsgType::LatestTagComplete, 10 => MsgType::StopRequest, 11 => MsgType::StopRequestReply, 12 => MsgType::StopGranted, diff --git a/rust/rti/src/net_util.rs b/rust/rti/src/net_util.rs index b0ea568..13af183 100644 --- a/rust/rti/src/net_util.rs +++ b/rust/rti/src/net_util.rs @@ -15,7 +15,7 @@ use crate::tag::Tag; pub struct NetUtil {} impl NetUtil { - pub fn read_from_stream_errexit( + pub fn read_from_socket_fail_on_error( stream: &mut TcpStream, buffer: &mut Vec, fed_id: u16, @@ -57,7 +57,7 @@ impl NetUtil { bytes_read } - pub fn write_to_stream_errexit( + pub fn write_to_socket_fail_on_error( mut stream: &TcpStream, buffer: &Vec, fed_id: u16, diff --git a/rust/rti/src/rti_common.rs b/rust/rti/src/rti_common.rs index a52d428..5f68b88 100644 --- a/rust/rti/src/rti_common.rs +++ b/rust/rti/src/rti_common.rs @@ -8,11 +8,10 @@ use crate::net_util::NetUtil; * @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn) * @author Chanhee Lee (chanheel@asu.edu) * @author Hokeun Kim (hokeun@asu.edu) - * @copyright (c) 2020-2024, The University of California at Berkeley - * License in [BSD 2-clause](..) - * @brief Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs. - * This file extends enclave.h with RTI features that are specific to federations and are not - * used by scheduling enclaves. + * @copyright (c) 2020-2023, The University of California at Berkeley + * License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md) + * @brief Common declarations for runtime infrastructure (RTI) for scheduling enclaves + * and distributed Lingua Franca programs. */ use crate::rti_remote::RTIRemote; use crate::tag; @@ -223,6 +222,13 @@ impl SchedulingNode { self.flags = flags; } + /** + * @brief Update the next event tag of an scheduling node. + * + * This will notify downstream scheduling nodes with a TAG or PTAG if appropriate. + * + * This function assumes that the caller is holding the RTI mutex. + */ pub fn update_scheduling_node_next_event_tag_locked( _f_rti: Arc>, fed_id: u16, @@ -259,6 +265,12 @@ impl SchedulingNode { start_time, sent_start_time.clone(), ); + } else { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = fed_id.into(); + let fed = &mut locked_rti.base().scheduling_nodes()[idx]; + let e = fed.enclave(); + e.set_last_granted(next_event_tag.clone()); } // Check downstream enclaves to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep @@ -274,6 +286,13 @@ impl SchedulingNode { ); } + /** + * @brief Either send to a federate or unblock an enclave to give it a tag. + * This function requires two different implementations, one for enclaves + * and one for federates. + * + * This assumes the caller holds the RTI mutex. + */ fn notify_advance_grant_if_safe( _f_rti: Arc>, fed_id: u16, @@ -304,6 +323,32 @@ impl SchedulingNode { } } + /** + * Determine whether the specified scheduling node is eligible for a tag advance grant, + * (TAG) and, if so, return the details. This is called upon receiving a LTC, NET + * or resign from an upstream node. + * + * This function calculates the minimum M over + * all upstream scheduling nodes of the "after" delay plus the most recently + * received LTC from that node. If M is greater than the + * most recent TAG to e or greater than or equal to the most + * recent PTAG, then return TAG(M). + * + * If the above conditions do not result in returning a TAG, then find the + * minimum M of the earliest possible future message from upstream federates. + * This is calculated by transitively looking at the most recently received + * NET calls from upstream scheduling nodes. + * If M is greater than the NET of e or the most recent PTAG to e, then + * return a TAG with tag equal to the NET of e or the PTAG. + * If M is equal to the NET of the federate, then return PTAG(M). + * + * This should be called whenever an immediately upstream federate sends to + * the RTI an LTC (latest tag complete), or when a transitive upstream + * federate sends a NET (Next Event Tag) message. + * It is also called when an upstream federate resigns from the federation. + * + * This function assumes that the caller holds the RTI mutex. + */ fn tag_advance_grant_if_safe( _f_rti: Arc>, fed_id: u16, @@ -348,7 +393,7 @@ impl SchedulingNode { ); } else { println!( - "Minimum upstream LTC for federate/enclave {} is ({},{}) (adjusted by after delay).\nWARNING!!! min_upstream_completed.time() < start_time", + "[tag_advance_grant_if_safe:396, federate/enclave {}] WARNING!!! min_upstream_completed.time({}) < start_time({})", e.id(), // FIXME: Check the below calculation min_upstream_completed.time(), // - start_time, @@ -372,6 +417,9 @@ impl SchedulingNode { // or federate (which includes any after delays on the connections). let t_d = Self::earliest_future_incoming_message_tag(_f_rti.clone(), fed_id as u16, start_time); + // Non-ZDC version of the above. This is a tag that must be strictly greater than + // that of the next granted PTAG. + let t_d_strict = Self::eimt_strict(_f_rti.clone(), fed_id as u16, start_time); if t_d.time() >= start_time { println!( @@ -381,16 +429,19 @@ impl SchedulingNode { t_d.microstep() ); } else { - println!(" t_d.time < start_time ({},{}", t_d.time(), start_time); + println!( + "[tag_advance_grant_if_safe:432] WARNING!!! t_d.time < start_time ({},{}", + // FIXME: Check the below calculation + t_d.time(), // - start_time, + start_time + ); } // Given an EIMT (earliest incoming message tag) there are these possible scenarios: // 1) The EIMT is greater than the NET we want to advance to. Grant a TAG. - // 2) The EIMT is equal to the NET and the federate is part of a zero-delay cycle (ZDC). - // 3) The EIMT is equal to the NET and the federate is not part of a ZDC. - // 4) The EIMT is less than the NET - // In (1) we can give a TAG to NET. In (2) we can give a PTAG. - // In (3) and (4), we wait for further updates from upstream federates. + // 2) The EIMT is equal to the NET and the strict EIMT is greater than the net + // and the federate is part of a zero-delay cycle (ZDC). Grant a PTAG. + // 3) Otherwise, grant nothing and wait for further updates. let next_event; let last_provisionally_granted; let last_granted; @@ -406,6 +457,7 @@ impl SchedulingNode { // Scenario (1) above if Tag::lf_tag_compare(&t_d, &next_event) > 0 // EIMT greater than NET + && Tag::lf_tag_compare(&next_event, &Tag::never_tag()) > 0 // NET is not NEVER_TAG && Tag::lf_tag_compare(&t_d, &last_provisionally_granted) >= 0 // The grant is not redundant // (equal is important to override any previous // PTAGs). @@ -414,33 +466,234 @@ impl SchedulingNode { { // No upstream node can send events that will be received with a tag less than or equal to // e->next_event, so it is safe to send a TAG. - println!("RTI: Earliest upstream message time for fed/encl {} is ({},{})(adjusted by after delay). Granting tag advance (TAG) for ({},{})", + if next_event.time() >= start_time { + println!("RTI: Earliest upstream message time for fed/encl {} is ({},{})(adjusted by after delay). Granting tag advance (TAG) for ({},{})", fed_id, t_d.time() - start_time, t_d.microstep(), - next_event.time(), // TODO: - start_time, + next_event.time() - start_time, next_event.microstep()); + } else { + println!("[tag_advance_grant_if_safe:471] WARNING!!! t_d.time({}) or next_event.time({}) < start_time({})", + // FIXME: Check the below calculation + t_d.time(), // - start_time, + next_event.time(), // - start_time, + start_time); + } result.set_tag(next_event); } else if - // Scenario (2) or (3) above - Tag::lf_tag_compare(&t_d, &next_event) == 0 // EIMT equal to NET - && Self::is_in_zero_delay_cycle(_f_rti.clone(), fed_id) // The node is part of a ZDC + // Scenario (2) above + Tag::lf_tag_compare(&t_d, &next_event) == 0 // EIMT equal to NET + && Self::is_in_zero_delay_cycle(_f_rti.clone(), fed_id) // The node is part of a ZDC + && Tag::lf_tag_compare(&t_d_strict, &next_event) > 0 // The strict EIMT is greater than the NET && Tag::lf_tag_compare(&t_d, &last_provisionally_granted) > 0 // The grant is not redundant // The grant is not redundant. && Tag::lf_tag_compare(&t_d, &last_granted) > 0 { // Some upstream node may send an event that has the same tag as this node's next event, // so we can only grant a PTAG. - println!("RTI: Earliest upstream message time for fed/encl {} is ({},{})(adjusted by after delay). Granting provisional tag advance (PTAG) for ({},{})", + if t_d.time() >= start_time && next_event.time() >= start_time { + println!("RTI: Earliest upstream message time for fed/encl {} is ({},{})(adjusted by after delay). Granting provisional tag advance (PTAG) for ({},{})", fed_id, t_d.time() - start_time, t_d.microstep(), next_event.time() - start_time, next_event.microstep()); + } else { + println!("[tag_advance_grant_if_safe:492] WARNING!!! next_event.time({}) or t_d.time({}) < start_time({})", + // FIXME: Check the below calculation + t_d.time(), // - start_time, + next_event.time(), // - start_time, + start_time); + } result.set_tag(next_event); result.set_provisional(true); } result } + /** + * Given a node (enclave or federate), find the tag of the earliest possible incoming + * message (EIMT) from upstream enclaves or federates, which will be the smallest upstream NET + * plus the least delay. This could be NEVER_TAG if the RTI has not seen a NET from some + * upstream node. + */ + fn earliest_future_incoming_message_tag( + _f_rti: Arc>, + fed_id: u16, + start_time: Instant, + ) -> Tag { + // First, we need to find the shortest path (minimum delay) path to each upstream node + // and then find the minimum of the node's recorded NET plus the minimum path delay. + // Update the shortest paths, if necessary. + let is_first_time; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + is_first_time = scheduling_nodes[idx].enclave().min_delays().len() == 0; + } + if is_first_time { + Self::update_min_delays_upstream(_f_rti.clone(), fed_id); + } + + // Next, find the tag of the earliest possible incoming message from upstream enclaves or + // federates, which will be the smallest upstream NET plus the least delay. + // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. + let mut t_d = Tag::forever_tag(); + let num_min_delays; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let e = fed.e(); + num_min_delays = e.num_min_delays(); + } + for i in 0..num_min_delays { + let upstream_id; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &FederateInfo = &enclaves[idx]; + let e = fed.e(); + upstream_id = e.min_delays[i as usize].id() as usize; + } + let upstream_next_event; + { + // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let fed: &mut FederateInfo = &mut enclaves[upstream_id]; + let upstream = fed.enclave(); + // If we haven't heard from the upstream node, then assume it can send an event at the start time. + upstream_next_event = upstream.next_event(); + if Tag::lf_tag_compare(&upstream_next_event, &Tag::never_tag()) == 0 { + let start_tag = Tag::new(start_time, 0); + upstream.set_next_event(start_tag); + } + } + // The min_delay here is a tag_t, not an interval_t because it may account for more than + // one connection. No delay at all is represented by (0,0). A delay of 0 is represented + // by (0,1). If the time part of the delay is greater than 0, then we want to ignore + // the microstep in upstream.next_event() because that microstep will have been lost. + // Otherwise, we want preserve it and add to it. This is handled by lf_tag_add(). + let min_delay; + let earliest_tag_from_upstream; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let enclaves = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let fed: &mut FederateInfo = &mut enclaves[idx]; + let e = fed.enclave(); + min_delay = e.min_delays()[i as usize].min_delay(); + earliest_tag_from_upstream = Tag::lf_tag_add(&upstream_next_event, &min_delay); + /* Following debug message is too verbose for normal use: + if earliest_tag_from_upstream.time() >= start_time { + println!("RTI: Earliest next event upstream of fed/encl {} at fed/encl {} has tag ({},{}).", + fed_id, + upstream_id, + earliest_tag_from_upstream.time() - start_time, + earliest_tag_from_upstream.microstep()); + } else { + println!( + " earliest_tag_from_upstream.time() < start_time, ({},{})", + earliest_tag_from_upstream.time(), + start_time + ); + } + */ + } + if Tag::lf_tag_compare(&earliest_tag_from_upstream, &t_d) < 0 { + t_d = earliest_tag_from_upstream.clone(); + } + } + t_d + } + + /** + * For the given scheduling node (enclave or federate), if necessary, update the `min_delays`, + * `num_min_delays`, and the fields that indicate cycles. These fields will be + * updated only if they have not been previously updated or if invalidate_min_delays_upstream + * has been called since they were last updated. + */ + fn update_min_delays_upstream(_f_rti: Arc>, node_idx: u16) { + let num_min_delays; + let number_of_scheduling_nodes; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = node_idx.into(); + num_min_delays = scheduling_nodes[idx].enclave().min_delays().len(); + number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); + } + // Check whether cached result is valid. + if num_min_delays == 0 { + // This is not Dijkstra's algorithm, but rather one optimized for sparse upstream nodes. + // There must be a name for this algorithm. + + // Array of results on the stack: + let mut path_delays = Vec::new(); + // This will be the number of non-FOREVER entries put into path_delays. + let mut count: u64 = 0; + + for _i in 0..number_of_scheduling_nodes { + path_delays.push(Tag::forever_tag()); + } + // FIXME:: Handle "as i32" properly. + Self::_update_min_delays_upstream( + _f_rti.clone(), + node_idx as i32, + -1, + &mut path_delays, + &mut count, + ); + + // Put the results onto the node's struct. + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = node_idx.into(); + let node = scheduling_nodes[idx].enclave(); + node.set_num_min_delays(count); + node.set_min_delays(Vec::new()); + println!( + "++++ Node {}(is in ZDC: {}), COUNT = {}, flags = {}, number_of_scheduling_nodes = {}\n", + node_idx, + node.flags() & IS_IN_ZERO_DELAY_CYCLE, count, node.flags(), number_of_scheduling_nodes + ); + let mut k = 0; + for i in 0..number_of_scheduling_nodes { + if Tag::lf_tag_compare(&path_delays[i as usize], &Tag::forever_tag()) < 0 { + // Node i is upstream. + if k >= count { + println!( + "Internal error! Count of upstream nodes {} for node {} is wrong!", + count, i + ); + std::process::exit(1); + } + let min_delay = MinimumDelay::new(i, path_delays[i as usize].clone()); + if node.min_delays().len() > k as usize { + let _ = + std::mem::replace(&mut node.min_delays()[k as usize], min_delay); + } else { + node.min_delays().insert(k as usize, min_delay); + } + k = k + 1; + // N^2 debug statement could be a problem with large benchmarks. + // println!( + // "++++ Node {} is upstream with delay ({},{}), k = {}", + // i, + // path_delays[i as usize].time(), + // path_delays[i as usize].microstep(), + // k + // ); + } + } + } + } + } + fn is_in_zero_delay_cycle(_f_rti: Arc>, fed_id: u16) -> bool { let is_first_time; { @@ -463,6 +716,103 @@ impl SchedulingNode { (flags & IS_IN_ZERO_DELAY_CYCLE) != 0 } + /** + * Given a node (enclave or federate), find the earliest incoming message tag (EIMT) from + * any immediately upstream node that is not part of zero-delay cycle (ZDC). + * These tags are treated strictly by the RTI when deciding whether to grant a PTAG. + * Since the upstream node is not part of a ZDC, there is no need to block on the input + * from that node since we can simply wait for it to complete its tag without chance of + * introducing a deadlock. This will return FOREVER_TAG if there are no non-ZDC upstream nodes. + * @return The earliest possible incoming message tag from a non-ZDC upstream node. + */ + fn eimt_strict(_f_rti: Arc>, fed_id: u16, start_time: Instant) -> Tag { + // Find the tag of the earliest possible incoming message from immediately upstream + // enclaves or federates that are not part of a zero-delay cycle. + // This will be the smallest upstream NET plus the least delay. + // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. + let num_upstream; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let e = scheduling_nodes[idx].e(); + num_upstream = e.num_upstream(); + } + let mut t_d = Tag::forever_tag(); + for i in 0..num_upstream { + let upstream_id; + let upstream_delay; + let next_event; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let idx: usize = fed_id.into(); + let e = scheduling_nodes[idx].e(); + // let upstreams = e.upstream(); + // let upstream_id = upstreams[i] as usize; + upstream_id = e.upstream()[i as usize] as usize; + upstream_delay = e.upstream_delay()[i as usize]; + next_event = e.next_event(); + } + // Skip this node if it is part of a zero-delay cycle. + if Self::is_in_zero_delay_cycle(_f_rti.clone(), upstream_id as u16) { + continue; + } + // If we haven't heard from the upstream node, then assume it can send an event at the start time. + if Tag::lf_tag_compare(&next_event, &Tag::never_tag()) == 0 { + let mut locked_rti = _f_rti.lock().unwrap(); + let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let upstream = scheduling_nodes[upstream_id].enclave(); + let start_tag = Tag::new(start_time, 0); + upstream.set_next_event(start_tag); + } + // Need to consider nodes that are upstream of the upstream node because those + // nodes may send messages to the upstream node. + let mut earliest = Self::earliest_future_incoming_message_tag( + _f_rti.clone(), + upstream_id as u16, + start_time, + ); + // If the next event of the upstream node is earlier, then use that. + if Tag::lf_tag_compare(&next_event, &earliest) < 0 { + earliest = next_event; + } + let earliest_tag_from_upstream = Tag::lf_delay_tag(&earliest, upstream_delay); + if earliest_tag_from_upstream.time() >= start_time { + println!( + "RTI: Strict EIMT of fed/encl {} at fed/encl {} has tag ({},{}).", + fed_id, + upstream_id, + earliest_tag_from_upstream.time() - start_time, + earliest_tag_from_upstream.microstep() + ); + } else { + println!( + "[eimt_strict:782] WARNING!!! RTI: Strict EIMT of fed/encl {} at fed/encl {} -> earliest_tag_from_upstream.time() < start_timehas tag = ({} < {}).", + fed_id, + upstream_id, + // FIXME: Check the below calculation + earliest_tag_from_upstream.time(), // - start_time, + earliest_tag_from_upstream.microstep() + ); + } + if Tag::lf_tag_compare(&earliest_tag_from_upstream, &t_d) < 0 { + t_d = earliest_tag_from_upstream; + } + } + t_d + } + + /** + * Notify a tag advance grant (TAG) message to the specified scheduling node. + * Do not notify it if a previously sent PTAG was greater or if a + * previously sent TAG was greater or equal. + * + * This function will keep a record of this TAG in the node's last_granted + * field. + * + * This function assumes that the caller holds the RTI mutex. + */ fn notify_tag_advance_grant( _f_rti: Arc>, fed_id: u16, @@ -548,6 +898,15 @@ impl SchedulingNode { } } + /** + * Notify a provisional tag advance grant (PTAG) message to the specified scheduling node. + * Do not notify it if a previously sent PTAG or TAG was greater or equal. + * + * This function will keep a record of this PTAG in the node's last_provisionally_granted + * field. + * + * This function assumes that the caller holds the RTI mutex. + */ fn notify_provisional_tag_advance_grant( _f_rti: Arc>, fed_id: u16, @@ -686,172 +1045,6 @@ impl SchedulingNode { } } - fn earliest_future_incoming_message_tag( - _f_rti: Arc>, - fed_id: u16, - start_time: Instant, - ) -> Tag { - // First, we need to find the shortest path (minimum delay) path to each upstream node - // and then find the minimum of the node's recorded NET plus the minimum path delay. - // Update the shortest paths, if necessary. - let is_first_time; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); - let idx: usize = fed_id.into(); - is_first_time = scheduling_nodes[idx].enclave().min_delays().len() == 0; - } - if is_first_time { - Self::update_min_delays_upstream(_f_rti.clone(), fed_id); - } - - // Next, find the tag of the earliest possible incoming message from upstream enclaves or - // federates, which will be the smallest upstream NET plus the least delay. - // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. - let mut t_d = Tag::forever_tag(); - let num_min_delays; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); - let idx: usize = fed_id.into(); - let fed: &FederateInfo = &enclaves[idx]; - let e = fed.e(); - num_min_delays = e.num_min_delays(); - } - for i in 0..num_min_delays { - let upstream_id; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); - let idx: usize = fed_id.into(); - let fed: &FederateInfo = &enclaves[idx]; - let e = fed.e(); - upstream_id = e.min_delays[i as usize].id() as usize; - } - let upstream_next_event; - { - // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); - let fed: &mut FederateInfo = &mut enclaves[upstream_id]; - let upstream = fed.enclave(); - // If we haven't heard from the upstream node, then assume it can send an event at the start time. - upstream_next_event = upstream.next_event(); - if Tag::lf_tag_compare(&upstream_next_event, &Tag::never_tag()) == 0 { - let start_tag = Tag::new(start_time, 0); - upstream.set_next_event(start_tag); - } - } - let min_delay; - let earliest_tag_from_upstream; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); - let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut enclaves[idx]; - let e = fed.enclave(); - min_delay = e.min_delays()[i as usize].min_delay(); - earliest_tag_from_upstream = Tag::lf_tag_add(&upstream_next_event, &min_delay); - if earliest_tag_from_upstream.time() >= start_time { - println!("RTI: Earliest next event upstream of fed/encl {} at fed/encl {} has tag ({},{}).", - fed_id, - upstream_id, - earliest_tag_from_upstream.time() - start_time, - earliest_tag_from_upstream.microstep()); - } else { - println!( - " earliest_tag_from_upstream.time() < start_time, ({},{})", - earliest_tag_from_upstream.time(), - start_time - ); - } - } - if Tag::lf_tag_compare(&earliest_tag_from_upstream, &t_d) < 0 { - t_d = earliest_tag_from_upstream.clone(); - } - } - t_d - } - - fn update_min_delays_upstream(_f_rti: Arc>, node_idx: u16) { - let num_min_delays; - let number_of_scheduling_nodes; - { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); - let idx: usize = node_idx.into(); - num_min_delays = scheduling_nodes[idx].enclave().min_delays().len(); - number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); - } - // Check whether cached result is valid. - if num_min_delays == 0 { - // This is not Dijkstra's algorithm, but rather one optimized for sparse upstream nodes. - // There must be a name for this algorithm. - - // Array of results on the stack: - let mut path_delays = Vec::new(); - // This will be the number of non-FOREVER entries put into path_delays. - let mut count: u64 = 0; - - for _i in 0..number_of_scheduling_nodes { - path_delays.push(Tag::forever_tag()); - } - // FIXME:: Handle "as i32" properly. - Self::_update_min_delays_upstream( - _f_rti.clone(), - node_idx as i32, - -1, - &mut path_delays, - &mut count, - ); - - // Put the results onto the node's struct. - { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); - let idx: usize = node_idx.into(); - let node = scheduling_nodes[idx].enclave(); - node.set_num_min_delays(count); - node.set_min_delays(Vec::new()); - println!( - "++++ Node {}(is in ZDC: {}), COUNT = {}, flags = {}, number_of_scheduling_nodes = {}\n", - node_idx, - node.flags() & IS_IN_ZERO_DELAY_CYCLE, count, node.flags(), number_of_scheduling_nodes - ); - - let mut k = 0; - for i in 0..number_of_scheduling_nodes { - if Tag::lf_tag_compare(&path_delays[i as usize], &Tag::forever_tag()) < 0 { - // Node i is upstream. - if k >= count { - println!( - "Internal error! Count of upstream nodes {} for node {} is wrong!", - count, i - ); - std::process::exit(1); - } - let min_delay = MinimumDelay::new(i, path_delays[i as usize].clone()); - if node.min_delays().len() > k as usize { - let _ = - std::mem::replace(&mut node.min_delays()[k as usize], min_delay); - } else { - node.min_delays().insert(k as usize, min_delay); - } - k = k + 1; - // N^2 debug statement could be a problem with large benchmarks. - // println!( - // "++++ Node {} is upstream with delay ({},{}), k = {}", - // i, - // path_delays[i as usize].time(), - // path_delays[i as usize].microstep(), - // k - // ); - } - } - } - } - } - // Local function used recursively to find minimum delays upstream. // Return in count the number of non-FOREVER_TAG entries in path_delays[]. fn _update_min_delays_upstream( @@ -949,6 +1142,12 @@ impl SchedulingNode { } } + /** + * For all scheduling nodes downstream of the specified node, determine + * whether they should be notified of a TAG or PTAG and notify them if so. + * + * This assumes the caller holds the RTI mutex. + */ pub fn notify_downstream_advance_grant_if_safe( _f_rti: Arc>, fed_id: u16, @@ -998,7 +1197,7 @@ impl SchedulingNode { } } - pub fn logical_tag_complete( + pub fn _logical_tag_complete( _f_rti: Arc>, fed_id: u16, number_of_enclaves: i32, diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 0b9e5d2..283e906 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -69,7 +69,7 @@ impl Server { let sent_start_time = Arc::new((Mutex::new(false), Condvar::new())); let stop_granted = Arc::new(Mutex::new(StopGranted::new())); // Wait for connections from federates and create a thread for each. - let handles = self.connect_to_federates( + let handles = self.lf_connect_to_federates( socket, _f_rti, start_time, @@ -107,7 +107,7 @@ impl Server { // TODO: close(socket_descriptor); } - fn connect_to_federates( + fn lf_connect_to_federates( &mut self, socket: TcpListener, mut _f_rti: RTIRemote, @@ -238,8 +238,8 @@ impl Server { cloned_start_time.clone(), cloned_sent_start_time.clone(), ), - MsgType::LogicalTagComplete => { - Self::handle_logical_tag_complete( + MsgType::LatestTagComplete => { + Self::handle_latest_tag_complete( fed_id.try_into().unwrap(), &mut stream, cloned_rti.clone(), @@ -336,7 +336,7 @@ impl Server { // Read bytes from the socket. We need 4 bytes. // FIXME: This should not exit with error but rather should just reject the connection. - NetUtil::read_from_stream_errexit(stream, &mut first_buffer, 0, ""); + NetUtil::read_from_socket_fail_on_error(stream, &mut first_buffer, 0, ""); // Initialize to an invalid value. let fed_id; @@ -379,7 +379,7 @@ impl Server { let mut federation_id_buffer = vec![0 as u8; federation_id_length.into()]; // Next read the actual federation ID. // FIXME: This should not exit on error, but rather just reject the connection. - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut federation_id_buffer, fed_id, @@ -463,7 +463,12 @@ impl Server { ); // Send an MsgType::Ack message. let ack_message: Vec = vec![MsgType::Ack.to_byte()]; - NetUtil::write_to_stream_errexit(stream, &ack_message, fed_id, "MsgType::Ack message"); + NetUtil::write_to_socket_fail_on_error( + stream, + &ack_message, + fed_id, + "MsgType::Ack message", + ); } fed_id.into() @@ -503,7 +508,7 @@ impl Server { let mut locked_rti = cloned_rti.lock().unwrap(); let mut connection_info_header = vec![0 as u8; MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE.try_into().unwrap()]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut connection_info_header, fed_id, @@ -534,7 +539,7 @@ impl Server { * num_upstream) + (mem::size_of::() * num_downstream); let mut connection_info_body = vec![0 as u8; connections_info_body_size]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut connection_info_body, fed_id, @@ -604,7 +609,7 @@ impl Server { fed_id ); let mut response = vec![0 as u8; 1 + mem::size_of::()]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut response, fed_id, @@ -670,6 +675,10 @@ impl Server { true } + /** + * A function to handle timestamp messages. + * This function assumes the caller does not hold the mutex. + */ fn handle_timestamp( fed_id: u16, stream: &mut TcpStream, @@ -679,6 +688,7 @@ impl Server { sent_start_time: Arc<(Mutex, Condvar)>, ) { let mut buffer = vec![0 as u8; mem::size_of::()]; + // Read bytes from the socket. We need 8 bytes. let bytes_read = NetUtil::read_from_stream(stream, &mut buffer, fed_id); if bytes_read < mem::size_of::() { println!("ERROR reading timestamp from federate_info {}.", fed_id); @@ -770,6 +780,16 @@ impl Server { } } + /** + * Handle MSG_TYPE_RESIGN sent by a federate. This message is sent at the time of termination + * after all shutdown events are processed on the federate. + * + * This function assumes the caller does not hold the mutex. + * + * @note At this point, the RTI might have outgoing messages to the federate. This + * function thus first performs a shutdown on the socket, which sends an EOF. It then + * waits for the remote socket to be closed before closing the socket itself. + */ fn handle_federate_resign( fed_id: u16, _f_rti: Arc>, @@ -778,6 +798,8 @@ impl Server { ) { // Nothing more to do. Close the socket and exit. + println!("FederateInfo {} has resigned.", fed_id); + { let mut locked_rti = _f_rti.lock().unwrap(); let idx: usize = fed_id.into(); @@ -811,7 +833,12 @@ impl Server { .unwrap(); } - println!("FederateInfo {} has resigned.", fed_id); + // Wait for the federate to send an EOF or a socket error to occur. + // Discard any incoming bytes. Normally, this read should return 0 because + // the federate is resigning and should itself invoke shutdown. + // TODO: Check we need the below two lines. + // unsigned char buffer[10]; + // while (read(my_fed->socket, buffer, 10) > 0); // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep @@ -853,7 +880,7 @@ impl Server { + mem::size_of::(); // Read the header, minus the first byte which has already been read. let mut header_buffer = vec![0 as u8; (header_size - 1) as usize]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut header_buffer, fed_id, @@ -903,13 +930,19 @@ impl Server { intended_tag.time() - start_time_value, intended_tag.microstep()); let mut message_buffer = vec![0 as u8; bytes_to_read.try_into().unwrap()]; - NetUtil::read_from_stream_errexit(stream, &mut message_buffer, fed_id, "timed message"); + NetUtil::read_from_socket_fail_on_error( + stream, + &mut message_buffer, + fed_id, + "timed message", + ); // FIXME: Handle "as i32" properly. let bytes_read = bytes_to_read + header_size as i32; // Following only works for string messages. // println!("Message received by RTI: {}.", buffer + header_size); let completed; + let next_event; { // Need to acquire the mutex lock to ensure that the thread handling // messages coming from the socket connected to the destination does not @@ -921,14 +954,15 @@ impl Server { let idx: usize = federate_id.into(); let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; let enclave = fed.enclave(); + next_event = enclave.next_event(); if enclave.state() == SchedulingNodeState::NotConnected { println!( "RTI: Destination federate_info {} is no longer connected. Dropping message.", federate_id ); println!("Fed status: next_event ({}, {}), completed ({}, {}), last_granted ({}, {}), last_provisionally_granted ({}, {}).", - enclave.next_event().time() - start_time_value, - enclave.next_event().microstep(), + next_event.time() - start_time_value, + next_event.microstep(), enclave.completed().time() - start_time_value, enclave.completed().microstep(), enclave.last_granted().time() - start_time_value, @@ -947,35 +981,6 @@ impl Server { reactor_port_id, federate_id, length ); - // Record this in-transit message in federate_info's in-transit message queue. - if Tag::lf_tag_compare(&completed, &intended_tag) < 0 { - // Add a record of this message to the list of in-transit messages to this federate_info. - let mut locked_rti = _f_rti.lock().unwrap(); - let idx: usize = federate_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; - MessageRecord::add_in_transit_message_record( - fed.in_transit_message_tags(), - intended_tag.clone(), - ); - println!( - "RTI: Adding a message with tag ({}, {}) to the list of in-transit messages for federate_info {}.", - intended_tag.time() - start_time_value, - intended_tag.microstep(), - federate_id - ); - } else { - println!( - "RTI: FederateInfo {} has already completed tag ({}, {}), but there is an in-transit message with tag ({}, {}) from federate_info {}. This is going to cause an STP violation under centralized coordination.", - federate_id, - completed.time() - start_time_value, - completed.microstep(), - intended_tag.time() - start_time_value, - intended_tag.microstep(), - fed_id - ); - // FIXME: Drop the federate_info? - } - // Need to make sure that the destination federate_info's thread has already // sent the starting MsgType::Timestamp message. { @@ -996,7 +1001,7 @@ impl Server { let mut result_buffer = vec![0 as u8; 1]; result_buffer[0] = message_type; result_buffer = vec![result_buffer.clone(), header_buffer, message_buffer].concat(); - NetUtil::write_to_stream_errexit( + NetUtil::write_to_socket_fail_on_error( destination_stream, &result_buffer, federate_id, @@ -1016,7 +1021,7 @@ impl Server { bytes_to_read = fed_com_buffer_size; } let mut forward_buffer = vec![0 as u8; bytes_to_read as usize]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut forward_buffer, fed_id, @@ -1034,7 +1039,7 @@ impl Server { let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; // FIXME: Handle unwrap properly. let destination_stream = fed.stream().as_ref().unwrap(); - NetUtil::write_to_stream_errexit( + NetUtil::write_to_socket_fail_on_error( destination_stream, &forward_buffer, federate_id, @@ -1043,13 +1048,47 @@ impl Server { } } - Self::update_federate_next_event_tag_locked( - _f_rti, - federate_id, - intended_tag, - start_time_value, - sent_start_time, - ); + // Record this in-transit message in federate_info's in-transit message queue. + if Tag::lf_tag_compare(&completed, &intended_tag) < 0 { + // Add a record of this message to the list of in-transit messages to this federate_info. + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = federate_id.into(); + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + // TODO: Replace 'MessageRecord::add_in_transit_message_record()' into 'pqueue_tag_insert_if_no_match()'. + MessageRecord::add_in_transit_message_record( + fed.in_transit_message_tags(), + intended_tag.clone(), + ); + println!( + "RTI: Adding a message with tag ({}, {}) to the list of in-transit messages for federate_info {}.", + intended_tag.time() - start_time_value, + intended_tag.microstep(), + federate_id + ); + } else { + println!( + "RTI: FederateInfo {} has already completed tag ({}, {}), but there is an in-transit message with tag ({}, {}) from federate_info {}. This is going to cause an STP violation under centralized coordination.", + federate_id, + completed.time() - start_time_value, + completed.microstep(), + intended_tag.time() - start_time_value, + intended_tag.microstep(), + fed_id + ); + // FIXME: Drop the federate_info? + } + + // If the message tag is less than the most recently received NET from the federate, + // then update the federate's next event tag to match the message tag. + if Tag::lf_tag_compare(&intended_tag, &next_event) < 0 { + Self::update_federate_next_event_tag_locked( + _f_rti, + federate_id, + intended_tag, + start_time_value, + sent_start_time, + ); + } } fn update_federate_next_event_tag_locked( @@ -1089,7 +1128,7 @@ impl Server { sent_start_time: Arc<(Mutex, Condvar)>, ) { let mut header_buffer = vec![0 as u8; mem::size_of::() + mem::size_of::()]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut header_buffer, fed_id, @@ -1130,7 +1169,7 @@ impl Server { ); } - fn handle_logical_tag_complete( + fn handle_latest_tag_complete( fed_id: u16, stream: &mut TcpStream, _f_rti: Arc>, @@ -1138,11 +1177,11 @@ impl Server { sent_start_time: Arc<(Mutex, Condvar)>, ) { let mut header_buffer = vec![0 as u8; mem::size_of::() + mem::size_of::()]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut header_buffer, fed_id, - "the content of the logical tag complete", + "the content of the latest tag complete", ); let completed = NetUtil::extract_tag( header_buffer[0..(mem::size_of::() + mem::size_of::())] @@ -1159,7 +1198,7 @@ impl Server { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } - SchedulingNode::logical_tag_complete( + SchedulingNode::_logical_tag_complete( _f_rti.clone(), fed_id, number_of_enclaves, @@ -1174,6 +1213,7 @@ impl Server { let idx: usize = fed_id.into(); let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; let in_transit_message_tags = fed.in_transit_message_tags(); + // TODO: Replace 'MessageRecord::clean_in_transit_message_record_up_to_tag()' into 'pqueue_tag_remove_up_to()'. MessageRecord::clean_in_transit_message_record_up_to_tag( in_transit_message_tags, completed, @@ -1192,28 +1232,13 @@ impl Server { println!("RTI handling stop_request from federate_info {}.", fed_id); let mut header_buffer = vec![0 as u8; MSG_TYPE_STOP_REQUEST_LENGTH - 1]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut header_buffer, fed_id, "the MsgType::StopRequest payload", ); - // Acquire a mutex lock to ensure that this state does change while a - // message is in transport or being used to determine a TAG. - { - let mut locked_rti = _f_rti.lock().unwrap(); - let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; - - // Check whether we have already received a stop_tag - // from this federate_info - if fed.requested_stop() { - // Ignore this request - return; - } - } - // Extract the proposed stop tag for the federate_info let proposed_stop_tag = NetUtil::extract_tag( header_buffer[0..(mem::size_of::() + mem::size_of::())] @@ -1221,12 +1246,48 @@ impl Server { .unwrap(), ); - // Update the maximum stop tag received from federates let start_time_value; { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } + println!( + "RTI received from federate_info {} a MsgType::StopRequest message with tag ({},{}).", + fed_id, + proposed_stop_tag.time() - start_time_value, + proposed_stop_tag.microstep() + ); + + // Acquire a mutex lock to ensure that this state does change while a + // message is in transport or being used to determine a TAG. + let requested_stop; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = fed_id.into(); + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + requested_stop = fed.requested_stop(); + } + // Check whether we have already received a stop_tag + // from this federate_info + if requested_stop { + // If stop request messages have already been broadcast, treat this as if it were a reply. + let stop_in_progress; + { + let locked_rti = _f_rti.lock().unwrap(); + stop_in_progress = locked_rti.stop_in_progress(); + } + if stop_in_progress { + Self::mark_federate_requesting_stop( + fed_id, + _f_rti.clone(), + stop_granted.clone(), + start_time_value, + ); + } + return; + } + + // Update the maximum stop tag received from federates { let mut locked_rti = _f_rti.lock().unwrap(); if Tag::lf_tag_compare(&proposed_stop_tag, &locked_rti.base().max_stop_tag()) > 0 { @@ -1236,33 +1297,17 @@ impl Server { } } - println!( - "RTI received from federate_info {} a MsgType::StopRequest message with tag ({},{}).", - fed_id, - proposed_stop_tag.time() - start_time_value, - proposed_stop_tag.microstep() - ); - - // If this federate_info has not already asked - // for a stop, add it to the tally. - Self::mark_federate_requesting_stop( + // If all federates have replied, send stop request granted. + if Self::mark_federate_requesting_stop( fed_id, _f_rti.clone(), stop_granted.clone(), start_time_value, - ); - - { - let mut locked_rti = _f_rti.lock().unwrap(); - if locked_rti.base().num_scheduling_nodes_handling_stop() - == locked_rti.base().number_of_scheduling_nodes() - { - // We now have information about the stop time of all - // federates. This is extremely unlikely, but it can occur - // all federates call lf_request_stop() at the same tag. - return; - } + ) { + // Have send stop request granted to all federates. Nothing more to do. + return; } + // Forward the stop request to all other federates that have not // also issued a stop request. let mut stop_request_buffer = vec![0 as u8; MSG_TYPE_STOP_REQUEST_LENGTH]; @@ -1284,6 +1329,10 @@ impl Server { } locked_rti.set_stop_in_progress(true); } + // Need a timeout here in case a federate never replies. + // TODO: Implement 'wait_for_stop_request_reply' function. + // lf_thread_create(&timeout_thread, wait_for_stop_request_reply, NULL); + let number_of_enclaves; { let mut locked_rti = _f_rti.lock().unwrap(); @@ -1304,7 +1353,7 @@ impl Server { } // FIXME: Handle unwrap properly. let stream = f.stream().as_ref().unwrap(); - NetUtil::write_to_stream_errexit( + NetUtil::write_to_socket_fail_on_error( stream, &stop_request_buffer, f.e().id(), @@ -1322,12 +1371,17 @@ impl Server { } } + /** + * Mark a federate requesting stop. If the number of federates handling stop reaches the + * NUM_OF_FEDERATES, broadcast MsgType::StopGranted to every federate. + * @return true if stop time has been sent to all federates and false otherwise. + */ fn mark_federate_requesting_stop( fed_id: u16, _f_rti: Arc>, stop_granted: Arc>, start_time_value: Instant, - ) { + ) -> bool { let mut num_enclaves_handling_stop; { let mut locked_rti = _f_rti.lock().unwrap(); @@ -1369,7 +1423,9 @@ impl Server { stop_granted, start_time_value, ); + return true; } + false } /** @@ -1434,7 +1490,7 @@ impl Server { let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[i as usize]; // FIXME: Handle unwrap properly. let stream = fed.stream().as_ref().unwrap(); - NetUtil::write_to_stream_errexit( + NetUtil::write_to_socket_fail_on_error( stream, &outgoing_buffer, fed.e().id(), @@ -1487,7 +1543,7 @@ impl Server { stop_granted: Arc>, ) { let mut header_buffer = vec![0 as u8; MSG_TYPE_STOP_REQUEST_REPLY_LENGTH - 1]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut header_buffer, fed_id, @@ -1543,7 +1599,7 @@ impl Server { mem::size_of::() * 2 + mem::size_of::() + mem::size_of::(); let mut header_buffer = vec![0 as u8; message_size]; - NetUtil::read_from_stream_errexit( + NetUtil::read_from_socket_fail_on_error( stream, &mut header_buffer, fed_id, @@ -1559,7 +1615,7 @@ impl Server { .unwrap(), ); - // TODO: Will be used when tracing_enabled + // TODO: Can be used when tracing_enabled // let start_idx = u16_size * 2; // let tag = NetUtil::extract_tag( // header_buffer[start_idx..(start_idx + mem::size_of::() + mem::size_of::())] @@ -1626,7 +1682,7 @@ impl Server { let mut result_buffer = vec![0 as u8]; result_buffer[0] = buffer[0]; result_buffer = vec![result_buffer.clone(), header_buffer].concat(); - NetUtil::write_to_stream_errexit( + NetUtil::write_to_socket_fail_on_error( destination_stream, &result_buffer, federate_id,