diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 90ba35b99d626..d2d12a12be208 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -359,6 +359,7 @@ mod test { connector_rpc_sink_payload_format: None, config_path: "src/config/test.toml", total_memory_bytes: 34359738368, + reserved_memory_bytes: None, parallelism: 10, role: Both, metrics_level: None, diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index c3d82dddfde94..344e0c781eb5e 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -90,6 +90,13 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())] pub total_memory_bytes: usize, + /// Reserved memory for the compute node in bytes. + /// If not set, a portion (default to 30%) for the total_memory_bytes will be used as the reserved memory. + /// + /// The total memory compute and storage can use is `total_memory_bytes` - `reserved_memory_bytes`. + #[clap(long, env = "RW_RESERVED_MEMORY_BYTES")] + pub reserved_memory_bytes: Option, + /// The parallelism that the compute node will register to the scheduler of the meta service. #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())] #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)] diff --git a/src/compute/src/memory/config.rs b/src/compute/src/memory/config.rs index ca04f2e411b64..552b8189a81a6 100644 --- a/src/compute/src/memory/config.rs +++ b/src/compute/src/memory/config.rs @@ -19,6 +19,8 @@ use risingwave_common::config::{ }; use risingwave_common::util::pretty_bytes::convert; +use crate::ComputeNodeOpts; + /// The minimal memory requirement of computing tasks in megabytes. pub const MIN_COMPUTE_MEMORY_MB: usize = 512; /// The memory reserved for system usage (stack and code segment of processes, allocation @@ -41,20 +43,24 @@ const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3; /// Each compute node reserves some memory for stack and code segment of processes, allocation /// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory /// size must be larger than `MIN_SYSTEM_RESERVED_MEMORY_MB` -pub fn reserve_memory_bytes(total_memory_bytes: usize) -> (usize, usize) { - if total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 { +pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) { + if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 { panic!( "The total memory size ({}) is too small. It must be at least {} MB.", - convert(total_memory_bytes as _), + convert(opts.total_memory_bytes as _), MIN_COMPUTE_MEMORY_MB ); } - let reserved = std::cmp::max( - (total_memory_bytes as f64 * SYSTEM_RESERVED_MEMORY_PROPORTION).ceil() as usize, - MIN_SYSTEM_RESERVED_MEMORY_MB << 20, - ); - (reserved, total_memory_bytes - reserved) + // If `reserved_memory_bytes` is not set, use `SYSTEM_RESERVED_MEMORY_PROPORTION` * `total_memory_bytes`. + let reserved = opts.reserved_memory_bytes.unwrap_or_else(|| { + (opts.total_memory_bytes as f64 * SYSTEM_RESERVED_MEMORY_PROPORTION).ceil() as usize + }); + + // Should have at least `MIN_SYSTEM_RESERVED_MEMORY_MB` for reserved memory. + let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20); + + (reserved, opts.total_memory_bytes - reserved) } /// Decide the memory limit for each storage cache. If not specified in `StorageConfig`, memory @@ -230,21 +236,41 @@ pub fn storage_memory_config( #[cfg(test)] mod tests { + use clap::Parser; use risingwave_common::config::StorageConfig; use super::{reserve_memory_bytes, storage_memory_config}; + use crate::ComputeNodeOpts; #[test] fn test_reserve_memory_bytes() { // at least 512 MB - let (reserved, non_reserved) = reserve_memory_bytes(1536 << 20); - assert_eq!(reserved, 512 << 20); - assert_eq!(non_reserved, 1024 << 20); + { + let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec); + opts.total_memory_bytes = 1536 << 20; + let (reserved, non_reserved) = reserve_memory_bytes(&opts); + assert_eq!(reserved, 512 << 20); + assert_eq!(non_reserved, 1024 << 20); + } // reserve based on proportion - let (reserved, non_reserved) = reserve_memory_bytes(10 << 30); - assert_eq!(reserved, 3 << 30); - assert_eq!(non_reserved, 7 << 30); + { + let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec); + opts.total_memory_bytes = 10 << 30; + let (reserved, non_reserved) = reserve_memory_bytes(&opts); + assert_eq!(reserved, 3 << 30); + assert_eq!(non_reserved, 7 << 30); + } + + // reserve based on opts + { + let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec); + opts.total_memory_bytes = 10 << 30; + opts.reserved_memory_bytes = Some(2 << 30); + let (reserved, non_reserved) = reserve_memory_bytes(&opts); + assert_eq!(reserved, 2 << 30); + assert_eq!(non_reserved, 8 << 30); + } } #[test] diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index bdf47cbd1309f..9f1c10a0c15bf 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -103,7 +103,7 @@ pub async fn compute_node_serve( // Register to the cluster. We're not ready to serve until activate is called. let (meta_client, system_params) = MetaClient::register_new( - opts.meta_address, + opts.meta_address.clone(), WorkerType::ComputeNode, &advertise_addr, Property { @@ -122,8 +122,7 @@ pub async fn compute_node_serve( let embedded_compactor_enabled = embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor); - let (reserved_memory_bytes, non_reserved_memory_bytes) = - reserve_memory_bytes(opts.total_memory_bytes); + let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts); let storage_memory_config = storage_memory_config( non_reserved_memory_bytes, embedded_compactor_enabled,