From 1e848526ba487836d72a29b5390689f712b50e0d Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 26 Mar 2024 23:09:41 +0800 Subject: [PATCH] refactor(cmd_all): refactor playground to use `single_node --in-memory` internally (#15897) --- src/cmd_all/src/bin/risingwave.rs | 20 ++- src/cmd_all/src/common.rs | 20 --- src/cmd_all/src/lib.rs | 2 - src/cmd_all/src/playground.rs | 260 ------------------------------ src/cmd_all/src/single_node.rs | 9 ++ src/cmd_all/src/standalone.rs | 16 +- 6 files changed, 33 insertions(+), 294 deletions(-) delete mode 100644 src/cmd_all/src/playground.rs diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index c7be6f9ac25c..4fc326f1089a 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -20,7 +20,7 @@ use anyhow::Result; use clap::error::ErrorKind; use clap::{command, ArgMatches, Args, Command, FromArgMatches}; use risingwave_cmd::{compactor, compute, ctl, frontend, meta}; -use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts}; +use risingwave_cmd_all::{SingleNodeOpts, StandaloneOpts}; use risingwave_common::git_sha; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; @@ -123,7 +123,7 @@ impl Component { Self::Frontend => frontend(parse_opts(matches)), Self::Compactor => compactor(parse_opts(matches)), Self::Ctl => ctl(parse_opts(matches)), - Self::Playground => playground(parse_opts(matches)), + Self::Playground => single_node(SingleNodeOpts::new_for_playground()), Self::Standalone => standalone(parse_opts(matches)), Self::SingleNode => single_node(parse_opts(matches)), } @@ -151,7 +151,7 @@ impl Component { Component::Frontend => FrontendOpts::augment_args(cmd), Component::Compactor => CompactorOpts::augment_args(cmd), Component::Ctl => CtlOpts::augment_args(cmd), - Component::Playground => PlaygroundOpts::augment_args(cmd), + Component::Playground => cmd, Component::Standalone => StandaloneOpts::augment_args(cmd), Component::SingleNode => SingleNodeOpts::augment_args(cmd), } @@ -161,8 +161,14 @@ impl Component { fn commands() -> Vec { Self::iter() .map(|c| { + let is_playground = matches!(c, Component::Playground); let name: &'static str = c.into(); let command = Command::new(name).visible_aliases(c.aliases()); + let command = if is_playground { + command.hide(true) + } else { + command + }; c.augment_args(command) }) .collect() @@ -221,14 +227,6 @@ fn main() -> Result<()> { Ok(()) } -fn playground(opts: PlaygroundOpts) { - let settings = risingwave_rt::LoggerSettings::from_opts(&opts) - .with_target("risingwave_storage", Level::WARN) - .with_thread_name(true); - risingwave_rt::init_risingwave_logger(settings); - risingwave_rt::main_okk(risingwave_cmd_all::playground(opts)).unwrap(); -} - fn standalone(opts: StandaloneOpts) { let opts = risingwave_cmd_all::parse_standalone_opt_args(&opts); let settings = risingwave_rt::LoggerSettings::from_opts(&opts) diff --git a/src/cmd_all/src/common.rs b/src/cmd_all/src/common.rs index 68e9090578ab..cfc55c0d1742 100644 --- a/src/cmd_all/src/common.rs +++ b/src/cmd_all/src/common.rs @@ -17,23 +17,3 @@ use std::ffi::OsString; pub fn osstrs + AsRef>(s: impl AsRef<[T]>) -> Vec { s.as_ref().iter().map(OsString::from).collect() } - -pub enum RisingWaveService { - Compute(Vec), - Meta(Vec), - Frontend(Vec), - #[allow(dead_code)] - Compactor(Vec), -} - -impl RisingWaveService { - /// Extend additional arguments to the service. - pub fn extend_args(&mut self, args: &[&str]) { - match self { - RisingWaveService::Compute(args0) - | RisingWaveService::Meta(args0) - | RisingWaveService::Frontend(args0) - | RisingWaveService::Compactor(args0) => args0.extend(args.iter().map(|s| s.into())), - } - } -} diff --git a/src/cmd_all/src/lib.rs b/src/cmd_all/src/lib.rs index 54ee3243bc66..a872d7de12b3 100644 --- a/src/cmd_all/src/lib.rs +++ b/src/cmd_all/src/lib.rs @@ -15,12 +15,10 @@ #![feature(lazy_cell)] mod common; -pub mod playground; mod standalone; pub mod single_node; -pub use playground::*; pub use single_node::*; pub use standalone::*; diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs deleted file mode 100644 index 1b03048d5d6e..000000000000 --- a/src/cmd_all/src/playground.rs +++ /dev/null @@ -1,260 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::env; -use std::ffi::OsString; -use std::io::Write; -use std::path::Path; -use std::sync::LazyLock; - -use anyhow::Result; -use clap::Parser; -use risingwave_common::util::meta_addr::MetaAddressStrategy; -use tempfile::TempPath; -use tokio::signal; - -use crate::common::{osstrs as common_osstrs, RisingWaveService}; - -const IDLE_EXIT_SECONDS: u64 = 1800; - -/// Embed the config file and create a temporary file at runtime. -static CONFIG_PATH_WITH_IDLE_EXIT: LazyLock = LazyLock::new(|| { - let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); - write!( - file, - "[meta] -disable_recovery = true -dangerous_max_idle_secs = {IDLE_EXIT_SECONDS} -max_heartbeat_interval_secs = 600", - ) - .expect("failed to write config file"); - file.into_temp_path() -}); - -fn osstrs(s: [&str; N]) -> Vec { - common_osstrs(s) -} - -fn get_services(profile: &str) -> (Vec, bool) { - let mut services = match profile { - "playground" => vec![ - RisingWaveService::Meta(osstrs([ - "--dashboard-host", - "0.0.0.0:5691", - "--state-store", - "hummock+memory", - "--data-directory", - "hummock_001", - "--advertise-addr", - "127.0.0.1:5690", - ])), - RisingWaveService::Compute(osstrs([])), - RisingWaveService::Frontend(osstrs([])), - ], - "playground-3cn" => vec![ - RisingWaveService::Meta(osstrs([ - "--dashboard-host", - "0.0.0.0:5691", - "--advertise-addr", - "127.0.0.1:5690", - "--state-store", - "hummock+memory-shared", - "--data-directory", - "hummock_001", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "127.0.0.1:5687", - "--parallelism", - "4", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "127.0.0.1:5688", - "--parallelism", - "4", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "127.0.0.1:5689", - "--parallelism", - "4", - ])), - RisingWaveService::Frontend(osstrs([])), - ], - "online-docker-playground" | "docker-playground" => vec![ - RisingWaveService::Meta(osstrs([ - "--listen-addr", - "0.0.0.0:5690", - "--advertise-addr", - "127.0.0.1:5690", - "--dashboard-host", - "0.0.0.0:5691", - "--state-store", - "hummock+memory", - "--data-directory", - "hummock_001", - ])), - RisingWaveService::Compute(osstrs([ - "--listen-addr", - "0.0.0.0:5688", - "--advertise-addr", - "127.0.0.1:5688", - ])), - RisingWaveService::Frontend(osstrs([ - "--listen-addr", - "0.0.0.0:4566", - "--advertise-addr", - "127.0.0.1:4566", - ])), - ], - _ => { - tracing::warn!("Unknown playground profile. All components will be started using the default command line options."); - return get_services("playground"); - } - }; - let idle_exit = profile != "docker-playground"; - if idle_exit { - services.iter_mut().for_each(|s| { - s.extend_args(&[ - "--config-path", - &CONFIG_PATH_WITH_IDLE_EXIT.as_os_str().to_string_lossy(), - ]) - }) - } - (services, idle_exit) -} - -#[derive(Debug, Clone, Parser)] -#[command(about = "The quick way to start an in-memory RisingWave cluster for playing around")] -pub struct PlaygroundOpts { - /// The profile to use. - #[clap(short, long, env = "PLAYGROUND_PROFILE", default_value = "playground")] - profile: String, -} - -impl risingwave_common::opts::Opts for PlaygroundOpts { - fn name() -> &'static str { - "playground" - } - - fn meta_addr(&self) -> MetaAddressStrategy { - "http://0.0.0.0:5690".parse().unwrap() // hard-coded - } -} - -pub async fn playground(opts: PlaygroundOpts) -> Result<()> { - let profile = opts.profile; - - tracing::info!("launching playground with profile `{}`", profile); - - let (services, idle_exit) = get_services(&profile); - - for service in services { - match service { - RisingWaveService::Meta(mut opts) => { - opts.insert(0, "meta-node".into()); - tracing::info!("starting meta-node thread with cli args: {:?}", opts); - let opts = risingwave_meta_node::MetaNodeOpts::parse_from(opts); - let _meta_handle = tokio::spawn(async move { - risingwave_meta_node::start(opts).await; - tracing::warn!("meta is stopped, shutdown all nodes"); - // As a playground, it's fine to just kill everything. - if idle_exit { - eprintln!("{}", - console::style(format_args!( - "RisingWave playground exited after being idle for {IDLE_EXIT_SECONDS} seconds. Bye!" - )).bold()); - } - std::process::exit(0); - }); - // wait for the service to be ready - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - RisingWaveService::Compute(mut opts) => { - opts.insert(0, "compute-node".into()); - tracing::info!("starting compute-node thread with cli args: {:?}", opts); - let opts = risingwave_compute::ComputeNodeOpts::parse_from(opts); - let _compute_handle = - tokio::spawn(async move { risingwave_compute::start(opts).await }); - } - RisingWaveService::Frontend(mut opts) => { - opts.insert(0, "frontend-node".into()); - tracing::info!("starting frontend-node thread with cli args: {:?}", opts); - let opts = risingwave_frontend::FrontendOpts::parse_from(opts); - let _frontend_handle = - tokio::spawn(async move { risingwave_frontend::start(opts).await }); - } - RisingWaveService::Compactor(mut opts) => { - opts.insert(0, "compactor".into()); - tracing::info!("starting compactor thread with cli args: {:?}", opts); - let opts = risingwave_compactor::CompactorOpts::parse_from(opts); - let _compactor_handle = - tokio::spawn(async move { risingwave_compactor::start(opts).await }); - } - } - } - - // wait for log messages to be flushed - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - eprintln!("-------------------------------"); - eprintln!("RisingWave playground is ready."); - eprint!( - "* {} RisingWave playground SHOULD NEVER be used in benchmarks and production environment!!!\n It is fully in-memory", - console::style("WARNING:").red().bold(), - ); - if idle_exit { - eprintln!( - " and will be automatically stopped after being idle for {}.", - console::style(format_args!("{IDLE_EXIT_SECONDS}s")).dim() - ); - } else { - eprintln!(); - } - let psql_cmd = if let Ok("1") = env::var("RISEDEV").as_deref() { - // Started with `./risedev playground`. - eprintln!( - "* Use {} instead if you want to start a full cluster.", - console::style("./risedev d").blue().bold() - ); - - // This is a specialization of `generate_risedev_env` in - // `src/risedevtool/src/risedev_env.rs`. - let risedev_env = r#" - RW_META_ADDR="http://0.0.0.0:5690" - RW_FRONTEND_LISTEN_ADDRESS="0.0.0.0" - RW_FRONTEND_PORT="4566" - "#; - std::fs::write( - Path::new(&env::var("PREFIX_CONFIG")?).join("risedev-env"), - risedev_env, - )?; - - "./risedev psql" - } else { - "psql -h localhost -p 4566 -d dev -U root" - }; - eprintln!( - "* Run {} in a different terminal to start Postgres interactive shell.", - console::style(psql_cmd).blue().bold() - ); - eprintln!("-------------------------------"); - - // TODO: should we join all handles? - // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now. - signal::ctrl_c().await.unwrap(); - tracing::info!("Ctrl+C received, now exiting"); - - Ok(()) -} diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index aa4fe888196c..ccbab12edb0e 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -53,6 +53,15 @@ pub struct SingleNodeOpts { node_opts: NodeSpecificOpts, } +impl SingleNodeOpts { + pub fn new_for_playground() -> Self { + let empty_args = vec![] as Vec; + let mut opts = SingleNodeOpts::parse_from(empty_args); + opts.in_memory = true; + opts + } +} + /// # Node-Specific Options /// /// ## Which node-specific options should be here? diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 420ccbf60077..384648f9211d 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -14,6 +14,7 @@ use anyhow::Result; use clap::Parser; +use risingwave_common::config::MetaBackend; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; @@ -185,7 +186,9 @@ pub async fn standalone( ) -> Result<()> { tracing::info!("launching Risingwave in standalone mode"); + let mut is_in_memory = false; if let Some(opts) = meta_opts { + is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem)); tracing::info!("starting meta-node thread with cli args: {:?}", opts); let _meta_handle = tokio::spawn(async move { @@ -210,9 +213,20 @@ pub async fn standalone( } // wait for log messages to be flushed - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; eprintln!("-------------------------------"); eprintln!("RisingWave standalone mode is ready."); + if is_in_memory { + eprintln!( + "{}", + console::style( + "WARNING: You are using RisingWave's in-memory mode. +It SHOULD NEVER be used in benchmarks and production environment!!!" + ) + .red() + .bold() + ); + } // TODO: should we join all handles? // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now.