From 7423382f339995af1d590d2d5413f61f5d0feb2b Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sun, 10 Mar 2024 20:48:20 -0700 Subject: [PATCH] Enable clock synchronization with federates --- rust/rti/src/federate_info.rs | 4 + rust/rti/src/net_common.rs | 18 +++ rust/rti/src/net_util.rs | 4 +- rust/rti/src/rti_remote.rs | 4 + rust/rti/src/server.rs | 274 ++++++++++++++++++++++++++++++++-- 5 files changed, 289 insertions(+), 15 deletions(-) diff --git a/rust/rti/src/federate_info.rs b/rust/rti/src/federate_info.rs index 96cb46a..1c4748c 100644 --- a/rust/rti/src/federate_info.rs +++ b/rust/rti/src/federate_info.rs @@ -78,6 +78,10 @@ impl FederateInfo { &self.stream } + pub fn stream_mut(&mut self) -> &mut Option { + &mut self.stream + } + pub fn clock_synchronization_enabled(&self) -> bool { self.clock_synchronization_enabled } diff --git a/rust/rti/src/net_common.rs b/rust/rti/src/net_common.rs index cd679ad..99d36d7 100644 --- a/rust/rti/src/net_common.rs +++ b/rust/rti/src/net_common.rs @@ -103,6 +103,10 @@ pub enum MsgType { AddressAdvertisement, P2pSendingFedId, P2pTaggedMessage, + ClockSyncT1, + ClockSyncT3, + ClockSyncT4, + ClockSyncCodedProbe, PortAbsent, NeighborStructure, Ignore, @@ -129,6 +133,10 @@ impl MsgType { MsgType::AddressAdvertisement => 14, MsgType::P2pSendingFedId => 15, MsgType::P2pTaggedMessage => 17, + MsgType::ClockSyncT1 => 19, + MsgType::ClockSyncT3 => 20, + MsgType::ClockSyncT4 => 21, + MsgType::ClockSyncCodedProbe => 22, MsgType::PortAbsent => 23, MsgType::NeighborStructure => 24, MsgType::Ignore => 250, @@ -150,6 +158,10 @@ impl MsgType { 12 => MsgType::StopGranted, 13 => MsgType::AddressQuery, 14 => MsgType::AddressAdvertisement, + 19 => MsgType::ClockSyncT1, + 20 => MsgType::ClockSyncT3, + 21 => MsgType::ClockSyncT4, + 22 => MsgType::ClockSyncCodedProbe, 23 => MsgType::PortAbsent, _ => MsgType::Ignore, } @@ -182,3 +194,9 @@ impl ErrType { } } } + +#[derive(PartialEq, Clone)] +pub enum SocketType { + TCP, + UDP, +} diff --git a/rust/rti/src/net_util.rs b/rust/rti/src/net_util.rs index 13af183..5b75f6e 100644 --- a/rust/rti/src/net_util.rs +++ b/rust/rti/src/net_util.rs @@ -36,7 +36,7 @@ impl NetUtil { // println!("\n"); } - pub fn read_from_stream(stream: &mut TcpStream, buffer: &mut Vec, fed_id: u16) -> usize { + pub fn read_from_socket(stream: &mut TcpStream, buffer: &mut Vec, fed_id: u16) -> usize { let mut bytes_read = 0; while match stream.read(buffer) { Ok(msg_size) => { @@ -73,7 +73,7 @@ impl NetUtil { } } - pub fn write_to_stream(mut stream: &TcpStream, buffer: &Vec, fed_id: u16) -> usize { + pub fn write_to_socket(mut stream: &TcpStream, buffer: &Vec, fed_id: u16) -> usize { let mut bytes_written = 0; match stream.write(&buffer) { Ok(bytes_size) => { diff --git a/rust/rti/src/rti_remote.rs b/rust/rti/src/rti_remote.rs index e883923..253dfe0 100644 --- a/rust/rti/src/rti_remote.rs +++ b/rust/rti/src/rti_remote.rs @@ -146,6 +146,10 @@ impl RTIRemote { self.clock_sync_global_status.clone() } + pub fn clock_sync_period_ns(&self) -> u64 { + self.clock_sync_period_ns + } + pub fn stop_in_progress(&self) -> bool { self.stop_in_progress } diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index a473f16..060f8b8 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -12,6 +12,7 @@ use std::net::{IpAddr, Shutdown, TcpListener, TcpStream}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::thread::JoinHandle; +use std::time::Duration; use crate::in_transit_message_queue::InTransitMessageQueue; use crate::net_common; @@ -187,7 +188,7 @@ impl Server { } // Read no more than one byte to get the message type. // FIXME: Handle unwrap properly. - let bytes_read = NetUtil::read_from_stream( + let bytes_read = NetUtil::read_from_socket( &mut stream, &mut buffer, fed_id.try_into().unwrap(), @@ -324,24 +325,193 @@ impl Server { println!("All federates have connected to RTI."); let cloned_rti = Arc::clone(&arc_rti); - let locked_rti = cloned_rti.read().unwrap(); - let clock_sync_global_status = locked_rti.clock_sync_global_status(); + let clock_sync_global_status; + let number_of_scheduling_nodes; + let final_port_udp; + { + let locked_rti = cloned_rti.read().unwrap(); + clock_sync_global_status = locked_rti.clock_sync_global_status(); + number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); + final_port_udp = locked_rti.final_port_udp(); + } if clock_sync_global_status >= ClockSyncStat::ClockSyncOn { // Create the thread that performs periodic PTP clock synchronization sessions // over the UDP channel, but only if the UDP channel is open and at least one // federate_info is performing runtime clock synchronization. let mut clock_sync_enabled = false; - for i in 0..locked_rti.base().number_of_scheduling_nodes() { - if locked_rti.base().scheduling_nodes()[i as usize].clock_synchronization_enabled() + for i in 0..number_of_scheduling_nodes { { - clock_sync_enabled = true; - break; + let locked_rti = cloned_rti.read().unwrap(); + if locked_rti.base().scheduling_nodes()[i as usize] + .clock_synchronization_enabled() + { + clock_sync_enabled = true; + break; + } } } - if locked_rti.final_port_udp() != u16::MAX && clock_sync_enabled { - println!("\tNEED to create clock_synchronization_thread thread.."); - // TODO: Implement the following. - // lf_thread_create(&_f_rti->clock_thread, clock_synchronization_thread, NULL); + + // let cloned_start_time = Arc::clone(&start_time); + // let cloned_received_start_times = Arc::clone(&received_start_times); + + if final_port_udp != u16::MAX && clock_sync_enabled { + let handle = thread::spawn(move || { + // Wait until all federates have been notified of the start time. + // FIXME: Use lf_ version of this when merged with master. + { + let locked_rti = cloned_rti.read().unwrap(); + while locked_rti.num_feds_proposed_start() + < locked_rti.base().number_of_scheduling_nodes() + { + // Need to wait here. + let received_start_times_notifier = Arc::clone(&received_start_times); + let (lock, condvar) = &*received_start_times_notifier; + let mut notified = lock.lock().unwrap(); + while !*notified { + notified = condvar.wait(notified).unwrap(); + } + } + } + + // Wait until the start time before starting clock synchronization. + // The above wait ensures that start_time has been set. + + let start_time_value; + { + let locked_start_time = start_time.lock().unwrap(); + start_time_value = locked_start_time.start_time(); + } + let ns_to_wait = start_time_value - Tag::lf_time_physical(); + + if ns_to_wait > 0 { + // TODO: Handle unwrap() properly. + let ns = Duration::from_nanos(ns_to_wait.try_into().unwrap()); + thread::sleep(ns); + } + + // Initiate a clock synchronization every rti->clock_sync_period_ns + // Initiate a clock synchronization every rti->clock_sync_period_ns + // let sleep_time = {(time_t)rti_remote->clock_sync_period_ns / BILLION, + // rti_remote->clock_sync_period_ns % BILLION}; + // let remaining_time; + + let mut any_federates_connected = true; + while any_federates_connected { + // Sleep + let clock_sync_period_ns; + let number_of_scheduling_nodes; + { + let locked_rti = cloned_rti.read().unwrap(); + clock_sync_period_ns = locked_rti.clock_sync_period_ns(); + number_of_scheduling_nodes = + locked_rti.base().number_of_scheduling_nodes(); + } + let ns = Duration::from_nanos(clock_sync_period_ns); // Can be interrupted + thread::sleep(ns); + any_federates_connected = false; + for fed_id in 0..number_of_scheduling_nodes { + let state; + let clock_synchronization_enabled; + { + let locked_rti = cloned_rti.read().unwrap(); + let idx: usize = fed_id as usize; + let fed = &locked_rti.base().scheduling_nodes()[idx]; + state = fed.enclave().state(); + clock_synchronization_enabled = fed.clock_synchronization_enabled(); + } + if state == SchedulingNodeState::NotConnected { + // FIXME: We need better error handling here, but clock sync failure + // should not stop execution. + println!( + "[ERROR] Clock sync failed with federate {}. Not connected.", + fed_id + ); + continue; + } else if !clock_synchronization_enabled { + continue; + } + // Send the RTI's current physical time to the federate + // Send on UDP. + println!( + "[DEBUG] RTI sending T1 message to initiate clock sync round." + ); + // TODO: Handle unwrap() properly. + Self::send_physical_clock( + fed_id.try_into().unwrap(), + cloned_rti.clone(), + MsgType::ClockSyncT1.to_byte(), + SocketType::UDP, + ); + + // Listen for reply message, which should be T3. + let message_size = 1 + std::mem::size_of::(); + let mut buffer = vec![0 as u8; message_size]; + // Maximum number of messages that we discard before giving up on this cycle. + // If the T3 message from this federate does not arrive and we keep receiving + // other messages, then give up on this federate and move to the next federate. + let mut remaining_attempts = 5; + while remaining_attempts > 0 { + remaining_attempts -= 1; + let read_failed; + { + let mut locked_rti = cloned_rti.write().unwrap(); + let idx: usize = fed_id as usize; + let fed: &mut FederateInfo = + &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + let stream: &mut TcpStream = + &mut fed.stream_mut().as_mut().unwrap(); + // TODO: Handle unwrap() properly. + read_failed = NetUtil::read_from_socket( + stream, + &mut buffer, + fed_id.try_into().unwrap(), + ); + } + // If any errors occur, either discard the message or the clock sync round. + if read_failed == 0 { + if buffer[0] == MsgType::ClockSyncT3.to_byte() { + // TODO: Change from_le_bytes properly. + let fed_id_2 = i32::from_le_bytes( + buffer[1..1 + std::mem::size_of::()] + .try_into() + .unwrap(), + ); + // Check that this message came from the correct federate. + if fed_id_2 != fed_id { + // Message is from the wrong federate. Discard the message. + println!("[WARNING] Clock sync: Received T3 message from federate {}, but expected one from {}. Discarding message.", + fed_id_2, fed_id); + continue; + } + println!("[EDBUG] Clock sync: RTI received T3 message from federate {}.", fed_id_2); + // TODO: Handle unwrap() properly. + Self::handle_physical_clock_sync_message( + fed_id_2.try_into().unwrap(), + cloned_rti.clone(), + SocketType::UDP, + ); + break; + } else { + // The message is not a T3 message. Discard the message and + // continue waiting for the T3 message. This is possibly a message + // from a previous cycle that was discarded. + println!("[WARNING] Clock sync: Unexpected UDP message {}. Expected MsgType::ClockSyncT3 from federate {}. Discarding message.", + buffer[0], fed_id); + continue; + } + } else { + println!("[WARNING] Clock sync: Read from UDP socket failed: Skipping clock sync round for federate {}.", + fed_id); + remaining_attempts -= 1; + } + } + if remaining_attempts > 0 { + any_federates_connected = true; + } + } + } + }); + handle_list.push(handle); } } @@ -591,6 +761,7 @@ impl Server { fed_id ); let cloned_rti = Arc::clone(&_f_rti); + // TODO: Handle unwrap() properly. let mut connection_info_header = vec![0 as u8; MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE.try_into().unwrap()]; NetUtil::read_from_socket_fail_on_error( @@ -776,7 +947,7 @@ impl Server { ) { let mut buffer = vec![0 as u8; mem::size_of::()]; // Read bytes from the socket. We need 8 bytes. - let bytes_read = NetUtil::read_from_stream(stream, &mut buffer, fed_id); + let bytes_read = NetUtil::read_from_socket(stream, &mut buffer, fed_id); if bytes_read < mem::size_of::() { println!("ERROR reading timestamp from federate_info {}.", fed_id); } @@ -859,7 +1030,7 @@ impl Server { let idx: usize = fed_id.into(); let my_fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; let stream = my_fed.stream().as_ref().unwrap(); - let bytes_written = NetUtil::write_to_stream(stream, &start_time_buffer, fed_id); + let bytes_written = NetUtil::write_to_socket(stream, &start_time_buffer, fed_id); if bytes_written < MSG_TYPE_TIMESTAMP_LENGTH { println!( "Failed to send the starting time to federate_info {}.", @@ -2020,4 +2191,81 @@ impl Server { ); } } + + fn send_physical_clock( + fed_id: u16, + _f_rti: Arc>, + message_type: u8, + socket_type: SocketType, + ) { + let state; + { + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = fed_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + state = fed.enclave().state(); + } + if state == SchedulingNodeState::NotConnected { + println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}. Socket not connected.\n", + fed_id); + return; + } + let mut buffer = vec![0 as u8; std::mem::size_of::() + 1]; + buffer[0] = message_type; + let current_physical_time = Tag::lf_time_physical(); + NetUtil::encode_int64(current_physical_time, &mut buffer, 1); + + // Send the message + if socket_type == SocketType::UDP { + println!("Sending messages through UDP is not supported yet."); + // TODO: Enable the following code. + // FIXME: UDP_addr is never initialized. + // println!("[Debug] Clock sync: RTI sending UDP message type %u.", buffer[0]); + // let bytes_written = sendto(rti_remote->socket_descriptor_UDP, buffer, 1 + sizeof(int64_t), 0, + // (struct sockaddr*)&fed->UDP_addr, sizeof(fed->UDP_addr)); + // if bytes_written < std::mem::size_of() + 1 { + // println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}: \n", fed_id); + // return; + // } + } else if socket_type == SocketType::TCP { + println!( + "[DEBUG] Clock sync: RTI sending TCP message type {}.", + buffer[0] + ); + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = fed_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + let stream = fed.stream().as_ref().unwrap(); + NetUtil::write_to_socket_fail_on_error(stream, &buffer, fed_id, "physical time"); + } + println!("[DEBUG] Clock sync: RTI sent PHYSICAL_TIME_SYNC_MESSAGE with timestamp ({}) to federate {}.", + current_physical_time, fed_id); + } + + fn handle_physical_clock_sync_message( + fed_id: u16, + _f_rti: Arc>, + socket_type: SocketType, + ) { + // Lock the mutex to prevent interference between sending the two + // coded probe messages. + let _locked_rti = _f_rti.write().unwrap(); + // Reply with a T4 type message + Self::send_physical_clock( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncT4.to_byte(), + socket_type.clone(), + ); + // Send the corresponding coded probe immediately after, + // but only if this is a UDP channel. + if socket_type == SocketType::UDP { + Self::send_physical_clock( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncCodedProbe.to_byte(), + socket_type, + ); + } + } }