Skip to content

Commit

Permalink
Handle a message type for address advertisement
Browse files Browse the repository at this point in the history
- MsgType::AddressAdvertisement which has an integer value 14
  • Loading branch information
chanijjani committed Feb 17, 2024
1 parent 7ba3d49 commit fa81dd2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
3 changes: 3 additions & 0 deletions rust/rti/src/net_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub enum MsgType {
StopRequestReply,
StopGranted,
AddressQuery,
AddressAdvertisement,
P2pSendingFedId,
P2pTaggedMessage,
PortAbsent,
Expand All @@ -125,6 +126,7 @@ impl MsgType {
MsgType::StopRequestReply => 11,
MsgType::StopGranted => 12,
MsgType::AddressQuery => 13,
MsgType::AddressAdvertisement => 14,
MsgType::P2pSendingFedId => 15,
MsgType::P2pTaggedMessage => 17,
MsgType::PortAbsent => 23,
Expand All @@ -147,6 +149,7 @@ impl MsgType {
11 => MsgType::StopRequestReply,
12 => MsgType::StopGranted,
13 => MsgType::AddressQuery,
14 => MsgType::AddressAdvertisement,
23 => MsgType::PortAbsent,
_ => MsgType::Ignore,
}
Expand Down
43 changes: 36 additions & 7 deletions rust/rti/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ impl Server {
cloned_stop_granted.clone(),
)
}
MsgType::AddressAdvertisement => Self::handle_address_ad(
fed_id.try_into().unwrap(),
&mut stream,
cloned_rti.clone(),
),
MsgType::PortAbsent => Self::handle_port_absent_message(
&buffer,
fed_id.try_into().unwrap(),
Expand Down Expand Up @@ -364,13 +369,13 @@ impl Server {
return -1;
} else {
// Received federate_info ID.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let u16_size = mem::size_of::<u16>();
fed_id = u16::from_le_bytes(first_buffer[1..(1 + u16_size)].try_into().unwrap());
println!("RTI received federate_info ID: {}.", fed_id);

// Read the federation ID. First read the length, which is one byte.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let federation_id_length = u8::from_le_bytes(
first_buffer[(1 + u16_size)..(1 + u16_size + 1)]
.try_into()
Expand Down Expand Up @@ -550,7 +555,7 @@ impl Server {
let mut message_head: usize = 0;
// First, read the info about upstream federates
for i in 0..num_upstream {
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let upstream_id = u16::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<u16>())]
.try_into()
Expand All @@ -562,7 +567,7 @@ impl Server {
"upstream_id: {}, message_head: {}",
upstream_id, message_head
);
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let upstream_delay = i64::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<i64>())]
.try_into()
Expand All @@ -578,7 +583,7 @@ impl Server {

// Next, read the info about downstream federates
for i in 0..num_downstream {
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let downstream_id = u16::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<u16>())]
.try_into()
Expand Down Expand Up @@ -629,7 +634,7 @@ impl Server {

if clock_sync_global_status >= ClockSyncStat::ClockSyncInit {
// If no initial clock sync, no need perform initial clock sync.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let federate_udp_port_number =
u16::from_le_bytes(response[1..3].try_into().unwrap());

Expand Down Expand Up @@ -1587,6 +1592,30 @@ impl Server {
);
}

fn handle_address_ad(federate_id: u16, stream: &mut TcpStream, _f_rti: Arc<Mutex<RTIRemote>>) {
// Read the port number of the federate that can be used for physical
// connections to other federates
let mut buffer = vec![0 as u8; mem::size_of::<i32>()];
NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, federate_id, "port data");

// FIXME: Update from_le_bytes if endian types should be considered.
let server_port = i32::from_le_bytes(buffer.try_into().unwrap());

assert!(server_port <= u16::MAX.into());

{
let mut locked_rti = _f_rti.lock().unwrap();
let idx: usize = federate_id.into();
let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx];
fed.set_server_port(server_port);
}

println!(
"Received address advertisement with port {} from federate {}.",
server_port, federate_id
);
}

fn handle_port_absent_message(
buffer: &Vec<u8>,
fed_id: u16,
Expand All @@ -1607,7 +1636,7 @@ impl Server {
);

let u16_size = mem::size_of::<u16>();
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let reactor_port_id = u16::from_le_bytes(header_buffer[0..(u16_size)].try_into().unwrap());
let federate_id = u16::from_le_bytes(
header_buffer[(u16_size)..(u16_size + u16_size)]
Expand Down

0 comments on commit fa81dd2

Please sign in to comment.