Skip to content

Commit

Permalink
refactor(con): change retrieval of Astarte config
Browse files Browse the repository at this point in the history
Now it is only possible to specify the type of connection (mqtt or grpc)
through the ASTARTE_CONNECTION env variable. The application will first
try to retrieve the config from env; if an error occurs, it tries to
retrieve them from a config.toml (if a path has beem specified). The
merge operation for the 2 sources of config has been removed.

Signed-off-by: Riccardo Gallo <[email protected]>
  • Loading branch information
rgallor committed Nov 26, 2024
1 parent ba6c64d commit 6db96f7
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 190 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ clap = { version = "=4.4.18", features = ["derive", "env", "string"] }
color-eyre = "0.6.3"
toml = "0.8.12"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] }
tokio-stream = "0.1.15"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"]}
rand = "0.8.5"
Expand Down
248 changes: 75 additions & 173 deletions src/astarte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use astarte_device_sdk::transport::mqtt::{Credential, Mqtt, MqttConfig};
use astarte_device_sdk::{Client, DeviceClient, DeviceConnection};
use clap::ValueEnum;
use color_eyre::eyre;
use color_eyre::eyre::{OptionExt, WrapErr};
use color_eyre::eyre::{eyre, OptionExt, WrapErr};
use serde::Deserialize;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
Expand Down Expand Up @@ -60,78 +60,67 @@ pub struct ConnectionConfigBuilder {
store_directory: Option<PathBuf>,
/// Astarte Device SDK config options
#[serde(rename = "mqtt", default)]
mqtt_config: MqttConfigBuilder,
/// Astarte Message Hub config options
mqtt_config: Option<MqttConfigBuilder>,
/// Astarte Message Hub endpoint
#[serde(rename = "grpc", default)]
grpc_config: GrpcConfigBuilder,
grpc_config: Option<GrpcConfigBuilder>,
}

