From 4474728be9f26686ca1bc944c7431988cf4c6b76 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 28 May 2024 03:28:29 +0000 Subject: [PATCH 01/16] chore: add dependency --- Cargo.toml | 3 ++- crates/base/Cargo.toml | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e270d4f0..fb3f2457 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,8 +68,9 @@ url = { version = "< 2.5.0", features = ["serde", "expose_internals"] } # upgrad eszip = "=0.72.2" log = "0.4.20" anyhow = "1.0.57" -libc = "0.2.126" +libc = "0.2.144" libz-sys = { version = "1.1", default-features = false } +num-traits = "0.2" enum-as-inner = "0.6.0" serde = { version = "1.0.149", features = ["derive"] } serde_json = "1.0.85" diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index cb094ed4..99e7b67b 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -76,8 +76,10 @@ rustls-pemfile.workspace = true tracing.workspace = true reqwest_v011.workspace = true tracing-subscriber = { workspace = true, optional = true, features = ["env-filter", "tracing-log"] } +num-traits.workspace = true tls-listener = { version = "0.10", features = ["rustls"] } +strum = { version = "0.25", features = ["derive"] } flume = "0.11.0" cooked-waker = "5" tokio-rustls = "0.25.0" From 06b911515b0a5fb549c349a2cd4f46165b437928 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 28 May 2024 03:28:42 +0000 Subject: [PATCH 02/16] chore: update `Cargo.lock` --- Cargo.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 1d0bc452..c438cbb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -805,6 +805,7 @@ dependencies = [ "log", "monch", "notify", + "num-traits", "once_cell", "pin-project", "reqwest 0.11.27", @@ -822,6 +823,7 @@ dependencies = [ "scopeguard", "serde", "serial_test", + "strum", "thiserror", "tls-listener", "tokio", From afad0525d7fa52d13a288423986327ba27c6133e Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 28 May 2024 03:30:05 +0000 Subject: [PATCH 03/16] stamp: add a utility function --- crates/base/src/utils/units.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/base/src/utils/units.rs b/crates/base/src/utils/units.rs index 5e238df0..df86f2b9 100644 --- a/crates/base/src/utils/units.rs +++ b/crates/base/src/utils/units.rs @@ -1,3 +1,5 @@ +use num_traits::NumCast; + // bytes size for 1 kibibyte pub const KIB: u64 = 1_024; // bytes size for 1 mebibyte @@ -36,3 +38,18 @@ pub fn human_elapsed(elapsed: u64) -> String { let seconds_remainder = seconds % 60; format!("{}m{}s", minutes, seconds_remainder) } + +pub fn percentage_value(value: U, percent: T) -> Option +where + T: NumCast + Ord, + U: NumCast, +{ + let percent = std::cmp::min(percent, T::from(100)?).to_f64()?; + let p = percent / 100.0f64; + + if p.is_normal() { + U::from(value.to_f64()? * p) + } else { + None + } +} From e476f566027094ee9c717de236151a857c316053 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 28 May 2024 03:32:12 +0000 Subject: [PATCH 04/16] stamp: update flags --- crates/base/src/server.rs | 5 +++++ crates/cli/src/flags.rs | 15 +++++++++++++++ crates/cli/src/main.rs | 27 ++++++++++++++++++--------- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 09c2fd89..4eb1f370 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -246,6 +246,7 @@ pub struct ServerFlags { pub no_module_cache: bool, pub allow_main_inspector: bool, pub tcp_nodelay: bool, + pub graceful_exit_deadline_sec: u64, pub graceful_exit_keepalive_deadline_ms: Option, pub event_worker_exit_deadline_sec: u64, @@ -253,6 +254,10 @@ pub struct ServerFlags { pub request_idle_timeout_ms: Option, pub request_read_timeout_ms: Option, pub request_buffer_size: Option, + + pub willterminate_wallclock_pct: Option, + pub willterminate_cpu_pct: Option, + pub willterminate_memory_pct: Option, } #[derive(Debug)] diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs index 34a5d969..4d0877ae 100644 --- a/crates/cli/src/flags.rs +++ b/crates/cli/src/flags.rs @@ -232,6 +232,21 @@ fn get_start_command() -> Command { .value_parser(value_parser!(u64)) .default_value("16384"), ) + .arg( + arg!(--"dispatch-willterminate-wallclock-ratio" ) + .value_parser(value_parser!(u8).range(..=99)) + .default_value("90") + ) + .arg( + arg!(--"dispatch-willterminate-cpu-ratio" ) + .value_parser(value_parser!(u8).range(..=99)) + .default_value("90") + ) + .arg( + arg!(--"dispatch-willterminate-memory-ratio" ) + .value_parser(value_parser!(u8).range(..=99)) + .default_value("90") + ) } fn get_bundle_command() -> Command { diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index fec89305..480f0448 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -10,6 +10,7 @@ use base::commands::start_server; use base::rt_worker::worker_pool::{SupervisorPolicy, WorkerPoolPolicy}; use base::server::{ServerFlags, Tls, WorkerEntrypoints}; use base::utils::path::find_up; +use base::utils::units::percentage_value; use base::{CacheSetting, DecoratorType, InspectorOption}; use clap::ArgMatches; use deno_core::url::Url; @@ -130,15 +131,7 @@ fn main() -> Result { return None; } - let deadline_ms = graceful_exit_deadline_sec * 1000; - let percent = std::cmp::min(it, 100) as f64; - let point = percent / 100.0f64; - - if point.is_normal() { - Some(((deadline_ms as f64) * point) as u64) - } else { - None - } + percentage_value(graceful_exit_deadline_sec * 1000, it) }); let event_worker_exit_deadline_sec = sub_matches @@ -154,6 +147,17 @@ fn main() -> Result { sub_matches.get_one::("request-idle-timeout").cloned(); let maybe_request_read_timeout = sub_matches.get_one::("request-read-timeout").cloned(); + + let maybe_willterminate_wallclock_pct = sub_matches + .get_one::("dispatch-willterminate-wallclock-ratio") + .cloned(); + let maybe_willterminate_cpu_pct = sub_matches + .get_one::("dispatch-willterminate-cpu-ratio") + .cloned(); + let maybe_willterminate_memory_pct = sub_matches + .get_one::("dispatch-willterminate-memory-ratio") + .cloned(); + let static_patterns = if let Some(val_ref) = sub_matches.get_many::("static") { val_ref.map(|s| s.as_str()).collect::>() @@ -190,6 +194,7 @@ fn main() -> Result { no_module_cache, allow_main_inspector, tcp_nodelay, + graceful_exit_deadline_sec, graceful_exit_keepalive_deadline_ms, event_worker_exit_deadline_sec, @@ -197,6 +202,10 @@ fn main() -> Result { request_idle_timeout_ms: maybe_request_idle_timeout, request_read_timeout_ms: maybe_request_read_timeout, request_buffer_size: Some(request_buffer_size), + + willterminate_wallclock_pct: maybe_willterminate_wallclock_pct, + willterminate_cpu_pct: maybe_willterminate_cpu_pct, + willterminate_memory_pct: maybe_willterminate_memory_pct, }; let maybe_received_signum = start_server( From 72712f72e3a736f33e6203aaf33f813a708b0075 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 30 May 2024 02:18:58 +0000 Subject: [PATCH 05/16] stamp: add dependency --- crates/base/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index 99e7b67b..3f0c0fb6 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -83,6 +83,7 @@ strum = { version = "0.25", features = ["derive"] } flume = "0.11.0" cooked-waker = "5" tokio-rustls = "0.25.0" +arc-swap = "1.7" [dev-dependencies] tokio-util = { workspace = true, features = ["rt", "compat"] } From 6419a639ec3209be3bd3f7cf6bbcfb4bc018c843 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 30 May 2024 02:19:09 +0000 Subject: [PATCH 06/16] chore: update `Cargo.lock` --- Cargo.lock | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index c438cbb6..d02cb80a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,6 +211,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.7.4" @@ -759,6 +765,7 @@ name = "base" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "async-trait", "async-tungstenite", "base_mem_check", From 087b5f0356d41ebfb464b3d9ec648ecbd296d082 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 30 May 2024 02:22:22 +0000 Subject: [PATCH 07/16] fix: update `bootstrap.js` to expose the event dispatch functions --- crates/sb_core/js/bootstrap.js | 93 ++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/crates/sb_core/js/bootstrap.js b/crates/sb_core/js/bootstrap.js index dc0148d1..b454af85 100644 --- a/crates/sb_core/js/bootstrap.js +++ b/crates/sb_core/js/bootstrap.js @@ -405,9 +405,74 @@ const DENIED_DENO_FS_API_LIST = ObjectKeys(fsVars) {} ); -globalThis.bootstrapSBEdge = (opts, extraCtx) => { - globalThis_ = globalThis; +function dispatchLoadEvent() { + globalThis_.dispatchEvent(new Event("load")); +} + +function dispatchWillTerminateEvent(reason) { + globalThis_.dispatchEvent(new CustomEvent("willterminate", { + detail: { reason } + })); +} + +function dispatchBeforeUnloadEvent() { + return globalThis_.dispatchEvent( + new Event("beforeunload", { cancelable: true }), + ); +} + +function dispatchUnloadEvent() { + globalThis_.dispatchEvent(new Event("unload")); +} + +// Notification that the core received an unhandled promise rejection that is about to +// terminate the runtime. If we can handle it, attempt to do so. +function processUnhandledPromiseRejection(promise, reason) { + const rejectionEvent = new event.PromiseRejectionEvent( + "unhandledrejection", + { + cancelable: true, + promise, + reason, + }, + ); + + // Note that the handler may throw, causing a recursive "error" event + globalThis_.dispatchEvent(rejectionEvent); + + // If event was not yet prevented, try handing it off to Node compat layer + // (if it was initialized) + if ( + !rejectionEvent.defaultPrevented && + typeof internals.nodeProcessUnhandledRejectionCallback !== "undefined" + ) { + internals.nodeProcessUnhandledRejectionCallback(rejectionEvent); + } + // If event was not prevented (or "unhandledrejection" listeners didn't + // throw) we will let Rust side handle it. + if (rejectionEvent.defaultPrevented) { + return true; + } + + return false; +} + +function processRejectionHandled(promise, reason) { + const rejectionHandledEvent = new event.PromiseRejectionEvent( + "rejectionhandled", + { promise, reason }, + ); + + // Note that the handler may throw, causing a recursive "error" event + globalThis_.dispatchEvent(rejectionHandledEvent); + + if (typeof internals.nodeProcessRejectionHandledCallback !== "undefined") { + internals.nodeProcessRejectionHandledCallback(rejectionHandledEvent); + } +} + +globalThis.bootstrapSBEdge = opts => { // We should delete this after initialization, // Deleting it during bootstrapping can backfire delete globalThis.__bootstrap; @@ -417,9 +482,19 @@ globalThis.bootstrapSBEdge = (opts, extraCtx) => { event.setEventTargetData(globalThis); event.saveGlobalThisReference(globalThis); - const eventHandlers = ['error', 'load', 'beforeunload', 'unload', 'unhandledrejection']; + const eventHandlers = [ + "error", + "load", + "beforeunload", + "unload", + "unhandledrejection", + ]; + eventHandlers.forEach((handlerName) => event.defineEventHandler(globalThis, handlerName)); + // Nothing listens to this, but it warms up the code paths for event dispatch + (new event.EventTarget()).dispatchEvent(new Event("warmup")); + const { 0: target, 1: isUserWorker, @@ -595,3 +670,15 @@ globalThis.bootstrapSBEdge = (opts, extraCtx) => { delete globalThis.bootstrapSBEdge; }; + +globalThis.bootstrap = { + dispatchLoadEvent, + dispatchWillTerminateEvent, + dispatchUnloadEvent, + dispatchBeforeUnloadEvent, + // dispatchProcessExitEvent, + // dispatchProcessBeforeExitEvent, +}; + +core.setUnhandledPromiseRejectionHandler(processUnhandledPromiseRejection); +core.setHandledPromiseRejectionHandler(processRejectionHandled); \ No newline at end of file From dfc24a77ddaf5caa0a5f8f6900302dafd17cea78 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 30 May 2024 02:25:50 +0000 Subject: [PATCH 08/16] stamp: polishing --- crates/base/src/server.rs | 2 +- crates/cli/src/flags.rs | 2 +- crates/cli/src/main.rs | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 4eb1f370..77e42c87 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -255,7 +255,7 @@ pub struct ServerFlags { pub request_read_timeout_ms: Option, pub request_buffer_size: Option, - pub willterminate_wallclock_pct: Option, + pub willterminate_wall_clock_pct: Option, pub willterminate_cpu_pct: Option, pub willterminate_memory_pct: Option, } diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs index 4d0877ae..df624c13 100644 --- a/crates/cli/src/flags.rs +++ b/crates/cli/src/flags.rs @@ -233,7 +233,7 @@ fn get_start_command() -> Command { .default_value("16384"), ) .arg( - arg!(--"dispatch-willterminate-wallclock-ratio" ) + arg!(--"dispatch-willterminate-wall-clock-ratio" ) .value_parser(value_parser!(u8).range(..=99)) .default_value("90") ) diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 480f0448..e45cd9dc 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -148,8 +148,8 @@ fn main() -> Result { let maybe_request_read_timeout = sub_matches.get_one::("request-read-timeout").cloned(); - let maybe_willterminate_wallclock_pct = sub_matches - .get_one::("dispatch-willterminate-wallclock-ratio") + let maybe_willterminate_wall_clock_pct = sub_matches + .get_one::("dispatch-willterminate-wall-clock-ratio") .cloned(); let maybe_willterminate_cpu_pct = sub_matches .get_one::("dispatch-willterminate-cpu-ratio") @@ -203,7 +203,7 @@ fn main() -> Result { request_read_timeout_ms: maybe_request_read_timeout, request_buffer_size: Some(request_buffer_size), - willterminate_wallclock_pct: maybe_willterminate_wallclock_pct, + willterminate_wall_clock_pct: maybe_willterminate_wall_clock_pct, willterminate_cpu_pct: maybe_willterminate_cpu_pct, willterminate_memory_pct: maybe_willterminate_memory_pct, }; From 878d0a1742c23a4798a4f75eb4a32eaba977e1e6 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 30 May 2024 02:29:04 +0000 Subject: [PATCH 09/16] feat: support dispatching the runtime events --- crates/base/src/deno_runtime.rs | 513 +++++++++++++++++- crates/base/src/rt_worker/supervisor/mod.rs | 55 +- .../supervisor/strategy_per_request.rs | 41 +- .../supervisor/strategy_per_worker.rs | 46 +- crates/base/src/rt_worker/worker.rs | 19 +- crates/base/src/rt_worker/worker_ctx.rs | 4 + crates/base/src/rt_worker/worker_pool.rs | 1 + crates/sb_workers/context.rs | 4 + 8 files changed, 632 insertions(+), 51 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 5f928291..afea7e30 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -3,19 +3,21 @@ use crate::rt_worker::supervisor::{CPUUsage, CPUUsageMetrics}; use crate::rt_worker::worker::DuplexStreamEntry; use crate::utils::json; use crate::utils::path::find_up; -use crate::utils::units::{bytes_to_display, mib_to_bytes}; +use crate::server::ServerFlags; +use crate::utils::units::{bytes_to_display, mib_to_bytes, percentage_value}; use anyhow::{anyhow, bail, Error}; use base_mem_check::{MemCheckState, WorkerHeapStatistics}; use base_rt::DenoRuntimeDropToken; use base_rt::{get_current_cpu_time_ns, BlockingScopeCPUUsage}; +use arc_swap::ArcSwapOption; use cooked_waker::{IntoWaker, WakeRef}; use ctor::ctor; -use deno_core::error::AnyError; +use deno_core::error::{AnyError, JsError}; use deno_core::url::Url; -use deno_core::v8::{GCCallbackFlags, GCType, HeapStatistics, Isolate}; +use deno_core::v8::{self, GCCallbackFlags, GCType, HeapStatistics, Isolate}; use deno_core::{ - located_script_name, serde_json, JsRuntime, ModuleCodeString, ModuleId, ModuleLoader, + located_script_name, serde_json, JsRuntime, ModuleCodeString, ModuleId, ModuleLoader, OpState, ModuleSpecifier, OpState, PollEventLoopOptions, ResolutionKind, RuntimeOptions, }; use deno_http::DefaultHttpPropertyExtractor; @@ -24,7 +26,8 @@ use deno_tls::rustls::RootCertStore; use deno_tls::RootCertStoreProvider; use futures_util::future::poll_fn; use futures_util::task::AtomicWaker; -use log::error; +use futures_util::Future; +use log::{error, trace}; use once_cell::sync::{Lazy, OnceCell}; use sb_core::http::sb_core_http; use sb_core::http_start::sb_core_http_start; @@ -33,6 +36,7 @@ use sb_fs::prefix_fs::PrefixFs; use sb_fs::s3_fs::S3Fs; use sb_fs::static_fs::StaticFs; use sb_fs::tmp_fs::TmpFs; +use scopeguard::ScopeGuard; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; @@ -48,6 +52,7 @@ use std::sync::{Arc, RwLock}; use std::task::Poll; use std::thread::ThreadId; use std::time::Duration; +use strum::IntoStaticStr; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::time::interval; use tokio_util::sync::{CancellationToken, PollSemaphore}; @@ -142,6 +147,7 @@ fn get_error_class_name(e: &AnyError) -> &'static str { struct MemCheck { exceeded_token: CancellationToken, limit: Option, + waker: Arc, state: Arc>, } @@ -193,6 +199,9 @@ impl MemCheck { trace!(malloced_mb = bytes_to_display(total_bytes as u64)); total_bytes } + + fn is_exceeded(&self) -> bool { + self.exceeded_token.is_cancelled() } pub trait GetRuntimeContext { @@ -205,6 +214,60 @@ impl GetRuntimeContext for () { } } +#[derive(Debug, Clone)] +struct GlobalMainContext(v8::Global); + +impl GlobalMainContext { + fn to_local_context<'s>( + &self, + scope: &mut v8::HandleScope<'s, ()>, + ) -> v8::Local<'s, v8::Context> { + v8::Local::new(scope, &self.0) + } +} + +struct DispatchEventFunctions { + dispatch_load_event_fn_global: v8::Global, + dispatch_willterminate_event_fn_global: v8::Global, + dispatch_beforeunload_event_fn_global: v8::Global, + dispatch_unload_event_fn_global: v8::Global, +} + +#[derive(IntoStaticStr, Debug, Clone, Copy)] +#[strum(serialize_all = "snake_case")] +pub enum WillTerminateReason { + CPU, + Memory, + WallClock, +} + +#[derive(Debug, Clone)] +struct GlobalMainContext(v8::Global); + +impl GlobalMainContext { + fn to_local_context<'s>( + &self, + scope: &mut v8::HandleScope<'s, ()>, + ) -> v8::Local<'s, v8::Context> { + v8::Local::new(scope, &self.0) + } +} + +struct DispatchEventFunctions { + dispatch_load_event_fn_global: v8::Global, + dispatch_willterminate_event_fn_global: v8::Global, + dispatch_beforeunload_event_fn_global: v8::Global, + dispatch_unload_event_fn_global: v8::Global, +} + +#[derive(IntoStaticStr, Debug, Clone, Copy)] +#[strum(serialize_all = "snake_case")] +pub enum WillTerminateReason { + CPU, + Memory, + WallClock, +} + pub struct DenoRuntime { pub js_runtime: ManuallyDrop, pub drop_token: CancellationToken, @@ -223,6 +286,9 @@ pub struct DenoRuntime { mem_check: Arc, waker: Arc, + willterminate_mem_threshold: Arc>, + willterminate_cpu_threshold: Arc>, + _phantom_runtime_context: PhantomData, } @@ -262,6 +328,7 @@ where pub async fn new( opts: WorkerContextInitOpts, maybe_inspector: Option, + flags: Arc, ) -> Result { let WorkerContextInitOpts { mut conf, @@ -543,18 +610,37 @@ where let mut create_params = None; let mut mem_check = MemCheck::default(); + let willterminate_cpu_threshold = ArcSwapOption::::from_pointee(None); + let willterminate_mem_threshold = ArcSwapOption::::from_pointee(None); + if conf.is_user_worker() { - let memory_limit = - mib_to_bytes(conf.as_user_worker().unwrap().memory_limit_mb) as usize; + let conf = conf.as_user_worker().unwrap(); + let memory_limit_bytes = mib_to_bytes(conf.memory_limit_mb) as usize; + + willterminate_mem_threshold.store( + flags + .willterminate_memory_pct + .and_then(|it| percentage_value(memory_limit_bytes as u64, it)) + .map(Arc::new), + ); - let allocator = CustomAllocator::new(memory_limit); + if conf.cpu_time_hard_limit_ms > 0 { + willterminate_cpu_threshold.store( + flags + .willterminate_cpu_pct + .and_then(|it| percentage_value(conf.cpu_time_hard_limit_ms, it)) + .map(Arc::new), + ); + } + + let allocator = CustomAllocator::new(memory_limit_bytes); allocator.set_waker(mem_check.waker.clone()); mem_check.limit = Some(memory_limit); create_params = Some( - deno_core::v8::CreateParams::default() - .heap_limits(mib_to_bytes(0) as usize, memory_limit) + v8::CreateParams::default() + .heap_limits(mib_to_bytes(0) as usize, memory_limit_bytes) .array_buffer_allocator(allocator.into_v8_allocator()), ) }; @@ -574,7 +660,77 @@ where ..Default::default() }; - let mut js_runtime = ManuallyDrop::new(JsRuntime::new(runtime_options)); + let mut js_runtime = JsRuntime::new(runtime_options); + + let dispatch_fns = { + let context = js_runtime.main_context(); + let scope = &mut js_runtime.handle_scope(); + let context_local = v8::Local::new(scope, context); + let global_obj = context_local.global(scope); + let bootstrap_str = + v8::String::new_external_onebyte_static(scope, b"bootstrap").unwrap(); + let bootstrap_ns: v8::Local = global_obj + .get(scope, bootstrap_str.into()) + .unwrap() + .try_into() + .unwrap(); + + let dispatch_load_event_fn_str = + v8::String::new_external_onebyte_static(scope, b"dispatchLoadEvent").unwrap(); + let dispatch_load_event_fn = bootstrap_ns + .get(scope, dispatch_load_event_fn_str.into()) + .unwrap(); + let dispatch_load_event_fn = + v8::Local::::try_from(dispatch_load_event_fn).unwrap(); + let dispatch_willterminate_event_fn_str = + v8::String::new_external_onebyte_static(scope, b"dispatchWillTerminateEvent") + .unwrap(); + let dispatch_willterminate_event_fn = bootstrap_ns + .get(scope, dispatch_willterminate_event_fn_str.into()) + .unwrap(); + let dispatch_willterminate_event_fn = + v8::Local::::try_from(dispatch_willterminate_event_fn).unwrap(); + let dispatch_beforeunload_event_fn_str = + v8::String::new_external_onebyte_static(scope, b"dispatchBeforeUnloadEvent") + .unwrap(); + let dispatch_beforeunload_event_fn = bootstrap_ns + .get(scope, dispatch_beforeunload_event_fn_str.into()) + .unwrap(); + let dispatch_beforeunload_event_fn = + v8::Local::::try_from(dispatch_beforeunload_event_fn).unwrap(); + let dispatch_unload_event_fn_str = + v8::String::new_external_onebyte_static(scope, b"dispatchUnloadEvent").unwrap(); + let dispatch_unload_event_fn = bootstrap_ns + .get(scope, dispatch_unload_event_fn_str.into()) + .unwrap(); + let dispatch_unload_event_fn = + v8::Local::::try_from(dispatch_unload_event_fn).unwrap(); + + let dispatch_load_event_fn_global = v8::Global::new(scope, dispatch_load_event_fn); + let dispatch_willterminate_event_fn_global = + v8::Global::new(scope, dispatch_willterminate_event_fn); + let dispatch_beforeunload_event_fn_global = + v8::Global::new(scope, dispatch_beforeunload_event_fn); + let dispatch_unload_event_fn_global = v8::Global::new(scope, dispatch_unload_event_fn); + + DispatchEventFunctions { + dispatch_load_event_fn_global, + dispatch_willterminate_event_fn_global, + dispatch_beforeunload_event_fn_global, + dispatch_unload_event_fn_global, + } + }; + + { + let main_context = js_runtime.main_context(); + + let op_state = js_runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + + op_state.put(dispatch_fns); + op_state.put(GlobalMainContext(main_context)); + } + let version: Option<&str> = option_env!("GIT_V_TAG"); { @@ -745,6 +901,9 @@ where mem_check, waker: Arc::default(), + willterminate_cpu_threshold: Arc::new(willterminate_cpu_threshold), + willterminate_mem_threshold: Arc::new(willterminate_mem_threshold), + _phantom_runtime_context: PhantomData, }) } @@ -783,10 +942,8 @@ where self.js_runtime.v8_isolate().enter(); if inspector.is_some() { - let is_terminated = self.is_terminated.clone(); let mut this = scopeguard::guard_on_unwind(&mut *self, |this| { this.js_runtime.v8_isolate().exit(); - is_terminated.raise(); }); { @@ -804,7 +961,6 @@ where if this.termination_request_token.is_cancelled() { this.js_runtime.v8_isolate().exit(); - is_terminated.raise(); return (Ok(()), 0i64); } } @@ -861,6 +1017,17 @@ where if let Err(err) = mod_result { return (Err(err), get_accumulated_cpu_time_ms!()); } + + let mut this = self.get_v8_tls_guard(); + + if let Err(err) = with_cpu_metrics_guard( + current_thread_id, + &maybe_cpu_usage_metrics_tx, + &mut accumulated_cpu_time_ns, + || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_load_event(), + ) { + return (Err(err), get_accumulated_cpu_time_ms!()); + } } if let Err(err) = self @@ -872,12 +1039,59 @@ where .instrument(span) .await { + let mut this = self.get_v8_tls_guard(); + let _ = with_cpu_metrics_guard( + current_thread_id, + &maybe_cpu_usage_metrics_tx, + &mut accumulated_cpu_time_ns, + || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_beforeunload_event(), + ); + + // TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e; + // beforeExit) + + if let Err(err) = with_cpu_metrics_guard( + current_thread_id, + &maybe_cpu_usage_metrics_tx, + &mut accumulated_cpu_time_ns, + || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_unload_event(), + ) { + return (Err(err), get_accumulated_cpu_time_ms!()); + } + + // TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e; exit) + return ( Err(anyhow!("event loop error: {}", err)), get_accumulated_cpu_time_ms!(), ); } + let mut this = self.get_v8_tls_guard(); + + if let Err(err) = with_cpu_metrics_guard( + current_thread_id, + &maybe_cpu_usage_metrics_tx, + &mut accumulated_cpu_time_ns, + || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_beforeunload_event(), + ) { + return (Err(err), get_accumulated_cpu_time_ms!()); + } + + // TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e; + // beforeExit) + + if let Err(err) = with_cpu_metrics_guard( + current_thread_id, + &maybe_cpu_usage_metrics_tx, + &mut accumulated_cpu_time_ns, + || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_unload_event(), + ) { + return (Err(err), get_accumulated_cpu_time_ms!()); + } + + // TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e; exit) + (Ok(()), get_accumulated_cpu_time_ms!()) } @@ -892,7 +1106,10 @@ where let global_waker = self.waker.clone(); let termination_request_token = self.termination_request_token.clone(); - let mem_check_state = is_user_worker.then(|| self.mem_check.clone()); + let willterminate_cpu_threshold = self.willterminate_cpu_threshold.clone(); + let willterminate_mem_threshold = self.willterminate_mem_threshold.clone(); + + let mem_check_state = is_user_worker.then(|| self.mem_check_state.clone()); let mut poll_sem = None::; poll_fn(move |cx| { @@ -968,6 +1185,37 @@ where if is_user_worker { let mem_state = mem_check_state.as_ref().unwrap(); + if let Some(threshold_ms) = willterminate_cpu_threshold.load().as_deref().copied() { + let threshold_ns = (threshold_ms as i128) * 1_000_000; + let accumulated_cpu_time_ns = *accumulated_cpu_time_ns as i128; + + if accumulated_cpu_time_ns >= threshold_ns { + willterminate_cpu_threshold.store(None); + + if let Err(err) = MaybeDenoRuntime::DenoRuntime(&mut this) + .dispatch_willterminate_event(WillTerminateReason::CPU) + { + return Poll::Ready(Err(err)); + } + } + } else if let Some(threshold_bytes) = + willterminate_mem_threshold.load().as_deref().copied() + { + let total_malloced_bytes = total_malloced_bytes as u64; + + if total_malloced_bytes >= threshold_bytes { + willterminate_mem_threshold.store(None); + + if !mem_state.is_exceeded() { + if let Err(err) = MaybeDenoRuntime::DenoRuntime(&mut this) + .dispatch_willterminate_event(WillTerminateReason::Memory) + { + return Poll::Ready(Err(err)); + } + } + } + } + mem_state.check(js_runtime.v8_isolate().as_mut()); mem_state.waker.register(waker); } @@ -1038,6 +1286,43 @@ where } } + fn terminate_execution_if_cancelled( + &mut self, + token: CancellationToken, + ) -> ScopeGuard { + extern "C" fn interrupt_fn(isolate: &mut v8::Isolate, _: *mut std::ffi::c_void) { + let _ = isolate.terminate_execution(); + } + + let handle = self.js_runtime.v8_isolate().thread_safe_handle(); + let cancel_task_token = CancellationToken::new(); + let request_interrupt_fn = move || { + let _ = handle.request_interrupt(interrupt_fn, std::ptr::null_mut()); + }; + + drop(rt::SUPERVISOR_RT.spawn({ + let cancel_task_token = cancel_task_token.clone(); + + async move { + if token.is_cancelled() { + request_interrupt_fn(); + } else { + tokio::select! { + _ = token.cancelled_owned() => { + request_interrupt_fn(); + } + + _ = cancel_task_token.cancelled_owned() => {} + } + } + } + })); + + scopeguard::guard(cancel_task_token, |v| { + v.cancel(); + }) + } + fn get_v8_tls_guard<'l>( &'l mut self, ) -> scopeguard::ScopeGuard< @@ -1056,6 +1341,156 @@ where } } +#[allow(dead_code)] +struct Scope<'s> { + context: v8::Local<'s, v8::Context>, + scope: v8::HandleScope<'s, ()>, +} + +impl<'s> Scope<'s> { + fn context_scope<'l>(&'l mut self) -> v8::ContextScope<'l, v8::HandleScope<'s>> { + let context = self.context; + v8::ContextScope::new(&mut self.scope, context) + } +} + +pub enum MaybeDenoRuntime<'l> { + DenoRuntime(&'l mut DenoRuntime), + Isolate(&'l mut v8::Isolate), +} + +impl<'l> MaybeDenoRuntime<'l> { + fn scope(&mut self) -> Scope<'_> { + let op_state = self.op_state(); + let op_state_ref = op_state.borrow(); + let context = op_state_ref + .try_borrow::() + .unwrap() + .clone(); + + let mut scope = match self { + MaybeDenoRuntime::DenoRuntime(v) => v8::HandleScope::new(v.js_runtime.v8_isolate()), + MaybeDenoRuntime::Isolate(v) => v8::HandleScope::new(&mut **v), + }; + + let context = context.to_local_context(&mut scope); + + Scope { context, scope } + } + + fn op_state(&mut self) -> Rc> { + match self { + MaybeDenoRuntime::DenoRuntime(v) => v.js_runtime.op_state(), + MaybeDenoRuntime::Isolate(v) => JsRuntime::op_state_from(v), + } + } + + fn terminate_execution_if_cancelled( + &mut self, + ) -> Option> { + match self { + MaybeDenoRuntime::DenoRuntime(v) => { + Some(v.terminate_execution_if_cancelled(v.termination_request_token.clone())) + } + + MaybeDenoRuntime::Isolate(_) => None, + } + } + + fn dispatch_event_with_callback( + &mut self, + select_dispatch_fn: T, + fn_args_fn: U, + callback_fn: V, + ) -> Result + where + T: for<'r> FnOnce(&'r DispatchEventFunctions) -> &v8::Global, + U: for<'r> FnOnce(&mut v8::HandleScope<'r, ()>) -> Vec>, + V: for<'r> FnOnce(Option>) -> Result, + { + let _guard = self.terminate_execution_if_cancelled(); + + let op_state = self.op_state(); + let op_state_ref = op_state.borrow(); + let dispatch_fns = op_state_ref.try_borrow::().unwrap(); + + let scope = &mut self.scope(); + let ctx_scope = &mut scope.context_scope(); + let tc_scope = &mut v8::TryCatch::new(ctx_scope); + + let event_fn = v8::Local::new(tc_scope, select_dispatch_fn(dispatch_fns)); + let undefined = v8::undefined(tc_scope); + let fn_args = &*fn_args_fn(tc_scope); + let fn_ret = event_fn.call(tc_scope, undefined.into(), fn_args); + + if let Some(ex) = tc_scope.exception() { + let err = JsError::from_v8_exception(tc_scope, ex); + + return Err(err.into()); + } + + callback_fn(fn_ret) + } + + /// Dispatches "load" event to the JavaScript runtime. + /// + /// Does not poll event loop, and thus not await any of the "load" event handlers. + pub fn dispatch_load_event(&mut self) -> Result<(), AnyError> { + self.dispatch_event_with_callback( + |fns| &fns.dispatch_load_event_fn_global, + |_| vec![], + |_| Ok(()), + ) + } + + /// Dispatches "willterminate" event to the JavaScript runtime. + /// + /// Does not poll event loop, and thus not await any of the "willterminate" event handlers. + pub fn dispatch_willterminate_event( + &mut self, + reason: WillTerminateReason, + ) -> Result<(), AnyError> { + self.dispatch_event_with_callback( + |fns| &fns.dispatch_willterminate_event_fn_global, + move |scope| { + vec![v8::String::new_external_onebyte_static( + scope, + <&'static str>::from(reason).as_bytes(), + ) + .unwrap() + .into()] + }, + |_| Ok(()), + ) + } + + /// Dispatches "beforeunload" event to the JavaScript runtime. Returns a boolean + /// indicating if the event was prevented and thus event loop should continue + /// running. + pub fn dispatch_beforeunload_event(&mut self) -> Result { + self.dispatch_event_with_callback( + |fns| &fns.dispatch_beforeunload_event_fn_global, + |_| vec![], + |it| Ok(it.unwrap().is_false()), + ) + } + + /// Dispatches "unload" event to the JavaScript runtime. + /// + /// Does not poll event loop, and thus not await any of the "unload" event handlers. + pub fn dispatch_unload_event(&mut self) -> Result<(), AnyError> { + self.dispatch_event_with_callback( + |fns| &fns.dispatch_unload_event_fn_global, + |_| vec![], + |_| Ok(()), + ) + } +} + +fn get_current_cpu_time_ns() -> Result { + get_thread_time().context("can't get current thread time") +} + pub fn import_meta_resolve_callback( loader: &dyn ModuleLoader, specifier: String, @@ -1289,6 +1724,7 @@ mod test { maybe_tmp_fs_config: tmp_fs_config, }, None, + Arc::default(), ) .await .unwrap() @@ -1395,6 +1831,7 @@ mod test { maybe_tmp_fs_config: None, }, None, + Arc::default(), ) .await .expect("It should not panic"); @@ -1442,6 +1879,7 @@ mod test { maybe_tmp_fs_config: None, }, None, + Arc::default(), ) .await; @@ -1508,6 +1946,7 @@ mod test { maybe_tmp_fs_config: None, }, None, + Arc::default(), ) .await; @@ -1535,6 +1974,50 @@ mod test { std::mem::drop(main_mod_ev); } + async fn create_runtime( + path: Option<&str>, + env_vars: Option>, + user_conf: Option, + static_patterns: Vec, + maybe_jsx_import_source_config: Option, + ) -> DenoRuntime { + let (worker_pool_tx, _) = mpsc::unbounded_channel::(); + + DenoRuntime::new( + WorkerContextInitOpts { + service_path: path + .map(PathBuf::from) + .unwrap_or(PathBuf::from("./test_cases/main")), + + no_module_cache: false, + import_map_path: None, + env_vars: env_vars.unwrap_or_default(), + events_rx: None, + timing: None, + maybe_eszip: None, + maybe_entrypoint: None, + maybe_decorator: None, + maybe_module_code: None, + conf: { + if let Some(uc) = user_conf { + uc + } else { + WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { + worker_pool_tx, + shared_metric_src: None, + event_worker_metric_src: None, + }) + } + }, + static_patterns, + maybe_jsx_import_source_config, + }, + None, + ) + .await + .unwrap() + } + // Main Runtime should have access to `EdgeRuntime` #[tokio::test] #[serial] diff --git a/crates/base/src/rt_worker/supervisor/mod.rs b/crates/base/src/rt_worker/supervisor/mod.rs index 835f4f11..4f867369 100644 --- a/crates/base/src/rt_worker/supervisor/mod.rs +++ b/crates/base/src/rt_worker/supervisor/mod.rs @@ -1,13 +1,13 @@ pub mod strategy_per_request; pub mod strategy_per_worker; -use std::sync::Arc; +use std::{future::pending, sync::Arc, time::Duration}; use cpu_timer::{CPUAlarmVal, CPUTimer}; -use deno_core::v8::IsolateHandle; +use deno_core::v8; use enum_as_inner::EnumAsInner; use futures_util::task::AtomicWaker; -use log::error; +use log::{error, warn}; use sb_workers::context::{Timing, UserWorkerMsgs, UserWorkerRuntimeOpts}; use tokio::sync::{ mpsc::{self, UnboundedReceiver}, @@ -16,26 +16,29 @@ use tokio::sync::{ use tokio_util::sync::CancellationToken; use uuid::Uuid; +use crate::{ + deno_runtime::{MaybeDenoRuntime, WillTerminateReason}, + server::ServerFlags, + utils::units::percentage_value, +}; + use super::{worker_ctx::TerminationToken, worker_pool::SupervisorPolicy}; #[repr(C)] -pub struct IsolateInterruptData { +pub struct V8HandleTerminationData { pub should_terminate: bool, pub isolate_memory_usage_tx: Option>, } -pub extern "C" fn handle_interrupt( - isolate: &mut deno_core::v8::Isolate, - data: *mut std::ffi::c_void, -) { - let mut boxed_data: Box; +pub extern "C" fn v8_handle_termination(isolate: &mut v8::Isolate, data: *mut std::ffi::c_void) { + let mut boxed_data: Box; unsafe { - boxed_data = Box::from_raw(data as *mut IsolateInterruptData); + boxed_data = Box::from_raw(data as *mut V8HandleTerminationData); } // log memory usage - let mut heap_stats = deno_core::v8::HeapStatistics::default(); + let mut heap_stats = v8::HeapStatistics::default(); isolate.get_heap_statistics(&mut heap_stats); @@ -129,9 +132,10 @@ pub struct Arguments { pub memory_limit_rx: mpsc::UnboundedReceiver<()>, pub pool_msg_tx: Option>, pub isolate_memory_usage_tx: oneshot::Sender, - pub thread_safe_handle: IsolateHandle, + pub thread_safe_handle: v8::IsolateHandle, pub waker: Arc, pub tokens: Tokens, + pub flags: Arc, } pub struct CPUUsage { @@ -151,3 +155,30 @@ async fn wait_cpu_alarm(maybe_alarm: Option<&mut UnboundedReceiver<()>>) -> Opti None => None, } } + +async fn create_wall_clock_willterminate_alert(wall_clock_limit_ms: u64, pct: Option) { + let dur = pct + .and_then(|it| percentage_value(wall_clock_limit_ms, it)) + .map(Duration::from_millis); + + if let Some(dur) = dur { + tokio::time::sleep(dur).await; + } else { + pending::<()>().await; + unreachable!() + } +} + +extern "C" fn v8_handle_wall_clock_willterminate( + isolate: &mut v8::Isolate, + _data: *mut std::ffi::c_void, +) { + if let Err(err) = MaybeDenoRuntime::Isolate(isolate) + .dispatch_willterminate_event(WillTerminateReason::WallClock) + { + warn!( + "found an error while dispatching the willterminate event: {}", + err + ); + } +} diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_request.rs b/crates/base/src/rt_worker/supervisor/strategy_per_request.rs index 741ece84..5f3a772c 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_request.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_request.rs @@ -9,7 +9,9 @@ use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs}; use tokio::time::Instant; use crate::rt_worker::supervisor::{ - handle_interrupt, wait_cpu_alarm, CPUUsage, CPUUsageMetrics, IsolateInterruptData, Tokens, + create_wall_clock_willterminate_alert, v8_handle_termination, + v8_handle_wall_clock_willterminate, wait_cpu_alarm, CPUUsage, CPUUsageMetrics, Tokens, + V8HandleTerminationData, }; use super::Arguments; @@ -26,10 +28,12 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) pool_msg_tx, isolate_memory_usage_tx, thread_safe_handle, + waker, tokens: Tokens { termination, supervise, }, + flags, .. } = args; @@ -49,7 +53,12 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) #[cfg(debug_assertions)] let mut current_thread_id = Option::::None; + let wall_clock_limit_ms = runtime_opts.worker_timeout_ms; + + let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; let mut is_worker_entered = false; + let mut is_wall_clock_willterminate_armed = false; + let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap(); let mut cpu_usage_ms = 0i64; let mut cpu_usage_accumulated_ms = 0i64; @@ -58,18 +67,21 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) let mut req_ack_count = 0usize; let mut req_start_ack = false; - let wall_clock_limit_ms = runtime_opts.worker_timeout_ms; - let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; - - let wall_clock_duration = Duration::from_millis(if wall_clock_limit_ms < 1 { + let wall_clock_limit_ms = if wall_clock_limit_ms < 1 { 1 } else { wall_clock_limit_ms - }); + }; + let wall_clock_duration = Duration::from_millis(wall_clock_limit_ms); let wall_clock_duration_alert = tokio::time::sleep(wall_clock_duration); + let wall_clock_willterminate_alert = create_wall_clock_willterminate_alert( + wall_clock_limit_ms, + flags.willterminate_wall_clock_pct, + ); tokio::pin!(wall_clock_duration_alert); + tokio::pin!(wall_clock_willterminate_alert); loop { tokio::select! { @@ -176,6 +188,19 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) } } + _ = &mut wall_clock_willterminate_alert, + if !is_wall_clock_limit_disabled && !is_wall_clock_willterminate_armed + => { + if thread_safe_handle.request_interrupt( + v8_handle_wall_clock_willterminate, + std::ptr::null_mut() + ) { + waker.wake(); + } + + is_wall_clock_willterminate_armed = true; + } + Some(_) = memory_limit_rx.recv() => { error!("memory limit reached for the worker: isolate: {:?}", key); complete_reason = Some(ShutdownReason::Memory); @@ -199,13 +224,13 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) } Some(reason) => { - let data_ptr_mut = Box::into_raw(Box::new(IsolateInterruptData { + let data_ptr_mut = Box::into_raw(Box::new(V8HandleTerminationData { should_terminate: true, isolate_memory_usage_tx: Some(isolate_memory_usage_tx), })); if !thread_safe_handle - .request_interrupt(handle_interrupt, data_ptr_mut as *mut std::ffi::c_void) + .request_interrupt(v8_handle_termination, data_ptr_mut as *mut std::ffi::c_void) { drop(unsafe { Box::from_raw(data_ptr_mut) }); } diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs index 0dfc64af..920f8dd7 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs @@ -7,9 +7,12 @@ use event_worker::events::ShutdownReason; use log::error; use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs}; -use crate::rt_worker::supervisor::{wait_cpu_alarm, CPUUsage, Tokens}; +use crate::rt_worker::supervisor::{ + create_wall_clock_willterminate_alert, v8_handle_wall_clock_willterminate, wait_cpu_alarm, + CPUUsage, Tokens, +}; -use super::{handle_interrupt, Arguments, CPUUsageMetrics, IsolateInterruptData}; +use super::{v8_handle_termination, Arguments, CPUUsageMetrics, V8HandleTerminationData}; pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let Arguments { @@ -23,10 +26,12 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { pool_msg_tx, isolate_memory_usage_tx, thread_safe_handle, + waker, tokens: Tokens { termination, supervise, }, + flags, .. } = args; @@ -45,7 +50,12 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { #[cfg(debug_assertions)] let mut current_thread_id = Option::::None; + let wall_clock_limit_ms = runtime_opts.worker_timeout_ms; + + let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; let mut is_worker_entered = false; + let mut is_wall_clock_willterminate_armed = false; + let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap(); let mut cpu_usage_ms = 0i64; @@ -53,14 +63,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let mut wall_clock_alerts = 0; let mut req_ack_count = 0usize; - let wall_clock_limit_ms = runtime_opts.worker_timeout_ms; - let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; - - let wall_clock_duration = Duration::from_millis(if wall_clock_limit_ms < 2 { + let wall_clock_limit_ms = if wall_clock_limit_ms < 2 { 2 } else { wall_clock_limit_ms - }); + }; + + let wall_clock_duration = Duration::from_millis(wall_clock_limit_ms); // Split wall clock duration into 2 intervals. // At the first interval, we will send a msg to retire the worker. @@ -70,6 +79,11 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { .unwrap_or(Duration::from_millis(1)), ); + let wall_clock_willterminate_alert = create_wall_clock_willterminate_alert( + wall_clock_limit_ms, + flags.willterminate_wall_clock_pct, + ); + let early_retire_fn = || { // we should raise a retire signal because subsequent incoming requests are unlikely to get // enough wall clock time or cpu time @@ -79,13 +93,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let terminate_fn = { let thread_safe_handle = thread_safe_handle.clone(); move || { - let data_ptr_mut = Box::into_raw(Box::new(IsolateInterruptData { + let data_ptr_mut = Box::into_raw(Box::new(V8HandleTerminationData { should_terminate: true, isolate_memory_usage_tx: Some(isolate_memory_usage_tx), })); if !thread_safe_handle - .request_interrupt(handle_interrupt, data_ptr_mut as *mut std::ffi::c_void) + .request_interrupt(v8_handle_termination, data_ptr_mut as *mut std::ffi::c_void) { drop(unsafe { Box::from_raw(data_ptr_mut) }); } @@ -93,6 +107,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { }; tokio::pin!(wall_clock_duration_alert); + tokio::pin!(wall_clock_willterminate_alert); loop { tokio::select! { @@ -217,6 +232,19 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } } + _ = &mut wall_clock_willterminate_alert, + if !is_wall_clock_limit_disabled && !is_wall_clock_willterminate_armed + => { + if thread_safe_handle.request_interrupt( + v8_handle_wall_clock_willterminate, + std::ptr::null_mut() + ) { + waker.wake(); + } + + is_wall_clock_willterminate_armed = true; + } + Some(_) = memory_limit_rx.recv() => { terminate_fn(); error!("memory limit reached for the worker: isolate: {:?}", key); diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index 056fa84b..5186558c 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -5,6 +5,7 @@ use crate::rt_worker::utils::{ get_event_metadata, parse_worker_conf, send_event_if_event_worker_available, }; use crate::rt_worker::worker_ctx::create_supervisor; +use crate::server::ServerFlags; use anyhow::Error; use base_mem_check::MemCheckState; @@ -21,6 +22,7 @@ use std::any::Any; use std::future::{pending, Future}; use std::pin::Pin; use std::time::Duration; +use std::sync::Arc; use tokio::io; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -90,6 +92,7 @@ impl Worker { self.supervisor_policy = supervisor_policy.unwrap_or_default(); } + #[allow(clippy::too_many_arguments)] pub fn start( &self, mut opts: WorkerContextInitOpts, @@ -101,6 +104,7 @@ impl Worker { exit: WorkerExit, termination_token: Option, inspector: Option, + flags: Arc, ) { let worker_name = self.worker_name.clone(); let worker_key = self.worker_key; @@ -130,7 +134,7 @@ impl Worker { .unzip(); let permit = DenoRuntime::acquire().await; - let result = match DenoRuntime::new(opts, inspector).await { + let result = match DenoRuntime::new(opts, inspector, flags.clone()).await { Ok(new_runtime) => { let mut runtime = scopeguard::guard(new_runtime, |mut runtime| unsafe { runtime.js_runtime.v8_isolate().enter(); @@ -183,6 +187,7 @@ impl Worker { cancel, timing, termination_token.clone(), + flags, ) else { return; }; @@ -237,12 +242,12 @@ impl Worker { }, )); - if !thread_safe_handle.request_interrupt( - supervisor::handle_interrupt, - data_ptr_mut as *mut std::ffi::c_void, - ) { - drop(unsafe { Box::from_raw(data_ptr_mut) }); - } + if !thread_safe_handle.request_interrupt( + supervisor::v8_handle_termination, + data_ptr_mut as *mut std::ffi::c_void, + ) { + drop(unsafe { Box::from_raw(data_ptr_mut) }); + } while !is_terminated.is_raised() { waker.wake(); diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index bc06b66f..9e5e1c14 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -275,6 +275,7 @@ pub fn create_supervisor( cancel: Option, timing: Option, termination_token: Option, + flags: Arc, ) -> Result<(Option, CancellationToken), Error> { let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel(); let (waker, thread_safe_handle) = { @@ -370,6 +371,7 @@ pub fn create_supervisor( thread_safe_handle, waker: waker.clone(), tokens, + flags, }; let (reason, cpu_usage_ms) = { @@ -604,6 +606,7 @@ pub async fn create_worker>( init_opts.into(); let worker_kind = worker_init_opts.conf.to_worker_kind(); + let request_idle_timeout = flags.request_idle_timeout_ms; let exit = WorkerExit::default(); let mut worker = Worker::new(&worker_init_opts)?; @@ -626,6 +629,7 @@ pub async fn create_worker>( exit.clone(), maybe_termination_token.clone(), inspector, + flags, ); // create an async task waiting for requests for worker diff --git a/crates/base/src/rt_worker/worker_pool.rs b/crates/base/src/rt_worker/worker_pool.rs index 39c877e5..ebd91892 100644 --- a/crates/base/src/rt_worker/worker_pool.rs +++ b/crates/base/src/rt_worker/worker_pool.rs @@ -235,6 +235,7 @@ impl WorkerPool { user_workers: HashMap::new(), active_workers: HashMap::new(), maybe_inspector: inspector, + flags: Arc::new(flags), worker_pool_msgs_tx, } } diff --git a/crates/sb_workers/context.rs b/crates/sb_workers/context.rs index 4ae61404..82345751 100644 --- a/crates/sb_workers/context.rs +++ b/crates/sb_workers/context.rs @@ -65,6 +65,10 @@ pub struct UserWorkerRuntimeOpts { pub cpu_time_soft_limit_ms: u64, pub cpu_time_hard_limit_ms: u64, + pub willterminate_wall_clock_pct: Option, + pub willterminate_cpu_pct: Option, + pub willterminate_memory_pct: Option, + pub force_create: bool, pub net_access_disabled: bool, pub allow_net: Option>, From 96ade8d040721476618a1e0c9ac9b51983991319 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 31 May 2024 01:46:39 +0000 Subject: [PATCH 10/16] stamp: must be used `CallbackScope` instead of `HandleScope` while dispatching event --- crates/base/src/deno_runtime.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index afea7e30..a1488232 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -1344,7 +1344,7 @@ where #[allow(dead_code)] struct Scope<'s> { context: v8::Local<'s, v8::Context>, - scope: v8::HandleScope<'s, ()>, + scope: v8::CallbackScope<'s, ()>, } impl<'s> Scope<'s> { @@ -1368,9 +1368,14 @@ impl<'l> MaybeDenoRuntime<'l> { .unwrap() .clone(); - let mut scope = match self { - MaybeDenoRuntime::DenoRuntime(v) => v8::HandleScope::new(v.js_runtime.v8_isolate()), - MaybeDenoRuntime::Isolate(v) => v8::HandleScope::new(&mut **v), + let mut scope = unsafe { + match self { + MaybeDenoRuntime::DenoRuntime(v) => { + v8::CallbackScope::new(v.js_runtime.v8_isolate()) + } + + MaybeDenoRuntime::Isolate(v) => v8::CallbackScope::new(&mut **v), + } }; let context = context.to_local_context(&mut scope); @@ -1419,6 +1424,9 @@ impl<'l> MaybeDenoRuntime<'l> { let tc_scope = &mut v8::TryCatch::new(ctx_scope); let event_fn = v8::Local::new(tc_scope, select_dispatch_fn(dispatch_fns)); + + drop(op_state_ref); + let undefined = v8::undefined(tc_scope); let fn_args = &*fn_args_fn(tc_scope); let fn_ret = event_fn.call(tc_scope, undefined.into(), fn_args); From aedb79bda5a48b6a62e9dcd0fec7af5b1107a9ae Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 31 May 2024 02:00:01 +0000 Subject: [PATCH 11/16] stamp: oops --- crates/base/src/deno_runtime.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index a1488232..e316033d 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -1198,7 +1198,9 @@ where return Poll::Ready(Err(err)); } } - } else if let Some(threshold_bytes) = + } + + if let Some(threshold_bytes) = willterminate_mem_threshold.load().as_deref().copied() { let total_malloced_bytes = total_malloced_bytes as u64; From 45fb979b65b09d7fe816d1c8d33bd5de0974164b Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 31 May 2024 04:46:31 +0000 Subject: [PATCH 12/16] stamp: use `beforeunload` instead of `willterminate` --- crates/base/src/deno_runtime.rs | 121 ++++-------------- crates/base/src/rt_worker/supervisor/mod.rs | 8 +- .../supervisor/strategy_per_request.rs | 21 ++- .../supervisor/strategy_per_worker.rs | 18 +-- crates/base/src/server.rs | 6 +- crates/cli/src/flags.rs | 6 +- crates/cli/src/main.rs | 18 +-- crates/sb_core/js/bootstrap.js | 14 +- crates/sb_workers/context.rs | 9 +- 9 files changed, 72 insertions(+), 149 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index e316033d..8e528b6a 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -228,34 +228,6 @@ impl GlobalMainContext { struct DispatchEventFunctions { dispatch_load_event_fn_global: v8::Global, - dispatch_willterminate_event_fn_global: v8::Global, - dispatch_beforeunload_event_fn_global: v8::Global, - dispatch_unload_event_fn_global: v8::Global, -} - -#[derive(IntoStaticStr, Debug, Clone, Copy)] -#[strum(serialize_all = "snake_case")] -pub enum WillTerminateReason { - CPU, - Memory, - WallClock, -} - -#[derive(Debug, Clone)] -struct GlobalMainContext(v8::Global); - -impl GlobalMainContext { - fn to_local_context<'s>( - &self, - scope: &mut v8::HandleScope<'s, ()>, - ) -> v8::Local<'s, v8::Context> { - v8::Local::new(scope, &self.0) - } -} - -struct DispatchEventFunctions { - dispatch_load_event_fn_global: v8::Global, - dispatch_willterminate_event_fn_global: v8::Global, dispatch_beforeunload_event_fn_global: v8::Global, dispatch_unload_event_fn_global: v8::Global, } @@ -286,8 +258,8 @@ pub struct DenoRuntime { mem_check: Arc, waker: Arc, - willterminate_mem_threshold: Arc>, - willterminate_cpu_threshold: Arc>, + beforeunload_mem_threshold: Arc>, + beforeunload_cpu_threshold: Arc>, _phantom_runtime_context: PhantomData, } @@ -610,24 +582,24 @@ where let mut create_params = None; let mut mem_check = MemCheck::default(); - let willterminate_cpu_threshold = ArcSwapOption::::from_pointee(None); - let willterminate_mem_threshold = ArcSwapOption::::from_pointee(None); + let beforeunload_cpu_threshold = ArcSwapOption::::from_pointee(None); + let beforeunload_mem_threshold = ArcSwapOption::::from_pointee(None); if conf.is_user_worker() { let conf = conf.as_user_worker().unwrap(); let memory_limit_bytes = mib_to_bytes(conf.memory_limit_mb) as usize; - willterminate_mem_threshold.store( + beforeunload_mem_threshold.store( flags - .willterminate_memory_pct + .beforeunload_memory_pct .and_then(|it| percentage_value(memory_limit_bytes as u64, it)) .map(Arc::new), ); if conf.cpu_time_hard_limit_ms > 0 { - willterminate_cpu_threshold.store( + beforeunload_cpu_threshold.store( flags - .willterminate_cpu_pct + .beforeunload_cpu_pct .and_then(|it| percentage_value(conf.cpu_time_hard_limit_ms, it)) .map(Arc::new), ); @@ -682,14 +654,6 @@ where .unwrap(); let dispatch_load_event_fn = v8::Local::::try_from(dispatch_load_event_fn).unwrap(); - let dispatch_willterminate_event_fn_str = - v8::String::new_external_onebyte_static(scope, b"dispatchWillTerminateEvent") - .unwrap(); - let dispatch_willterminate_event_fn = bootstrap_ns - .get(scope, dispatch_willterminate_event_fn_str.into()) - .unwrap(); - let dispatch_willterminate_event_fn = - v8::Local::::try_from(dispatch_willterminate_event_fn).unwrap(); let dispatch_beforeunload_event_fn_str = v8::String::new_external_onebyte_static(scope, b"dispatchBeforeUnloadEvent") .unwrap(); @@ -707,15 +671,12 @@ where v8::Local::::try_from(dispatch_unload_event_fn).unwrap(); let dispatch_load_event_fn_global = v8::Global::new(scope, dispatch_load_event_fn); - let dispatch_willterminate_event_fn_global = - v8::Global::new(scope, dispatch_willterminate_event_fn); let dispatch_beforeunload_event_fn_global = v8::Global::new(scope, dispatch_beforeunload_event_fn); let dispatch_unload_event_fn_global = v8::Global::new(scope, dispatch_unload_event_fn); DispatchEventFunctions { dispatch_load_event_fn_global, - dispatch_willterminate_event_fn_global, dispatch_beforeunload_event_fn_global, dispatch_unload_event_fn_global, } @@ -901,8 +862,8 @@ where mem_check, waker: Arc::default(), - willterminate_cpu_threshold: Arc::new(willterminate_cpu_threshold), - willterminate_mem_threshold: Arc::new(willterminate_mem_threshold), + beforeunload_cpu_threshold: Arc::new(beforeunload_cpu_threshold), + beforeunload_mem_threshold: Arc::new(beforeunload_mem_threshold), _phantom_runtime_context: PhantomData, }) @@ -1041,16 +1002,6 @@ where { let mut this = self.get_v8_tls_guard(); let _ = with_cpu_metrics_guard( - current_thread_id, - &maybe_cpu_usage_metrics_tx, - &mut accumulated_cpu_time_ns, - || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_beforeunload_event(), - ); - - // TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e; - // beforeExit) - - if let Err(err) = with_cpu_metrics_guard( current_thread_id, &maybe_cpu_usage_metrics_tx, &mut accumulated_cpu_time_ns, @@ -1069,18 +1020,6 @@ where let mut this = self.get_v8_tls_guard(); - if let Err(err) = with_cpu_metrics_guard( - current_thread_id, - &maybe_cpu_usage_metrics_tx, - &mut accumulated_cpu_time_ns, - || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_beforeunload_event(), - ) { - return (Err(err), get_accumulated_cpu_time_ms!()); - } - - // TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e; - // beforeExit) - if let Err(err) = with_cpu_metrics_guard( current_thread_id, &maybe_cpu_usage_metrics_tx, @@ -1106,8 +1045,8 @@ where let global_waker = self.waker.clone(); let termination_request_token = self.termination_request_token.clone(); - let willterminate_cpu_threshold = self.willterminate_cpu_threshold.clone(); - let willterminate_mem_threshold = self.willterminate_mem_threshold.clone(); + let beforeunload_cpu_threshold = self.beforeunload_cpu_threshold.clone(); + let beforeunload_mem_threshold = self.beforeunload_mem_threshold.clone(); let mem_check_state = is_user_worker.then(|| self.mem_check_state.clone()); let mut poll_sem = None::; @@ -1185,32 +1124,31 @@ where if is_user_worker { let mem_state = mem_check_state.as_ref().unwrap(); - if let Some(threshold_ms) = willterminate_cpu_threshold.load().as_deref().copied() { + if let Some(threshold_ms) = beforeunload_cpu_threshold.load().as_deref().copied() { let threshold_ns = (threshold_ms as i128) * 1_000_000; let accumulated_cpu_time_ns = *accumulated_cpu_time_ns as i128; if accumulated_cpu_time_ns >= threshold_ns { - willterminate_cpu_threshold.store(None); + beforeunload_cpu_threshold.store(None); if let Err(err) = MaybeDenoRuntime::DenoRuntime(&mut this) - .dispatch_willterminate_event(WillTerminateReason::CPU) + .dispatch_beforeunload_event(WillTerminateReason::CPU) { return Poll::Ready(Err(err)); } } } - if let Some(threshold_bytes) = - willterminate_mem_threshold.load().as_deref().copied() + if let Some(threshold_bytes) = beforeunload_mem_threshold.load().as_deref().copied() { let total_malloced_bytes = total_malloced_bytes as u64; if total_malloced_bytes >= threshold_bytes { - willterminate_mem_threshold.store(None); + beforeunload_mem_threshold.store(None); if !mem_state.is_exceeded() { if let Err(err) = MaybeDenoRuntime::DenoRuntime(&mut this) - .dispatch_willterminate_event(WillTerminateReason::Memory) + .dispatch_beforeunload_event(WillTerminateReason::Memory) { return Poll::Ready(Err(err)); } @@ -1453,15 +1391,15 @@ impl<'l> MaybeDenoRuntime<'l> { ) } - /// Dispatches "willterminate" event to the JavaScript runtime. - /// - /// Does not poll event loop, and thus not await any of the "willterminate" event handlers. - pub fn dispatch_willterminate_event( + /// Dispatches "beforeunload" event to the JavaScript runtime. Returns a boolean + /// indicating if the event was prevented and thus event loop should continue + /// running. + pub fn dispatch_beforeunload_event( &mut self, reason: WillTerminateReason, - ) -> Result<(), AnyError> { + ) -> Result { self.dispatch_event_with_callback( - |fns| &fns.dispatch_willterminate_event_fn_global, + |fns| &fns.dispatch_beforeunload_event_fn_global, move |scope| { vec![v8::String::new_external_onebyte_static( scope, @@ -1470,17 +1408,6 @@ impl<'l> MaybeDenoRuntime<'l> { .unwrap() .into()] }, - |_| Ok(()), - ) - } - - /// Dispatches "beforeunload" event to the JavaScript runtime. Returns a boolean - /// indicating if the event was prevented and thus event loop should continue - /// running. - pub fn dispatch_beforeunload_event(&mut self) -> Result { - self.dispatch_event_with_callback( - |fns| &fns.dispatch_beforeunload_event_fn_global, - |_| vec![], |it| Ok(it.unwrap().is_false()), ) } diff --git a/crates/base/src/rt_worker/supervisor/mod.rs b/crates/base/src/rt_worker/supervisor/mod.rs index 4f867369..deeb278b 100644 --- a/crates/base/src/rt_worker/supervisor/mod.rs +++ b/crates/base/src/rt_worker/supervisor/mod.rs @@ -156,7 +156,7 @@ async fn wait_cpu_alarm(maybe_alarm: Option<&mut UnboundedReceiver<()>>) -> Opti } } -async fn create_wall_clock_willterminate_alert(wall_clock_limit_ms: u64, pct: Option) { +async fn create_wall_clock_beforeunload_alert(wall_clock_limit_ms: u64, pct: Option) { let dur = pct .and_then(|it| percentage_value(wall_clock_limit_ms, it)) .map(Duration::from_millis); @@ -169,15 +169,15 @@ async fn create_wall_clock_willterminate_alert(wall_clock_limit_ms: u64, pct: Op } } -extern "C" fn v8_handle_wall_clock_willterminate( +extern "C" fn v8_handle_wall_clock_beforeunload( isolate: &mut v8::Isolate, _data: *mut std::ffi::c_void, ) { if let Err(err) = MaybeDenoRuntime::Isolate(isolate) - .dispatch_willterminate_event(WillTerminateReason::WallClock) + .dispatch_beforeunload_event(WillTerminateReason::WallClock) { warn!( - "found an error while dispatching the willterminate event: {}", + "found an error while dispatching the beforeunload event: {}", err ); } diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_request.rs b/crates/base/src/rt_worker/supervisor/strategy_per_request.rs index 5f3a772c..56afb6ea 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_request.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_request.rs @@ -9,9 +9,8 @@ use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs}; use tokio::time::Instant; use crate::rt_worker::supervisor::{ - create_wall_clock_willterminate_alert, v8_handle_termination, - v8_handle_wall_clock_willterminate, wait_cpu_alarm, CPUUsage, CPUUsageMetrics, Tokens, - V8HandleTerminationData, + create_wall_clock_beforeunload_alert, v8_handle_termination, v8_handle_wall_clock_beforeunload, + wait_cpu_alarm, CPUUsage, CPUUsageMetrics, Tokens, V8HandleTerminationData, }; use super::Arguments; @@ -57,7 +56,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; let mut is_worker_entered = false; - let mut is_wall_clock_willterminate_armed = false; + let mut is_wall_clock_beforeunload_armed = false; let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap(); let mut cpu_usage_ms = 0i64; @@ -75,13 +74,13 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) let wall_clock_duration = Duration::from_millis(wall_clock_limit_ms); let wall_clock_duration_alert = tokio::time::sleep(wall_clock_duration); - let wall_clock_willterminate_alert = create_wall_clock_willterminate_alert( + let wall_clock_beforeunload_alert = create_wall_clock_beforeunload_alert( wall_clock_limit_ms, - flags.willterminate_wall_clock_pct, + flags.beforeunload_wall_clock_pct, ); tokio::pin!(wall_clock_duration_alert); - tokio::pin!(wall_clock_willterminate_alert); + tokio::pin!(wall_clock_beforeunload_alert); loop { tokio::select! { @@ -188,17 +187,17 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) } } - _ = &mut wall_clock_willterminate_alert, - if !is_wall_clock_limit_disabled && !is_wall_clock_willterminate_armed + _ = &mut wall_clock_beforeunload_alert, + if !is_wall_clock_limit_disabled && !is_wall_clock_beforeunload_armed => { if thread_safe_handle.request_interrupt( - v8_handle_wall_clock_willterminate, + v8_handle_wall_clock_beforeunload, std::ptr::null_mut() ) { waker.wake(); } - is_wall_clock_willterminate_armed = true; + is_wall_clock_beforeunload_armed = true; } Some(_) = memory_limit_rx.recv() => { diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs index 920f8dd7..f52e5e05 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs @@ -8,7 +8,7 @@ use log::error; use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs}; use crate::rt_worker::supervisor::{ - create_wall_clock_willterminate_alert, v8_handle_wall_clock_willterminate, wait_cpu_alarm, + create_wall_clock_beforeunload_alert, v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage, Tokens, }; @@ -54,7 +54,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; let mut is_worker_entered = false; - let mut is_wall_clock_willterminate_armed = false; + let mut is_wall_clock_beforeunload_armed = false; let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap(); let mut cpu_usage_ms = 0i64; @@ -79,9 +79,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { .unwrap_or(Duration::from_millis(1)), ); - let wall_clock_willterminate_alert = create_wall_clock_willterminate_alert( + let wall_clock_beforeunload_alert = create_wall_clock_beforeunload_alert( wall_clock_limit_ms, - flags.willterminate_wall_clock_pct, + flags.beforeunload_wall_clock_pct, ); let early_retire_fn = || { @@ -107,7 +107,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { }; tokio::pin!(wall_clock_duration_alert); - tokio::pin!(wall_clock_willterminate_alert); + tokio::pin!(wall_clock_beforeunload_alert); loop { tokio::select! { @@ -232,17 +232,17 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } } - _ = &mut wall_clock_willterminate_alert, - if !is_wall_clock_limit_disabled && !is_wall_clock_willterminate_armed + _ = &mut wall_clock_beforeunload_alert, + if !is_wall_clock_limit_disabled && !is_wall_clock_beforeunload_armed => { if thread_safe_handle.request_interrupt( - v8_handle_wall_clock_willterminate, + v8_handle_wall_clock_beforeunload, std::ptr::null_mut() ) { waker.wake(); } - is_wall_clock_willterminate_armed = true; + is_wall_clock_beforeunload_armed = true; } Some(_) = memory_limit_rx.recv() => { diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 77e42c87..fa6b2b93 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -255,9 +255,9 @@ pub struct ServerFlags { pub request_read_timeout_ms: Option, pub request_buffer_size: Option, - pub willterminate_wall_clock_pct: Option, - pub willterminate_cpu_pct: Option, - pub willterminate_memory_pct: Option, + pub beforeunload_wall_clock_pct: Option, + pub beforeunload_cpu_pct: Option, + pub beforeunload_memory_pct: Option, } #[derive(Debug)] diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs index df624c13..ae74ab33 100644 --- a/crates/cli/src/flags.rs +++ b/crates/cli/src/flags.rs @@ -233,17 +233,17 @@ fn get_start_command() -> Command { .default_value("16384"), ) .arg( - arg!(--"dispatch-willterminate-wall-clock-ratio" ) + arg!(--"dispatch-beforeunload-wall-clock-ratio" ) .value_parser(value_parser!(u8).range(..=99)) .default_value("90") ) .arg( - arg!(--"dispatch-willterminate-cpu-ratio" ) + arg!(--"dispatch-beforeunload-cpu-ratio" ) .value_parser(value_parser!(u8).range(..=99)) .default_value("90") ) .arg( - arg!(--"dispatch-willterminate-memory-ratio" ) + arg!(--"dispatch-beforeunload-memory-ratio" ) .value_parser(value_parser!(u8).range(..=99)) .default_value("90") ) diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index e45cd9dc..ae47ac13 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -148,14 +148,14 @@ fn main() -> Result { let maybe_request_read_timeout = sub_matches.get_one::("request-read-timeout").cloned(); - let maybe_willterminate_wall_clock_pct = sub_matches - .get_one::("dispatch-willterminate-wall-clock-ratio") + let maybe_beforeunload_wall_clock_pct = sub_matches + .get_one::("dispatch-beforeunload-wall-clock-ratio") .cloned(); - let maybe_willterminate_cpu_pct = sub_matches - .get_one::("dispatch-willterminate-cpu-ratio") + let maybe_beforeunload_cpu_pct = sub_matches + .get_one::("dispatch-beforeunload-cpu-ratio") .cloned(); - let maybe_willterminate_memory_pct = sub_matches - .get_one::("dispatch-willterminate-memory-ratio") + let maybe_beforeunload_memory_pct = sub_matches + .get_one::("dispatch-beforeunload-memory-ratio") .cloned(); let static_patterns = @@ -203,9 +203,9 @@ fn main() -> Result { request_read_timeout_ms: maybe_request_read_timeout, request_buffer_size: Some(request_buffer_size), - willterminate_wall_clock_pct: maybe_willterminate_wall_clock_pct, - willterminate_cpu_pct: maybe_willterminate_cpu_pct, - willterminate_memory_pct: maybe_willterminate_memory_pct, + beforeunload_wall_clock_pct: maybe_beforeunload_wall_clock_pct, + beforeunload_cpu_pct: maybe_beforeunload_cpu_pct, + beforeunload_memory_pct: maybe_beforeunload_memory_pct, }; let maybe_received_signum = start_server( diff --git a/crates/sb_core/js/bootstrap.js b/crates/sb_core/js/bootstrap.js index b454af85..603f7971 100644 --- a/crates/sb_core/js/bootstrap.js +++ b/crates/sb_core/js/bootstrap.js @@ -409,18 +409,13 @@ function dispatchLoadEvent() { globalThis_.dispatchEvent(new Event("load")); } -function dispatchWillTerminateEvent(reason) { - globalThis_.dispatchEvent(new CustomEvent("willterminate", { - detail: { reason } +function dispatchBeforeUnloadEvent(reason) { + globalThis_.dispatchEvent(new CustomEvent("beforeunload", { + cancelable: true, + detail: { reason: reason ?? null } })); } -function dispatchBeforeUnloadEvent() { - return globalThis_.dispatchEvent( - new Event("beforeunload", { cancelable: true }), - ); -} - function dispatchUnloadEvent() { globalThis_.dispatchEvent(new Event("unload")); } @@ -673,7 +668,6 @@ globalThis.bootstrapSBEdge = opts => { globalThis.bootstrap = { dispatchLoadEvent, - dispatchWillTerminateEvent, dispatchUnloadEvent, dispatchBeforeUnloadEvent, // dispatchProcessExitEvent, diff --git a/crates/sb_workers/context.rs b/crates/sb_workers/context.rs index 82345751..ce865bb1 100644 --- a/crates/sb_workers/context.rs +++ b/crates/sb_workers/context.rs @@ -65,9 +65,9 @@ pub struct UserWorkerRuntimeOpts { pub cpu_time_soft_limit_ms: u64, pub cpu_time_hard_limit_ms: u64, - pub willterminate_wall_clock_pct: Option, - pub willterminate_cpu_pct: Option, - pub willterminate_memory_pct: Option, + pub beforeunload_wall_clock_pct: Option, + pub beforeunload_cpu_pct: Option, + pub beforeunload_memory_pct: Option, pub force_create: bool, pub net_access_disabled: bool, @@ -89,6 +89,9 @@ impl Default for UserWorkerRuntimeOpts { worker_timeout_ms: env!("SUPABASE_RESOURCE_LIMIT_TIMEOUT_MS").parse().unwrap(), cpu_time_soft_limit_ms: env!("SUPABASE_RESOURCE_LIMIT_CPU_SOFT_MS").parse().unwrap(), cpu_time_hard_limit_ms: env!("SUPABASE_RESOURCE_LIMIT_CPU_HARD_MS").parse().unwrap(), + beforeunload_wall_clock_pct: None, + beforeunload_cpu_pct: None, + beforeunload_memory_pct: None, force_create: false, key: None, From 399fd979ea022ef936d686bbf57a441c01b6a6cb Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 22 Nov 2024 10:17:17 +0000 Subject: [PATCH 13/16] stamp: resolve merge conflicts --- crates/base/src/deno_runtime.rs | 96 ++++++--------------- crates/base/src/rt_worker/supervisor/mod.rs | 2 +- crates/base/src/rt_worker/worker.rs | 4 +- crates/base/src/rt_worker/worker_ctx.rs | 3 +- crates/base/src/rt_worker/worker_pool.rs | 1 - crates/sb_core/js/bootstrap.js | 4 +- 6 files changed, 34 insertions(+), 76 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 8e528b6a..6cbaa120 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -1,23 +1,23 @@ use crate::inspector_server::Inspector; use crate::rt_worker::supervisor::{CPUUsage, CPUUsageMetrics}; use crate::rt_worker::worker::DuplexStreamEntry; +use crate::server::ServerFlags; use crate::utils::json; use crate::utils::path::find_up; -use crate::server::ServerFlags; use crate::utils::units::{bytes_to_display, mib_to_bytes, percentage_value}; use anyhow::{anyhow, bail, Error}; +use arc_swap::ArcSwapOption; use base_mem_check::{MemCheckState, WorkerHeapStatistics}; use base_rt::DenoRuntimeDropToken; use base_rt::{get_current_cpu_time_ns, BlockingScopeCPUUsage}; -use arc_swap::ArcSwapOption; use cooked_waker::{IntoWaker, WakeRef}; use ctor::ctor; use deno_core::error::{AnyError, JsError}; use deno_core::url::Url; use deno_core::v8::{self, GCCallbackFlags, GCType, HeapStatistics, Isolate}; use deno_core::{ - located_script_name, serde_json, JsRuntime, ModuleCodeString, ModuleId, ModuleLoader, OpState, + located_script_name, serde_json, JsRuntime, ModuleCodeString, ModuleId, ModuleLoader, ModuleSpecifier, OpState, PollEventLoopOptions, ResolutionKind, RuntimeOptions, }; use deno_http::DefaultHttpPropertyExtractor; @@ -26,8 +26,7 @@ use deno_tls::rustls::RootCertStore; use deno_tls::RootCertStoreProvider; use futures_util::future::poll_fn; use futures_util::task::AtomicWaker; -use futures_util::Future; -use log::{error, trace}; +use log::error; use once_cell::sync::{Lazy, OnceCell}; use sb_core::http::sb_core_http; use sb_core::http_start::sb_core_http_start; @@ -147,7 +146,6 @@ fn get_error_class_name(e: &AnyError) -> &'static str { struct MemCheck { exceeded_token: CancellationToken, limit: Option, - waker: Arc, state: Arc>, } @@ -202,6 +200,7 @@ impl MemCheck { fn is_exceeded(&self) -> bool { self.exceeded_token.is_cancelled() + } } pub trait GetRuntimeContext { @@ -609,7 +608,7 @@ where allocator.set_waker(mem_check.waker.clone()); - mem_check.limit = Some(memory_limit); + mem_check.limit = Some(memory_limit_bytes); create_params = Some( v8::CreateParams::default() .heap_limits(mib_to_bytes(0) as usize, memory_limit_bytes) @@ -632,7 +631,7 @@ where ..Default::default() }; - let mut js_runtime = JsRuntime::new(runtime_options); + let mut js_runtime = ManuallyDrop::new(JsRuntime::new(runtime_options)); let dispatch_fns = { let context = js_runtime.main_context(); @@ -903,8 +902,10 @@ where self.js_runtime.v8_isolate().enter(); if inspector.is_some() { + let is_terminated = self.is_terminated.clone(); let mut this = scopeguard::guard_on_unwind(&mut *self, |this| { this.js_runtime.v8_isolate().exit(); + is_terminated.raise(); }); { @@ -922,6 +923,7 @@ where if this.termination_request_token.is_cancelled() { this.js_runtime.v8_isolate().exit(); + is_terminated.raise(); return (Ok(()), 0i64); } } @@ -983,9 +985,10 @@ where if let Err(err) = with_cpu_metrics_guard( current_thread_id, + this.js_runtime.op_state(), &maybe_cpu_usage_metrics_tx, &mut accumulated_cpu_time_ns, - || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_load_event(), + || MaybeDenoRuntime::DenoRuntime(*this).dispatch_load_event(), ) { return (Err(err), get_accumulated_cpu_time_ms!()); } @@ -1003,13 +1006,12 @@ where let mut this = self.get_v8_tls_guard(); let _ = with_cpu_metrics_guard( current_thread_id, + this.js_runtime.op_state(), &maybe_cpu_usage_metrics_tx, &mut accumulated_cpu_time_ns, || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_unload_event(), - ) { - return (Err(err), get_accumulated_cpu_time_ms!()); - } - + ); + // TODO(Nyannyacha): Here we also need to trigger the event for node platform (i.e; exit) return ( @@ -1022,6 +1024,7 @@ where if let Err(err) = with_cpu_metrics_guard( current_thread_id, + this.js_runtime.op_state(), &maybe_cpu_usage_metrics_tx, &mut accumulated_cpu_time_ns, || MaybeDenoRuntime::DenoRuntime(&mut this).dispatch_unload_event(), @@ -1048,7 +1051,7 @@ where let beforeunload_cpu_threshold = self.beforeunload_cpu_threshold.clone(); let beforeunload_mem_threshold = self.beforeunload_mem_threshold.clone(); - let mem_check_state = is_user_worker.then(|| self.mem_check_state.clone()); + let mem_check_state = is_user_worker.then(|| self.mem_check.clone()); let mut poll_sem = None::; poll_fn(move |cx| { @@ -1123,6 +1126,9 @@ where if is_user_worker { let mem_state = mem_check_state.as_ref().unwrap(); + let total_malloced_bytes = mem_state.check(js_runtime.v8_isolate().as_mut()); + + mem_state.waker.register(waker); if let Some(threshold_ms) = beforeunload_cpu_threshold.load().as_deref().copied() { let threshold_ns = (threshold_ms as i128) * 1_000_000; @@ -1155,9 +1161,6 @@ where } } } - - mem_state.check(js_runtime.v8_isolate().as_mut()); - mem_state.waker.register(waker); } // NOTE(Nyannyacha): If tasks are empty or V8 is not evaluating the @@ -1240,7 +1243,7 @@ where let _ = handle.request_interrupt(interrupt_fn, std::ptr::null_mut()); }; - drop(rt::SUPERVISOR_RT.spawn({ + drop(base_rt::SUPERVISOR_RT.spawn({ let cancel_task_token = cancel_task_token.clone(); async move { @@ -1294,12 +1297,15 @@ impl<'s> Scope<'s> { } } -pub enum MaybeDenoRuntime<'l> { - DenoRuntime(&'l mut DenoRuntime), +pub enum MaybeDenoRuntime<'l, RuntimeContext> { + DenoRuntime(&'l mut DenoRuntime), Isolate(&'l mut v8::Isolate), } -impl<'l> MaybeDenoRuntime<'l> { +impl<'l, RuntimeContext> MaybeDenoRuntime<'l, RuntimeContext> +where + RuntimeContext: GetRuntimeContext, +{ fn scope(&mut self) -> Scope<'_> { let op_state = self.op_state(); let op_state_ref = op_state.borrow(); @@ -1424,10 +1430,6 @@ impl<'l> MaybeDenoRuntime<'l> { } } -fn get_current_cpu_time_ns() -> Result { - get_thread_time().context("can't get current thread time") -} - pub fn import_meta_resolve_callback( loader: &dyn ModuleLoader, specifier: String, @@ -1911,50 +1913,6 @@ mod test { std::mem::drop(main_mod_ev); } - async fn create_runtime( - path: Option<&str>, - env_vars: Option>, - user_conf: Option, - static_patterns: Vec, - maybe_jsx_import_source_config: Option, - ) -> DenoRuntime { - let (worker_pool_tx, _) = mpsc::unbounded_channel::(); - - DenoRuntime::new( - WorkerContextInitOpts { - service_path: path - .map(PathBuf::from) - .unwrap_or(PathBuf::from("./test_cases/main")), - - no_module_cache: false, - import_map_path: None, - env_vars: env_vars.unwrap_or_default(), - events_rx: None, - timing: None, - maybe_eszip: None, - maybe_entrypoint: None, - maybe_decorator: None, - maybe_module_code: None, - conf: { - if let Some(uc) = user_conf { - uc - } else { - WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { - worker_pool_tx, - shared_metric_src: None, - event_worker_metric_src: None, - }) - } - }, - static_patterns, - maybe_jsx_import_source_config, - }, - None, - ) - .await - .unwrap() - } - // Main Runtime should have access to `EdgeRuntime` #[tokio::test] #[serial] diff --git a/crates/base/src/rt_worker/supervisor/mod.rs b/crates/base/src/rt_worker/supervisor/mod.rs index deeb278b..c2b5308b 100644 --- a/crates/base/src/rt_worker/supervisor/mod.rs +++ b/crates/base/src/rt_worker/supervisor/mod.rs @@ -173,7 +173,7 @@ extern "C" fn v8_handle_wall_clock_beforeunload( isolate: &mut v8::Isolate, _data: *mut std::ffi::c_void, ) { - if let Err(err) = MaybeDenoRuntime::Isolate(isolate) + if let Err(err) = MaybeDenoRuntime::<()>::Isolate(isolate) .dispatch_beforeunload_event(WillTerminateReason::WallClock) { warn!( diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index 5186558c..4dc27d33 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -21,8 +21,8 @@ use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts, WorkerExit, Wor use std::any::Any; use std::future::{pending, Future}; use std::pin::Pin; -use std::time::Duration; use std::sync::Arc; +use std::time::Duration; use tokio::io; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -236,7 +236,7 @@ impl Worker { termination_request_token.cancel(); let data_ptr_mut = Box::into_raw(Box::new( - supervisor::IsolateInterruptData { + supervisor::V8HandleTerminationData { should_terminate: true, isolate_memory_usage_tx: None, }, diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index 9e5e1c14..df6ba846 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -606,7 +606,6 @@ pub async fn create_worker>( init_opts.into(); let worker_kind = worker_init_opts.conf.to_worker_kind(); - let request_idle_timeout = flags.request_idle_timeout_ms; let exit = WorkerExit::default(); let mut worker = Worker::new(&worker_init_opts)?; @@ -629,7 +628,7 @@ pub async fn create_worker>( exit.clone(), maybe_termination_token.clone(), inspector, - flags, + flags.clone(), ); // create an async task waiting for requests for worker diff --git a/crates/base/src/rt_worker/worker_pool.rs b/crates/base/src/rt_worker/worker_pool.rs index ebd91892..39c877e5 100644 --- a/crates/base/src/rt_worker/worker_pool.rs +++ b/crates/base/src/rt_worker/worker_pool.rs @@ -235,7 +235,6 @@ impl WorkerPool { user_workers: HashMap::new(), active_workers: HashMap::new(), maybe_inspector: inspector, - flags: Arc::new(flags), worker_pool_msgs_tx, } } diff --git a/crates/sb_core/js/bootstrap.js b/crates/sb_core/js/bootstrap.js index 603f7971..1654c043 100644 --- a/crates/sb_core/js/bootstrap.js +++ b/crates/sb_core/js/bootstrap.js @@ -467,7 +467,9 @@ function processRejectionHandled(promise, reason) { } } -globalThis.bootstrapSBEdge = opts => { +globalThis.bootstrapSBEdge = (opts, extraCtx) => { + globalThis_ = globalThis; + // We should delete this after initialization, // Deleting it during bootstrapping can backfire delete globalThis.__bootstrap; From 4b0a591f4dd930fcd2f4ab12e53e8d77e509adc6 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sat, 23 Nov 2024 05:14:11 +0000 Subject: [PATCH 14/16] stamp: polishing --- crates/base/src/deno_runtime.rs | 143 ++++++++++++++++++++++---------- 1 file changed, 97 insertions(+), 46 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 6cbaa120..7ff672a1 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -1231,39 +1231,11 @@ where fn terminate_execution_if_cancelled( &mut self, - token: CancellationToken, - ) -> ScopeGuard { - extern "C" fn interrupt_fn(isolate: &mut v8::Isolate, _: *mut std::ffi::c_void) { - let _ = isolate.terminate_execution(); - } - - let handle = self.js_runtime.v8_isolate().thread_safe_handle(); - let cancel_task_token = CancellationToken::new(); - let request_interrupt_fn = move || { - let _ = handle.request_interrupt(interrupt_fn, std::ptr::null_mut()); - }; - - drop(base_rt::SUPERVISOR_RT.spawn({ - let cancel_task_token = cancel_task_token.clone(); - - async move { - if token.is_cancelled() { - request_interrupt_fn(); - } else { - tokio::select! { - _ = token.cancelled_owned() => { - request_interrupt_fn(); - } - - _ = cancel_task_token.cancelled_owned() => {} - } - } - } - })); - - scopeguard::guard(cancel_task_token, |v| { - v.cancel(); - }) + ) -> ScopeGuard> { + terminate_execution_if_cancelled( + self.js_runtime.v8_isolate(), + self.termination_request_token.clone(), + ) } fn get_v8_tls_guard<'l>( @@ -1297,9 +1269,34 @@ impl<'s> Scope<'s> { } } +pub struct IsolateWithCancellationToken<'l>(&'l mut v8::Isolate, CancellationToken); + +impl std::ops::Deref for IsolateWithCancellationToken<'_> { + type Target = v8::Isolate; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +impl std::ops::DerefMut for IsolateWithCancellationToken<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + +impl IsolateWithCancellationToken<'_> { + fn terminate_execution_if_cancelled( + &mut self, + ) -> ScopeGuard> { + terminate_execution_if_cancelled(self.0, self.1.clone()) + } +} + pub enum MaybeDenoRuntime<'l, RuntimeContext> { DenoRuntime(&'l mut DenoRuntime), Isolate(&'l mut v8::Isolate), + IsolateWithCancellationToken(IsolateWithCancellationToken<'l>), } impl<'l, RuntimeContext> MaybeDenoRuntime<'l, RuntimeContext> @@ -1316,11 +1313,9 @@ where let mut scope = unsafe { match self { - MaybeDenoRuntime::DenoRuntime(v) => { - v8::CallbackScope::new(v.js_runtime.v8_isolate()) - } - - MaybeDenoRuntime::Isolate(v) => v8::CallbackScope::new(&mut **v), + Self::DenoRuntime(v) => v8::CallbackScope::new(v.js_runtime.v8_isolate()), + Self::Isolate(v) => v8::CallbackScope::new(&mut **v), + Self::IsolateWithCancellationToken(v) => v8::CallbackScope::new(&mut **v), } }; @@ -1329,22 +1324,30 @@ where Scope { context, scope } } + #[allow(unused)] + fn v8_isolate(&mut self) -> &mut v8::Isolate { + match self { + Self::DenoRuntime(v) => v.js_runtime.v8_isolate(), + Self::Isolate(v) => v, + Self::IsolateWithCancellationToken(v) => v.0, + } + } + fn op_state(&mut self) -> Rc> { match self { - MaybeDenoRuntime::DenoRuntime(v) => v.js_runtime.op_state(), - MaybeDenoRuntime::Isolate(v) => JsRuntime::op_state_from(v), + Self::DenoRuntime(v) => v.js_runtime.op_state(), + Self::Isolate(v) => JsRuntime::op_state_from(v), + Self::IsolateWithCancellationToken(v) => JsRuntime::op_state_from(v.0), } } fn terminate_execution_if_cancelled( &mut self, - ) -> Option> { + ) -> Option>> { match self { - MaybeDenoRuntime::DenoRuntime(v) => { - Some(v.terminate_execution_if_cancelled(v.termination_request_token.clone())) - } - - MaybeDenoRuntime::Isolate(_) => None, + Self::DenoRuntime(v) => Some(v.terminate_execution_if_cancelled()), + Self::IsolateWithCancellationToken(v) => Some(v.terminate_execution_if_cancelled()), + Self::Isolate(_) => None, } } @@ -1422,6 +1425,14 @@ where /// /// Does not poll event loop, and thus not await any of the "unload" event handlers. pub fn dispatch_unload_event(&mut self) -> Result<(), AnyError> { + // NOTE(Nyannyacha): It is currently not possible to dispatch this event because the + // supervisor has forcibly pulled the isolate out of the running state and the + // `CancellationToken` prevents function invocation. + // + // If we want to dispatch this event, we may need to provide an extra margin for the + // invocation. + + // self.v8_isolate().cancel_terminate_execution(); self.dispatch_event_with_callback( |fns| &fns.dispatch_unload_event_fn_global, |_| vec![], @@ -1498,6 +1509,46 @@ fn get_cpu_metrics_guard<'l>( }) } +fn terminate_execution_if_cancelled( + isolate: &mut v8::Isolate, + token: CancellationToken, +) -> ScopeGuard> { + extern "C" fn interrupt_fn(isolate: &mut v8::Isolate, _: *mut std::ffi::c_void) { + let _ = isolate.terminate_execution(); + } + + let handle = isolate.thread_safe_handle(); + let cancel_task_token = CancellationToken::new(); + let request_interrupt_fn = move || { + let _ = handle.request_interrupt(interrupt_fn, std::ptr::null_mut()); + }; + + drop(base_rt::SUPERVISOR_RT.spawn({ + let cancel_task_token = cancel_task_token.clone(); + + async move { + if token.is_cancelled() { + request_interrupt_fn(); + } else { + tokio::select! { + _ = token.cancelled_owned() => { + request_interrupt_fn(); + } + + _ = cancel_task_token.cancelled_owned() => {} + } + } + } + })); + + scopeguard::guard( + cancel_task_token, + Box::new(|v| { + v.cancel(); + }), + ) +} + fn set_v8_flags() { let v8_flags = std::env::var("V8_FLAGS").unwrap_or("".to_string()); let mut vec = vec![""]; From b84b79db61f8773bc1d15b0829481223319e4fb4 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sat, 23 Nov 2024 05:14:20 +0000 Subject: [PATCH 15/16] chore: add integration tests --- .../test_cases/runtime-event/cpu/index.ts | 34 +++++ crates/base/test_cases/runtime-event/index.ts | 91 ++++++++++++++ .../test_cases/runtime-event/mem/index.ts | 32 +++++ .../test_cases/runtime-event/unload/index.ts | 10 ++ .../runtime-event/wall-clock/index.ts | 20 +++ crates/base/tests/integration_tests.rs | 116 ++++++++++++++++++ 6 files changed, 303 insertions(+) create mode 100644 crates/base/test_cases/runtime-event/cpu/index.ts create mode 100644 crates/base/test_cases/runtime-event/index.ts create mode 100644 crates/base/test_cases/runtime-event/mem/index.ts create mode 100644 crates/base/test_cases/runtime-event/unload/index.ts create mode 100644 crates/base/test_cases/runtime-event/wall-clock/index.ts diff --git a/crates/base/test_cases/runtime-event/cpu/index.ts b/crates/base/test_cases/runtime-event/cpu/index.ts new file mode 100644 index 00000000..1bc23b44 --- /dev/null +++ b/crates/base/test_cases/runtime-event/cpu/index.ts @@ -0,0 +1,34 @@ +addEventListener("beforeunload", (ev) => { + if (ev instanceof CustomEvent) { + console.log("triggered", ev.detail?.["reason"]); + } +}); + +function sleep(ms: number) { + return new Promise(res => { + setTimeout(() => { + res(void 0); + }, ms) + }); +} + +function mySlowFunction(baseNumber: number) { + const now = Date.now(); + let result = 0; + for (let i = Math.pow(baseNumber, 7); i >= 0; i--) { + result += Math.atan(i) * Math.tan(i); + } + const duration = Date.now() - now; + return { result: result, duration: duration }; +} + +export default { + async fetch() { + for (let i = 0; i < Number.MAX_VALUE; i++) { + mySlowFunction(8); + await sleep(10); + } + + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/test_cases/runtime-event/index.ts b/crates/base/test_cases/runtime-event/index.ts new file mode 100644 index 00000000..53295810 --- /dev/null +++ b/crates/base/test_cases/runtime-event/index.ts @@ -0,0 +1,91 @@ +import * as path from "jsr:@std/path"; + +Deno.serve(async (req: Request) => { + console.log(req.url); + const url = new URL(req.url); + const { pathname } = url; + const service_name = pathname; + + if (!service_name || service_name === "") { + const error = { msg: "missing function name in request" } + return new Response( + JSON.stringify(error), + { status: 400, headers: { "Content-Type": "application/json" } }, + ) + } + + const servicePath = path.join("test_cases/runtime-event", pathname); + let configs = { + memoryLimitMb: 150 * 1, + workerTimeoutMs: 60 * 1000, + cpuTimeSoftLimitMs: 1000 * 2, + cpuTimeHardLimitMs: 1000 * 4, + } as const; + + switch (pathname) { + case "/cpu": + configs = { + ...configs, + cpuTimeSoftLimitMs: 250, + cpuTimeHardLimitMs: 500, + }; + break; + + case "/mem": + configs = { + ...configs, + memoryLimitMb: 50 + }; + break; + + case "/wall-clock": + configs = { + ...configs, + workerTimeoutMs: 1000 * 5 + }; + break; + + case "/unload": + configs = { + ...configs, + workerTimeoutMs: 1000 * 2 + }; + break; + + default: + return new Response(null, { status: 200 }); + } + + const createWorker = async () => { + + const noModuleCache = false; + const importMapPath = null; + const envVarsObj = Deno.env.toObject(); + const envVars = Object.keys(envVarsObj).map(k => [k, envVarsObj[k]]); + + return await EdgeRuntime.userWorkers.create({ + ...configs, + servicePath, + noModuleCache, + importMapPath, + envVars + }); + } + + const callWorker = async () => { + try { + const worker = await createWorker(); + return await worker.fetch(req); + } catch (e) { + console.error(e); + + const error = { msg: e.toString() } + return new Response( + JSON.stringify(error), + { status: 500, headers: { "Content-Type": "application/json" } }, + ); + } + } + + return await callWorker(); +}) diff --git a/crates/base/test_cases/runtime-event/mem/index.ts b/crates/base/test_cases/runtime-event/mem/index.ts new file mode 100644 index 00000000..59513236 --- /dev/null +++ b/crates/base/test_cases/runtime-event/mem/index.ts @@ -0,0 +1,32 @@ +addEventListener("beforeunload", (ev) => { + if (ev instanceof CustomEvent) { + console.log("triggered", ev.detail?.["reason"]); + } +}); + +function sleep(ms: number) { + return new Promise(res => { + setTimeout(() => { + res(void 0); + }, ms) + }); +} + +const arr = []; + +function memHog() { + const x = new Uint8Array(100000); + x.fill(999, 0); + arr.push(x); +} + +export default { + async fetch() { + for (let i = 0; i < Number.MAX_SAFE_INTEGER; i++) { + memHog(); + await sleep(10); + } + + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/test_cases/runtime-event/unload/index.ts b/crates/base/test_cases/runtime-event/unload/index.ts new file mode 100644 index 00000000..f306e586 --- /dev/null +++ b/crates/base/test_cases/runtime-event/unload/index.ts @@ -0,0 +1,10 @@ +addEventListener("unload", () => { + console.log("triggered", "unload"); +}); + + +export default { + fetch() { + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/test_cases/runtime-event/wall-clock/index.ts b/crates/base/test_cases/runtime-event/wall-clock/index.ts new file mode 100644 index 00000000..9acdf606 --- /dev/null +++ b/crates/base/test_cases/runtime-event/wall-clock/index.ts @@ -0,0 +1,20 @@ +addEventListener("beforeunload", (ev) => { + if (ev instanceof CustomEvent) { + console.log("triggered", ev.detail?.["reason"]); + } +}); + +function sleep(ms: number) { + return new Promise(res => { + setTimeout(() => { + res(void 0); + }, ms) + }); +} + +export default { + async fetch() { + await sleep(1000 * 30); + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 58d209b6..d7012caa 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -2,6 +2,7 @@ #![allow(clippy::async_yields_async)] use deno_config::JsxImportSourceConfig; +use event_worker::events::{LogLevel, WorkerEvents}; use http_v02 as http; use hyper_v014 as hyper; use reqwest_v011 as reqwest; @@ -3083,6 +3084,121 @@ async fn test_ort_vision_zero_shot_image_classification() { ); } +async fn test_runtime_beforeunload_event(kind: &'static str, pct: u8) { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = TestBedBuilder::new("./test_cases/runtime-event") + .with_per_worker_policy(None) + .with_worker_event_sender(Some(tx)) + .with_server_flags(ServerFlags { + beforeunload_wall_clock_pct: Some(pct), + beforeunload_cpu_pct: Some(pct), + beforeunload_memory_pct: Some(pct), + ..Default::default() + }) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri(format!("/{}", kind)) + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_ne!(resp.status().as_u16(), StatusCode::OK); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + let mut found_triggered = false; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Log(ev) = ev.event else { + continue; + }; + if ev.level != LogLevel::Info { + continue; + } + + found_triggered = ev + .msg + .contains(&format!("triggered {}", kind.replace("-", "_"))); + + if found_triggered { + break; + } + } + + assert!(found_triggered); +} + +#[tokio::test] +#[serial] +async fn test_runtime_event_beforeunload_cpu() { + test_runtime_beforeunload_event("cpu", 50).await; +} + +#[tokio::test] +#[serial] +async fn test_runtime_event_beforeunload_wall_clock() { + test_runtime_beforeunload_event("wall-clock", 50).await; +} + +#[tokio::test] +#[serial] +async fn test_runtime_event_beforeunload_mem() { + test_runtime_beforeunload_event("mem", 50).await; +} + +// NOTE(Nyannyacha): We cannot enable this test unless we clarify the trigger point of the unload +// event. +// #[tokio::test] +// #[serial] +// async fn test_runtime_event_unload() { +// let (tx, mut rx) = mpsc::unbounded_channel(); +// let tb = TestBedBuilder::new("./test_cases/runtime-event") +// .with_per_worker_policy(None) +// .with_worker_event_sender(Some(tx)) +// .build() +// .await; + +// let resp = tb +// .request(|b| { +// b.uri("/unload") +// .method("GET") +// .body(Body::empty()) +// .context("can't make request") +// }) +// .await +// .unwrap(); + +// assert_eq!(resp.status().as_u16(), StatusCode::OK); + +// sleep(Duration::from_secs(8)).await; +// tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + +// let mut found_triggered = false; + +// while let Some(ev) = rx.recv().await { +// let WorkerEvents::Log(ev) = ev.event else { +// continue; +// }; +// if ev.level != LogLevel::Info { +// continue; +// } + +// found_triggered = ev.msg.contains("triggered unload"); + +// if found_triggered { +// break; +// } +// } + +// assert!(found_triggered); +// } + #[derive(Deserialize)] struct ErrorResponsePayload { msg: String, From 8d7110143322e0a985788a4a0ececd108b1ff841 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sat, 23 Nov 2024 05:31:30 +0000 Subject: [PATCH 16/16] stamp: make clippy happy --- crates/base/src/deno_runtime.rs | 7 +++++-- crates/base/tests/integration_tests.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 7ff672a1..64d52391 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -1256,6 +1256,9 @@ where } } +type TerminateExecutionIfCancelledReturnType = + ScopeGuard>; + #[allow(dead_code)] struct Scope<'s> { context: v8::Local<'s, v8::Context>, @@ -1343,7 +1346,7 @@ where fn terminate_execution_if_cancelled( &mut self, - ) -> Option>> { + ) -> Option { match self { Self::DenoRuntime(v) => Some(v.terminate_execution_if_cancelled()), Self::IsolateWithCancellationToken(v) => Some(v.terminate_execution_if_cancelled()), @@ -1512,7 +1515,7 @@ fn get_cpu_metrics_guard<'l>( fn terminate_execution_if_cancelled( isolate: &mut v8::Isolate, token: CancellationToken, -) -> ScopeGuard> { +) -> TerminateExecutionIfCancelledReturnType { extern "C" fn interrupt_fn(isolate: &mut v8::Isolate, _: *mut std::ffi::c_void) { let _ = isolate.terminate_execution(); } diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index d7012caa..01237eb2 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -3124,7 +3124,7 @@ async fn test_runtime_beforeunload_event(kind: &'static str, pct: u8) { found_triggered = ev .msg - .contains(&format!("triggered {}", kind.replace("-", "_"))); + .contains(&format!("triggered {}", kind.replace('-', "_"))); if found_triggered { break;