Skip to content

Commit

Permalink
refactor: make telemetry event a separate crate (#18192)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Aug 22, 2024
1 parent 955c6bb commit 84b7b94
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 154 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"src/common/fields-derive",
"src/common/heap_profiling",
"src/common/metrics",
"src/common/telemetry_event",
"src/compute",
"src/connector",
"src/connector/codec",
Expand Down Expand Up @@ -241,6 +242,7 @@ risingwave_udf = { path = "./src/expr/udf" }
risingwave_variables = { path = "./src/utils/variables" }
risingwave_java_binding = { path = "./src/java_binding" }
risingwave_jni_core = { path = "src/jni_core" }
risingwave_telemetry_event = { path = "./src/common/telemetry_event" }
rw_futures_util = { path = "src/utils/futures_util" }
rw_resource_util = { path = "src/utils/resource_util" }
rw_iter_util = { path = "src/utils/iter_util" }
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ risingwave_common_secret = { path = "./secret" }
risingwave_error = { workspace = true }
risingwave_license = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_telemetry_event = { workspace = true }
rust_decimal = { version = "1", features = ["db-postgres", "maths"] }
rw_iter_util = { workspace = true }
rw_resource_util = { workspace = true }
Expand Down
46 changes: 7 additions & 39 deletions src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ pub mod pb_compatible;
pub mod report;

use std::env;
use std::time::SystemTime;

use risingwave_pb::telemetry::PbTelemetryClusterType;
pub use risingwave_telemetry_event::{
current_timestamp, post_telemetry_report_pb, report_event_common, request_to_telemetry_event,
TelemetryError, TelemetryResult,
};
use serde::{Deserialize, Serialize};
use sysinfo::System;
use thiserror_ext::AsReport;

use crate::util::env_var::env_var_is_true_or;
use crate::util::resource_util::cpu::total_cpu_available;
use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};
use crate::RW_VERSION;

type Result<T> = core::result::Result<T, TelemetryError>;

pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE";
pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWave Cloud
pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes";
Expand All @@ -50,21 +54,13 @@ pub fn telemetry_cluster_type_from_env_var() -> PbTelemetryClusterType {
}

/// Url of telemetry backend
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";

pub use risingwave_telemetry_event::TELEMETRY_REPORT_URL;
/// Telemetry reporting interval in seconds, 6 hours
pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60;

/// Environment Variable that is default to be true
const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY";

pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;

/// Telemetry errors are generally recoverable/ignorable. `String` is good enough.
pub type TelemetryError = String;

type Result<T> = core::result::Result<T, TelemetryError>;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum TelemetryNodeType {
Meta,
Expand Down Expand Up @@ -148,40 +144,12 @@ impl Default for SystemData {
}
}

/// Sends a `POST` request of the telemetry reporting to a URL.
pub async fn post_telemetry_report_pb(url: &str, report_body: Vec<u8>) -> Result<()> {
let client = reqwest::Client::new();
let res = client
.post(url)
.header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
.body(report_body)
.send()
.await
.map_err(|err| format!("failed to send telemetry report, err: {}", err.as_report()))?;
if res.status().is_success() {
Ok(())
} else {
Err(format!(
"telemetry response is error, url {}, status {}",
url,
res.status()
))
}
}

/// check whether telemetry is enabled in environment variable
pub fn telemetry_env_enabled() -> bool {
// default to be true
env_var_is_true_or(TELEMETRY_ENV_ENABLE, true)
}

pub fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock might go backward")
.as_secs()
}

pub fn report_scarf_enabled() -> bool {
telemetry_env_enabled()
&& !matches!(
Expand Down
109 changes: 4 additions & 105 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, OnceLock};
use std::sync::Arc;

use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
pub use risingwave_telemetry_event::{
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use uuid::Uuid;

use super::{current_timestamp, Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use super::{Result, TELEMETRY_REPORT_INTERVAL};
use crate::telemetry::pb_compatible::TelemetryToProtobuf;
use crate::telemetry::post_telemetry_report_pb;

#[async_trait::async_trait]
pub trait TelemetryInfoFetcher {
Expand All @@ -47,8 +44,6 @@ pub trait TelemetryReportCreator {
fn report_type(&self) -> &str;
}

static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();

pub async fn start_telemetry_reporting<F, I>(
info_fetcher: Arc<I>,
report_creator: Arc<F>,
Expand Down Expand Up @@ -126,99 +121,3 @@ where
});
(join_handle, shutdown_tx)
}

pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
node: String,
) {
let event_tracking_id: String;
if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
event_tracking_id = tracking_id.to_string();
} else {
tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
return;
}

request_to_telemetry_event(
event_tracking_id,
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
false,
);
}

fn request_to_telemetry_event(
tracking_id: String,
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
node: String,
is_test: bool,
) {
let event = PbEventMessage {
tracking_id,
event_time_sec: current_timestamp(),
event_stage: event_stage as i32,
event_name: event_name.to_string(),
connector_name,
object: object.map(|c| c as i32),
catalog_id,
attributes: attributes.map(|a| a.to_string()),
node,
is_test,
};
let report_bytes = event.encode_to_vec();

tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::info!("{}", e))
});
}

#[cfg(test)]
mod test {

use super::*;

#[ignore]
#[tokio::test]
async fn test_telemetry_report_event() {
let event_stage = PbTelemetryEventStage::CreateStreamJob;
let event_name = "test_feature";
let catalog_id = 1;
let connector_name = Some("test_connector".to_string());
let object = Some(PbTelemetryDatabaseObject::Source);
let attributes = None;
let node = "test_node".to_string();

request_to_telemetry_event(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
true,
);

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
32 changes: 32 additions & 0 deletions src/common/telemetry_event/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "risingwave_telemetry_event"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["workspace-hack"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]

[dependencies]
jsonbb = { workspace = true }
prost = { workspace = true }
reqwest = { version = "0.12.2", features = ["json"] }
risingwave_pb = { workspace = true }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
] }
tracing = "0.1"
uuid = { version = "1", features = ["v4"] }
Loading

0 comments on commit 84b7b94

Please sign in to comment.