Skip to content

Commit

Permalink
No functional changes; (re)ordering imports and fixing clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
svanharmelen committed Nov 10, 2024
1 parent fcc89d8 commit 9797f40
Show file tree
Hide file tree
Showing 21 changed files with 255 additions and 246 deletions.
5 changes: 4 additions & 1 deletion lib/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{path::PathBuf, time::Duration};
use std::path::PathBuf;

use tokio::time::Duration;

use crate::server::prelude::Config;

Expand All @@ -16,6 +18,7 @@ impl ClientBuilder {
}

/// Creates a `ClientBuilder` using a configuration file as the initial state.
#[expect(clippy::result_unit_err)]
pub fn from_config(path: impl Into<PathBuf>) -> Result<ClientBuilder, ()> {
Ok(ClientBuilder {
config: ClientConfig::load(&path.into())?,
Expand Down
26 changes: 14 additions & 12 deletions lib/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use std::{
collections::BTreeMap,
path::{Path, PathBuf},
str::FromStr,
time::Duration,
};

use tokio::time::Duration;

use crate::{
core::config::Config,
crypto::SecurityPolicy,
Expand Down Expand Up @@ -82,17 +83,18 @@ impl ClientUserToken {
);
valid = false;
}
} else {
if self.cert_path.is_none() && self.private_key_path.is_none() {
error!(
"User token {} fails to provide a password or certificate info.",
self.user
);
valid = false;
} else if self.cert_path.is_none() || self.private_key_path.is_none() {
error!("User token {} fails to provide both a certificate path and a private key path.", self.user);
valid = false;
}
} else if self.cert_path.is_none() && self.private_key_path.is_none() {
error!(
"User token {} fails to provide a password or certificate info.",
self.user
);
valid = false;
} else if self.cert_path.is_none() || self.private_key_path.is_none() {
error!(
"User token {} fails to provide both a certificate path and a private key path.",
self.user
);
valid = false;
}
valid
}
Expand Down
3 changes: 2 additions & 1 deletion lib/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@
//!
//! ```no_run
//! use std::sync::Arc;
//! use std::time::Duration;
//!
//! use opcua::client::{ClientBuilder, IdentityToken, Session, DataChangeCallback, MonitoredItem};
//! use opcua::types::{
//! EndpointDescription, MessageSecurityMode, UserTokenPolicy, StatusCode,
//! NodeId, TimestampsToReturn, MonitoredItemCreateRequest, DataValue
//! };
//! use tokio::time::Duration;
//!
//! #[tokio::main]
//! async fn main() {
Expand Down
4 changes: 2 additions & 2 deletions lib/src/client/retry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use tokio::time::Duration;

pub(crate) struct ExponentialBackoff {
max_sleep: Duration,
Expand Down Expand Up @@ -26,7 +26,7 @@ impl Iterator for ExponentialBackoff {
return None;
}

let next_sleep = self.current_sleep.clone();
let next_sleep = self.current_sleep;
self.current_sleep = self.max_sleep.min(self.current_sleep * 2);
self.retry_count += 1;

Expand Down
28 changes: 10 additions & 18 deletions lib/src/client/session/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{path::PathBuf, str::FromStr, sync::Arc};

use chrono::Duration;
use tokio::{pin, select};

use crate::{
client::{
Expand Down Expand Up @@ -165,9 +164,8 @@ impl Client {
let server_endpoints = self
.get_server_endpoints_from_url(server_url)
.await
.map_err(|status_code| {
.inspect_err(|status_code| {
error!("Cannot get endpoints for server, error - {}", status_code);
status_code
})?;

// Find the server endpoint that matches the one desired
Expand All @@ -180,12 +178,11 @@ impl Client {
endpoint.security_mode,
)
.ok_or(StatusCode::BadTcpEndpointUrlInvalid)
.map_err(|status_code| {
.inspect_err(|_| {
error!(
"Cannot find matching endpoint for {}",
endpoint.endpoint_url.as_ref()
);
status_code
})?;

Ok(self
Expand Down Expand Up @@ -509,10 +506,10 @@ impl Client {
let mut evt_loop = channel.connect().await?;

let send_fut = self.get_server_endpoints_inner(&endpoint, &channel);
pin!(send_fut);
tokio::pin!(send_fut);

let res = loop {
select! {
tokio::select! {
r = evt_loop.poll() => {
if let TransportPollResult::Closed(e) = r {
return Err(e);
Expand Down Expand Up @@ -549,12 +546,7 @@ impl Client {
let response = channel.send(request, self.config.request_timeout).await?;
if let SupportedMessage::FindServersResponse(response) = response {
process_service_result(&response.response_header)?;
let servers = if let Some(servers) = response.servers {
servers
} else {
Vec::new()
};
Ok(servers)
Ok(response.servers.unwrap_or_default())
} else {
Err(process_unexpected_response(response))
}
Expand Down Expand Up @@ -588,10 +580,10 @@ impl Client {
let mut evt_loop = channel.connect().await?;

let send_fut = self.find_servers_inner(discovery_endpoint_url, &channel);
pin!(send_fut);
tokio::pin!(send_fut);

let res = loop {
select! {
tokio::select! {
r = evt_loop.poll() => {
if let TransportPollResult::Closed(e) = r {
return Err(e);
Expand Down Expand Up @@ -731,7 +723,7 @@ impl Client {

let Some(endpoint) = endpoints
.iter()
.filter(|e| self.is_supported_endpoint(*e))
.filter(|e| self.is_supported_endpoint(e))
.max_by(|a, b| a.security_level.cmp(&b.security_level))
else {
error!("Cannot find an endpoint that we call register server on");
Expand All @@ -753,10 +745,10 @@ impl Client {
let mut evt_loop = channel.connect().await?;

let send_fut = self.register_server_inner(server, &channel);
pin!(send_fut);
tokio::pin!(send_fut);

let res = loop {
select! {
tokio::select! {
r = evt_loop.poll() => {
if let TransportPollResult::Closed(e) = r {
return Err(e);
Expand Down
6 changes: 2 additions & 4 deletions lib/src/client/session/connect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::sync::Arc;

use tokio::{pin, select};

use crate::{
client::transport::{SecureChannelEventLoop, TransportPollResult},
types::{NodeId, StatusCode},
Expand Down Expand Up @@ -42,10 +40,10 @@ impl SessionConnector {
let mut event_loop = self.inner.channel.connect_no_retry().await?;

let activate_fut = self.ensure_and_activate_session();
pin!(activate_fut);
tokio::pin!(activate_fut);

let res = loop {
select! {
tokio::select! {
r = event_loop.poll() => {
if let TransportPollResult::Closed(c) = r {
return Err(c);
Expand Down
25 changes: 13 additions & 12 deletions lib/src/client/session/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::sync::Arc;

use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use tokio::{
sync::watch,
time::{interval, sleep_until, Duration, Instant, Interval, MissedTickBehavior},
};

use crate::{
client::{
Expand Down Expand Up @@ -57,23 +58,23 @@ enum SessionEventLoopState {
#[must_use = "The session event loop must be started for the session to work"]
pub struct SessionEventLoop {
inner: Arc<Session>,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
retry: SessionRetryPolicy,
keep_alive_interval: Duration,
trigger_publish_rx: watch::Receiver<Instant>,
}

impl SessionEventLoop {
pub(crate) fn new(
inner: Arc<Session>,
retry: SessionRetryPolicy,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
keep_alive_interval: Duration,
trigger_publish_rx: watch::Receiver<Instant>,
) -> Self {
Self {
inner,
retry,
trigger_publish_recv,
keep_alive_interval,
trigger_publish_rx,
}
}

Expand Down Expand Up @@ -183,7 +184,7 @@ impl SessionEventLoop {
))
}
SessionEventLoopState::Connecting(connector, mut backoff, next_try) => {
tokio::time::sleep_until(next_try.into()).await;
sleep_until(next_try).await;

match connector.try_connect().await {
Ok((channel, result)) => {
Expand All @@ -200,7 +201,7 @@ impl SessionEventLoop {
.boxed(),
SubscriptionEventLoop::new(
slf.inner.clone(),
slf.trigger_publish_recv.clone(),
slf.trigger_publish_rx.clone(),
)
.run()
.boxed(),
Expand Down Expand Up @@ -245,13 +246,13 @@ enum SessionTickEvent {
}

struct SessionIntervals {
keep_alive: tokio::time::Interval,
keep_alive: Interval,
}

impl SessionIntervals {
pub fn new(keep_alive_interval: Duration) -> Self {
let mut keep_alive = tokio::time::interval(keep_alive_interval);
keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut keep_alive = interval(keep_alive_interval);
keep_alive.set_missed_tick_behavior(MissedTickBehavior::Skip);

Self { keep_alive }
}
Expand Down
2 changes: 2 additions & 0 deletions lib/src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ mod client;
mod connect;
mod event_loop;
mod services;

#[expect(clippy::module_inception)]
mod session;

/// Information about the server endpoint, security policy, security mode and user identity that the session will
Expand Down
Loading

0 comments on commit 9797f40

Please sign in to comment.