diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index 864d1de56c7be..cffeb5dfe5f65 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -14,6 +14,7 @@ use std::marker::PhantomData; +use anyhow::anyhow; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::stream::{self, BoxStream}; @@ -51,14 +52,22 @@ impl SplitEnumerator for OpendalEnumerator { async fn list_splits(&mut self) -> ConnectorResult>> { let empty_split: OpendalFsSplit = OpendalFsSplit::empty_split(); + let prefix = self.prefix.as_deref().unwrap_or("/"); - Ok(vec![empty_split]) + match self.op.list(prefix).await { + Ok(_) => return Ok(vec![empty_split]), + Err(e) => { + return Err(anyhow!(e) + .context("fail to create source, please check your config.") + .into()) + } + } } } impl OpendalEnumerator { pub async fn list(&self) -> ConnectorResult { - let prefix = self.prefix.as_deref().unwrap_or(""); + let prefix = self.prefix.as_deref().unwrap_or("/"); let object_lister = self .op diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 25b32c0a0e4b8..c11ba773648ba 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -99,6 +99,10 @@ impl FsListExecutor { .collect::>(); let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect(); + if res.is_empty() { + tracing::warn!("No items were listed from source."); + return Ok(StreamChunk::default()); + } Ok(StreamChunk::from_rows( &res, &[DataType::Varchar, DataType::Timestamptz, DataType::Int64],