Skip to content

Commit

Permalink
feat: telemetry report in pb (#15063)
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Mar 8, 2024
1 parent bffaa38 commit 45cd2f5
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 99 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 4 additions & 78 deletions src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@
// limitations under the License.

pub mod manager;
pub mod pb_compatible;
pub mod report;

use std::time::SystemTime;

use risingwave_pb::telemetry::{
ReportBase as PbTelemetryReportBase, SystemCpu as PbSystemCpu, SystemData as PbSystemData,
SystemMemory as PbSystemMemory, SystemOs as PbSystemOs,
TelemetryNodeType as PbTelemetryNodeType,
};
use serde::{Deserialize, Serialize};
use sysinfo::System;
use thiserror_ext::AsReport;
Expand All @@ -31,7 +27,7 @@ use crate::util::resource_util::cpu::total_cpu_available;
use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};

/// Url of telemetry backend
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v1/report";
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";

/// Telemetry reporting interval in seconds, 6 hours
pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60;
Expand Down Expand Up @@ -70,19 +66,6 @@ pub struct TelemetryReportBase {
pub node_type: TelemetryNodeType,
}

impl From<TelemetryReportBase> for PbTelemetryReportBase {
fn from(val: TelemetryReportBase) -> Self {
PbTelemetryReportBase {
tracking_id: val.tracking_id,
session_id: val.session_id,
system_data: Some(val.system_data.into()),
up_time: val.up_time,
report_time: val.time_stamp,
node_type: from_telemetry_node_type(val.node_type) as i32,
}
}
}

pub trait TelemetryReport: Serialize {}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -140,11 +123,11 @@ impl Default for SystemData {
}

