diff --git a/Cargo.lock b/Cargo.lock index 3079a56189a5..3fa2f49f21eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8310,6 +8310,7 @@ dependencies = [ "parking_lot 0.12.1", "percent-encoding", "pin-project", + "prometheus-client", "prost", "quick-xml 0.29.0", "redis", diff --git a/Cargo.toml b/Cargo.toml index d81acb998d5f..72e13f5e4da8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,6 +108,7 @@ members = [ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1", default-features = false } opendal = { version = "0.40", features = [ "layers-minitrace", + "layers-prometheus-client", "services-ipfs", "services-moka", "services-redis", diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index c1d35d44131c..f80b4924c52c 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -38,9 +38,11 @@ use common_meta_app::storage::StorageParams; use common_meta_app::storage::StorageRedisConfig; use common_meta_app::storage::StorageS3Config; use common_meta_app::storage::StorageWebhdfsConfig; +use common_metrics::load_global_prometheus_registry; use opendal::layers::ImmutableIndexLayer; use opendal::layers::LoggingLayer; use opendal::layers::MinitraceLayer; +use opendal::layers::PrometheusClientLayer; use opendal::layers::RetryLayer; use opendal::layers::TimeoutLayer; use opendal::raw::HttpClient; @@ -108,7 +110,10 @@ pub fn build_operator(builder: B) -> Result { .layer(LoggingLayer::default()) // Add tracing .layer(MinitraceLayer) - // TODO(liyz): add PrometheusClientLayer + // Add PrometheusClientLayer + .layer(PrometheusClientLayer::new( + &mut load_global_prometheus_registry(), + )) .finish(); Ok(op) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 6a365f690bdc..6062de36f999 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -112,7 +112,9 @@ pub trait TableContext: Send + Sync { fn get_scan_progress(&self) -> Arc; fn get_scan_progress_value(&self) -> ProgressValues; fn get_write_progress(&self) -> Arc; + fn get_spill_progress(&self) -> Arc; fn get_write_progress_value(&self) -> ProgressValues; + fn get_spill_progress_value(&self) -> ProgressValues; fn get_result_progress(&self) -> Arc; fn get_result_progress_value(&self) -> ProgressValues; fn get_status_info(&self) -> String; diff --git a/src/query/service/src/interpreters/interpreter_query_log.rs b/src/query/service/src/interpreters/interpreter_query_log.rs index c23ebdfb090b..5fa014468d0e 100644 --- a/src/query/service/src/interpreters/interpreter_query_log.rs +++ b/src/query/service/src/interpreters/interpreter_query_log.rs @@ -105,6 +105,8 @@ impl InterpreterQueryLog { let result_bytes = 0u64; let cpu_usage = ctx.get_settings().get_max_threads()? as u32; let memory_usage = ctx.get_current_session().get_memory_usage() as u64; + let join_spilled_rows = 0u64; + let join_spilled_bytes = 0u64; // Client. let client_address = match ctx.get_client_address() { @@ -161,6 +163,8 @@ impl InterpreterQueryLog { result_bytes, cpu_usage, memory_usage, + join_spilled_bytes, + join_spilled_rows, client_info: "".to_string(), client_address, user_agent, @@ -212,6 +216,9 @@ impl InterpreterQueryLog { let cpu_usage = ctx.get_settings().get_max_threads()? as u32; let memory_usage = ctx.get_current_session().get_memory_usage() as u64; + let join_spilled_rows = ctx.get_spill_progress_value().rows as u64; + let join_spilled_bytes = ctx.get_spill_progress_value().bytes as u64; + // Result. let result_rows = ctx.get_result_progress_value().rows as u64; let result_bytes = ctx.get_result_progress_value().bytes as u64; @@ -275,6 +282,8 @@ impl InterpreterQueryLog { result_bytes, cpu_usage, memory_usage, + join_spilled_bytes, + join_spilled_rows, client_info: "".to_string(), client_address, user_agent, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_state.rs index 2fe5abddb23e..92ae9eb074fe 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_state.rs @@ -54,7 +54,7 @@ impl BuildSpillState { let tenant = ctx.get_tenant(); let spill_config = SpillerConfig::create(query_spill_prefix(&tenant)); let operator = DataOperator::instance().operator(); - let spiller = Spiller::create(operator, spill_config, SpillerType::HashJoinBuild); + let spiller = Spiller::create(ctx, operator, spill_config, SpillerType::HashJoinBuild); Self { build_state, spill_coordinator, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/probe_spill_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/probe_spill_state.rs index 805963cfa4a7..d133d4a58ed1 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/probe_spill_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/probe_spill_state.rs @@ -41,7 +41,7 @@ impl ProbeSpillState { let tenant = ctx.get_tenant(); let spill_config = SpillerConfig::create(query_spill_prefix(&tenant)); let operator = DataOperator::instance().operator(); - let spiller = Spiller::create(operator, spill_config, SpillerType::HashJoinProbe); + let spiller = Spiller::create(ctx, operator, spill_config, SpillerType::HashJoinProbe); Self { probe_state, spiller, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index bdec9424da76..d5e072fc51b0 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -319,10 +319,18 @@ impl TableContext for QueryContext { self.shared.write_progress.clone() } + fn get_spill_progress(&self) -> Arc { + self.shared.spill_progress.clone() + } + fn get_write_progress_value(&self) -> ProgressValues { self.shared.write_progress.as_ref().get_values() } + fn get_spill_progress_value(&self) -> ProgressValues { + self.shared.spill_progress.as_ref().get_values() + } + fn get_result_progress(&self) -> Arc { self.shared.result_progress.clone() } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index d3dc580be835..d019942b10e6 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -58,6 +58,8 @@ pub struct QueryContextShared { pub(in crate::sessions) scan_progress: Arc, /// write_progress for write/commit metrics of datablocks (uncompressed) pub(in crate::sessions) write_progress: Arc, + /// Record how many bytes/rows have been spilled. + pub(in crate::sessions) spill_progress: Arc, /// result_progress for metrics of result datablocks (uncompressed) pub(in crate::sessions) result_progress: Arc, pub(in crate::sessions) error: Arc>>, @@ -129,6 +131,7 @@ impl QueryContextShared { status: Arc::new(RwLock::new("null".to_string())), user_agent: Arc::new(RwLock::new("null".to_string())), materialized_cte_tables: Arc::new(Default::default()), + spill_progress: Arc::new(Progress::create()), })) } diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 1797dda5200b..68a1b50de8a4 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -16,8 +16,11 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; +use std::sync::Arc; use common_base::base::GlobalUniqName; +use common_base::base::ProgressValues; +use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::arrow::deserialize_column; use common_expression::arrow::serialize_column; @@ -25,6 +28,8 @@ use common_expression::DataBlock; use log::info; use opendal::Operator; +use crate::sessions::QueryContext; + /// Spiller type, currently only supports HashJoin #[derive(Clone, Debug, Eq, PartialEq)] pub enum SpillerType { @@ -61,6 +66,7 @@ impl SpillerConfig { /// 3. Serialization and deserialization input data /// 4. Interact with the underlying storage engine to write and read spilled data pub struct Spiller { + ctx: Arc, operator: Operator, config: SpillerConfig, spiller_type: SpillerType, @@ -78,8 +84,14 @@ pub struct Spiller { impl Spiller { /// Create a new spiller - pub fn create(operator: Operator, config: SpillerConfig, spiller_type: SpillerType) -> Self { + pub fn create( + ctx: Arc, + operator: Operator, + config: SpillerConfig, + spiller_type: SpillerType, + ) -> Self { Self { + ctx, operator, config, spiller_type, @@ -111,14 +123,6 @@ impl Spiller { self.spilled_partition_set.insert(*p_id); let unique_name = GlobalUniqName::unique(); let location = format!("{}/{}", self.config.location_prefix, unique_name); - info!( - "{:?} spilled {:?} rows data into {:?}, partition id is {:?}, worker id is {:?}", - self.spiller_type, - data.num_rows(), - location, - p_id, - worker_id - ); self.partition_location .entry(*p_id) .and_modify(|locs| { @@ -143,6 +147,21 @@ impl Spiller { writer.write(data).await?; } writer.close().await?; + { + let progress_val = ProgressValues { + rows: data.num_rows(), + bytes: data.memory_size(), + }; + self.ctx.get_spill_progress().incr(&progress_val); + } + info!( + "{:?} spilled {:?} rows data into {:?}, partition id is {:?}, worker id is {:?}", + self.spiller_type, + data.num_rows(), + location, + p_id, + worker_id + ); Ok(()) } diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index 3cef845532f2..8b5849fdd042 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -13,9 +13,7 @@ // limitations under the License. use common_base::base::tokio; -use common_base::base::tokio::fs; -use common_base::base::GlobalInstance; -use common_config::InnerConfig; +use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::types::DataType; use common_expression::types::Int32Type; @@ -24,15 +22,23 @@ use common_expression::types::NumberScalar; use common_expression::DataBlock; use common_expression::FromData; use common_expression::ScalarRef; +use common_pipeline_core::query_spill_prefix; use common_storage::DataOperator; use databend_query::spillers::Spiller; use databend_query::spillers::SpillerConfig; use databend_query::spillers::SpillerType; -use databend_query::GlobalServices; +use databend_query::test_kits::TestFixture; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_spill_with_partition() -> Result<()> { - let mut spiller = create_spiller().await?; + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + let tenant = ctx.get_tenant(); + let spiller_config = SpillerConfig::create(query_spill_prefix(&tenant)); + let operator = DataOperator::instance().operator(); + + let mut spiller = Spiller::create(ctx, operator, spiller_config, SpillerType::HashJoinBuild); + spiller.partition_set = vec![0, 1, 2]; // Generate data block: two columns, type is i32, 100 rows @@ -44,7 +50,7 @@ async fn test_spill_with_partition() -> Result<()> { let res = spiller.spill_with_partition(&(0_u8), &data, 0).await; assert!(res.is_ok()); - assert!(spiller.partition_location.get(&0).unwrap()[0].starts_with("_hash_join_build_spill")); + assert!(spiller.partition_location.get(&0).unwrap()[0].starts_with("_query_spill")); // Test read spilled data let data_blocks = spiller.read_spilled_data(&(0_u8)).await?; @@ -65,28 +71,5 @@ async fn test_spill_with_partition() -> Result<()> { } } } - // Delete `_hash_join_build_spill` dir - fs::remove_dir_all("_data/_hash_join_build_spill").await?; Ok(()) } - -#[cfg(debug_assertions)] -async fn create_spiller() -> Result { - let thread_name = match std::thread::current().name() { - None => panic!("thread name is none"), - Some(thread_name) => thread_name.to_string(), - }; - - GlobalInstance::init_testing(&thread_name); - GlobalServices::init_with(InnerConfig::default()).await?; - - let spiller_config = SpillerConfig { - location_prefix: "_hash_join_build_spill".to_string(), - }; - let operator = DataOperator::instance().operator(); - Ok(Spiller::create( - operator, - spiller_config, - SpillerType::HashJoinBuild, - )) -} diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 30e46b83af34..b9014274ed60 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -369,10 +369,18 @@ impl TableContext for CtxDelegation { self.ctx.get_write_progress() } + fn get_spill_progress(&self) -> Arc { + self.ctx.get_spill_progress() + } + fn get_write_progress_value(&self) -> ProgressValues { todo!() } + fn get_spill_progress_value(&self) -> ProgressValues { + todo!() + } + fn get_result_progress(&self) -> Arc { todo!() } diff --git a/src/query/service/tests/it/storages/system.rs b/src/query/service/tests/it/storages/system.rs index ac6e57cf1036..857bcce5614d 100644 --- a/src/query/service/tests/it/storages/system.rs +++ b/src/query/service/tests/it/storages/system.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_base::base::tokio; use common_catalog::table::Table; use common_exception::Result; +use common_expression::block_debug::box_render; use common_expression::block_debug::pretty_format_blocks; use common_meta_app::principal::AuthInfo; use common_meta_app::principal::AuthType; @@ -284,7 +285,14 @@ async fn test_metrics_table() -> Result<()> { assert_eq!(block.num_columns(), 5); assert!(block.num_rows() >= 1); - let output = pretty_format_blocks(result.as_slice())?; + let output = box_render( + &Arc::new(source_plan.output_schema.into()), + result.as_slice(), + 1000, + 1024, + 30, + true, + )?; assert!(output.contains("test_metrics_table_count")); assert!(output.contains("test_metrics_table_histogram")); diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index 528092fae446..4e0d627aa1ed 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -152,6 +152,8 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'is_updatable' | 'information_schema' | 'views' | 'UInt8' | 'TINYINT UNSIGNED' | '' | '' | 'NO' | '' | | 'job_state' | 'system' | 'background_jobs' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | | 'job_type' | 'system' | 'background_jobs' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | +| 'join_spilled_bytes' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | +| 'join_spilled_rows' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'keywords' | 'information_schema' | 'keywords' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'kind' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'labels' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | diff --git a/src/query/service/tests/it/storages/testdata/settings_table.txt b/src/query/service/tests/it/storages/testdata/settings_table.txt index f9aeb025d8ea..9d65acf7f406 100644 --- a/src/query/service/tests/it/storages/testdata/settings_table.txt +++ b/src/query/service/tests/it/storages/testdata/settings_table.txt @@ -50,7 +50,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System | 'retention_period' | '12' | '12' | 'SESSION' | 'Sets the retention period in hours.' | 'UInt64' | | 'sandbox_tenant' | '' | '' | 'SESSION' | 'Injects a custom 'sandbox_tenant' into this session. This is only for testing purposes and will take effect only when 'internal_enable_sandbox_tenant' is turned on.' | 'String' | | 'spilling_bytes_threshold_per_proc' | '0' | '0' | 'SESSION' | 'Sets the maximum amount of memory in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' | -| 'spilling_memory_ratio' | '100' | '100' | 'SESSION' | 'Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' | +| 'spilling_memory_ratio' | '0' | '0' | 'SESSION' | 'Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' | | 'sql_dialect' | 'PostgreSQL' | 'PostgreSQL' | 'SESSION' | 'Sets the SQL dialect. Available values include "PostgreSQL", "MySQL", and "Hive".' | 'String' | | 'storage_fetch_part_num' | '2' | '2' | 'SESSION' | 'Sets the number of partitions that are fetched in parallel from storage during query execution.' | 'UInt64' | | 'storage_io_max_page_bytes_for_read' | '524288' | '524288' | 'SESSION' | 'Sets the maximum byte size of data pages that can be read from storage in a single I/O operation.' | 'UInt64' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1f8acf461f99..ef144c8c55f5 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -276,7 +276,7 @@ impl DefaultSettings { display_in_show_settings: true, }), ("spilling_memory_ratio", DefaultSettingValue { - value: UserSettingValue::UInt64(100), + value: UserSettingValue::UInt64(0), desc: "Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.", possible_values: None, display_in_show_settings: true, diff --git a/src/query/storages/system/src/query_log_table.rs b/src/query/storages/system/src/query_log_table.rs index 20c08ab8313f..224a3f18b2f7 100644 --- a/src/query/storages/system/src/query_log_table.rs +++ b/src/query/storages/system/src/query_log_table.rs @@ -108,6 +108,8 @@ pub struct QueryLogElement { pub result_bytes: u64, pub cpu_usage: u32, pub memory_usage: u64, + pub join_spilled_bytes: u64, + pub join_spilled_rows: u64, // Client. pub client_info: String, @@ -171,6 +173,14 @@ impl SystemLogElement for QueryLogElement { "written_bytes", TableDataType::Number(NumberDataType::UInt64), ), + TableField::new( + "join_spilled_rows", + TableDataType::Number(NumberDataType::UInt64), + ), + TableField::new( + "join_spilled_bytes", + TableDataType::Number(NumberDataType::UInt64), + ), TableField::new( "written_io_bytes", TableDataType::Number(NumberDataType::UInt64), @@ -321,6 +331,14 @@ impl SystemLogElement for QueryLogElement { .next() .unwrap() .push(Scalar::Number(NumberScalar::UInt64(self.written_bytes)).as_ref()); + columns + .next() + .unwrap() + .push(Scalar::Number(NumberScalar::UInt64(self.join_spilled_rows)).as_ref()); + columns + .next() + .unwrap() + .push(Scalar::Number(NumberScalar::UInt64(self.join_spilled_bytes)).as_ref()); columns .next() .unwrap()