Skip to content

Commit

Permalink
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Browse files Browse the repository at this point in the history
…build_keys_state
  • Loading branch information
Dousir9 committed Sep 26, 2023
2 parents bdc2e81 + 4784d7f commit e4cef93
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ members = [
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1", default-features = false }
opendal = { version = "0.40", features = [
"layers-minitrace",
"layers-prometheus-client",
"services-ipfs",
"services-moka",
"services-redis",
Expand Down
7 changes: 6 additions & 1 deletion src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ use common_meta_app::storage::StorageParams;
use common_meta_app::storage::StorageRedisConfig;
use common_meta_app::storage::StorageS3Config;
use common_meta_app::storage::StorageWebhdfsConfig;
use common_metrics::load_global_prometheus_registry;
use opendal::layers::ImmutableIndexLayer;
use opendal::layers::LoggingLayer;
use opendal::layers::MinitraceLayer;
use opendal::layers::PrometheusClientLayer;
use opendal::layers::RetryLayer;
use opendal::layers::TimeoutLayer;
use opendal::raw::HttpClient;
Expand Down Expand Up @@ -108,7 +110,10 @@ pub fn build_operator<B: Builder>(builder: B) -> Result<Operator> {
.layer(LoggingLayer::default())
// Add tracing
.layer(MinitraceLayer)
// TODO(liyz): add PrometheusClientLayer
// Add PrometheusClientLayer
.layer(PrometheusClientLayer::new(
&mut load_global_prometheus_registry(),
))
.finish();

Ok(op)
Expand Down
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ pub trait TableContext: Send + Sync {
fn get_scan_progress(&self) -> Arc<Progress>;
fn get_scan_progress_value(&self) -> ProgressValues;
fn get_write_progress(&self) -> Arc<Progress>;
fn get_spill_progress(&self) -> Arc<Progress>;
fn get_write_progress_value(&self) -> ProgressValues;
fn get_spill_progress_value(&self) -> ProgressValues;
fn get_result_progress(&self) -> Arc<Progress>;
fn get_result_progress_value(&self) -> ProgressValues;
fn get_status_info(&self) -> String;
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/interpreters/interpreter_query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl InterpreterQueryLog {
let result_bytes = 0u64;
let cpu_usage = ctx.get_settings().get_max_threads()? as u32;
let memory_usage = ctx.get_current_session().get_memory_usage() as u64;
let join_spilled_rows = 0u64;
let join_spilled_bytes = 0u64;

// Client.
let client_address = match ctx.get_client_address() {
Expand Down Expand Up @@ -161,6 +163,8 @@ impl InterpreterQueryLog {
result_bytes,
cpu_usage,
memory_usage,
join_spilled_bytes,
join_spilled_rows,
client_info: "".to_string(),
client_address,
user_agent,
Expand Down Expand Up @@ -212,6 +216,9 @@ impl InterpreterQueryLog {
let cpu_usage = ctx.get_settings().get_max_threads()? as u32;
let memory_usage = ctx.get_current_session().get_memory_usage() as u64;

let join_spilled_rows = ctx.get_spill_progress_value().rows as u64;
let join_spilled_bytes = ctx.get_spill_progress_value().bytes as u64;

// Result.
let result_rows = ctx.get_result_progress_value().rows as u64;
let result_bytes = ctx.get_result_progress_value().bytes as u64;
Expand Down Expand Up @@ -275,6 +282,8 @@ impl InterpreterQueryLog {
result_bytes,
cpu_usage,
memory_usage,
join_spilled_bytes,
join_spilled_rows,
client_info: "".to_string(),
client_address,
user_agent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl BuildSpillState {
let tenant = ctx.get_tenant();
let spill_config = SpillerConfig::create(query_spill_prefix(&tenant));
let operator = DataOperator::instance().operator();
let spiller = Spiller::create(operator, spill_config, SpillerType::HashJoinBuild);
let spiller = Spiller::create(ctx, operator, spill_config, SpillerType::HashJoinBuild);
Self {
build_state,
spill_coordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ProbeSpillState {
let tenant = ctx.get_tenant();
let spill_config = SpillerConfig::create(query_spill_prefix(&tenant));
let operator = DataOperator::instance().operator();
let spiller = Spiller::create(operator, spill_config, SpillerType::HashJoinProbe);
let spiller = Spiller::create(ctx, operator, spill_config, SpillerType::HashJoinProbe);
Self {
probe_state,
spiller,
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,18 @@ impl TableContext for QueryContext {
self.shared.write_progress.clone()
}

fn get_spill_progress(&self) -> Arc<Progress> {
self.shared.spill_progress.clone()
}

fn get_write_progress_value(&self) -> ProgressValues {
self.shared.write_progress.as_ref().get_values()
}

fn get_spill_progress_value(&self) -> ProgressValues {
self.shared.spill_progress.as_ref().get_values()
}

fn get_result_progress(&self) -> Arc<Progress> {
self.shared.result_progress.clone()
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct QueryContextShared {
pub(in crate::sessions) scan_progress: Arc<Progress>,
/// write_progress for write/commit metrics of datablocks (uncompressed)
pub(in crate::sessions) write_progress: Arc<Progress>,
/// Record how many bytes/rows have been spilled.
pub(in crate::sessions) spill_progress: Arc<Progress>,
/// result_progress for metrics of result datablocks (uncompressed)
pub(in crate::sessions) result_progress: Arc<Progress>,
pub(in crate::sessions) error: Arc<Mutex<Option<ErrorCode>>>,
Expand Down Expand Up @@ -129,6 +131,7 @@ impl QueryContextShared {
status: Arc::new(RwLock::new("null".to_string())),
user_agent: Arc::new(RwLock::new("null".to_string())),
materialized_cte_tables: Arc::new(Default::default()),
spill_progress: Arc::new(Progress::create()),
}))
}

Expand Down
37 changes: 28 additions & 9 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::Arc;

use common_base::base::GlobalUniqName;
use common_base::base::ProgressValues;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::arrow::deserialize_column;
use common_expression::arrow::serialize_column;
use common_expression::DataBlock;
use log::info;
use opendal::Operator;

use crate::sessions::QueryContext;

/// Spiller type, currently only supports HashJoin
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum SpillerType {
Expand Down Expand Up @@ -61,6 +66,7 @@ impl SpillerConfig {
/// 3. Serialization and deserialization input data
/// 4. Interact with the underlying storage engine to write and read spilled data
pub struct Spiller {
ctx: Arc<QueryContext>,
operator: Operator,
config: SpillerConfig,
spiller_type: SpillerType,
Expand All @@ -78,8 +84,14 @@ pub struct Spiller {

impl Spiller {
/// Create a new spiller
pub fn create(operator: Operator, config: SpillerConfig, spiller_type: SpillerType) -> Self {
pub fn create(
ctx: Arc<QueryContext>,
operator: Operator,
config: SpillerConfig,
spiller_type: SpillerType,
) -> Self {
Self {
ctx,
operator,
config,
spiller_type,
Expand Down Expand Up @@ -111,14 +123,6 @@ impl Spiller {
self.spilled_partition_set.insert(*p_id);
let unique_name = GlobalUniqName::unique();
let location = format!("{}/{}", self.config.location_prefix, unique_name);
info!(
"{:?} spilled {:?} rows data into {:?}, partition id is {:?}, worker id is {:?}",
self.spiller_type,
data.num_rows(),
location,
p_id,
worker_id
);
self.partition_location
.entry(*p_id)
.and_modify(|locs| {
Expand All @@ -143,6 +147,21 @@ impl Spiller {
writer.write(data).await?;
}
writer.close().await?;
{
let progress_val = ProgressValues {
rows: data.num_rows(),
bytes: data.memory_size(),
};
self.ctx.get_spill_progress().incr(&progress_val);
}
info!(
"{:?} spilled {:?} rows data into {:?}, partition id is {:?}, worker id is {:?}",
self.spiller_type,
data.num_rows(),
location,
p_id,
worker_id
);
Ok(())
}

Expand Down
41 changes: 12 additions & 29 deletions src/query/service/tests/it/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// limitations under the License.

use common_base::base::tokio;
use common_base::base::tokio::fs;
use common_base::base::GlobalInstance;
use common_config::InnerConfig;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::Int32Type;
Expand All @@ -24,15 +22,23 @@ use common_expression::types::NumberScalar;
use common_expression::DataBlock;
use common_expression::FromData;
use common_expression::ScalarRef;
use common_pipeline_core::query_spill_prefix;
use common_storage::DataOperator;
use databend_query::spillers::Spiller;
use databend_query::spillers::SpillerConfig;
use databend_query::spillers::SpillerType;
use databend_query::GlobalServices;
use databend_query::test_kits::TestFixture;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_spill_with_partition() -> Result<()> {
let mut spiller = create_spiller().await?;
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();
let tenant = ctx.get_tenant();
let spiller_config = SpillerConfig::create(query_spill_prefix(&tenant));
let operator = DataOperator::instance().operator();

let mut spiller = Spiller::create(ctx, operator, spiller_config, SpillerType::HashJoinBuild);

spiller.partition_set = vec![0, 1, 2];

// Generate data block: two columns, type is i32, 100 rows
Expand All @@ -44,7 +50,7 @@ async fn test_spill_with_partition() -> Result<()> {
let res = spiller.spill_with_partition(&(0_u8), &data, 0).await;

assert!(res.is_ok());
assert!(spiller.partition_location.get(&0).unwrap()[0].starts_with("_hash_join_build_spill"));
assert!(spiller.partition_location.get(&0).unwrap()[0].starts_with("_query_spill"));

// Test read spilled data
let data_blocks = spiller.read_spilled_data(&(0_u8)).await?;
Expand All @@ -65,28 +71,5 @@ async fn test_spill_with_partition() -> Result<()> {
}
}
}
// Delete `_hash_join_build_spill` dir
fs::remove_dir_all("_data/_hash_join_build_spill").await?;
Ok(())
}

#[cfg(debug_assertions)]
async fn create_spiller() -> Result<Spiller> {
let thread_name = match std::thread::current().name() {
None => panic!("thread name is none"),
Some(thread_name) => thread_name.to_string(),
};

GlobalInstance::init_testing(&thread_name);
GlobalServices::init_with(InnerConfig::default()).await?;

let spiller_config = SpillerConfig {
location_prefix: "_hash_join_build_spill".to_string(),
};
let operator = DataOperator::instance().operator();
Ok(Spiller::create(
operator,
spiller_config,
SpillerType::HashJoinBuild,
))
}
8 changes: 8 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,18 @@ impl TableContext for CtxDelegation {
self.ctx.get_write_progress()
}

fn get_spill_progress(&self) -> Arc<Progress> {
self.ctx.get_spill_progress()
}

fn get_write_progress_value(&self) -> ProgressValues {
todo!()
}

fn get_spill_progress_value(&self) -> ProgressValues {
todo!()
}

fn get_result_progress(&self) -> Arc<Progress> {
todo!()
}
Expand Down
10 changes: 9 additions & 1 deletion src/query/service/tests/it/storages/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use common_base::base::tokio;
use common_catalog::table::Table;
use common_exception::Result;
use common_expression::block_debug::box_render;
use common_expression::block_debug::pretty_format_blocks;
use common_meta_app::principal::AuthInfo;
use common_meta_app::principal::AuthType;
Expand Down Expand Up @@ -284,7 +285,14 @@ async fn test_metrics_table() -> Result<()> {
assert_eq!(block.num_columns(), 5);
assert!(block.num_rows() >= 1);

let output = pretty_format_blocks(result.as_slice())?;
let output = box_render(
&Arc::new(source_plan.output_schema.into()),
result.as_slice(),
1000,
1024,
30,
true,
)?;
assert!(output.contains("test_metrics_table_count"));
assert!(output.contains("test_metrics_table_histogram"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| 'is_updatable' | 'information_schema' | 'views' | 'UInt8' | 'TINYINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'job_state' | 'system' | 'background_jobs' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' |
| 'job_type' | 'system' | 'background_jobs' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' |
| 'join_spilled_bytes' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'join_spilled_rows' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'keywords' | 'information_schema' | 'keywords' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'kind' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'labels' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'retention_period' | '12' | '12' | 'SESSION' | 'Sets the retention period in hours.' | 'UInt64' |
| 'sandbox_tenant' | '' | '' | 'SESSION' | 'Injects a custom 'sandbox_tenant' into this session. This is only for testing purposes and will take effect only when 'internal_enable_sandbox_tenant' is turned on.' | 'String' |
| 'spilling_bytes_threshold_per_proc' | '0' | '0' | 'SESSION' | 'Sets the maximum amount of memory in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' |
| 'spilling_memory_ratio' | '100' | '100' | 'SESSION' | 'Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' |
| 'spilling_memory_ratio' | '0' | '0' | 'SESSION' | 'Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.' | 'UInt64' |
| 'sql_dialect' | 'PostgreSQL' | 'PostgreSQL' | 'SESSION' | 'Sets the SQL dialect. Available values include "PostgreSQL", "MySQL", and "Hive".' | 'String' |
| 'storage_fetch_part_num' | '2' | '2' | 'SESSION' | 'Sets the number of partitions that are fetched in parallel from storage during query execution.' | 'UInt64' |
| 'storage_io_max_page_bytes_for_read' | '524288' | '524288' | 'SESSION' | 'Sets the maximum byte size of data pages that can be read from storage in a single I/O operation.' | 'UInt64' |
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl DefaultSettings {
display_in_show_settings: true,
}),
("spilling_memory_ratio", DefaultSettingValue {
value: UserSettingValue::UInt64(100),
value: UserSettingValue::UInt64(0),
desc: "Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.",
possible_values: None,
display_in_show_settings: true,
Expand Down
Loading

0 comments on commit e4cef93

Please sign in to comment.