Skip to content

Commit

Permalink
refactor: do not output suppressed count when it's zero (#15333)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Feb 29, 2024
1 parent 7b90ed8 commit ff342fe
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 45 deletions.
19 changes: 14 additions & 5 deletions src/common/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU32;
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::atomic::{AtomicUsize, Ordering};

use governor::Quota;
Expand Down Expand Up @@ -46,10 +46,13 @@ impl LogSuppresser {

/// Check if the log should be suppressed.
/// If the log should be suppressed, return `Err(LogSuppressed)`.
/// Otherwise, return `Ok(usize)` with count of suppressed messages before.
pub fn check(&self) -> core::result::Result<usize, LogSuppressed> {
/// Otherwise, return `Ok(Some(..))` with count of suppressed messages since last check,
/// or `Ok(None)` if there's none.
pub fn check(&self) -> core::result::Result<Option<NonZeroUsize>, LogSuppressed> {
match self.rate_limiter.check() {
Ok(()) => Ok(self.suppressed_count.swap(0, Ordering::Relaxed)),
Ok(()) => Ok(NonZeroUsize::new(
self.suppressed_count.swap(0, Ordering::Relaxed),
)),
Err(_) => {
self.suppressed_count.fetch_add(1, Ordering::Relaxed);
Err(LogSuppressed)
Expand All @@ -72,10 +75,16 @@ mod tests {
use std::sync::LazyLock;
use std::time::Duration;

use tracing_subscriber::util::SubscriberInitExt;

use super::*;

#[tokio::test]
async fn demo() {
let _logger = tracing_subscriber::fmt::Subscriber::builder()
.with_max_level(tracing::Level::ERROR)
.set_default();

let mut interval = tokio::time::interval(Duration::from_millis(100));
for _ in 0..100 {
interval.tick().await;
Expand All @@ -86,7 +95,7 @@ mod tests {
});

if let Ok(suppressed_count) = RATE_LIMITER.check() {
println!("failed to foo bar. suppressed_count = {}", suppressed_count);
tracing::error!(suppressed_count, "failed to foo bar");
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(
"RisingWave supports decimal precision up to {}, but got {}. Will truncate. ({} suppressed)",
Decimal::MAX_PRECISION,
suppressed_count,
precision
);
suppressed_count,
"RisingWave supports decimal precision up to {}, but got {}. Will truncate.",
Decimal::MAX_PRECISION,
precision
);
}
}
DataType::Decimal
Expand Down
22 changes: 16 additions & 6 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v)),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
}
Expand All @@ -45,8 +50,13 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
}
Expand Down Expand Up @@ -106,7 +116,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column_name = name,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand All @@ -125,7 +135,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column_name = name,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand Down
58 changes: 29 additions & 29 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ macro_rules! handle_list_data_type {
}
}
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
}
Expand All @@ -61,12 +61,12 @@ macro_rules! handle_list_data_type {
}
}
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
}
Expand All @@ -80,12 +80,12 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v)),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
Expand All @@ -97,12 +97,12 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
Expand Down Expand Up @@ -147,10 +147,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
suppressed_count,
column = name,
error = %err.as_report(),
"parse uuid column failed",
);
Expand Down Expand Up @@ -181,10 +181,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand Down Expand Up @@ -278,10 +278,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand Down

0 comments on commit ff342fe

Please sign in to comment.