impl ConnectionConfigBuilder {
/// Builder constructor
/// Init astarte config from env var if they have been set
///
/// Specify if the builder should use the Astarte Device SDK or the Astarte Message Hub
pub fn with_connection(astarte_connection: Option<AstarteConnection>) -> Self {
Self {
astarte_connection,
store_directory: None,
mqtt_config: MqttConfigBuilder::default(),
grpc_config: GrpcConfigBuilder::default(),
}
}
/// If an error is returned, it means that one or more environment variables have not been set
pub fn try_from_env(&mut self) -> eyre::Result<()> {
let con = env::var("ASTARTE_CONNECTION")
.map(|s| AstarteConnection::from_str(&s, true))?
.map_err(|err| eyre!(err))?;

Check warning on line 76 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L73-L76

Added lines #L73 - L76 were not covered by tests

/// Init astarte config from env var if they have been set
pub fn from_env(&mut self) {
// doesn't change it if it's been set from CLI
if self.astarte_connection.is_none() {
self.astarte_connection = env::var("ASTARTE_CONNECTION")
.ok()
.map(|s| AstarteConnection::from_str(&s, true))
.transpose()
.ok()
.unwrap_or_default();
}
self.store_directory = Some(env::var("ASTARTE_STORE_DIRECTORY").map(PathBuf::from)?);

Check warning on line 78 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L78

Added line #L78 was not covered by tests

self.store_directory = env::var("ASTARTE_STORE_DIRECTORY").ok().map(PathBuf::from);

// update the mqtt config info
let device_id = env::var("ASTARTE_DEVICE_ID").ok();
let realm = env::var("ASTARTE_REALM").ok();
let pairing_url = env::var("ASTARTE_PAIRING_URL").ok();
let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS")
.map(|s| s.parse().unwrap_or_default())
.ok();
let credential = env::var("ASTARTE_CREDENTIALS_SECRET")
.ok()
.map(Credential::secret)
.or_else(|| {
env::var("ASTARTE_PAIRING_TOKEN")
.ok()
.map(Credential::paring_token)
});

self.mqtt_config = MqttConfigBuilder {
device_id,
realm,
credential,
pairing_url,
astarte_ignore_ssl,
};
match con {
AstarteConnection::Mqtt => {
self.astarte_connection = Some(con);

Check warning on line 82 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L82

Added line #L82 was not covered by tests

// update the mqtt config info
let device_id = env::var("ASTARTE_DEVICE_ID")?;
let realm = env::var("ASTARTE_REALM")?;
let pairing_url = env::var("ASTARTE_PAIRING_URL")?;
let astarte_ignore_ssl = env::var("ASTARTE_IGNORE_SSL_ERRORS")
.map(|s| s.parse::<bool>().unwrap_or_default())?;
let credential = env::var("ASTARTE_CREDENTIALS_SECRET")

Check warning on line 90 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L85-L90

Added lines #L85 - L90 were not covered by tests
.map(Credential::secret)
.or_else(|_| env::var("ASTARTE_PAIRING_TOKEN").map(Credential::paring_token))?;

Check warning on line 92 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L92

Added line #L92 was not covered by tests

self.mqtt_config = Some(MqttConfigBuilder {
device_id,
realm,
credential,
pairing_url,
ignore_ssl_errors: astarte_ignore_ssl,

Check warning on line 99 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L94-L99

Added lines #L94 - L99 were not covered by tests
});
}
AstarteConnection::Grpc => {
self.astarte_connection = Some(con);

Check warning on line 103 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L103

Added line #L103 was not covered by tests

// update the mqtt config info
let endpoint = env::var("ASTARTE_MSGHUB_ENDPOINT").ok();
let endpoint = env::var("ASTARTE_MSGHUB_ENDPOINT")?;

Check warning on line 105 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L105

Added line #L105 was not covered by tests

self.grpc_config = GrpcConfigBuilder { endpoint };
// update the grpc config info
self.grpc_config = Some(GrpcConfigBuilder { endpoint });

Check warning on line 108 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L108

Added line #L108 was not covered by tests
}
}

Ok(())

Check warning on line 112 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L112

Added line #L112 was not covered by tests
}

/// Update the missing config values taking them from a config.toml file
pub async fn update_with_toml(&mut self, path: impl AsRef<Path>) {
pub async fn from_toml(&mut self, path: impl AsRef<Path>) {
match tokio::fs::read_to_string(&path).await {
Ok(file) => {

Check warning on line 118 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L116-L118

Added lines #L116 - L118 were not covered by tests
// retrieve the astarte config information from the config.toml file
match toml::from_str::<ConfigToml>(&file) {
Ok(toml_cfg) => {

Check warning on line 121 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L120-L121

Added lines #L120 - L121 were not covered by tests
// update the configs
self.merge(toml_cfg.astarte);
*self = toml_cfg.astarte;

Check warning on line 123 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L123

Added line #L123 was not covered by tests
}
Err(err) => {
error!("error deserializing astarte cfg from toml: {err}");

Check warning on line 126 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L125-L126

Added lines #L125 - L126 were not covered by tests
Expand All @@ -147,41 +136,6 @@ impl ConnectionConfigBuilder {
}
}

/// Merge two configs
///
/// Prioritize the already existing fields
fn merge(&mut self, other: ConnectionConfigBuilder) {
// doesn't change it if it's been set from CLI or from ENV
if self.astarte_connection.is_none() {
self.astarte_connection = other.astarte_connection;
}

self.store_directory = self.store_directory.take().or(other.store_directory);

// update the mqtt config info
let mqtt_config = &mut self.mqtt_config;

mqtt_config.device_id = mqtt_config.device_id.take().or(other.mqtt_config.device_id);
mqtt_config.realm = mqtt_config.realm.take().or(other.mqtt_config.realm);
mqtt_config.credential = mqtt_config
.credential
.take()
.or(other.mqtt_config.credential);
mqtt_config.pairing_url = mqtt_config
.pairing_url
.take()
.or(other.mqtt_config.pairing_url);
mqtt_config.astarte_ignore_ssl = mqtt_config
.astarte_ignore_ssl
.take()
.or(other.mqtt_config.astarte_ignore_ssl);

// update the grpc config info
let grpc_config = &mut self.grpc_config;

grpc_config.endpoint = grpc_config.endpoint.take().or(other.grpc_config.endpoint);
}

/// Build a complete Astarte configuration or return an error
pub async fn build(self) -> eyre::Result<(DeviceClient<SqliteStore>, SdkConnection)> {
let astarte_connection = self
Expand All @@ -198,7 +152,8 @@ impl ConnectionConfigBuilder {
match astarte_connection {
AstarteConnection::Mqtt => {
// define MQTT configuration options
let mqtt_cfg = self.mqtt_config.build()?;
let mqtt_cfg: MqttConfig =

Check warning on line 155 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L155

Added line #L155 was not covered by tests
self.mqtt_config.ok_or_eyre("invalid mqtt config")?.into();
debug!("parsed Astarte Device Sdk config: {:#?}", mqtt_cfg);

Check warning on line 157 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L157

Added line #L157 was not covered by tests

// connect to Astarte
Expand All @@ -207,7 +162,11 @@ impl ConnectionConfigBuilder {
Ok((client, SdkConnection::Mqtt(connection)))

Check warning on line 162 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L162

Added line #L162 was not covered by tests
}
AstarteConnection::Grpc => {
let grpc_cfg = self.grpc_config.build()?;
let grpc_endpoint = self.grpc_config.ok_or_eyre("invalid grpc config")?.endpoint;

Check warning on line 165 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L165

Added line #L165 was not covered by tests

let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, grpc_endpoint)

Check warning on line 167 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L167

Added line #L167 was not covered by tests
.wrap_err("failed to create a gRPC config")?;

debug!("parsed Astarte Message Hub config: {:#?}", grpc_cfg);

Check warning on line 170 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L170

Added line #L170 was not covered by tests

let (client, connection) = builder.connect(grpc_cfg).await?.build().await;

Check warning on line 172 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L172

Added line #L172 was not covered by tests
Expand All @@ -227,58 +186,46 @@ pub enum SdkConnection {
}

/// Config for an MQTT connection to Astarte
#[derive(Debug, Default, Deserialize)]
///
/// The struct isn't really necessary, nevertheless we cannot deserialize the entire [MqttConfig]
/// struct without having the fields `keepalive`, `conn_timeout` and `bounded_channel_size`.
#[derive(Debug, Deserialize)]
struct MqttConfigBuilder {
/// Device ID
device_id: Option<String>,
device_id: String,
/// Astarte realm
realm: Option<String>,
realm: String,
/// Device credential
#[serde(flatten)]
credential: Option<Credential>,
credential: Credential,
/// Astarte pairing url
pairing_url: Option<String>,
pairing_url: String,
/// Flag to ignore Astarte SSL errors
astarte_ignore_ssl: Option<bool>,
ignore_ssl_errors: bool,
}

impl MqttConfigBuilder {
fn build(self) -> eyre::Result<MqttConfig> {
let device_id = self.device_id.ok_or_eyre("missing device id")?;
let realm = self.realm.ok_or_eyre("missing realm")?;
let credential = self
.credential
.ok_or_eyre("missing either a credential secret or a pairing token")?;
let pairing_url = self.pairing_url.ok_or_eyre("missing pairing url")?;
// if missing, set the ignore ssl error flat to false
let astarte_ignore_ssl = self.astarte_ignore_ssl.unwrap_or_default();

let mut mqtt_cfg = MqttConfig::new(realm, device_id, credential, pairing_url);

if astarte_ignore_ssl {
mqtt_cfg.ignore_ssl_errors();
impl From<MqttConfigBuilder> for MqttConfig {
fn from(value: MqttConfigBuilder) -> Self {

Check warning on line 208 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L208

Added line #L208 was not covered by tests
let mut cfg = MqttConfig::new(
value.realm,
value.device_id,
value.credential,
value.pairing_url,

Check warning on line 213 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L210-L213

Added lines #L210 - L213 were not covered by tests
);

if value.ignore_ssl_errors {
cfg.ignore_ssl_errors();

Check warning on line 217 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L216-L217

Added lines #L216 - L217 were not covered by tests
}

Ok(mqtt_cfg)
cfg

Check warning on line 220 in src/astarte.rs

View check run for this annotation

Codecov / codecov/patch

src/astarte.rs#L220

Added line #L220 was not covered by tests
}
}

/// Config for a gRPC connection to an Astarte Message Hub instance
#[derive(Debug, Default, Deserialize)]
struct GrpcConfigBuilder {
/// The Endpoint of the Astarte Message Hub
endpoint: Option<String>,
}

impl GrpcConfigBuilder {
fn build(self) -> eyre::Result<GrpcConfig> {
let endpoint = self.endpoint.ok_or_eyre("missing endpoint")?;

let grpc_cfg = GrpcConfig::from_url(STREAM_RUST_TEST_NODE_UUID, endpoint)
.wrap_err("failed to create a gRPC config")?;

Ok(grpc_cfg)
}
endpoint: String,
}

/// Send data to Astarte
Expand Down Expand Up @@ -319,39 +266,10 @@ mod test {
#[tokio::test]
async fn test_connection_config_builder_build_failures() {
// empty config builder cannot build successfully
let cfg_builder = ConnectionConfigBuilder::with_connection(None).build().await;
let cfg_builder = ConnectionConfigBuilder::default().build().await;
assert!(cfg_builder.is_err());

// config builder with only the connection to astarte specified cannot build successfully
let con = AstarteConnection::Mqtt;
let cfg_builder = ConnectionConfigBuilder::with_connection(Some(con))
.build()
.await;
assert!(cfg_builder.is_err());

// check that only the astarte connection is added to the configuration
let mut cfg_builder = ConnectionConfigBuilder::with_connection(None);
let toml_str = r#"
[astarte]
connection = "mqtt"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert!(cfg_builder.astarte_connection.is_some());

// check that the astarte connection is not updated with toml info if it already contains a
// value
let con = AstarteConnection::Mqtt;
let mut cfg_builder = ConnectionConfigBuilder::with_connection(Some(con));
let toml_str = r#"
[astarte]
connection = "grpc"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert_eq!(cfg_builder.astarte_connection, Some(con));

// define store dire for the next tests
// define store dir for the next tests
let mut tmp_dir = env::temp_dir();
tmp_dir.push("stream-rust-test-tests");
std::fs::create_dir_all(&tmp_dir).expect("failed to create store dir");
Expand All @@ -365,21 +283,5 @@ mod test {
};
let res = cfg_builder.build().await;
assert!(res.is_err());

// check that the store path is not updated with toml info if it already contains a value
let mut cfg_builder = ConnectionConfigBuilder {
astarte_connection: Some(AstarteConnection::Mqtt),
store_directory: Some(tmp_dir.clone()),
mqtt_config: Default::default(),
grpc_config: Default::default(),
};
let toml_str = r#"
[astarte]
connection = "grpc"
store_directory = "/tmp/stream-rust-test/store/"
"#;
let toml = toml::from_str::<ConfigToml>(toml_str).unwrap();
cfg_builder.merge(toml.astarte);
assert_eq!(cfg_builder.store_directory, Some(tmp_dir.clone()));
}
}
4 changes: 0 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

//! CLI configuration options

use crate::astarte::AstarteConnection;
use crate::math::MathFunction;
use clap::Parser;
use std::path::PathBuf;
Expand All @@ -15,9 +14,6 @@ use std::path::PathBuf;
#[derive(Debug, Clone, Parser)]
#[clap(version, about)]
pub struct Config {
/// Either use the Astarte Device SDK or the Astarte Message Hub
#[clap(short = 'c', long, env = "ASTARTE_CONNECTION")]
pub astarte_connection: Option<AstarteConnection>,
/// Path to the directory containing the Astarte configuration file config.toml
///
/// First, the Astarte configuration is taken from ENV vars, then from the config.toml if the
Expand Down
Loading

0 comments on commit 6db96f7

Please sign in to comment.