Skip to content

Commit

Permalink
feat(metrics): simplify user error metrics (#15544)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Mar 11, 2024
1 parent f796432 commit e27d13c
Show file tree
Hide file tree
Showing 25 changed files with 101 additions and 236 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

42 changes: 10 additions & 32 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,53 +1676,31 @@ def section_streaming_errors(outer_panels):
[
panels.timeseries_count(
"Compute Errors by Type",
"",
"Errors that happened during computation. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_compute_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_compute_error')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
f"sum({metric('user_compute_error')}) by (error_type, executor_name, fragment_id)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Errors by Type",
"",
[
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_source_error')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Reader Errors by Type",
"",
"Errors that happened during source data ingestion. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_source_reader_error_count')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
),
panels.target(
f"sum({metric('user_source_reader_error')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
f"sum({metric('user_source_error')}) by (error_type, source_id, source_name, fragment_id)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Sink by Connector",
"",
"Sink Errors by Type",
"Errors that happened during data sink out. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_sink_error')}) by (connector_name, executor_id, error_msg)",
"{{connector_name}}: {{error_msg}} ({{executor_id}})",
f"sum({metric('user_sink_error')}) by (error_type, sink_id, sink_name, fragment_id)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,16 @@ def section_overview(panels):
"Errors in the system group by type",
[
panels.target(
f"sum({metric('user_compute_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"compute error {{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
f"sum({metric('user_compute_error')}) by (error_type, executor_name, fragment_id)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"parse error {{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
f"sum({metric('user_source_error')}) by (error_type, source_id, source_name, fragment_id)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_sink_error')}) by (error_type, sink_id, sink_name, fragment_id)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"{metric('source_status_is_up')} == 0",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message SinkParam {
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
string sink_name = 8;
}

enum SinkPayloadFormat {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl SourceExecutor {
self.source_ctrl_opts.clone(),
None,
ConnectorProperties::default(),
"NA".to_owned(), // FIXME: source name was not passed in batch plan
"NA".to_owned(), // source name was not passed in batch plan
));
let stream = self
.source
Expand Down
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ async fn main() {
.unwrap();
let sink_param = SinkParam {
sink_id: SinkId::new(1),
sink_name: cfg.sink.clone(),
properties,
columns: table_schema.get_sink_schema(),
downstream_pk: table_schema.pk_indices,
Expand Down
44 changes: 0 additions & 44 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};
use std::time::{Duration, SystemTime};

use thiserror::Error;
use thiserror_ext::Macro;
Expand All @@ -24,8 +22,6 @@ pub mod v2 {
pub use risingwave_error::*;
}

const ERROR_SUPPRESSOR_RESET_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h

pub trait Error = std::error::Error + Send + Sync + 'static;
pub type BoxedError = Box<dyn Error>;

Expand Down Expand Up @@ -183,46 +179,6 @@ macro_rules! bail {
};
}

#[derive(Debug)]
pub struct ErrorSuppressor {
max_unique: usize,
unique: HashSet<String>,
last_reset_time: SystemTime,
}

impl ErrorSuppressor {
pub fn new(max_unique: usize) -> Self {
Self {
max_unique,
last_reset_time: SystemTime::now(),
unique: Default::default(),
}
}

pub fn suppress_error(&mut self, error: &str) -> bool {
self.try_reset();
if self.unique.contains(error) {
false
} else if self.unique.len() < self.max_unique {
self.unique.insert(error.to_string());
false
} else {
// We have exceeded the capacity.
true
}
}

pub fn max(&self) -> usize {
self.max_unique
}

fn try_reset(&mut self) {
if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPPRESSOR_RESET_DURATION {
*self = Self::new(self.max_unique)
}
}
}

#[cfg(test)]
mod tests {
use std::convert::Into;
Expand Down
47 changes: 11 additions & 36 deletions src/common/src/metrics/error_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ impl<const N: usize> ErrorMetric<N> {

pub type ErrorMetricRef<const N: usize> = Arc<ErrorMetric<N>>;

/// Metrics for counting errors in the system.
/// The detailed error messages are not supposed to be stored in the metrics, but in the logs.
///
/// Please avoid adding new error metrics here. Instead, introduce new `error_type` for new errors.
#[derive(Clone)]
pub struct ErrorMetrics {
pub user_sink_error: ErrorMetricRef<3>,
pub user_compute_error: ErrorMetricRef<4>,
pub user_source_reader_error: ErrorMetricRef<5>,
pub user_source_error: ErrorMetricRef<5>,
pub cdc_source_error: ErrorMetricRef<3>,
pub user_sink_error: ErrorMetricRef<4>,
pub user_compute_error: ErrorMetricRef<3>,
pub user_source_error: ErrorMetricRef<4>,
}

impl ErrorMetrics {
Expand All @@ -95,40 +97,17 @@ impl ErrorMetrics {
user_sink_error: Arc::new(ErrorMetric::new(
"user_sink_error",
"Sink errors in the system, queryable by tags",
&["connector_name", "executor_id", "error_msg"],
&["error_type", "sink_id", "sink_name", "fragment_id"],
)),
user_compute_error: Arc::new(ErrorMetric::new(
"user_compute_error",
"Compute errors in the system, queryable by tags",
&["error_type", "error_msg", "executor_name", "fragment_id"],
)),
user_source_reader_error: Arc::new(ErrorMetric::new(
"user_source_reader_error",
"Source reader error count",
&[
"error_type",
"error_msg",
"executor_name",
"actor_id",
"source_id",
],
&["error_type", "executor_name", "fragment_id"],
)),
user_source_error: Arc::new(ErrorMetric::new(
"user_source_error_count",
"user_source_error",
"Source errors in the system, queryable by tags",
&[
"error_type",
"error_msg",
"executor_name",
"fragment_id",
"table_id",
],
)),
// cdc source is singleton, so we use source_id to identify the connector
cdc_source_error: Arc::new(ErrorMetric::new(
"cdc_source_error",
"CDC source errors in the system, queryable by tags",
&["connector_name", "source_id", "error_msg"],
&["error_type", "source_id", "source_name", "fragment_id"],
)),
}
}
Expand All @@ -137,19 +116,15 @@ impl ErrorMetrics {
vec![
&self.user_sink_error.desc,
&self.user_compute_error.desc,
&self.user_source_reader_error.desc,
&self.user_source_error.desc,
&self.cdc_source_error.desc,
]
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
vec![
self.user_sink_error.collect(),
self.user_compute_error.collect(),
self.user_source_reader_error.collect(),
self.user_source_error.collect(),
self.cdc_source_error.collect(),
]
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ mod tests {
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..Default::default()
};
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
Expand Down
12 changes: 11 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME};
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::types::{Datum, Scalar, ScalarImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::tracing::InstrumentStream;
Expand Down Expand Up @@ -712,7 +713,16 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
"failed to parse message, skipping"
);
}
parser.source_ctx().report_user_source_error(&error);

// report to error metrics
let context = parser.source_ctx();
GLOBAL_ERROR_METRICS.user_source_error.report([
// TODO(eric): output ConnectorError's variant as label
"source_parser".to_owned(),
context.source_info.source_id.to_string(),
context.source_info.source_name.clone(),
context.source_info.fragment_id.to_string(),
]);
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ mod tests {
.map(|c| SourceColumnDesc::from(&c.column_desc))
.collect::<Vec<_>>();

let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..Default::default()
};
let source_ctx = Arc::new(source_ctx);
// format plain encode json parser
let parser = PlainParser::new(
Expand Down Expand Up @@ -343,8 +345,10 @@ mod tests {
.collect::<Vec<_>>();

// format plain encode json parser
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::MysqlCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::MysqlCdc(Box::default()),
..Default::default()
};
let mut parser = PlainParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
columns.clone(),
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,19 @@ pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SinkParam {
pub sink_id: SinkId,
pub sink_name: String,
pub properties: HashMap<String, String>,
pub columns: Vec<ColumnDesc>,
pub downstream_pk: Vec<usize>,
pub sink_type: SinkType,
pub format_desc: Option<SinkFormatDesc>,
pub db_name: String,

/// - For `CREATE SINK ... FROM ...`, the name of the source table.
/// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
///
/// See also `gen_sink_plan`.
// TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
pub sink_from_name: String,
}

Expand All @@ -171,6 +178,7 @@ impl SinkParam {
};
Self {
sink_id: SinkId::from(pb_param.sink_id),
sink_name: pb_param.sink_name,
properties: pb_param.properties,
columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
downstream_pk: table_schema
Expand All @@ -190,6 +198,7 @@ impl SinkParam {
pub fn to_proto(&self) -> PbSinkParam {
PbSinkParam {
sink_id: self.sink_id.sink_id,
sink_name: self.sink_name.clone(),
properties: self.properties.clone(),
table_schema: Some(TableSchema {
columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
Expand Down Expand Up @@ -217,6 +226,7 @@ impl From<SinkCatalog> for SinkParam {
.collect();
Self {
sink_id: sink_catalog.id,
sink_name: sink_catalog.name,
properties: sink_catalog.properties,
columns,
downstream_pk: sink_catalog.downstream_pk,
Expand Down
Loading

0 comments on commit e27d13c

Please sign in to comment.