Skip to content

Commit

Permalink
refactor(connector): simplify and clean-up unused variants of `Connec…
Browse files Browse the repository at this point in the history
…torError` (#15031)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Feb 13, 2024
1 parent b05cbff commit befeba3
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 62 deletions.
38 changes: 7 additions & 31 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,22 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum ConnectorError {
#[error("Parse error: {0}")]
Parse(&'static str),

#[error("Invalid parameter {name}: {reason}")]
InvalidParam { name: &'static str, reason: String },

#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),

#[error("Config error: {0}")]
Config(
#[source]
#[error("MySQL error: {0}")]
MySql(
#[from]
#[backtrace]
anyhow::Error,
mysql_async::Error,
),

#[error("Connection error: {0}")]
Connection(
#[source]
#[backtrace]
anyhow::Error,
),

#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),

#[error("Postgres error: {0}")]
Postgres(#[from] tokio_postgres::Error),

#[error("Pulsar error: {0}")]
Pulsar(
#[source]
#[backtrace]
anyhow::Error,
),

#[error(transparent)]
Internal(
Uncategorized(
#[from]
#[backtrace]
anyhow::Error,
),
}

pub type ConnectorResult<T> = Result<T, ConnectorError>;
5 changes: 2 additions & 3 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use futures_async_stream::try_stream;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::source::cdc::external::{
CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset,
SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ExternalTableReader, MySqlOffset, SchemaTableName,
};

#[derive(Debug)]
Expand Down
25 changes: 6 additions & 19 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod postgres;

use std::collections::HashMap;

use anyhow::{anyhow, Context};
use anyhow::Context;
use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
Expand All @@ -32,13 +32,11 @@ use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use serde_derive::{Deserialize, Serialize};

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::mysql_row_to_owned_row;
use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};

pub type ConnectorResult<T> = std::result::Result<T, ConnectorError>;

#[derive(Debug)]
pub enum CdcTableType {
Undefined,
Expand Down Expand Up @@ -77,10 +75,7 @@ impl CdcTableType {
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(with_properties, schema).await?,
)),
_ => bail!(ConnectorError::Config(anyhow!(
"invalid external table type: {:?}",
*self
))),
_ => bail!("invalid external table type: {:?}", *self),
}
}
}
Expand Down Expand Up @@ -405,19 +400,11 @@ impl MySqlExternalTableReader {
DataType::Date => Value::from(value.into_date().0),
DataType::Time => Value::from(value.into_time().0),
DataType::Timestamp => Value::from(value.into_timestamp().0),
_ => {
return Err(ConnectorError::Internal(anyhow!(
"unsupported primary key data type: {}",
ty
)))
}
_ => bail!("unsupported primary key data type: {}", ty),
};
Ok((pk.clone(), val))
ConnectorResult::Ok((pk.clone(), val))
} else {
Err(ConnectorError::Internal(anyhow!(
"primary key {} cannot be null",
pk
)))
bail!("primary key {} cannot be null", pk);
}
})
.try_collect()?;
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;
use tokio_postgres::NoTls;

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
use crate::source::cdc::external::{
CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig,
ExternalTableReader, SchemaTableName,
CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Context};
use anyhow::Context;
use arrow_array::{Int32Array, Int64Array, RecordBatch};
use async_trait::async_trait;
use futures::StreamExt;
Expand All @@ -32,7 +32,6 @@ use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::ROWID_PREFIX;
use risingwave_common::{bail, ensure};

use crate::error::ConnectorError;
use crate::parser::ParserConfig;
use crate::source::pulsar::split::PulsarSplit;
use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties};
Expand Down Expand Up @@ -398,10 +397,11 @@ impl PulsarIcebergReader {
fn build_iceberg_configs(&self) -> anyhow::Result<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();

let bucket =
self.props.iceberg_bucket.as_ref().ok_or_else(|| {
ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured"))
})?;
let bucket = self
.props
.iceberg_bucket
.as_ref()
.context("Iceberg bucket is not configured")?;

iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string());
iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string());
Expand Down

0 comments on commit befeba3

Please sign in to comment.