Skip to content

Commit

Permalink
refactor(config): update astarte device config
Browse files Browse the repository at this point in the history
Now the configuration information to connect a device to Astarte are
provided inside a `astarte-device-DEVICE_ID_HERE-conf` directory, in
a `config.json` file. Instead, all the information necessary to generate
samples to be sent to Astarte are provided via CLI or environment
variables.
Also the README have been update to explain how to configure the
application and build/run it.

Signed-off-by: Riccardo Gallo <[email protected]>
  • Loading branch information
rgallor committed Sep 3, 2024
1 parent 316d90d commit 0ca718d
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 63 deletions.
4 changes: 4 additions & 0 deletions .reuse/dep5
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ License: Apache-2.0
Files: interfaces/*
Copyright: 2024 SECO Mind Srl
License: Apache-2.0

Files: astarte-device-*-conf/*
Copyright: 2024 SECO Mind Srl
License: Apache-2.0
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ astarte-device-sdk = "0.8.2"
clap = { version = "=4.4.18", features = ["derive", "env", "string"] }
color-eyre = "0.6.3"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "sync", "macros", "signal"] }
tokio-stream = "0.1.15"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"]}
rand = "0.8.5"
serde_json = "1.0.124"
serde = { version = "1.0.207", features = ["derive"] }
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,61 @@ Copyright 2024 SECO Mind Srl
SPDX-License-Identifier: Apache-2.0
-->

# stream-rust-test
# Astarte Stream Rust Test
==========================

[![ci](https://github.com/astarte-platform/stream-rust-test/actions/workflows/ci.yaml/badge.svg)](https://github.com/astarte-platform/stream-rust-test/actions/workflows/ci.yaml)
[![codecov](https://codecov.io/gh/astarte-platform/stream-rust-test/graph/badge.svg?token=wW2Hsm5edX)](https://codecov.io/gh/astarte-platform/stream-rust-test)

Astarte Rust SDK based data stream test.

Requirements
============

* Astarte Device Rust SDK

Getting started
===============

## Choosing a Device ID

A Base64 url-encoded uuid should be used, you can use [astartectl](https://github.com/astarte-platform/astartectl#installation) to generate one:

```bash
astartectl utils device-id generate-random
```

## Configuring the application

Create a configuration directory `astarte-device-DEVICE_ID_HERE-conf` with a `config.json` file inside it containing:
- `realm`: the name of the Astarte realm.
- `device_id`: the id of the device you want to connect to Astarte.
- `pairing_url`: the URL of the Astarte Pairing endpoint. It should be something like `https://<api url>/pairing`.
- `credentials_secret` or `pairing_token`: the identifiers used to authenticate the device through Astarte. If both are
present, the credential secret will be used.
- `store_directory`: the directory specifying where persistent data will be saved.
- `interfaces_directory`: the directory where the astarte interfaces used by the device are saved.

## Build and run

Build the application using following commands:
```sh
cargo build --release
```

Then run the application either by running the `run.sh` script inside the `scripts` folder or with the CLI:
```sh
./target/release/stream-rust-test [OPTIONS]
```

The full list of options can be shown with the command:
```sh
cargo run -- -h
```

Several options are available:

- `--device` allows to set the device ID;
- `--function` allows to choose the data generation function (one between `sin`, `noisesin`, `randomspikessin`, `saw`, `rect`, `sinc`, `random`, `x` and a default one);
- `--interval` allows to set the sending interval;
- `--scale` allows to scale the generated result;
9 changes: 9 additions & 0 deletions astarte-device-DEVICE_ID_HERE-conf/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"realm": "REALM_HERE",
"device_id": "DEVICE_ID_HERE",
"pairing_url": "PAIRING_URL_HERE",
"credentials_secret": "CREDENTIALS_SECRET_HERE",
"astarte_ignore_ssl": true,
"interfaces_directory": "INTERFACES_DIRECTORY_HERE",
"store_directory": "STORE_DIRECTORY_HERE"
}
7 changes: 0 additions & 7 deletions scripts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ set -eEuo pipefail
export RUST_LOG="debug"

# Set application environment variables
export REALM=""
export DEVICE_ID=""
export CREDENTIALS_SECRET=""
#export PAIRING_TOKEN=""
export PAIRING_URL="http://api.astarte.localhost/pairing"
export STORE_DIR="/tmp/stream-rust-test/store"
export IGNORE_SSL_ERRORS="true"
export MATH_FUNCTION="sin"
export INTERVAL_BTW_SAMPLES=500
export SCALE=3
Expand Down
153 changes: 98 additions & 55 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,61 +10,62 @@ use astarte_device_sdk::transport::mqtt::{Credential, MqttConfig};
use astarte_device_sdk::{Client, DeviceClient, EventLoop};
use clap::Parser;
use color_eyre::eyre;
use color_eyre::eyre::bail;
use color_eyre::eyre::{bail, eyre};
use serde::Deserialize;
use std::path::PathBuf;
use std::time::SystemTime;
use stream_rust_test::math::{BaseValue, MathFunction};
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReadDirStream;
use tokio_stream::StreamExt;
use tracing::error;
use tracing::log::debug;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

/// Astarte device configuration.
#[derive(Debug, Clone, Parser)]
#[clap(version, about)]
pub struct Config {
#[derive(Debug, Clone, Deserialize)]
struct AstarteConfig {
/// Astarte realm
#[clap(long, env = "REALM")]
pub realm: String,
realm: String,
/// Device ID
#[clap(long, env = "DEVICE_ID")]
pub device_id: String,
device_id: String,
/// Device credential secret
#[clap(long, env = "CREDENTIALS_SECRET")]
pub credentials_secret: Option<String>,
#[serde(default)]
credentials_secret: Option<String>,
/// Device pairing token
#[clap(long, env = "PAIRING_TOKEN")]
pub pairing_token: Option<String>,
#[serde(default)]
pairing_token: Option<String>,
/// Astarte pairing url
#[clap(long, env = "PAIRING_URL")]
pub pairing_url: String,
pairing_url: String,
/// Astarte store directory
#[clap(long, env = "STORE_DIR")]
pub store_dir: String,
store_directory: PathBuf,
/// Flag to ignore Astarte SSL errors
#[clap(long, default_value = "true", env = "IGNORE_SSL_ERRORS")]
pub ignore_ssl_errors: bool,
astarte_ignore_ssl: bool,
/// Path to folder containing the Astarte Device interfaces
#[clap(long, default_value = PathBuf::from("interfaces").into_os_string())]
pub interfaces_folder: PathBuf,
interfaces_directory: PathBuf,
}

/// Configuration for the values to be sent to Astarte
#[derive(Debug, Clone, Parser)]
#[clap(version, about)]
struct Config {
/// Math function the device will use to send data to Astarte
#[clap(long, default_value = "default", env = "MATH_FUNCTION")]
pub math_function: MathFunction,
#[clap(short, long, default_value = "default", env = "MATH_FUNCTION")]
math_function: MathFunction,
/// Interface name to send data to
#[clap(
long,
default_value = "org.astarte-platform.genericsensors.Values",
env = "INTERFACE_NAME"
)]
pub interface_datastream_do: String,
interface_datastream_do: String,
/// Milliseconds the device must wait before sending data to Astarte
#[clap(long, default_value = "1000", env = "INTERVAL_BTW_SAMPLES")]
pub interval_btw_samples: u64,
#[clap(short, long, default_value = "1000", env = "INTERVAL_BTW_SAMPLES")]
interval_btw_samples: u64,
/// Scale for the generation of the data to send
#[clap(long, default_value = "1.0", env = "SCALE")]
pub scale: f64,
#[clap(short, long, default_value = "1.0", env = "SCALE")]
scale: f64,
}

#[tokio::main]
Expand All @@ -79,33 +80,43 @@ async fn main() -> eyre::Result<()> {
// time instant when the program starts its execution
let now = SystemTime::now();

// initialize configuration options
let cfg = Config::parse();
// initialize CLI configuration options
let cli_cfg = Config::parse();

debug!("Parsed config: {:#?}", cli_cfg);

debug!("Parsed config: {:#?}", cfg);
// Load astarte configuration
let astarte_cfg: AstarteConfig = load_astarte_cfg().await?;

debug!("Parsed Astarte config: {:#?}", astarte_cfg);

// define type of credential (pairing token or credential secret) to use to establish an MQTT
// connection with Astarte
let cred = if let Some(pairing) = cfg.pairing_token.as_deref() {
let cred = if let Some(pairing) = astarte_cfg.pairing_token.as_deref() {
Credential::paring_token(pairing)
} else if let Some(secret) = cfg.credentials_secret.as_deref() {
} else if let Some(secret) = astarte_cfg.credentials_secret.as_deref() {
Credential::secret(secret)
} else {
bail!("missing credential secret or pairing token");
};

// define MQTT configuration options
let mut mqtt_config = MqttConfig::new(cfg.realm, cfg.device_id, cred, cfg.pairing_url);

if cfg.ignore_ssl_errors {
let mut mqtt_config = MqttConfig::new(
astarte_cfg.realm,
astarte_cfg.device_id,
cred,
astarte_cfg.pairing_url,
);

if astarte_cfg.astarte_ignore_ssl {
mqtt_config.ignore_ssl_errors();
}

// connect to Astarte
let (client, mut connection) = DeviceBuilder::new()
.store_dir(cfg.store_dir)
.store_dir(astarte_cfg.store_directory.as_path())
.await?
.interface_directory(cfg.interfaces_folder.as_path())?
.interface_directory(astarte_cfg.interfaces_directory.as_path())?
.connect(mqtt_config)
.await?
.build();
Expand All @@ -116,14 +127,7 @@ async fn main() -> eyre::Result<()> {
tasks.spawn(async move { connection.handle_events().await.map_err(Into::into) });

// spawn task to send data to Astarte
tasks.spawn(send_data(
client,
now,
cfg.math_function,
cfg.interface_datastream_do,
cfg.interval_btw_samples,
cfg.scale,
));
tasks.spawn(send_data(client, now, cli_cfg));

// handle tasks termination
while let Some(res) = tasks.join_next().await {
Expand All @@ -144,25 +148,64 @@ async fn main() -> eyre::Result<()> {
Ok(())
}

async fn load_astarte_cfg() -> eyre::Result<AstarteConfig> {
// search the astarte-device-DEVICE_ID_HERE-conf with the DEVICE_ID specified by the user
// starting from the root of the project
let dirs = tokio::fs::read_dir(".").await?;
let dirs_stream = ReadDirStream::new(dirs);

let mut dirs = dirs_stream
.filter_map(|res| res.ok().map(|e| e.path()))
.filter(|path| {
if !path.is_dir() {
return false;
}

let name = path
.file_name()
.expect("failed to retrieve the folder name")
.to_string_lossy();

// true if the folder name starts and ends with the predefined values
name.starts_with("astarte-device-") && name.ends_with("-conf")
})
.collect::<Vec<_>>()
.await;

// if more folders are present, take only the first one
let Some(dir) = dirs.first_mut() else {
return Err(eyre!("No astarte devices config folder found"));
};

dir.push("config.json");

let file = tokio::fs::read_to_string(dir).await?;

// retrieve the astarte config information
let astarte_cfg: AstarteConfig = serde_json::from_str(&file)?;

Ok(astarte_cfg)
}

/// Send data to Astarte
async fn send_data(
client: DeviceClient<SqliteStore>,
now: SystemTime,
math_function: MathFunction,
interface_datastream_do: String,
interval_btw_samples: u64,
scale: f64,
cfg: Config,
) -> eyre::Result<()> {
let mut base_value = BaseValue::try_from_system_time(now, scale)?;
let mut base_value = BaseValue::try_from_system_time(now, cfg.scale)?;

debug!("sending data to Astarte with {math_function} math function");
debug!(
"sending data to Astarte with {} math function",
cfg.math_function
);

loop {
// Send data to Astarte
let value = math_function.compute(base_value.value());
let value = cfg.math_function.compute(base_value.value());

client
.send(&interface_datastream_do, "/test/value", value)
.send(&cfg.interface_datastream_do, "/test/value", value)
.await?;

debug!("data sent on endpoint /test/value, content: {value}");
Expand All @@ -171,6 +214,6 @@ async fn send_data(
base_value.update();

// Sleep interval secs
tokio::time::sleep(std::time::Duration::from_millis(interval_btw_samples)).await;
tokio::time::sleep(std::time::Duration::from_millis(cfg.interval_btw_samples)).await;
}
}

0 comments on commit 0ca718d

Please sign in to comment.