diff --git a/Cargo.lock b/Cargo.lock index c028ec24b9e6..93f84a682617 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10245,6 +10245,7 @@ dependencies = [ "risingwave_error", "risingwave_license", "risingwave_pb", + "risingwave_telemetry_event", "rust_decimal", "rusty-fork", "rw_iter_util", @@ -11726,6 +11727,21 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "risingwave_telemetry_event" +version = "2.1.0-alpha" +dependencies = [ + "jsonbb", + "madsim-tokio", + "prost 0.13.1", + "reqwest 0.12.4", + "risingwave_pb", + "thiserror", + "thiserror-ext", + "tracing", + "uuid", +] + [[package]] name = "risingwave_test_runner" version = "2.1.0-alpha" diff --git a/Cargo.toml b/Cargo.toml index 186f2baf1e1a..d7622bee9bb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "src/common/fields-derive", "src/common/heap_profiling", "src/common/metrics", + "src/common/telemetry_event", "src/compute", "src/connector", "src/connector/codec", @@ -241,6 +242,7 @@ risingwave_udf = { path = "./src/expr/udf" } risingwave_variables = { path = "./src/utils/variables" } risingwave_java_binding = { path = "./src/java_binding" } risingwave_jni_core = { path = "src/jni_core" } +risingwave_telemetry_event = { path = "./src/common/telemetry_event" } rw_futures_util = { path = "src/utils/futures_util" } rw_resource_util = { path = "src/utils/resource_util" } rw_iter_util = { path = "src/utils/iter_util" } diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 2cc1d81f1a38..cab8ef2129f0 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -90,6 +90,7 @@ risingwave_common_secret = { path = "./secret" } risingwave_error = { workspace = true } risingwave_license = { workspace = true } risingwave_pb = { workspace = true } +risingwave_telemetry_event = { workspace = true } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } rw_iter_util = { workspace = true } rw_resource_util = { workspace = true } diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index 9cf469af9cd8..0e77adfb9802 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -17,18 +17,22 @@ pub mod pb_compatible; pub mod report; use std::env; -use std::time::SystemTime; use risingwave_pb::telemetry::PbTelemetryClusterType; +pub use risingwave_telemetry_event::{ + current_timestamp, post_telemetry_report_pb, report_event_common, request_to_telemetry_event, + TelemetryError, TelemetryResult, +}; use serde::{Deserialize, Serialize}; use sysinfo::System; -use thiserror_ext::AsReport; use crate::util::env_var::env_var_is_true_or; use crate::util::resource_util::cpu::total_cpu_available; use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes}; use crate::RW_VERSION; +type Result = core::result::Result; + pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE"; pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWave Cloud pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes"; @@ -50,21 +54,13 @@ pub fn telemetry_cluster_type_from_env_var() -> PbTelemetryClusterType { } /// Url of telemetry backend -pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; - +pub use risingwave_telemetry_event::TELEMETRY_REPORT_URL; /// Telemetry reporting interval in seconds, 6 hours pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60; /// Environment Variable that is default to be true const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY"; -pub type TelemetryResult = core::result::Result; - -/// Telemetry errors are generally recoverable/ignorable. `String` is good enough. -pub type TelemetryError = String; - -type Result = core::result::Result; - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum TelemetryNodeType { Meta, @@ -148,40 +144,12 @@ impl Default for SystemData { } } -/// Sends a `POST` request of the telemetry reporting to a URL. -pub async fn post_telemetry_report_pb(url: &str, report_body: Vec) -> Result<()> { - let client = reqwest::Client::new(); - let res = client - .post(url) - .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf") - .body(report_body) - .send() - .await - .map_err(|err| format!("failed to send telemetry report, err: {}", err.as_report()))?; - if res.status().is_success() { - Ok(()) - } else { - Err(format!( - "telemetry response is error, url {}, status {}", - url, - res.status() - )) - } -} - /// check whether telemetry is enabled in environment variable pub fn telemetry_env_enabled() -> bool { // default to be true env_var_is_true_or(TELEMETRY_ENV_ENABLE, true) } -pub fn current_timestamp() -> u64 { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Clock might go backward") - .as_secs() -} - pub fn report_scarf_enabled() -> bool { telemetry_env_enabled() && !matches!( diff --git a/src/common/src/telemetry/report.rs b/src/common/src/telemetry/report.rs index 38d1c4806563..0d12491269ac 100644 --- a/src/common/src/telemetry/report.rs +++ b/src/common/src/telemetry/report.rs @@ -12,21 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; -use prost::Message; -use risingwave_pb::telemetry::{ - EventMessage as PbEventMessage, PbTelemetryDatabaseObject, - TelemetryEventStage as PbTelemetryEventStage, +pub use risingwave_telemetry_event::{ + current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, }; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use tokio::time::{interval, Duration}; use uuid::Uuid; -use super::{current_timestamp, Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL}; +use super::{Result, TELEMETRY_REPORT_INTERVAL}; use crate::telemetry::pb_compatible::TelemetryToProtobuf; -use crate::telemetry::post_telemetry_report_pb; #[async_trait::async_trait] pub trait TelemetryInfoFetcher { @@ -47,8 +44,6 @@ pub trait TelemetryReportCreator { fn report_type(&self) -> &str; } -static TELEMETRY_TRACKING_ID: OnceLock = OnceLock::new(); - pub async fn start_telemetry_reporting( info_fetcher: Arc, report_creator: Arc, @@ -126,99 +121,3 @@ where }); (join_handle, shutdown_tx) } - -pub fn report_event_common( - event_stage: PbTelemetryEventStage, - event_name: &str, - catalog_id: i64, - connector_name: Option, - object: Option, - attributes: Option, // any json string - node: String, -) { - let event_tracking_id: String; - if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() { - event_tracking_id = tracking_id.to_string(); - } else { - tracing::info!("Telemetry tracking_id is not set, event reporting disabled"); - return; - } - - request_to_telemetry_event( - event_tracking_id, - event_stage, - event_name, - catalog_id, - connector_name, - object, - attributes, - node, - false, - ); -} - -fn request_to_telemetry_event( - tracking_id: String, - event_stage: PbTelemetryEventStage, - event_name: &str, - catalog_id: i64, - connector_name: Option, - object: Option, - attributes: Option, // any json string - node: String, - is_test: bool, -) { - let event = PbEventMessage { - tracking_id, - event_time_sec: current_timestamp(), - event_stage: event_stage as i32, - event_name: event_name.to_string(), - connector_name, - object: object.map(|c| c as i32), - catalog_id, - attributes: attributes.map(|a| a.to_string()), - node, - is_test, - }; - let report_bytes = event.encode_to_vec(); - - tokio::spawn(async move { - const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; - let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); - post_telemetry_report_pb(&url, report_bytes) - .await - .unwrap_or_else(|e| tracing::info!("{}", e)) - }); -} - -#[cfg(test)] -mod test { - - use super::*; - - #[ignore] - #[tokio::test] - async fn test_telemetry_report_event() { - let event_stage = PbTelemetryEventStage::CreateStreamJob; - let event_name = "test_feature"; - let catalog_id = 1; - let connector_name = Some("test_connector".to_string()); - let object = Some(PbTelemetryDatabaseObject::Source); - let attributes = None; - let node = "test_node".to_string(); - - request_to_telemetry_event( - "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), - event_stage, - event_name, - catalog_id, - connector_name, - object, - attributes, - node, - true, - ); - - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } -} diff --git a/src/common/telemetry_event/Cargo.toml b/src/common/telemetry_event/Cargo.toml new file mode 100644 index 000000000000..6007f1cdc331 --- /dev/null +++ b/src/common/telemetry_event/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "risingwave_telemetry_event" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +jsonbb = { workspace = true } +prost = { workspace = true } +reqwest = { version = "0.12.2", features = ["json"] } +risingwave_pb = { workspace = true } +thiserror = "1" +thiserror-ext = { workspace = true } +tokio = { version = "0.2", package = "madsim-tokio", features = [ + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", +] } +tracing = "0.1" +uuid = { version = "1", features = ["v4"] } diff --git a/src/common/telemetry_event/src/lib.rs b/src/common/telemetry_event/src/lib.rs new file mode 100644 index 000000000000..7a26164f9156 --- /dev/null +++ b/src/common/telemetry_event/src/lib.rs @@ -0,0 +1,131 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Move the Telemetry's Event Report functions here +/// Keep the stats report module in the common/ module +mod util; + +use std::sync::OnceLock; + +use prost::Message; +use risingwave_pb::telemetry::{ + EventMessage as PbEventMessage, PbTelemetryDatabaseObject, + TelemetryEventStage as PbTelemetryEventStage, +}; +pub use util::*; + +pub type TelemetryResult = core::result::Result; + +/// Telemetry errors are generally recoverable/ignorable. `String` is good enough. +pub type TelemetryError = String; + +pub static TELEMETRY_TRACKING_ID: OnceLock = OnceLock::new(); + +pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; + +pub fn report_event_common( + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + object: Option, + attributes: Option, // any json string + node: String, +) { + let event_tracking_id: String; + if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() { + event_tracking_id = tracking_id.to_string(); + } else { + tracing::info!("Telemetry tracking_id is not set, event reporting disabled"); + return; + } + + request_to_telemetry_event( + event_tracking_id, + event_stage, + event_name, + catalog_id, + connector_name, + object, + attributes, + node, + false, + ); +} + +pub fn request_to_telemetry_event( + tracking_id: String, + event_stage: PbTelemetryEventStage, + event_name: &str, + catalog_id: i64, + connector_name: Option, + object: Option, + attributes: Option, // any json string + node: String, + is_test: bool, +) { + let event = PbEventMessage { + tracking_id, + event_time_sec: current_timestamp(), + event_stage: event_stage as i32, + event_name: event_name.to_string(), + connector_name, + object: object.map(|c| c as i32), + catalog_id, + attributes: attributes.map(|a| a.to_string()), + node, + is_test, + }; + let report_bytes = event.encode_to_vec(); + + tokio::spawn(async move { + const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; + let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); + post_telemetry_report_pb(&url, report_bytes) + .await + .unwrap_or_else(|e| tracing::info!("{}", e)) + }); +} + +#[cfg(test)] +mod test { + + use super::*; + + #[ignore] + #[tokio::test] + async fn test_telemetry_report_event() { + let event_stage = PbTelemetryEventStage::CreateStreamJob; + let event_name = "test_feature"; + let catalog_id = 1; + let connector_name = Some("test_connector".to_string()); + let object = Some(PbTelemetryDatabaseObject::Source); + let attributes = None; + let node = "test_node".to_string(); + + request_to_telemetry_event( + "7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(), + event_stage, + event_name, + catalog_id, + connector_name, + object, + attributes, + node, + true, + ); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } +} diff --git a/src/common/telemetry_event/src/util.rs b/src/common/telemetry_event/src/util.rs new file mode 100644 index 000000000000..f7e390792064 --- /dev/null +++ b/src/common/telemetry_event/src/util.rs @@ -0,0 +1,49 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::SystemTime; + +use thiserror_ext::AsReport; + +use crate::TelemetryError; + +type Result = core::result::Result; + +pub fn current_timestamp() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Clock might go backward") + .as_secs() +} + +/// Sends a `POST` request of the telemetry reporting to a URL. +pub async fn post_telemetry_report_pb(url: &str, report_body: Vec) -> Result<()> { + let client = reqwest::Client::new(); + let res = client + .post(url) + .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf") + .body(report_body) + .send() + .await + .map_err(|err| format!("failed to send telemetry report, err: {}", err.as_report()))?; + if res.status().is_success() { + Ok(()) + } else { + Err(format!( + "telemetry response is error, url {}, status {}", + url, + res.status() + )) + } +} diff --git a/src/frontend/src/telemetry.rs b/src/frontend/src/telemetry.rs index 19777a28ca90..7c30d6ac0e43 100644 --- a/src/frontend/src/telemetry.rs +++ b/src/frontend/src/telemetry.rs @@ -14,9 +14,10 @@ use prost::Message; use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; -use risingwave_common::telemetry::report::{report_event_common, TelemetryReportCreator}; +use risingwave_common::telemetry::report::TelemetryReportCreator; use risingwave_common::telemetry::{ - current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, + current_timestamp, report_event_common, SystemData, TelemetryNodeType, TelemetryReportBase, + TelemetryResult, }; use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use serde::{Deserialize, Serialize}; diff --git a/src/meta/src/telemetry.rs b/src/meta/src/telemetry.rs index a76d76c58c6e..b3858c4edde7 100644 --- a/src/meta/src/telemetry.rs +++ b/src/meta/src/telemetry.rs @@ -15,12 +15,10 @@ use prost::Message; use risingwave_common::config::MetaBackend; use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; -use risingwave_common::telemetry::report::{ - report_event_common, TelemetryInfoFetcher, TelemetryReportCreator, -}; +use risingwave_common::telemetry::report::{TelemetryInfoFetcher, TelemetryReportCreator}; use risingwave_common::telemetry::{ - current_timestamp, telemetry_cluster_type_from_env_var, SystemData, TelemetryNodeType, - TelemetryReportBase, TelemetryResult, + current_timestamp, report_event_common, telemetry_cluster_type_from_env_var, SystemData, + TelemetryNodeType, TelemetryReportBase, TelemetryResult, }; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_pb::common::WorkerType; diff --git a/src/storage/compactor/src/telemetry.rs b/src/storage/compactor/src/telemetry.rs index 815b485f9e89..00104986a1da 100644 --- a/src/storage/compactor/src/telemetry.rs +++ b/src/storage/compactor/src/telemetry.rs @@ -14,9 +14,10 @@ use prost::Message; use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf; -use risingwave_common::telemetry::report::{report_event_common, TelemetryReportCreator}; +use risingwave_common::telemetry::report::TelemetryReportCreator; use risingwave_common::telemetry::{ - current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult, + current_timestamp, report_event_common, SystemData, TelemetryNodeType, TelemetryReportBase, + TelemetryResult, }; use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use serde::{Deserialize, Serialize}; diff --git a/src/stream/src/telemetry.rs b/src/stream/src/telemetry.rs index e64f71fa94e1..a22d69a20a40 100644 --- a/src/stream/src/telemetry.rs +++ b/src/stream/src/telemetry.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::telemetry::report::report_event_common; +use risingwave_common::telemetry::report_event_common; use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute";