Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Aug 13, 2024
1 parent 45dc1ac commit fe5754d
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,28 +181,32 @@ impl SourceReader {

#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
async fn build_opendal_fs_list_stream<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list().await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Ok(res) => {
if matcher
.as_ref()
.map(|m| m.matches(&res.name))
.unwrap_or(true)
{
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
continue;
loop {
// keep listing all files in the bucket
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list().await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Ok(res) => {
if matcher
.as_ref()
.map(|m| m.matches(&res.name))
.unwrap_or(true)
{
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
continue;
}
}
Err(err) => {
tracing::error!(error = %err.as_report(), "list object fail");
return Err(err);
}
}
Err(err) => {
tracing::error!(error = %err.as_report(), "list object fail");
return Err(err);
}
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

Expand Down

0 comments on commit fe5754d

Please sign in to comment.