Skip to content

Commit

Permalink
fix: Update with_options to include mqtt_common.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Mar 11, 2024
1 parent 45f2c20 commit 09adecb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
7 changes: 3 additions & 4 deletions src/connector/src/source/mqtt/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ impl SplitReader for MqttSplitReader {
source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
) -> Result<Self> {
let (client, eventloop) = properties.common.build_client(
source_ctx.source_info.actor_id,
source_ctx.source_info.fragment_id as u64,
)?;
let (client, eventloop) = properties
.common
.build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?;

let qos = properties.common.qos();

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/with_options_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ fn common_mod_path() -> PathBuf {
connector_crate_path().join("src").join("common.rs")
}

fn mqtt_common_mod_path() -> PathBuf {
connector_crate_path().join("src").join("mqtt_common.rs")
}

pub fn generate_with_options_yaml_source() -> String {
generate_with_options_yaml_inner(&source_mod_path())
}
Expand All @@ -63,6 +67,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String {
for entry in walkdir::WalkDir::new(path)
.into_iter()
.chain(walkdir::WalkDir::new(common_mod_path()))
.chain(walkdir::WalkDir::new(mqtt_common_mod_path()))
{
let entry = entry.expect("Failed to read directory entry");
if entry.path().extension() == Some("rs".as_ref()) {
Expand Down

0 comments on commit 09adecb

Please sign in to comment.