Skip to content

Commit

Permalink
Merge pull request #1004 from microsoft/enhancement-ab-sched
Browse files Browse the repository at this point in the history
[demikernel] Switch to `runtime::pack_result()` for `catcollar` and `catpowder`
  • Loading branch information
anandbonde authored Nov 13, 2023
2 parents ef7e1f1 + 70b0a5d commit 7b92046
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 210 deletions.
96 changes: 2 additions & 94 deletions src/rust/catcollar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ use crate::{
QType,
},
types::{
demi_accept_result_t,
demi_opcode_t,
demi_qr_value_t,
demi_qresult_t,
demi_sgarray_t,
},
Expand Down Expand Up @@ -675,8 +672,8 @@ impl CatcollarLibOS {
}

pub fn pack_result(&mut self, handle: TaskHandle, qt: QToken) -> Result<demi_qresult_t, Fail> {
let (qd, r): (QDesc, OperationResult) = self.take_result(handle);
Ok(pack_result(&self.transport.clone(), r, qd, qt.into()))
let result: demi_qresult_t = self.runtime.remove_coroutine_and_get_result(&handle, qt.into());
Ok(result)
}

/// Allocates a scatter-gather array.
Expand All @@ -691,14 +688,6 @@ impl CatcollarLibOS {
self.runtime.free_sgarray(sga)
}

/// Takes out the operation result descriptor associated with the target scheduler handle.
fn take_result(&mut self, handle: TaskHandle) -> (QDesc, OperationResult) {
self.runtime
.remove_coroutine(&handle)
.get_result()
.expect("The coroutine has not finished")
}

fn get_shared_queue(&self, qd: &QDesc) -> Result<CatcollarQueue, Fail> {
Ok(self.runtime.get_shared_queue::<CatcollarQueue>(qd)?.clone())
}
Expand All @@ -714,84 +703,3 @@ impl CatcollarLibOS {
}
}
}
//======================================================================================================================
// Standalone Functions
//======================================================================================================================

/// Packs a [OperationResult] into a [demi_qresult_t].
fn pack_result(rt: &SharedIoUringRuntime, result: OperationResult, qd: QDesc, qt: u64) -> demi_qresult_t {
match result {
OperationResult::Connect => demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_CONNECT,
qr_qd: qd.into(),
qr_qt: qt,
qr_ret: 0,
qr_value: unsafe { mem::zeroed() },
},
OperationResult::Accept((new_qd, addr)) => {
let saddr: SockAddr = linux::socketaddrv4_to_sockaddr(&addr);
let qr_value: demi_qr_value_t = demi_qr_value_t {
ares: demi_accept_result_t {
qd: new_qd.into(),
addr: saddr,
},
};
demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_ACCEPT,
qr_qd: qd.into(),
qr_qt: qt,
qr_ret: 0,
qr_value,
}
},
OperationResult::Push => demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_PUSH,
qr_qd: qd.into(),
qr_qt: qt,
qr_ret: 0,
qr_value: unsafe { mem::zeroed() },
},
OperationResult::Pop(addr, bytes) => match rt.into_sgarray(bytes) {
Ok(mut sga) => {
if let Some(addr) = addr {
sga.sga_addr = linux::socketaddrv4_to_sockaddr(&addr);
}
let qr_value: demi_qr_value_t = demi_qr_value_t { sga };
demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_POP,
qr_qd: qd.into(),
qr_qt: qt,
qr_ret: 0,
qr_value,
}
},
Err(e) => {
warn!("Operation Failed: {:?}", e);
demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_FAILED,
qr_qd: qd.into(),
qr_qt: qt,
qr_ret: e.errno as i64,
qr_value: unsafe { mem::zeroed() },
}
},
},
OperationResult::Close => demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_CLOSE,
qr_qd: qd.into(),
qr_qt: qt,
qr_ret: 0,
qr_value: unsafe { mem::zeroed() },
},
OperationResult::Failed(e) => {
warn!("Operation Failed: {:?}", e);
demi_qresult_t {
qr_opcode: demi_opcode_t::DEMI_OPC_FAILED,
qr_qd: qd.into(),
qr_qt: qt,
qr_ret: e.errno as i64,
qr_value: unsafe { mem::zeroed() },
}
},
}
}
6 changes: 1 addition & 5 deletions src/rust/catloop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use crate::{
QType,
};
use ::std::{
mem,
net::{
Ipv4Addr,
SocketAddr,
Expand Down Expand Up @@ -525,10 +524,7 @@ impl SharedCatloopLibOS {
_ => {
match self.get_queue(&QDesc::from(result.qr_qd)) {
Ok(mut queue) => queue.remove_pending_op(&handle),
Err(_) => warn!(
"Catloop::take_result() qd={:?}, lingering pending op found",
result.qr_qd
),
Err(_) => warn!("catloop: qd={:?}, lingering pending op found", result.qr_qd),
};
},
}
Expand Down
5 changes: 1 addition & 4 deletions src/rust/catnap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,7 @@ impl SharedCatnapLibOS {
_ => {
match self.get_shared_queue(&QDesc::from(result.qr_qd)) {
Ok(mut queue) => queue.remove_pending_op(&handle),
Err(_) => warn!(
"Catnap::take_result() qd={:?}, lingering pending op found",
result.qr_qd
),
Err(_) => warn!("catnap: qd={:?}, lingering pending op found", result.qr_qd),
};
},
}
Expand Down
99 changes: 0 additions & 99 deletions src/rust/catpowder/interop.rs

This file was deleted.

11 changes: 3 additions & 8 deletions src/rust/catpowder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@
// Licensed under the MIT license.

mod config;
mod interop;
pub mod runtime;

//==============================================================================
// Imports
//==============================================================================

use self::{
interop::pack_result,
runtime::LinuxRuntime,
};
use self::runtime::LinuxRuntime;
use crate::{
demikernel::config::Config,
inetstack::InetStack,
Expand All @@ -27,7 +23,6 @@ use crate::{
demi_qresult_t,
demi_sgarray_t,
},
OperationResult,
QDesc,
QToken,
SharedBox,
Expand Down Expand Up @@ -133,8 +128,8 @@ impl CatpowderLibOS {
}

pub fn pack_result(&mut self, handle: TaskHandle, qt: QToken) -> Result<demi_qresult_t, Fail> {
let (qd, r): (QDesc, OperationResult) = self.take_operation(handle);
Ok(pack_result(&self.transport, r, qd, qt.into()))
let result: demi_qresult_t = self.runtime.remove_coroutine_and_get_result(&handle, qt.into());
Ok(result)
}

/// Allocates a scatter-gather array.
Expand Down

0 comments on commit 7b92046

Please sign in to comment.