diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index d25658dfaf5e5..db773add56364 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -16,46 +16,22 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum ConnectorError { - #[error("Parse error: {0}")] - Parse(&'static str), - - #[error("Invalid parameter {name}: {reason}")] - InvalidParam { name: &'static str, reason: String }, - - #[error("Kafka error: {0}")] - Kafka(#[from] rdkafka::error::KafkaError), - - #[error("Config error: {0}")] - Config( - #[source] + #[error("MySQL error: {0}")] + MySql( + #[from] #[backtrace] - anyhow::Error, + mysql_async::Error, ), - #[error("Connection error: {0}")] - Connection( - #[source] - #[backtrace] - anyhow::Error, - ), - - #[error("MySQL error: {0}")] - MySql(#[from] mysql_async::Error), - #[error("Postgres error: {0}")] Postgres(#[from] tokio_postgres::Error), - #[error("Pulsar error: {0}")] - Pulsar( - #[source] - #[backtrace] - anyhow::Error, - ), - #[error(transparent)] - Internal( + Uncategorized( #[from] #[backtrace] anyhow::Error, ), } + +pub type ConnectorResult = Result; diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index 2c8ee00f67af9..d7c39e2a9aa8b 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -19,10 +19,9 @@ use futures_async_stream::try_stream; use risingwave_common::row::OwnedRow; use risingwave_common::types::ScalarImpl; -use crate::error::ConnectorError; +use crate::error::{ConnectorError, ConnectorResult}; use crate::source::cdc::external::{ - CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset, - SchemaTableName, + CdcOffset, CdcOffsetParseFunc, ExternalTableReader, MySqlOffset, SchemaTableName, }; #[derive(Debug)] diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 78c6c714e2bc6..f281d1ecea58a 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -17,7 +17,7 @@ mod postgres; use std::collections::HashMap; -use anyhow::{anyhow, Context}; +use anyhow::Context; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -32,13 +32,11 @@ use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use serde_derive::{Deserialize, Serialize}; -use crate::error::ConnectorError; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::mysql_row_to_owned_row; use crate::source::cdc::external::mock_external_table::MockExternalTableReader; use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; -pub type ConnectorResult = std::result::Result; - #[derive(Debug)] pub enum CdcTableType { Undefined, @@ -77,10 +75,7 @@ impl CdcTableType { Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( PostgresExternalTableReader::new(with_properties, schema).await?, )), - _ => bail!(ConnectorError::Config(anyhow!( - "invalid external table type: {:?}", - *self - ))), + _ => bail!("invalid external table type: {:?}", *self), } } } @@ -405,19 +400,11 @@ impl MySqlExternalTableReader { DataType::Date => Value::from(value.into_date().0), DataType::Time => Value::from(value.into_time().0), DataType::Timestamp => Value::from(value.into_timestamp().0), - _ => { - return Err(ConnectorError::Internal(anyhow!( - "unsupported primary key data type: {}", - ty - ))) - } + _ => bail!("unsupported primary key data type: {}", ty), }; - Ok((pk.clone(), val)) + ConnectorResult::Ok((pk.clone(), val)) } else { - Err(ConnectorError::Internal(anyhow!( - "primary key {} cannot be null", - pk - ))) + bail!("primary key {} cannot be null", pk); } }) .try_collect()?; diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index f8f0c9d402347..bd8a0b51c04e7 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -28,11 +28,11 @@ use thiserror_ext::AsReport; use tokio_postgres::types::PgLsn; use tokio_postgres::NoTls; -use crate::error::ConnectorError; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::postgres_row_to_owned_row; use crate::source::cdc::external::{ - CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig, - ExternalTableReader, SchemaTableName, + CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader, + SchemaTableName, }; #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 7181710b70868..139af839bd16d 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use arrow_array::{Int32Array, Int64Array, RecordBatch}; use async_trait::async_trait; use futures::StreamExt; @@ -32,7 +32,6 @@ use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::catalog::ROWID_PREFIX; use risingwave_common::{bail, ensure}; -use crate::error::ConnectorError; use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; @@ -398,10 +397,11 @@ impl PulsarIcebergReader { fn build_iceberg_configs(&self) -> anyhow::Result> { let mut iceberg_configs = HashMap::new(); - let bucket = - self.props.iceberg_bucket.as_ref().ok_or_else(|| { - ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured")) - })?; + let bucket = self + .props + .iceberg_bucket + .as_ref() + .context("Iceberg bucket is not configured")?; iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string()); iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string());