From ff7834427c50bc0328497544b2bda065186f479f Mon Sep 17 00:00:00 2001 From: baishen Date: Thu, 26 Dec 2024 14:19:15 +0800 Subject: [PATCH] feat(query): Add `external_block_rows` metrics (#17116) * feat(query): Add `external_block_rows` metrics * fix * add init_semaphore * add rows metrics --- .../base/src/runtime/metrics/histogram.rs | 5 ++++ src/common/base/src/runtime/metrics/mod.rs | 1 + .../base/src/runtime/metrics/registry.rs | 6 ++++ .../metrics/src/metrics/external_server.rs | 18 +++++++++-- .../src/pipelines/builders/builder_udf.rs | 9 +++++- .../transforms/transform_udf_server.rs | 30 +++++++++++++++---- 6 files changed, 60 insertions(+), 9 deletions(-) diff --git a/src/common/base/src/runtime/metrics/histogram.rs b/src/common/base/src/runtime/metrics/histogram.rs index 6ac594b3b981..f0b1bdcf479f 100644 --- a/src/common/base/src/runtime/metrics/histogram.rs +++ b/src/common/base/src/runtime/metrics/histogram.rs @@ -39,6 +39,11 @@ pub static BUCKET_MILLISECONDS: [f64; 15] = [ 300000.0, 600000.0, 1800000.0, ]; +pub static BUCKET_ROWS: [f64; 14] = [ + 1.0, 10.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0, 100000.0, 500000.0, 1000000.0, + 5000000.0, 10000000.0, 50000000.0, +]; + /// Histogram is a port of prometheus-client's Histogram. The only difference is that /// we can reset the histogram. #[derive(Debug)] diff --git a/src/common/base/src/runtime/metrics/mod.rs b/src/common/base/src/runtime/metrics/mod.rs index 3d1563c22be5..cc1e5e3a3670 100644 --- a/src/common/base/src/runtime/metrics/mod.rs +++ b/src/common/base/src/runtime/metrics/mod.rs @@ -34,6 +34,7 @@ pub use registry::register_gauge_family; pub use registry::register_histogram; pub use registry::register_histogram_family; pub use registry::register_histogram_family_in_milliseconds; +pub use registry::register_histogram_family_in_rows; pub use registry::register_histogram_family_in_seconds; pub use registry::register_histogram_in_milliseconds; pub use registry::register_histogram_in_seconds; diff --git a/src/common/base/src/runtime/metrics/registry.rs b/src/common/base/src/runtime/metrics/registry.rs index 97897ad043e3..485ab071e11a 100644 --- a/src/common/base/src/runtime/metrics/registry.rs +++ b/src/common/base/src/runtime/metrics/registry.rs @@ -38,6 +38,7 @@ use crate::runtime::metrics::family_metrics::FamilyHistogram as InnerFamilyHisto use crate::runtime::metrics::gauge::Gauge; use crate::runtime::metrics::histogram::Histogram; use crate::runtime::metrics::histogram::BUCKET_MILLISECONDS; +use crate::runtime::metrics::histogram::BUCKET_ROWS; use crate::runtime::metrics::histogram::BUCKET_SECONDS; use crate::runtime::metrics::process_collector::ProcessCollector; use crate::runtime::metrics::sample::MetricSample; @@ -309,6 +310,11 @@ where T: FamilyLabels { register_histogram_family(name, BUCKET_MILLISECONDS.iter().copied()) } +pub fn register_histogram_family_in_rows(name: &str) -> FamilyHistogram +where T: FamilyLabels { + register_histogram_family(name, BUCKET_ROWS.iter().copied()) +} + pub type FamilyGauge = Family>; pub type FamilyCounter = Family>; pub type FamilyHistogram = Family>; diff --git a/src/common/metrics/src/metrics/external_server.rs b/src/common/metrics/src/metrics/external_server.rs index e712d606ec0e..a0a1899d2d98 100644 --- a/src/common/metrics/src/metrics/external_server.rs +++ b/src/common/metrics/src/metrics/external_server.rs @@ -16,7 +16,8 @@ use std::sync::LazyLock; use std::time::Duration; use databend_common_base::runtime::metrics::register_counter_family; -use databend_common_base::runtime::metrics::register_histogram_family_in_seconds; +use databend_common_base::runtime::metrics::register_histogram_family_in_milliseconds; +use databend_common_base::runtime::metrics::register_histogram_family_in_rows; use databend_common_base::runtime::metrics::FamilyCounter; use databend_common_base::runtime::metrics::FamilyHistogram; @@ -28,12 +29,13 @@ const METRIC_RETRY: &str = "external_retry"; const METRIC_ERROR: &str = "external_error"; const METRIC_RUNNING_REQUESTS: &str = "external_running_requests"; const METRIC_REQUESTS: &str = "external_requests"; +const METRIC_EXTERNAL_BLOCK_ROWS: &str = "external_block_rows"; static REQUEST_EXTERNAL_DURATION: LazyLock> = - LazyLock::new(|| register_histogram_family_in_seconds(METRIC_REQUEST_EXTERNAL_DURATION)); + LazyLock::new(|| register_histogram_family_in_milliseconds(METRIC_REQUEST_EXTERNAL_DURATION)); static CONNECT_EXTERNAL_DURATION: LazyLock> = - LazyLock::new(|| register_histogram_family_in_seconds(METRIC_CONNECT_EXTERNAL_DURATION)); + LazyLock::new(|| register_histogram_family_in_milliseconds(METRIC_CONNECT_EXTERNAL_DURATION)); static RETRY_EXTERNAL: LazyLock> = LazyLock::new(|| register_counter_family(METRIC_RETRY)); @@ -47,6 +49,9 @@ static RUNNING_REQUESTS_EXTERNAL: LazyLock> = static REQUESTS_EXTERNAL_EXTERNAL: LazyLock> = LazyLock::new(|| register_counter_family(METRIC_REQUESTS)); +static EXTERNAL_BLOCK_ROWS: LazyLock> = + LazyLock::new(|| register_histogram_family_in_rows(METRIC_EXTERNAL_BLOCK_ROWS)); + const LABEL_FUNCTION_NAME: &str = "function_name"; const LABEL_ERROR_KIND: &str = "error_kind"; @@ -64,6 +69,13 @@ pub fn record_request_external_duration(function_name: impl Into, durati .observe(duration.as_millis_f64()); } +pub fn record_request_external_block_rows(function_name: impl Into, rows: usize) { + let labels = &vec![(LABEL_FUNCTION_NAME, function_name.into())]; + EXTERNAL_BLOCK_ROWS + .get_or_create(labels) + .observe(rows as f64); +} + pub fn record_retry_external(function_name: impl Into, error_kind: impl Into) { let labels = &vec![ (LABEL_FUNCTION_NAME, function_name.into()), diff --git a/src/query/service/src/pipelines/builders/builder_udf.rs b/src/query/service/src/pipelines/builders/builder_udf.rs index 0bdf290f597e..7045374042ea 100644 --- a/src/query/service/src/pipelines/builders/builder_udf.rs +++ b/src/query/service/src/pipelines/builders/builder_udf.rs @@ -41,8 +41,15 @@ impl PipelineBuilder { )) }) } else { + let semaphore = TransformUdfServer::init_semaphore(self.ctx.clone())?; + let endpoints = TransformUdfServer::init_endpoints(self.ctx.clone(), &udf.udf_funcs)?; self.main_pipeline.try_add_async_transformer(|| { - TransformUdfServer::new(self.ctx.clone(), udf.udf_funcs.clone()) + TransformUdfServer::new( + self.ctx.clone(), + udf.udf_funcs.clone(), + semaphore.clone(), + endpoints.clone(), + ) }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs index f532004bfa0b..c6309c3d002a 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs @@ -34,6 +34,7 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchema; use databend_common_metrics::external_server::record_connect_external_duration; use databend_common_metrics::external_server::record_error_external; +use databend_common_metrics::external_server::record_request_external_block_rows; use databend_common_metrics::external_server::record_request_external_duration; use databend_common_metrics::external_server::record_retry_external; use databend_common_metrics::external_server::record_running_requests_external_finish; @@ -60,15 +61,20 @@ pub struct TransformUdfServer { } impl TransformUdfServer { - pub fn new(ctx: Arc, funcs: Vec) -> Result { + pub fn init_semaphore(ctx: Arc) -> Result> { let settings = ctx.get_settings(); - let connect_timeout = settings.get_external_server_connect_timeout_secs()?; - let request_timeout = settings.get_external_server_request_timeout_secs()?; - let request_batch_rows = settings.get_external_server_request_batch_rows()? as usize; let request_max_threads = settings.get_external_server_request_max_threads()? as usize; - let retry_times = settings.get_external_server_request_retry_times()? as usize; let semaphore = Arc::new(Semaphore::new(request_max_threads)); + Ok(semaphore) + } + pub fn init_endpoints( + ctx: Arc, + funcs: &[UdfFunctionDesc], + ) -> Result>> { + let settings = ctx.get_settings(); + let connect_timeout = settings.get_external_server_connect_timeout_secs()?; + let request_timeout = settings.get_external_server_request_timeout_secs()?; let mut endpoints: BTreeMap> = BTreeMap::new(); for func in funcs.iter() { let server_addr = func.udf_type.as_server().unwrap(); @@ -79,6 +85,19 @@ impl TransformUdfServer { UDFFlightClient::build_endpoint(server_addr, connect_timeout, request_timeout)?; endpoints.insert(server_addr.clone(), endpoint); } + Ok(endpoints) + } + + pub fn new( + ctx: Arc, + funcs: Vec, + semaphore: Arc, + endpoints: BTreeMap>, + ) -> Result { + let settings = ctx.get_settings(); + let connect_timeout = settings.get_external_server_connect_timeout_secs()?; + let request_batch_rows = settings.get_external_server_request_batch_rows()? as usize; + let retry_times = settings.get_external_server_request_retry_times()? as usize; Ok(Self { ctx, @@ -222,6 +241,7 @@ impl AsyncTransform for TransformUdfServer { .map(|start| data_block.slice(start..start + batch_rows.min(rows - start))) .collect(); for func in self.funcs.iter() { + record_request_external_block_rows(func.func_name.clone(), rows); let server_addr = func.udf_type.as_server().unwrap(); let endpoint = self.endpoints.get(server_addr).unwrap(); let tasks: Vec<_> = batch_blocks