Skip to content

Commit

Permalink
telemetry for connection create and ref
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed Dec 2, 2024
1 parent 303ab11 commit 1ddd180
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 6 deletions.
8 changes: 7 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use risingwave_pb::catalog::{PbSink, PbSource, Table};
use risingwave_pb::ddl_service::{ReplaceTablePlan, TableJobType};
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
use risingwave_pb::telemetry::TelemetryDatabaseObject;
use risingwave_sqlparser::ast::{
CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
Query, Statement,
Expand Down Expand Up @@ -113,7 +114,11 @@ pub async fn gen_sink_plan(

resolve_privatelink_in_with_option(&mut with_options)?;
let (mut resolved_with_options, connection_type, connector_conn_ref) =
resolve_connection_ref_and_secret_ref(with_options, session)?;
resolve_connection_ref_and_secret_ref(
with_options,
session,
TelemetryDatabaseObject::Sink,
)?;
ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_CONNECTOR)?;

// if not using connection, we don't need to check connector match connection type
Expand Down Expand Up @@ -781,6 +786,7 @@ fn bind_sink_format_desc(
resolve_connection_ref_and_secret_ref(
WithOptions::try_from(value.row_options.as_slice())?,
session,
TelemetryDatabaseObject::Sink,
)?;
ensure_connection_type_allowed(connection_type_flag, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
let (mut options, secret_refs) = props.into_parts();
Expand Down
15 changes: 12 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use risingwave_pb::catalog::connection_params::PbConnectionType;
use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_pb::telemetry::TelemetryDatabaseObject;
use risingwave_sqlparser::ast::{
get_delimiter, AstString, ColumnDef, CreateSourceStatement, Encode, Format,
FormatEncodeOptions, ObjectName, ProtobufSchema, SourceWatermark, TableConstraint,
Expand Down Expand Up @@ -315,8 +316,11 @@ pub(crate) async fn bind_columns_from_source(

let options_with_secret = match with_properties {
Either::Left(options) => {
let (sec_resolve_props, connection_type, _) =
resolve_connection_ref_and_secret_ref(options.clone(), session)?;
let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref(
options.clone(),
session,
TelemetryDatabaseObject::Source,
)?;
if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
"connection type {:?} is not allowed, allowed types: {:?}",
Expand All @@ -336,6 +340,7 @@ pub(crate) async fn bind_columns_from_source(
resolve_connection_ref_and_secret_ref(
WithOptions::try_from(format_encode.row_options())?,
session,
TelemetryDatabaseObject::Source,
)?;
ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;

Expand Down Expand Up @@ -1622,7 +1627,11 @@ pub async fn bind_create_source_or_table_with_connector(
resolve_privatelink_in_with_option(&mut with_properties)?;

let (with_properties, connection_type, connector_conn_ref) =
resolve_connection_ref_and_secret_ref(with_properties, session)?;
resolve_connection_ref_and_secret_ref(
with_properties,
session,
TelemetryDatabaseObject::Source,
)?;
ensure_connection_type_allowed(connection_type, &ALLOWED_CONNECTION_CONNECTOR)?;

// if not using connection, we don't need to check connector match connection type
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use serde::{Deserialize, Serialize};

const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend";

#[allow(dead_code)] // please remove when used
pub(crate) fn report_event(
event_stage: PbTelemetryEventStage,
event_name: &str,
Expand Down
20 changes: 20 additions & 0 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_pb::catalog::connection::Info as ConnectionInfo;
use risingwave_pb::catalog::connection_params::PbConnectionType;
use risingwave_pb::secret::secret_ref::PbRefAsType;
use risingwave_pb::secret::PbSecretRef;
use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
use risingwave_sqlparser::ast::{
ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, SqlOptionValue,
Expand All @@ -38,6 +39,7 @@ use super::OverwriteOptions;
use crate::error::{ErrorCode, Result as RwResult, RwError};
use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
use crate::session::SessionImpl;
use crate::telemetry::report_event;
use crate::Binder;

mod options {
Expand Down Expand Up @@ -194,7 +196,9 @@ impl WithOptions {
pub(crate) fn resolve_connection_ref_and_secret_ref(
with_options: WithOptions,
session: &SessionImpl,
object: TelemetryDatabaseObject,
) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
let connector_name = with_options.get_connector();
let db_name: &str = session.database();
let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();

Expand All @@ -220,6 +224,22 @@ pub(crate) fn resolve_connection_ref_and_secret_ref(
)));
}
};

// report to telemetry
report_event(
PbTelemetryEventStage::CreateStreamJob,
"connection_ref",
0,
connector_name.clone(),
Some(object),
{
connection_params.as_ref().map(|cp| {
jsonbb::json!({
"connection_type": cp.connection_type().as_str_name().to_string()
})
})
},
);
}

let mut inner_secret_refs = {
Expand Down
22 changes: 21 additions & 1 deletion src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use risingwave_pb::meta::subscribe_response::{
use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::FragmentTypeFlag;
use risingwave_pb::telemetry::PbTelemetryEventStage;
use risingwave_pb::user::PbUserInfo;
use sea_orm::sea_query::{Expr, Query, SimpleExpr};
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -83,7 +84,7 @@ use crate::manager::{
NotificationVersion, IGNORED_NOTIFICATION_VERSION,
};
use crate::rpc::ddl_controller::DropMode;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::telemetry::{report_event, MetaTelemetryJobDesc};
use crate::{MetaError, MetaResult};

pub type Catalog = (
Expand Down Expand Up @@ -1538,6 +1539,25 @@ impl CatalogController {

txn.commit().await?;

{
// call meta telemetry here to report the connection creation
report_event(
PbTelemetryEventStage::Unspecified,
"connection_create",
pb_connection.get_id() as _,
{
pb_connection.info.as_ref().and_then(|info| match info {
ConnectionInfo::ConnectionParams(params) => {
Some(params.connection_type().as_str_name().to_string())
}
_ => None,
})
},
None,
None,
);
}

let version = self
.notify_frontend(
NotificationOperation::Add,
Expand Down

0 comments on commit 1ddd180

Please sign in to comment.