Skip to content

Commit

Permalink
fix(connector): file source do not panic when credential is wrong (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Aug 20, 2024
1 parent 747245f commit e383ad6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::marker::PhantomData;

use anyhow::anyhow;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::{self, BoxStream};
Expand Down Expand Up @@ -51,14 +52,22 @@ impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {

async fn list_splits(&mut self) -> ConnectorResult<Vec<OpendalFsSplit<Src>>> {
let empty_split: OpendalFsSplit<Src> = OpendalFsSplit::empty_split();
let prefix = self.prefix.as_deref().unwrap_or("/");

Ok(vec![empty_split])
match self.op.list(prefix).await {
Ok(_) => return Ok(vec![empty_split]),
Err(e) => {
return Err(anyhow!(e)
.context("fail to create source, please check your config.")
.into())
}
}
}
}

impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
let prefix = self.prefix.as_deref().unwrap_or("");
let prefix = self.prefix.as_deref().unwrap_or("/");

let object_lister = self
.op
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/source/list_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl<S: StateStore> FsListExecutor<S> {
.collect::<Vec<_>>();

let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect();
if res.is_empty() {
tracing::warn!("No items were listed from source.");
return Ok(StreamChunk::default());
}
Ok(StreamChunk::from_rows(
&res,
&[DataType::Varchar, DataType::Timestamptz, DataType::Int64],
Expand Down

0 comments on commit e383ad6

Please sign in to comment.