Skip to content

Commit

Permalink
fix(connector): add additional check in nats list_splits (#14546)
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong authored Jan 12, 2024
1 parent 1afb0ec commit 2bec2ed
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion src/connector/src/source/nats/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ use super::source::{NatsOffset, NatsSplit};
use super::NatsProperties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone)]
pub struct NatsSplitEnumerator {
subject: String,
split_id: SplitId,
client: async_nats::Client,
}

#[async_trait]
Expand All @@ -36,13 +37,23 @@ impl SplitEnumerator for NatsSplitEnumerator {
properties: Self::Properties,
_context: SourceEnumeratorContextRef,
) -> anyhow::Result<NatsSplitEnumerator> {
let client = properties.common.build_client().await?;
Ok(Self {
subject: properties.common.subject,
split_id: Arc::from("0"),
client,
})
}

async fn list_splits(&mut self) -> anyhow::Result<Vec<NatsSplit>> {
// Nats currently does not support list_splits API, if we simple return the default 0 without checking the client status, will result executor crash
let state = self.client.connection_state();
if state != async_nats::connection::State::Connected {
return Err(anyhow::anyhow!(
"Nats connection status is not connected, current status is {:?}",
state
));
}
// TODO: to simplify the logic, return 1 split for first version
let nats_split = NatsSplit {
subject: self.subject.clone(),
Expand Down

0 comments on commit 2bec2ed

Please sign in to comment.