From 88873e865b9c1b4f675797f89eeeb3a143c92d7e Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sat, 17 Feb 2024 12:46:41 -0700 Subject: [PATCH] Handle a message type for address advertisement/query - Functions for MsgType::AddressAdvertisement (Byte value 14) and MsgType::AddressQuery (Byte value: 13) are added. --- rust/rti/Cargo.toml | 2 +- rust/rti/src/federate_info.rs | 38 ++++++++--- rust/rti/src/net_common.rs | 3 + rust/rti/src/server.rs | 124 +++++++++++++++++++++++++++++++--- 4 files changed, 147 insertions(+), 20 deletions(-) diff --git a/rust/rti/Cargo.toml b/rust/rti/Cargo.toml index 720c8a4..577406a 100644 --- a/rust/rti/Cargo.toml +++ b/rust/rti/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] byteorder = "1" -priority-queue = "1.3.2" +priority-queue = "2.0.2" diff --git a/rust/rti/src/federate_info.rs b/rust/rti/src/federate_info.rs index 29dbf55..376b997 100644 --- a/rust/rti/src/federate_info.rs +++ b/rust/rti/src/federate_info.rs @@ -7,7 +7,7 @@ use crate::message_record::message_record::InTransitMessageRecordQueue; * @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn) * @author Chanhee Lee (chanheel@asu.edu) * @author Hokeun Kim (hokeun@asu.edu) - * @copyright (c) 2020-2023, The University of California at Berkeley + * @copyright (c) 2020-2024, The University of California at Berkeley * License in [BSD 2-clause](..) * @brief Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs. * This file extends rti_common.h with RTI features that are specific to federations and are not @@ -15,7 +15,8 @@ use crate::message_record::message_record::InTransitMessageRecordQueue; */ use crate::rti_common::*; -use std::net::TcpStream; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; + use std::option::Option; /** @@ -31,9 +32,9 @@ pub struct FederateInfo { requested_stop: bool, // Indicates that the federate has requested stop or has replied // to a request for stop from the RTI. Used to prevent double-counting // a federate when handling lf_request_stop(). - // TODO: lf_thread_t thread_id; // The ID of the thread handling communication with this federate. + // TODO: lf_thread_t thread_id; stream: Option, // The TCP socket descriptor for communicating with this federate. - // TODO: struct sockaddr_in UDP_addr; // The UDP address for the federate. + // TODO: struct sockaddr_in UDP_addr; clock_synchronization_enabled: bool, // Indicates the status of clock synchronization // for this federate. Enabled by default. in_transit_message_tags: InTransitMessageRecordQueue, // Record of in-transit messages to this federate that are not @@ -41,11 +42,13 @@ pub struct FederateInfo { // value of each message for a more efficient access. server_hostname: String, // Human-readable IP address and server_port: i32, // port number of the socket server of the federate - // if it has any incoming direct connections from other federates. - // The port number will be -1 if there is no server or if the - // RTI has not been informed of the port number. - // TODO: struct in_addr server_ip_addr; // Information about the IP address of the socket - // server of the federate. + // if it has any incoming direct connections from other federates. + // The port number will be -1 if there is no server or if the + // RTI has not been informed of the port number. + // TODO: struct in_addr server_ip_addr; + // server of the federate. + server_ip_addr: SocketAddr, // Information about the IP address of the socket + // server of the federate. } impl FederateInfo { @@ -58,6 +61,7 @@ impl FederateInfo { in_transit_message_tags: InTransitMessageRecordQueue::new(), server_hostname: String::from("localhost"), server_port: -1, + server_ip_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), } } @@ -81,6 +85,18 @@ impl FederateInfo { self.clock_synchronization_enabled } + pub fn server_hostname(&self) -> String { + self.server_hostname.clone() + } + + pub fn server_port(&self) -> i32 { + self.server_port + } + + pub fn server_ip_addr(&self) -> SocketAddr { + self.server_ip_addr.clone() + } + pub fn set_requested_stop(&mut self, requested_stop: bool) { self.requested_stop = requested_stop; } @@ -104,4 +120,8 @@ impl FederateInfo { pub fn set_server_port(&mut self, server_port: i32) { self.server_port = server_port; } + + pub fn set_server_ip_addr(&mut self, server_ip_addr: SocketAddr) { + self.server_ip_addr = server_ip_addr; + } } diff --git a/rust/rti/src/net_common.rs b/rust/rti/src/net_common.rs index 701a8ac..cd679ad 100644 --- a/rust/rti/src/net_common.rs +++ b/rust/rti/src/net_common.rs @@ -100,6 +100,7 @@ pub enum MsgType { StopRequestReply, StopGranted, AddressQuery, + AddressAdvertisement, P2pSendingFedId, P2pTaggedMessage, PortAbsent, @@ -125,6 +126,7 @@ impl MsgType { MsgType::StopRequestReply => 11, MsgType::StopGranted => 12, MsgType::AddressQuery => 13, + MsgType::AddressAdvertisement => 14, MsgType::P2pSendingFedId => 15, MsgType::P2pTaggedMessage => 17, MsgType::PortAbsent => 23, @@ -147,6 +149,7 @@ impl MsgType { 11 => MsgType::StopRequestReply, 12 => MsgType::StopGranted, 13 => MsgType::AddressQuery, + 14 => MsgType::AddressAdvertisement, 23 => MsgType::PortAbsent, _ => MsgType::Ignore, } diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 574e9e1..567b72c 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -8,7 +8,7 @@ */ use std::io::Write; use std::mem; -use std::net::{Shutdown, TcpListener, TcpStream}; +use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpListener, TcpStream}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::thread::JoinHandle; @@ -269,6 +269,16 @@ impl Server { cloned_stop_granted.clone(), ) } + MsgType::AddressQuery => Self::handle_address_query( + fed_id.try_into().unwrap(), + &mut stream, + cloned_rti.clone(), + ), + MsgType::AddressAdvertisement => Self::handle_address_ad( + fed_id.try_into().unwrap(), + &mut stream, + cloned_rti.clone(), + ), MsgType::PortAbsent => Self::handle_port_absent_message( &buffer, fed_id.try_into().unwrap(), @@ -365,13 +375,13 @@ impl Server { return -1; } else { // Received federate_info ID. - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let u16_size = mem::size_of::(); fed_id = u16::from_le_bytes(first_buffer[1..(1 + u16_size)].try_into().unwrap()); println!("RTI received federate_info ID: {}.", fed_id); // Read the federation ID. First read the length, which is one byte. - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let federation_id_length = u8::from_le_bytes( first_buffer[(1 + u16_size)..(1 + u16_size + 1)] .try_into() @@ -443,8 +453,20 @@ impl Server { "Federation ID matches! \"{}(received)\" <-> \"{}(_f_rti)\"", federation_id_received, federation_id ); - - // TODO: Assign the address information for federate_info. + { + let mut locked_rti = cloned_rti.write().unwrap(); + let idx: usize = fed_id.into(); + let federate_info: &mut FederateInfo = + &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + // The MsgType::FedIds message has the right federation ID. + // Assign the address information for federate. + // The IP address is stored here as an in_addr struct (in .server_ip_addr) that can be useful + // to create sockets and can be efficiently sent over the network. + // First, convert the sockaddr structure into a sockaddr_in that contains an internet address. + // Then extract the internet address (which is in IPv4 format) and assign it as the federate's socket server + // TODO: Handle unwrap() in the below line properly. + federate_info.set_server_ip_addr(stream.peer_addr().unwrap()); + } // Set the federate_info's state as pending // because it is waiting for the start time to be @@ -550,7 +572,7 @@ impl Server { let mut message_head: usize = 0; // First, read the info about upstream federates for i in 0..num_upstream { - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let upstream_id = u16::from_le_bytes( connection_info_body[message_head..(message_head + mem::size_of::())] .try_into() @@ -562,7 +584,7 @@ impl Server { "upstream_id: {}, message_head: {}", upstream_id, message_head ); - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let upstream_delay = i64::from_le_bytes( connection_info_body[message_head..(message_head + mem::size_of::())] .try_into() @@ -578,7 +600,7 @@ impl Server { // Next, read the info about downstream federates for i in 0..num_downstream { - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let downstream_id = u16::from_le_bytes( connection_info_body[message_head..(message_head + mem::size_of::())] .try_into() @@ -629,7 +651,7 @@ impl Server { if clock_sync_global_status >= ClockSyncStat::ClockSyncInit { // If no initial clock sync, no need perform initial clock sync. - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let federate_udp_port_number = u16::from_le_bytes(response[1..3].try_into().unwrap()); @@ -1589,6 +1611,88 @@ impl Server { ); } + fn handle_address_query(fed_id: u16, stream: &mut TcpStream, _f_rti: Arc>) { + // Use buffer both for reading and constructing the reply. + // The length is what is needed for the reply. + let mut buffer = vec![0 as u8; mem::size_of::()]; + NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, fed_id, "address query."); + let remote_fed_id = u16::from_le_bytes(buffer.clone().try_into().unwrap()); + + println!( + "RTI received address query from {} for {}.", + fed_id, remote_fed_id + ); + + // NOTE: server_port initializes to -1, which means the RTI does not know + // the port number because it has not yet received an MsgType::AddressAdvertisement message + // from this federate. In that case, it will respond by sending -1. + + // Response message is also of type MsgType::AddressQuery. + let mut response_message = vec![0 as u8; 1 + mem::size_of::()]; + response_message[0] = MsgType::AddressQuery.to_byte(); + + // Encode the port number. + let server_hostname; + let server_port; + let server_ip_addr; + { + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = remote_fed_id.into(); + let remote_fed = &locked_rti.base().scheduling_nodes()[idx]; + server_hostname = remote_fed.server_hostname(); + server_port = remote_fed.server_port(); + server_ip_addr = remote_fed.server_ip_addr(); + } + // Send the port number (which could be -1). + NetUtil::encode_int32(server_port, &mut response_message, 1); + NetUtil::write_to_socket_fail_on_error(stream, &response_message, fed_id, "port number"); + + // Send the server IP address to federate. + match server_ip_addr.ip() { + IpAddr::V4(ip_addr) => { + NetUtil::write_to_socket_fail_on_error( + stream, + &(ip_addr.octets().to_vec()), + fed_id, + "ip address", + ); + } + IpAddr::V6(_e) => { + println!("IPv6 is not supported yet. Exit."); + std::process::exit(1); + } + } + + println!( + "Replied to address query from federate {} with address {}({}):{}.", + fed_id, server_hostname, server_ip_addr, server_port + ); + } + + fn handle_address_ad(federate_id: u16, stream: &mut TcpStream, _f_rti: Arc>) { + // Read the port number of the federate that can be used for physical + // connections to other federates + let mut buffer = vec![0 as u8; mem::size_of::()]; + NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, federate_id, "port data"); + + // FIXME: Update from_le_bytes if endian types should be considered. + let server_port = i32::from_le_bytes(buffer.try_into().unwrap()); + + assert!(server_port <= u16::MAX.into()); + + { + let mut locked_rti = _f_rti.write().unwrap(); + let idx: usize = federate_id.into(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + fed.set_server_port(server_port); + } + + println!( + "Received address advertisement with port {} from federate {}.", + server_port, federate_id + ); + } + fn handle_port_absent_message( buffer: &Vec, fed_id: u16, @@ -1609,7 +1713,7 @@ impl Server { ); let u16_size = mem::size_of::(); - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let reactor_port_id = u16::from_le_bytes(header_buffer[0..(u16_size)].try_into().unwrap()); let federate_id = u16::from_le_bytes( header_buffer[(u16_size)..(u16_size + u16_size)]