From f58daf23b8739894c75963938d825f68855d8c81 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sat, 17 Feb 2024 12:46:41 -0700 Subject: [PATCH] Handle a message type for address advertisement/query - Functions for MsgType::AddressAdvertisement (Byte value 14) and MsgType::AddressQuery (Byte value: 13) are added. --- .github/workflows/rust.yml | 5 -- rust/rti/src/federate_info.rs | 25 +++++++-- rust/rti/src/net_common.rs | 3 ++ rust/rti/src/server.rs | 98 ++++++++++++++++++++++++++++++++--- 4 files changed, 114 insertions(+), 17 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index bcc8be5..3d4ecf0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -26,11 +26,6 @@ jobs: - name: Unit tests run: cd rust/rti; cargo test - # fetch-lf: - # uses: chanijjani/lingua-franca/.github/workflows/extract-ref.yml@master - # with: - # file: 'lingua-franca-ref.txt' - lf-default: # TODO(chanijjani): Change the pointer to point to the main lingua-franca repo. uses: chanijjani/lingua-franca/.github/workflows/c-tests-with-rust-rti.yml@integration_tests_with_rust_rti diff --git a/rust/rti/src/federate_info.rs b/rust/rti/src/federate_info.rs index 2ed3cea..7e36072 100644 --- a/rust/rti/src/federate_info.rs +++ b/rust/rti/src/federate_info.rs @@ -41,11 +41,13 @@ pub struct FederateInfo { // value of each message for a more efficient access. server_hostname: String, // Human-readable IP address and server_port: i32, // port number of the socket server of the federate - // if it has any incoming direct connections from other federates. - // The port number will be -1 if there is no server or if the - // RTI has not been informed of the port number. - // TODO: struct in_addr server_ip_addr; // Information about the IP address of the socket - // server of the federate. + // if it has any incoming direct connections from other federates. + // The port number will be -1 if there is no server or if the + // RTI has not been informed of the port number. + // TODO: struct in_addr server_ip_addr; // Information about the IP address of the socket + // server of the federate. + server_ip_addr: String, // Information about the IP address of the socket + // server of the federate. } impl FederateInfo { @@ -58,6 +60,7 @@ impl FederateInfo { in_transit_message_tags: InTransitMessageRecordQueue::new(), server_hostname: String::from("localhost"), server_port: -1, + server_ip_addr: String::from("127.0.0.1"), } } @@ -81,6 +84,18 @@ impl FederateInfo { self.clock_synchronization_enabled } + pub fn server_hostname(&self) -> String { + self.server_hostname.clone() + } + + pub fn server_port(&self) -> i32 { + self.server_port + } + + pub fn server_ip_addr(&self) -> String { + self.server_ip_addr.clone() + } + pub fn set_requested_stop(&mut self, requested_stop: bool) { self.requested_stop = requested_stop; } diff --git a/rust/rti/src/net_common.rs b/rust/rti/src/net_common.rs index 701a8ac..cd679ad 100644 --- a/rust/rti/src/net_common.rs +++ b/rust/rti/src/net_common.rs @@ -100,6 +100,7 @@ pub enum MsgType { StopRequestReply, StopGranted, AddressQuery, + AddressAdvertisement, P2pSendingFedId, P2pTaggedMessage, PortAbsent, @@ -125,6 +126,7 @@ impl MsgType { MsgType::StopRequestReply => 11, MsgType::StopGranted => 12, MsgType::AddressQuery => 13, + MsgType::AddressAdvertisement => 14, MsgType::P2pSendingFedId => 15, MsgType::P2pTaggedMessage => 17, MsgType::PortAbsent => 23, @@ -147,6 +149,7 @@ impl MsgType { 11 => MsgType::StopRequestReply, 12 => MsgType::StopGranted, 13 => MsgType::AddressQuery, + 14 => MsgType::AddressAdvertisement, 23 => MsgType::PortAbsent, _ => MsgType::Ignore, } diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 283e906..e4334d4 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -267,6 +267,16 @@ impl Server { cloned_stop_granted.clone(), ) } + MsgType::AddressQuery => Self::handle_address_query( + fed_id.try_into().unwrap(), + &mut stream, + cloned_rti.clone(), + ), + MsgType::AddressAdvertisement => Self::handle_address_ad( + fed_id.try_into().unwrap(), + &mut stream, + cloned_rti.clone(), + ), MsgType::PortAbsent => Self::handle_port_absent_message( &buffer, fed_id.try_into().unwrap(), @@ -364,13 +374,13 @@ impl Server { return -1; } else { // Received federate_info ID. - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let u16_size = mem::size_of::(); fed_id = u16::from_le_bytes(first_buffer[1..(1 + u16_size)].try_into().unwrap()); println!("RTI received federate_info ID: {}.", fed_id); // Read the federation ID. First read the length, which is one byte. - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let federation_id_length = u8::from_le_bytes( first_buffer[(1 + u16_size)..(1 + u16_size + 1)] .try_into() @@ -550,7 +560,7 @@ impl Server { let mut message_head: usize = 0; // First, read the info about upstream federates for i in 0..num_upstream { - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let upstream_id = u16::from_le_bytes( connection_info_body[message_head..(message_head + mem::size_of::())] .try_into() @@ -562,7 +572,7 @@ impl Server { "upstream_id: {}, message_head: {}", upstream_id, message_head ); - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let upstream_delay = i64::from_le_bytes( connection_info_body[message_head..(message_head + mem::size_of::())] .try_into() @@ -578,7 +588,7 @@ impl Server { // Next, read the info about downstream federates for i in 0..num_downstream { - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let downstream_id = u16::from_le_bytes( connection_info_body[message_head..(message_head + mem::size_of::())] .try_into() @@ -629,7 +639,7 @@ impl Server { if clock_sync_global_status >= ClockSyncStat::ClockSyncInit { // If no initial clock sync, no need perform initial clock sync. - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let federate_udp_port_number = u16::from_le_bytes(response[1..3].try_into().unwrap()); @@ -1587,6 +1597,80 @@ impl Server { ); } + fn handle_address_query(fed_id: u16, stream: &mut TcpStream, _f_rti: Arc>) { + // Use buffer both for reading and constructing the reply. + // The length is what is needed for the reply. + let mut buffer = vec![0 as u8; 1 + mem::size_of::()]; + NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, fed_id, "address query."); + let remote_fed_id = + u16::from_le_bytes(buffer[0..mem::size_of::()].try_into().unwrap()); + + println!( + "RTI received address query from {} for {}.", + fed_id, remote_fed_id + ); + + // NOTE: server_port initializes to -1, which means the RTI does not know + // the port number because it has not yet received an MsgType::AddressAdvertisement message + // from this federate. In that case, it will respond by sending -1. + + // Response message is also of type MsgType::AddressQuery. + buffer[0] = MsgType::AddressQuery.to_byte(); + + // Encode the port number. + let server_hostname; + let server_port; + let server_ip_addr; + { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = remote_fed_id.into(); + let remote_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + server_hostname = remote_fed.server_hostname(); + server_port = remote_fed.server_port(); + server_ip_addr = remote_fed.server_ip_addr(); + } + // Send the port number (which could be -1). + NetUtil::encode_int32(server_port, &mut buffer, 1); + NetUtil::write_to_socket_fail_on_error(stream, &buffer, fed_id, "port number"); + + // Send the server IP address to federate. + NetUtil::write_to_socket_fail_on_error( + stream, + &(server_ip_addr.as_str().as_bytes().to_vec()), + fed_id, + "ip address", + ); + + println!( + "Replied to address query from federate {} with address {}({}):{}.", + fed_id, server_hostname, server_ip_addr, server_port + ); + } + + fn handle_address_ad(federate_id: u16, stream: &mut TcpStream, _f_rti: Arc>) { + // Read the port number of the federate that can be used for physical + // connections to other federates + let mut buffer = vec![0 as u8; mem::size_of::()]; + NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, federate_id, "port data"); + + // FIXME: Update from_le_bytes if endian types should be considered. + let server_port = i32::from_le_bytes(buffer.try_into().unwrap()); + + assert!(server_port <= u16::MAX.into()); + + { + let mut locked_rti = _f_rti.lock().unwrap(); + let idx: usize = federate_id.into(); + let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + fed.set_server_port(server_port); + } + + println!( + "Received address advertisement with port {} from federate {}.", + server_port, federate_id + ); + } + fn handle_port_absent_message( buffer: &Vec, fed_id: u16, @@ -1607,7 +1691,7 @@ impl Server { ); let u16_size = mem::size_of::(); - // FIXME: Change from_le_bytes properly. + // FIXME: Update from_le_bytes if endian types should be considered. let reactor_port_id = u16::from_le_bytes(header_buffer[0..(u16_size)].try_into().unwrap()); let federate_id = u16::from_le_bytes( header_buffer[(u16_size)..(u16_size + u16_size)]