From 0d1861c71336fdcd377008a4e0cf5d905fc57957 Mon Sep 17 00:00:00 2001 From: Gio Gutierrez Date: Sun, 3 Mar 2024 11:30:44 -0500 Subject: [PATCH] chore: Fix dylint errors --- integration_tests/mqtt/create_source.sql | 8 ++++---- src/connector/src/sink/mqtt.rs | 5 +++-- src/connector/src/source/mqtt/enumerator/mod.rs | 3 ++- src/connector/src/source/mqtt/source/reader.rs | 3 ++- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/integration_tests/mqtt/create_source.sql b/integration_tests/mqtt/create_source.sql index a586ee0966860..925082841b3e5 100644 --- a/integration_tests/mqtt/create_source.sql +++ b/integration_tests/mqtt/create_source.sql @@ -9,7 +9,8 @@ CREATE TABLE mqtt_source_table WITH ( connector='mqtt', host='mqtt-server', - topic= 'test' + topic= 'test', + qos = '1' ) FORMAT PLAIN ENCODE JSON; @@ -23,7 +24,7 @@ WITH topic= 'test', type = 'append-only', force_append_only='true', - retain = 'true', + retain = 'false', qos = '1' ); @@ -39,5 +40,4 @@ VALUES (7, 'Posey'), (8, 'Waverly'); - -FLUSH; \ No newline at end of file +FLUSH; diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 031e3c77d3819..89626b99ef5b0 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -23,6 +23,7 @@ use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::ConnectionError; use serde_derive::Deserialize; use serde_with::serde_as; +use thiserror_ext::AsReport; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use with_options::WithOptions; @@ -171,11 +172,11 @@ impl MqttSinkWriter { if let ConnectionError::MqttState(rumqttc::v5::StateError::Io(err)) = err { if err.kind() != std::io::ErrorKind::ConnectionAborted { - tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err); + tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err.as_report()); std::thread::sleep(std::time::Duration::from_secs(1)); } } else { - tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err); + tracing::error!("[Sink] Failed to poll mqtt eventloop: {}", err.as_report()); std::thread::sleep(std::time::Duration::from_secs(1)); } } diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 98f3fcec498bb..1a88603cedde2 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use risingwave_common::bail; use rumqttc::v5::{ConnectionError, Event, Incoming}; use rumqttc::Outgoing; +use thiserror_ext::AsReport; use tokio::sync::RwLock; use super::source::MqttSplit; @@ -93,7 +94,7 @@ impl SplitEnumerator for MqttSplitEnumerator { tracing::error!( "[Enumerator] Failed to subscribe to topic {}: {}", topic, - err + err.as_report(), ); connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); cloned_client diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 6872b887ff8ca..41e2b45e8913d 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -18,6 +18,7 @@ use risingwave_common::bail; use rumqttc::v5::mqttbytes::v5::Filter; use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::{ConnectionError, Event, Incoming}; +use thiserror_ext::AsReport; use super::message::MqttMessage; use super::MqttSplit; @@ -105,7 +106,7 @@ impl CommonSplitReader for MqttSplitReader { if let ConnectionError::Timeout(_) = e { continue; } - tracing::error!("[Reader] Failed to poll mqtt eventloop: {}", e); + tracing::error!("[Reader] Failed to poll mqtt eventloop: {}", e.as_report()); client .subscribe_many( splits