Skip to content

Commit

Permalink
chore: define the telemetry protocol in protobuf (#14952)
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Feb 4, 2024
1 parent 266fe25 commit 42cc7c8
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 0 deletions.
95 changes: 95 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
syntax = "proto3";

package telemetry;

enum MetaBackend {
META_BACKEND_UNSPECIFIED = 0;
META_BACKEND_MEMORY = 1;
META_BACKEND_ETCD = 2;
META_BACKEND_RDB = 3;
}

enum TelemetryNodeType {
TELEMETRY_NODE_TYPE_UNSPECIFIED = 0;
TELEMETRY_NODE_TYPE_META = 1;
TELEMETRY_NODE_TYPE_COMPUTE = 2;
TELEMETRY_NODE_TYPE_FRONTEND = 3;
TELEMETRY_NODE_TYPE_COMPACTOR = 4;
}

message SystemMemory {
uint64 used = 1;
uint64 total = 2;
}

message SystemOs {
string name = 1;
string version = 2;
string kernel_version = 3;
}

message SystemCpu {
float available = 1;
}

message SystemData {
SystemMemory memory = 1;
SystemOs os = 2;
SystemCpu cpu = 3;
}

// NodeCount represents how many nodes in this cluster
message NodeCount {
uint32 meta = 1;
uint32 compute = 2;
uint32 frontend = 3;
uint32 compactor = 4;
}

// RwVersion represents the version of RisingWave
message RwVersion {
// Version is the Cargo package version of RisingWave
string rw_version = 1;
// GitSHA is the Git commit SHA of RisingWave
string git_sha = 2;
}

message ReportBase {
// tracking_id is persistent in meta data
string tracking_id = 1;
// session_id is reset every time node restarts
string session_id = 2;
// system_data is hardware and os info
SystemData system_data = 3;
// up_time is how long the node has been running
uint64 up_time = 4;
// report_time is when the report is created
uint64 report_time = 5;
// node_type is the node that creates the report
TelemetryNodeType node_type = 6;
}

message MetaReport {
ReportBase base = 1;
// meta_backend is the backend of meta data
MetaBackend meta_backend = 2;
// node_count is the count of each node type
NodeCount node_count = 3;
// rw_version is the version of RisingWave
RwVersion rw_version = 4;
// This field represents the "number of running streaming jobs"
// and is used to indicate whether the cluster is active.
uint32 stream_job_count = 5;
}

message ComputeReport {
ReportBase base = 1;
}

message FrontendReport {
ReportBase base = 1;
}

message CompactorReport {
ReportBase base = 1;
}
75 changes: 75 additions & 0 deletions src/common/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub mod report;

use std::time::SystemTime;

use risingwave_pb::telemetry::{
ReportBase as PbTelemetryReportBase, SystemCpu as PbSystemCpu, SystemData as PbSystemData,
SystemMemory as PbSystemMemory, SystemOs as PbSystemOs,
TelemetryNodeType as PbTelemetryNodeType,
};
use serde::{Deserialize, Serialize};
use sysinfo::System;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -65,6 +70,19 @@ pub struct TelemetryReportBase {
pub node_type: TelemetryNodeType,
}

impl From<TelemetryReportBase> for PbTelemetryReportBase {
fn from(val: TelemetryReportBase) -> Self {
PbTelemetryReportBase {
tracking_id: val.tracking_id,
session_id: val.session_id,
system_data: Some(val.system_data.into()),
up_time: val.up_time,
report_time: val.time_stamp,
node_type: from_telemetry_node_type(val.node_type) as i32,
}
}
}

pub trait TelemetryReport: Serialize {}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -155,6 +173,63 @@ pub fn current_timestamp() -> u64 {
.as_secs()
}

fn from_telemetry_node_type(t: TelemetryNodeType) -> PbTelemetryNodeType {
match t {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}

impl From<TelemetryNodeType> for PbTelemetryNodeType {
fn from(val: TelemetryNodeType) -> Self {
match val {
TelemetryNodeType::Meta => PbTelemetryNodeType::Meta,
TelemetryNodeType::Compute => PbTelemetryNodeType::Compute,
TelemetryNodeType::Frontend => PbTelemetryNodeType::Frontend,
TelemetryNodeType::Compactor => PbTelemetryNodeType::Compactor,
}
}
}

impl From<Cpu> for PbSystemCpu {
fn from(val: Cpu) -> Self {
PbSystemCpu {
available: val.available,
}
}
}

impl From<Memory> for PbSystemMemory {
fn from(val: Memory) -> Self {
PbSystemMemory {
used: val.used as u64,
total: val.total as u64,
}
}
}

impl From<Os> for PbSystemOs {
fn from(val: Os) -> Self {
PbSystemOs {
name: val.name,
kernel_version: val.kernel_version,
version: val.version,
}
}
}

impl From<SystemData> for PbSystemData {
fn from(val: SystemData) -> Self {
PbSystemData {
memory: Some(val.memory.into()),
os: Some(val.os.into()),
cpu: Some(val.cpu.into()),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"stream_plan",
"stream_service",
"task_service",
"telemetry",
"user",
];
let protos: Vec<String> = proto_files
Expand Down
6 changes: 6 additions & 0 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub mod java_binding;
#[cfg_attr(madsim, path = "sim/health.rs")]
pub mod health;
#[rustfmt::skip]
#[path = "sim/telemetry.rs"]
pub mod telemetry;
#[rustfmt::skip]
#[path = "connector_service.serde.rs"]
pub mod connector_service_serde;
#[rustfmt::skip]
Expand Down Expand Up @@ -151,6 +154,9 @@ pub mod backup_service_serde;
#[rustfmt::skip]
#[path = "java_binding.serde.rs"]
pub mod java_binding_serde;
#[rustfmt::skip]
#[path = "telemetry.serde.rs"]
pub mod telemetry_serde;

#[derive(Clone, PartialEq, Eq, Debug, Error)]
#[error("field `{0}` not found")]
Expand Down

0 comments on commit 42cc7c8

Please sign in to comment.