Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): simplify user error metrics #15544

Merged
merged 10 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

42 changes: 10 additions & 32 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,53 +1676,31 @@ def section_streaming_errors(outer_panels):
[
panels.timeseries_count(
"Compute Errors by Type",
"",
"Errors that happened during computation. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_compute_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_compute_error')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
f"sum({metric('user_compute_error')}) by (error_type, executor_name, fragment_id)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Errors by Type",
"",
[
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_source_error')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Reader Errors by Type",
"",
"Errors that happened during source data ingestion. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_source_reader_error_count')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
),
panels.target(
f"sum({metric('user_source_reader_error')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
f"sum({metric('user_source_error')}) by (error_type, source_id, source_name, fragment_id)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Sink by Connector",
"",
"Sink Errors by Type",
"Errors that happened during data sink out. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_sink_error')}) by (connector_name, executor_id, error_msg)",
"{{connector_name}}: {{error_msg}} ({{executor_id}})",
f"sum({metric('user_sink_error')}) by (error_type, sink_id, sink_name, fragment_id)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,16 @@ def section_overview(panels):
"Errors in the system group by type",
[
panels.target(
f"sum({metric('user_compute_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"compute error {{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
f"sum({metric('user_compute_error')}) by (error_type, executor_name, fragment_id)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"parse error {{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
f"sum({metric('user_source_error')}) by (error_type, source_id, source_name, fragment_id)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_sink_error')}) by (error_type, sink_id, sink_name, fragment_id)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"{metric('source_status_is_up')} == 0",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message SinkParam {
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
string sink_name = 8;
}

enum SinkPayloadFormat {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl SourceExecutor {
self.source_ctrl_opts.clone(),
None,
ConnectorProperties::default(),
"NA".to_owned(), // FIXME: source name was not passed in batch plan
"NA".to_owned(), // source name was not passed in batch plan
));
let stream = self
.source
Expand Down
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ async fn main() {
.unwrap();
let sink_param = SinkParam {
sink_id: SinkId::new(1),
sink_name: cfg.sink.clone(),
properties,
columns: table_schema.get_sink_schema(),
downstream_pk: table_schema.pk_indices,
Expand Down
44 changes: 0 additions & 44 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};
use std::time::{Duration, SystemTime};

use thiserror::Error;
use thiserror_ext::Macro;
Expand All @@ -24,8 +22,6 @@ pub mod v2 {
pub use risingwave_error::*;
}

const ERROR_SUPPRESSOR_RESET_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h

pub trait Error = std::error::Error + Send + Sync + 'static;
pub type BoxedError = Box<dyn Error>;

Expand Down Expand Up @@ -183,46 +179,6 @@ macro_rules! bail {
};
}

#[derive(Debug)]
pub struct ErrorSuppressor {
max_unique: usize,
unique: HashSet<String>,
last_reset_time: SystemTime,
}

impl ErrorSuppressor {
pub fn new(max_unique: usize) -> Self {
Self {
max_unique,
last_reset_time: SystemTime::now(),
unique: Default::default(),
}
}

pub fn suppress_error(&mut self, error: &str) -> bool {
self.try_reset();
if self.unique.contains(error) {
false
} else if self.unique.len() < self.max_unique {
self.unique.insert(error.to_string());
false
} else {
// We have exceeded the capacity.
true
}
}

pub fn max(&self) -> usize {
self.max_unique
}

fn try_reset(&mut self) {
if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPPRESSOR_RESET_DURATION {
*self = Self::new(self.max_unique)
}
}
}

#[cfg(test)]
mod tests {
use std::convert::Into;
Expand Down
47 changes: 11 additions & 36 deletions src/common/src/metrics/error_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ impl<const N: usize> ErrorMetric<N> {

pub type ErrorMetricRef<const N: usize> = Arc<ErrorMetric<N>>;

/// Metrics for counting errors in the system.
/// The detailed error messages are not supposed to be stored in the metrics, but in the logs.
///
/// Please avoid adding new error metrics here. Instead, introduce new `error_type` for new errors.
#[derive(Clone)]
pub struct ErrorMetrics {
pub user_sink_error: ErrorMetricRef<3>,
pub user_compute_error: ErrorMetricRef<4>,
pub user_source_reader_error: ErrorMetricRef<5>,
pub user_source_error: ErrorMetricRef<5>,
pub cdc_source_error: ErrorMetricRef<3>,
pub user_sink_error: ErrorMetricRef<4>,
pub user_compute_error: ErrorMetricRef<3>,
pub user_source_error: ErrorMetricRef<4>,
}

impl ErrorMetrics {
Expand All @@ -95,40 +97,17 @@ impl ErrorMetrics {
user_sink_error: Arc::new(ErrorMetric::new(
"user_sink_error",
"Sink errors in the system, queryable by tags",
&["connector_name", "executor_id", "error_msg"],
&["error_type", "sink_id", "sink_name", "fragment_id"],
)),
user_compute_error: Arc::new(ErrorMetric::new(
"user_compute_error",
"Compute errors in the system, queryable by tags",
&["error_type", "error_msg", "executor_name", "fragment_id"],
)),
user_source_reader_error: Arc::new(ErrorMetric::new(
"user_source_reader_error",
"Source reader error count",
&[
"error_type",
"error_msg",
"executor_name",
"actor_id",
"source_id",
],
&["error_type", "executor_name", "fragment_id"],
)),
user_source_error: Arc::new(ErrorMetric::new(
"user_source_error_count",
"user_source_error",
"Source errors in the system, queryable by tags",
&[
"error_type",
"error_msg",
"executor_name",
"fragment_id",
"table_id",
],
)),
// cdc source is singleton, so we use source_id to identify the connector
cdc_source_error: Arc::new(ErrorMetric::new(
"cdc_source_error",
"CDC source errors in the system, queryable by tags",
&["connector_name", "source_id", "error_msg"],
&["error_type", "source_id", "source_name", "fragment_id"],
)),
}
}
Expand All @@ -137,19 +116,15 @@ impl ErrorMetrics {
vec![
&self.user_sink_error.desc,
&self.user_compute_error.desc,
&self.user_source_reader_error.desc,
&self.user_source_error.desc,
&self.cdc_source_error.desc,
]
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
vec![
self.user_sink_error.collect(),
self.user_compute_error.collect(),
self.user_source_reader_error.collect(),
self.user_source_error.collect(),
self.cdc_source_error.collect(),
]
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ mod tests {
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..Default::default()
};
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
Expand Down
12 changes: 11 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME};
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::types::{Datum, Scalar, ScalarImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::tracing::InstrumentStream;
Expand Down Expand Up @@ -712,7 +713,16 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
"failed to parse message, skipping"
);
}
parser.source_ctx().report_user_source_error(&error);

// report to error metrics
let context = parser.source_ctx();
GLOBAL_ERROR_METRICS.user_source_error.report([
// TODO(eric): output ConnectorError's variant as label
Copy link
Member Author

@fuyufjh fuyufjh Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try in next PR. #15557

"source_parser".to_owned(),
context.source_info.source_id.to_string(),
context.source_info.source_name.clone(),
context.source_info.fragment_id.to_string(),
]);
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ mod tests {
.map(|c| SourceColumnDesc::from(&c.column_desc))
.collect::<Vec<_>>();

let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..Default::default()
};
let source_ctx = Arc::new(source_ctx);
// format plain encode json parser
let parser = PlainParser::new(
Expand Down Expand Up @@ -343,8 +345,10 @@ mod tests {
.collect::<Vec<_>>();

// format plain encode json parser
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::MysqlCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::MysqlCdc(Box::default()),
..Default::default()
};
let mut parser = PlainParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
columns.clone(),
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,19 @@ pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SinkParam {
pub sink_id: SinkId,
pub sink_name: String,
pub properties: HashMap<String, String>,
pub columns: Vec<ColumnDesc>,
pub downstream_pk: Vec<usize>,
pub sink_type: SinkType,
pub format_desc: Option<SinkFormatDesc>,
pub db_name: String,

/// - For `CREATE SINK ... FROM ...`, the name of the source table.
/// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
///
/// See also `gen_sink_plan`.
// TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
pub sink_from_name: String,
}

Expand All @@ -171,6 +178,7 @@ impl SinkParam {
};
Self {
sink_id: SinkId::from(pb_param.sink_id),
sink_name: pb_param.sink_name,
properties: pb_param.properties,
columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
downstream_pk: table_schema
Expand All @@ -190,6 +198,7 @@ impl SinkParam {
pub fn to_proto(&self) -> PbSinkParam {
PbSinkParam {
sink_id: self.sink_id.sink_id,
sink_name: self.sink_name.clone(),
properties: self.properties.clone(),
table_schema: Some(TableSchema {
columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
Expand Down Expand Up @@ -217,6 +226,7 @@ impl From<SinkCatalog> for SinkParam {
.collect();
Self {
sink_id: sink_catalog.id,
sink_name: sink_catalog.name,
properties: sink_catalog.properties,
columns,
downstream_pk: sink_catalog.downstream_pk,
Expand Down
Loading
Loading