diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index e1d4f96197716..c5059fdc8186c 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -21,10 +21,11 @@ use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone)] pub struct NatsSplitEnumerator { subject: String, split_id: SplitId, + client: async_nats::Client, } #[async_trait] @@ -36,13 +37,23 @@ impl SplitEnumerator for NatsSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> anyhow::Result { + let client = properties.common.build_client().await?; Ok(Self { subject: properties.common.subject, split_id: Arc::from("0"), + client, }) } async fn list_splits(&mut self) -> anyhow::Result> { + // Nats currently does not support list_splits API, if we simple return the default 0 without checking the client status, will result executor crash + let state = self.client.connection_state(); + if state != async_nats::connection::State::Connected { + return Err(anyhow::anyhow!( + "Nats connection status is not connected, current status is {:?}", + state + )); + } // TODO: to simplify the logic, return 1 split for first version let nats_split = NatsSplit { subject: self.subject.clone(),