Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add guest-profiler support to serve mode #301

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#![cfg_attr(not(debug_assertions), doc(test(attr(allow(dead_code)))))]
#![cfg_attr(not(debug_assertions), doc(test(attr(allow(unused_variables)))))]

use std::path::PathBuf;
use std::process::ExitCode;

use wasi_common::I32Exit;
Expand All @@ -39,7 +40,16 @@ use {
/// Create a new server, bind it to an address, and serve responses until an error occurs.
pub async fn serve(serve_args: ServeArgs) -> Result<(), Error> {
// Load the wasm module into an execution context
let ctx = create_execution_context(serve_args.shared(), true).await?;
let ctx = create_execution_context(
serve_args.shared(),
true,
serve_args.profile_guest().cloned(),
)
.await?;

if let Some(guest_profile_path) = serve_args.profile_guest() {
std::fs::create_dir_all(guest_profile_path)?;
}

let addr = serve_args.addr();
ViceroyService::new(ctx).serve(addr).await?;
Expand Down Expand Up @@ -93,18 +103,14 @@ pub async fn main() -> ExitCode {
/// Execute a Wasm program in the Viceroy environment.
pub async fn run_wasm_main(run_args: RunArgs) -> Result<(), anyhow::Error> {
// Load the wasm module into an execution context
let ctx = create_execution_context(run_args.shared(), false).await?;
let ctx = create_execution_context(run_args.shared(), false, run_args.profile_guest().cloned())
.await?;
let input = run_args.shared().input();
let program_name = match input.file_stem() {
Some(stem) => stem.to_string_lossy(),
None => panic!("program cannot be a directory"),
};
ctx.run_main(
&program_name,
run_args.wasm_args(),
run_args.profile_guest(),
)
.await
ctx.run_main(&program_name, run_args.wasm_args()).await
}

fn install_tracing_subscriber(verbosity: u8) {
Expand Down Expand Up @@ -233,11 +239,17 @@ impl<'a> MakeWriter<'a> for StdWriter {
async fn create_execution_context(
args: &SharedArgs,
check_backends: bool,
guest_profile_path: Option<PathBuf>,
) -> Result<ExecuteCtx, anyhow::Error> {
let input = args.input();
let mut ctx = ExecuteCtx::new(input, args.profiling_strategy(), args.wasi_modules())?
.with_log_stderr(args.log_stderr())
.with_log_stdout(args.log_stdout());
let mut ctx = ExecuteCtx::new(
input,
args.profiling_strategy(),
args.wasi_modules(),
guest_profile_path,
)?
.with_log_stderr(args.log_stderr())
.with_log_stdout(args.log_stdout());

if let Some(config_path) = args.config_path() {
let config = FastlyConfig::from_file(config_path)?;
Expand Down
10 changes: 10 additions & 0 deletions cli/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub struct ServeArgs {
#[arg(long = "addr")]
socket_addr: Option<SocketAddr>,

/// Whether to profile the wasm guest. Takes an optional directory to save
/// per-request profiles to
#[arg(long, default_missing_value = "guest-profiles", num_args=0..=1, require_equals=true)]
profile_guest: Option<PathBuf>,

#[command(flatten)]
shared: SharedArgs,
}
Expand Down Expand Up @@ -99,6 +104,11 @@ impl ServeArgs {
.unwrap_or_else(|| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7676))
}

/// The path to write guest profiles to
pub fn profile_guest(&self) -> Option<&PathBuf> {
self.profile_guest.as_ref()
}

pub fn shared(&self) -> &SharedArgs {
&self.shared
}
Expand Down
23 changes: 14 additions & 9 deletions cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,20 @@ impl Test {
self.backends.start_servers().await;
}

let ctx = ExecuteCtx::new(&self.module_path, ProfilingStrategy::None, HashSet::new())
.expect("failed to set up execution context")
.with_backends(self.backends.backend_configs().await)
.with_dictionaries(self.dictionaries.clone())
.with_geolocation(self.geolocation.clone())
.with_object_stores(self.object_stores.clone())
.with_secret_stores(self.secret_stores.clone())
.with_log_stderr(self.log_stderr)
.with_log_stdout(self.log_stdout);
let ctx = ExecuteCtx::new(
&self.module_path,
ProfilingStrategy::None,
HashSet::new(),
None,
)
.expect("failed to set up execution context")
.with_backends(self.backends.backend_configs().await)
.with_dictionaries(self.dictionaries.clone())
.with_geolocation(self.geolocation.clone())
.with_object_stores(self.object_stores.clone())
.with_secret_stores(self.secret_stores.clone())
.with_log_stderr(self.log_stderr)
.with_log_stdout(self.log_stdout);

if self.via_hyper {
let svc = ViceroyService::new(ctx);
Expand Down
2 changes: 1 addition & 1 deletion cli/tests/trap-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub type TestResult = Result<(), Error>;
#[tokio::test(flavor = "multi_thread")]
async fn fatal_error_traps() -> TestResult {
let module_path = format!("{RUST_FIXTURE_PATH}/response.wasm");
let ctx = ExecuteCtx::new(module_path, ProfilingStrategy::None, HashSet::new())?;
let ctx = ExecuteCtx::new(module_path, ProfilingStrategy::None, HashSet::new(), None)?;
let req = Request::get("http://127.0.0.1:7676/").body(Body::from(""))?;
let resp = ctx
.handle_request_with_runtime_error(req, "127.0.0.1".parse().unwrap())
Expand Down
85 changes: 56 additions & 29 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Guest code execution.

use std::time::SystemTime;

use wasmtime::GuestProfiler;

use {
Expand Down Expand Up @@ -68,6 +70,10 @@ pub struct ExecuteCtx {
// `Arc` for the two fields below because this struct must be `Clone`.
epoch_increment_thread: Option<Arc<JoinHandle<()>>>,
epoch_increment_stop: Arc<AtomicBool>,
/// Path to write profiling results from the guest. In serve mode,
/// this must refer to a directory, while in run mode it names
/// a file.
guest_profile_path: Arc<Option<PathBuf>>,
}

impl ExecuteCtx {
Expand All @@ -76,6 +82,7 @@ impl ExecuteCtx {
module_path: impl AsRef<Path>,
profiling_strategy: ProfilingStrategy,
wasi_modules: HashSet<ExperimentalModule>,
guest_profile_path: Option<PathBuf>,
) -> Result<Self, Error> {
let config = &configure_wasmtime(profiling_strategy);
let engine = Engine::new(config)?;
Expand Down Expand Up @@ -112,6 +119,7 @@ impl ExecuteCtx {
secret_stores: Arc::new(SecretStores::new()),
epoch_increment_thread,
epoch_increment_stop,
guest_profile_path: Arc::new(guest_profile_path),
})
}

Expand Down Expand Up @@ -217,7 +225,7 @@ impl ExecuteCtx {
/// # use viceroy_lib::{Error, ExecuteCtx, ProfilingStrategy, ViceroyService};
/// # async fn f() -> Result<(), Error> {
/// # let req = Request::new(Body::from(""));
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new())?;
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new(), None)?;
/// let resp = ctx.handle_request(req, "127.0.0.1".parse().unwrap()).await?;
/// # Ok(())
/// # }
Expand Down Expand Up @@ -310,11 +318,28 @@ impl ExecuteCtx {
self.object_store.clone(),
self.secret_stores.clone(),
);

let guest_profile_path = self.guest_profile_path.as_deref().map(|path| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
path.join(format!("{}-{}.json", now, req_id))
});
let profiler = guest_profile_path.is_some().then(|| {
let program_name = "main";
GuestProfiler::new(
program_name,
EPOCH_INTERRUPTION_PERIOD,
vec![(program_name.to_string(), self.module.clone())],
)
});

// We currently have to postpone linking and instantiation to the guest task
// due to wasmtime limitations, in particular the fact that `Instance` is not `Send`.
// However, the fact that the module itself is created within `ExecuteCtx::new`
// means that the heavy lifting happens only once.
let mut store = create_store(&self, session, None).map_err(ExecutionError::Context)?;
let mut store = create_store(&self, session, profiler).map_err(ExecutionError::Context)?;

let instance = self
.instance_pre
Expand Down Expand Up @@ -346,6 +371,9 @@ impl ExecuteCtx {
}
};

// If we collected a profile, write it to the file
write_profile(&mut store, guest_profile_path.as_ref());

// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
store.data_mut().close_downstream_response_sender();
Expand All @@ -364,12 +392,7 @@ impl ExecuteCtx {
outcome
}

pub async fn run_main(
self,
program_name: &str,
args: &[String],
guest_profile_path: Option<&PathBuf>,
) -> Result<(), anyhow::Error> {
pub async fn run_main(self, program_name: &str, args: &[String]) -> Result<(), anyhow::Error> {
Comment on lines -368 to +395
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: We need to release this as 0.7.x since we are changing the signature of a public crate function

// placeholders for request, result sender channel, and remote IP
let req = Request::get("http://example.com/").body(Body::empty())?;
let req_id = 0;
Expand All @@ -390,7 +413,7 @@ impl ExecuteCtx {
self.secret_stores.clone(),
);

let profiler = guest_profile_path.map(|_| {
let profiler = self.guest_profile_path.is_some().then(|| {
GuestProfiler::new(
program_name,
EPOCH_INTERRUPTION_PERIOD,
Expand Down Expand Up @@ -419,26 +442,7 @@ impl ExecuteCtx {
let result = main_func.call_async(&mut store, ()).await;

// If we collected a profile, write it to the file
if let (Some(profile), Some(path)) =
(store.data_mut().take_guest_profiler(), guest_profile_path)
{
if let Err(e) = std::fs::File::create(&path)
.map_err(anyhow::Error::new)
.and_then(|output| profile.finish(std::io::BufWriter::new(output)))
{
event!(
Level::ERROR,
"failed writing profile at {}: {e:#}",
path.display()
);
} else {
event!(
Level::INFO,
"\nProfile written to: {}\nView this profile at https://profiler.firefox.com/.",
path.display()
);
}
}
write_profile(&mut store, self.guest_profile_path.as_ref().as_ref());

// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
Expand All @@ -453,6 +457,29 @@ impl ExecuteCtx {
}
}

fn write_profile(store: &mut wasmtime::Store<WasmCtx>, guest_profile_path: Option<&PathBuf>) {
if let (Some(profile), Some(path)) =
(store.data_mut().take_guest_profiler(), guest_profile_path)
{
if let Err(e) = std::fs::File::create(&path)
.map_err(anyhow::Error::new)
.and_then(|output| profile.finish(std::io::BufWriter::new(output)))
{
event!(
Level::ERROR,
"failed writing profile at {}: {e:#}",
path.display()
);
} else {
event!(
Level::INFO,
"\nProfile written to: {}\nView this profile at https://profiler.firefox.com/.",
path.display()
);
}
}
}

impl Drop for ExecuteCtx {
fn drop(&mut self) {
if let Some(arc) = self.epoch_increment_thread.take() {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl ViceroyService {
/// # use std::collections::HashSet;
/// use viceroy_lib::{Error, ExecuteCtx, ProfilingStrategy, ViceroyService};
/// # fn f() -> Result<(), Error> {
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new())?;
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new(), None)?;
/// let svc = ViceroyService::new(ctx);
/// # Ok(())
/// # }
Expand Down