Skip to content

Commit

Permalink
Handle a message type for address advertisement/query
Browse files Browse the repository at this point in the history
- Functions for MsgType::AddressAdvertisement (Byte value 14) and
  MsgType::AddressQuery (Byte value: 13) are added.
  • Loading branch information
chanijjani committed Feb 18, 2024
1 parent 7ba3d49 commit f58daf2
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 17 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ jobs:
- name: Unit tests
run: cd rust/rti; cargo test

# fetch-lf:
# uses: chanijjani/lingua-franca/.github/workflows/extract-ref.yml@master
# with:
# file: 'lingua-franca-ref.txt'

lf-default:
# TODO(chanijjani): Change the pointer to point to the main lingua-franca repo.
uses: chanijjani/lingua-franca/.github/workflows/c-tests-with-rust-rti.yml@integration_tests_with_rust_rti
25 changes: 20 additions & 5 deletions rust/rti/src/federate_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ pub struct FederateInfo {
// value of each message for a more efficient access.
server_hostname: String, // Human-readable IP address and
server_port: i32, // port number of the socket server of the federate
// if it has any incoming direct connections from other federates.
// The port number will be -1 if there is no server or if the
// RTI has not been informed of the port number.
// TODO: struct in_addr server_ip_addr; // Information about the IP address of the socket
// server of the federate.
// if it has any incoming direct connections from other federates.
// The port number will be -1 if there is no server or if the
// RTI has not been informed of the port number.
// TODO: struct in_addr server_ip_addr; // Information about the IP address of the socket
// server of the federate.
server_ip_addr: String, // Information about the IP address of the socket
// server of the federate.
}

impl FederateInfo {
Expand All @@ -58,6 +60,7 @@ impl FederateInfo {
in_transit_message_tags: InTransitMessageRecordQueue::new(),
server_hostname: String::from("localhost"),
server_port: -1,
server_ip_addr: String::from("127.0.0.1"),
}
}

Expand All @@ -81,6 +84,18 @@ impl FederateInfo {
self.clock_synchronization_enabled
}

pub fn server_hostname(&self) -> String {
self.server_hostname.clone()
}

pub fn server_port(&self) -> i32 {
self.server_port
}

pub fn server_ip_addr(&self) -> String {
self.server_ip_addr.clone()
}