/// Sends a `POST` request of the telemetry reporting to a URL.
async fn post_telemetry_report(url: &str, report_body: String) -> Result<()> {
async fn post_telemetry_report_pb(url: &str, report_body: Vec<u8>) -> Result<()> {
let client = reqwest::Client::new();
let res = client
.post(url)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
.body(report_body)
.send()
.await
Expand Down Expand Up @@ -173,63 +156,6 @@ pub fn current_timestamp() -> u64 {
.as_secs()
}

fn from_telemetry_node_type(t: TelemetryNodeType) -> PbTelemetryNodeType {
match t {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}

impl From<TelemetryNodeType> for PbTelemetryNodeType {
fn from(val: TelemetryNodeType) -> Self {
match val {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}
}

impl From<Cpu> for PbSystemCpu {
fn from(val: Cpu) -> Self {
PbSystemCpu {
available: val.available,
}
}
}

impl From<Memory> for PbSystemMemory {
fn from(val: Memory) -> Self {
PbSystemMemory {
used: val.used as u64,
total: val.total as u64,
}
}
}

impl From<Os> for PbSystemOs {
fn from(val: Os) -> Self {
PbSystemOs {
name: val.name,
kernel_version: val.kernel_version,
version: val.version,
}
}
}

impl From<SystemData> for PbSystemData {
fn from(val: SystemData) -> Self {
PbSystemData {
memory: Some(val.memory.into()),
os: Some(val.os.into()),
cpu: Some(val.cpu.into()),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
95 changes: 95 additions & 0 deletions src/common/src/telemetry/pb_compatible.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::telemetry::{
ReportBase as PbTelemetryReportBase, SystemCpu as PbSystemCpu, SystemData as PbSystemData,
SystemMemory as PbSystemMemory, SystemOs as PbSystemOs,
TelemetryNodeType as PbTelemetryNodeType,
};

use crate::telemetry::{Cpu, Memory, Os, SystemData, TelemetryNodeType, TelemetryReportBase};

pub trait TelemetryToProtobuf {
fn to_pb_bytes(self) -> Vec<u8>;
}

impl From<TelemetryReportBase> for PbTelemetryReportBase {
fn from(val: TelemetryReportBase) -> Self {
PbTelemetryReportBase {
tracking_id: val.tracking_id,
session_id: val.session_id,
system_data: Some(val.system_data.into()),
up_time: val.up_time,
report_time: val.time_stamp,
node_type: from_telemetry_node_type(val.node_type) as i32,
}
}
}

fn from_telemetry_node_type(t: TelemetryNodeType) -> PbTelemetryNodeType {
match t {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}

impl From<TelemetryNodeType> for PbTelemetryNodeType {
fn from(val: TelemetryNodeType) -> Self {
match val {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}
}

impl From<Cpu> for PbSystemCpu {
fn from(val: Cpu) -> Self {
PbSystemCpu {
available: val.available,
}
}
}

impl From<Memory> for PbSystemMemory {
fn from(val: Memory) -> Self {
PbSystemMemory {
used: val.used as u64,
total: val.total as u64,
}
}
}

impl From<Os> for PbSystemOs {
fn from(val: Os) -> Self {
PbSystemOs {
name: val.name,
kernel_version: val.kernel_version,
version: val.version,
}
}
}

impl From<SystemData> for PbSystemData {
fn from(val: SystemData) -> Self {
PbSystemData {
memory: Some(val.memory.into()),
os: Some(val.os.into()),
cpu: Some(val.cpu.into()),
}
}
}
20 changes: 8 additions & 12 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use uuid::Uuid;

use super::{
post_telemetry_report, Result, TelemetryReport, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL,
};
use super::{Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use crate::telemetry::pb_compatible::TelemetryToProtobuf;
use crate::telemetry::post_telemetry_report_pb;

#[async_trait::async_trait]
pub trait TelemetryInfoFetcher {
Expand All @@ -37,7 +37,7 @@ pub trait TelemetryReportCreator {
tracking_id: String,
session_id: String,
up_time: u64,
) -> Result<impl TelemetryReport>;
) -> Result<impl TelemetryToProtobuf>;

fn report_type(&self) -> &str;
}
Expand Down Expand Up @@ -85,20 +85,16 @@ where
}

// create a report and serialize to json
let report_json = match report_creator
let bin_report = match report_creator
.create_report(
tracking_id.clone(),
session_id.clone(),
begin_time.elapsed().as_secs(),
)
.await
.map(|r| serde_json::to_string(&r))
.map(TelemetryToProtobuf::to_pb_bytes)
{
Ok(Ok(report_json)) => report_json,
Ok(Err(_)) => {
tracing::error!("Telemetry failed to serialize report to json");
continue;
}
Ok(bin_report) => bin_report,
Err(e) => {
tracing::error!("Telemetry failed to create report {}", e);
continue;
Expand All @@ -108,7 +104,7 @@ where
let url =
(TELEMETRY_REPORT_URL.to_owned() + "/" + report_creator.report_type()).to_owned();

match post_telemetry_report(&url, report_json).await {
match post_telemetry_report_pb(&url, bin_report).await {
Ok(_) => tracing::info!("Telemetry post success, id {}", tracking_id),
Err(e) => tracing::error!("Telemetry post error, {}", e),
}
Expand Down
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ itertools = "0.12"
maplit = "1.0.2"
pprof = { version = "0.13", features = ["flamegraph"] }
prometheus = { version = "0.13" }
prost = { workspace = true }
risingwave_batch = { workspace = true }
risingwave_common = { workspace = true }
risingwave_common_heap_profiling = { workspace = true }
Expand Down
14 changes: 11 additions & 3 deletions src/compute/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message;
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase,
TelemetryResult,
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -54,7 +55,14 @@ pub(crate) struct ComputeTelemetryReport {
base: TelemetryReportBase,
}

impl TelemetryReport for ComputeTelemetryReport {}
impl TelemetryToProtobuf for ComputeTelemetryReport {
fn to_pb_bytes(self) -> Vec<u8> {
let pb_report = risingwave_pb::telemetry::ComputeReport {
base: Some(self.base.into()),
};
pb_report.encode_to_vec()
}
}

impl ComputeTelemetryReport {
pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ postgres-types = { version = "0.2.6" }
pretty-xmlish = "0.1.13"
pretty_assertions = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = { workspace = true }
rand = "0.8"
risingwave_batch = { workspace = true }
risingwave_common = { workspace = true }
Expand Down
14 changes: 11 additions & 3 deletions src/frontend/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message;
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReport, TelemetryReportBase,
TelemetryResult,
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -54,7 +55,14 @@ pub(crate) struct FrontendTelemetryReport {
base: TelemetryReportBase,
}

impl TelemetryReport for FrontendTelemetryReport {}
impl TelemetryToProtobuf for FrontendTelemetryReport {
fn to_pb_bytes(self) -> Vec<u8> {
let pb_report = risingwave_pb::telemetry::FrontendReport {
base: Some(self.base.into()),
};
pb_report.encode_to_vec()
}
}

impl FrontendTelemetryReport {
pub(crate) fn new(tracking_id: String, session_id: String, up_time: u64) -> Self {
Expand Down
Loading

0 comments on commit 45cd2f5

Please sign in to comment.