pub fn set_requested_stop(&mut self, requested_stop: bool) {
self.requested_stop = requested_stop;
}
Expand Down
3 changes: 3 additions & 0 deletions rust/rti/src/net_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub enum MsgType {
StopRequestReply,
StopGranted,
AddressQuery,
AddressAdvertisement,
P2pSendingFedId,
P2pTaggedMessage,
PortAbsent,
Expand All @@ -125,6 +126,7 @@ impl MsgType {
MsgType::StopRequestReply => 11,
MsgType::StopGranted => 12,
MsgType::AddressQuery => 13,
MsgType::AddressAdvertisement => 14,
MsgType::P2pSendingFedId => 15,
MsgType::P2pTaggedMessage => 17,
MsgType::PortAbsent => 23,
Expand All @@ -147,6 +149,7 @@ impl MsgType {
11 => MsgType::StopRequestReply,
12 => MsgType::StopGranted,
13 => MsgType::AddressQuery,
14 => MsgType::AddressAdvertisement,
23 => MsgType::PortAbsent,
_ => MsgType::Ignore,
}
Expand Down
98 changes: 91 additions & 7 deletions rust/rti/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,16 @@ impl Server {
cloned_stop_granted.clone(),
)
}
MsgType::AddressQuery => Self::handle_address_query(
fed_id.try_into().unwrap(),
&mut stream,
cloned_rti.clone(),
),
MsgType::AddressAdvertisement => Self::handle_address_ad(
fed_id.try_into().unwrap(),
&mut stream,
cloned_rti.clone(),
),
MsgType::PortAbsent => Self::handle_port_absent_message(
&buffer,
fed_id.try_into().unwrap(),
Expand Down Expand Up @@ -364,13 +374,13 @@ impl Server {
return -1;
} else {
// Received federate_info ID.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let u16_size = mem::size_of::<u16>();
fed_id = u16::from_le_bytes(first_buffer[1..(1 + u16_size)].try_into().unwrap());
println!("RTI received federate_info ID: {}.", fed_id);

// Read the federation ID. First read the length, which is one byte.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let federation_id_length = u8::from_le_bytes(
first_buffer[(1 + u16_size)..(1 + u16_size + 1)]
.try_into()
Expand Down Expand Up @@ -550,7 +560,7 @@ impl Server {
let mut message_head: usize = 0;
// First, read the info about upstream federates
for i in 0..num_upstream {
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let upstream_id = u16::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<u16>())]
.try_into()
Expand All @@ -562,7 +572,7 @@ impl Server {
"upstream_id: {}, message_head: {}",
upstream_id, message_head
);
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let upstream_delay = i64::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<i64>())]
.try_into()
Expand All @@ -578,7 +588,7 @@ impl Server {

// Next, read the info about downstream federates
for i in 0..num_downstream {
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let downstream_id = u16::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<u16>())]
.try_into()
Expand Down Expand Up @@ -629,7 +639,7 @@ impl Server {

if clock_sync_global_status >= ClockSyncStat::ClockSyncInit {
// If no initial clock sync, no need perform initial clock sync.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let federate_udp_port_number =
u16::from_le_bytes(response[1..3].try_into().unwrap());

Expand Down Expand Up @@ -1587,6 +1597,80 @@ impl Server {
);
}

fn handle_address_query(fed_id: u16, stream: &mut TcpStream, _f_rti: Arc<Mutex<RTIRemote>>) {
// Use buffer both for reading and constructing the reply.
// The length is what is needed for the reply.
let mut buffer = vec![0 as u8; 1 + mem::size_of::<i32>()];
NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, fed_id, "address query.");
let remote_fed_id =
u16::from_le_bytes(buffer[0..mem::size_of::<u16>()].try_into().unwrap());

println!(
"RTI received address query from {} for {}.",
fed_id, remote_fed_id
);

// NOTE: server_port initializes to -1, which means the RTI does not know
// the port number because it has not yet received an MsgType::AddressAdvertisement message
// from this federate. In that case, it will respond by sending -1.

// Response message is also of type MsgType::AddressQuery.
buffer[0] = MsgType::AddressQuery.to_byte();

// Encode the port number.
let server_hostname;
let server_port;
let server_ip_addr;
{
let mut locked_rti = _f_rti.lock().unwrap();
let idx: usize = remote_fed_id.into();
let remote_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx];
server_hostname = remote_fed.server_hostname();
server_port = remote_fed.server_port();
server_ip_addr = remote_fed.server_ip_addr();
}
// Send the port number (which could be -1).
NetUtil::encode_int32(server_port, &mut buffer, 1);
NetUtil::write_to_socket_fail_on_error(stream, &buffer, fed_id, "port number");

// Send the server IP address to federate.
NetUtil::write_to_socket_fail_on_error(
stream,
&(server_ip_addr.as_str().as_bytes().to_vec()),
fed_id,
"ip address",
);

println!(
"Replied to address query from federate {} with address {}({}):{}.",
fed_id, server_hostname, server_ip_addr, server_port
);
}

fn handle_address_ad(federate_id: u16, stream: &mut TcpStream, _f_rti: Arc<Mutex<RTIRemote>>) {
// Read the port number of the federate that can be used for physical
// connections to other federates
let mut buffer = vec![0 as u8; mem::size_of::<i32>()];
NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, federate_id, "port data");

// FIXME: Update from_le_bytes if endian types should be considered.
let server_port = i32::from_le_bytes(buffer.try_into().unwrap());

assert!(server_port <= u16::MAX.into());

{
let mut locked_rti = _f_rti.lock().unwrap();
let idx: usize = federate_id.into();
let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx];
fed.set_server_port(server_port);
}

println!(
"Received address advertisement with port {} from federate {}.",
server_port, federate_id
);
}

fn handle_port_absent_message(
buffer: &Vec<u8>,
fed_id: u16,
Expand All @@ -1607,7 +1691,7 @@ impl Server {
);

let u16_size = mem::size_of::<u16>();
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let reactor_port_id = u16::from_le_bytes(header_buffer[0..(u16_size)].try_into().unwrap());
let federate_id = u16::from_le_bytes(
header_buffer[(u16_size)..(u16_size + u16_size)]
Expand Down

0 comments on commit f58daf2

Please sign in to comment.