From 9797f400ff74af54296e02bdb1d1566d7ec2dc54 Mon Sep 17 00:00:00 2001 From: Sander van Harmelen Date: Wed, 6 Nov 2024 13:19:42 +0100 Subject: [PATCH] No functional changes; (re)ordering imports and fixing clippy warnings --- lib/src/client/builder.rs | 5 +- lib/src/client/config.rs | 26 ++--- lib/src/client/mod.rs | 3 +- lib/src/client/retry.rs | 4 +- lib/src/client/session/client.rs | 28 ++---- lib/src/client/session/connect.rs | 6 +- lib/src/client/session/event_loop.rs | 25 ++--- lib/src/client/session/mod.rs | 2 + lib/src/client/session/services/attributes.rs | 61 +++++------- lib/src/client/session/services/session.rs | 26 +++-- .../services/subscriptions/event_loop.rs | 37 ++++---- .../session/services/subscriptions/mod.rs | 21 ++-- .../session/services/subscriptions/service.rs | 15 ++- .../session/services/subscriptions/state.rs | 20 ++-- lib/src/client/session/session.rs | 36 ++++--- lib/src/client/transport/buffer.rs | 2 +- lib/src/client/transport/channel.rs | 95 ++++++++++--------- lib/src/client/transport/core.rs | 40 ++++---- lib/src/client/transport/state.rs | 12 ++- lib/src/client/transport/tcp.rs | 35 ++++--- lib/src/crypto/security_policy.rs | 2 +- 21 files changed, 255 insertions(+), 246 deletions(-) diff --git a/lib/src/client/builder.rs b/lib/src/client/builder.rs index e802f15c1..4ec91baac 100644 --- a/lib/src/client/builder.rs +++ b/lib/src/client/builder.rs @@ -1,4 +1,6 @@ -use std::{path::PathBuf, time::Duration}; +use std::path::PathBuf; + +use tokio::time::Duration; use crate::server::prelude::Config; @@ -16,6 +18,7 @@ impl ClientBuilder { } /// Creates a `ClientBuilder` using a configuration file as the initial state. + #[expect(clippy::result_unit_err)] pub fn from_config(path: impl Into) -> Result { Ok(ClientBuilder { config: ClientConfig::load(&path.into())?, diff --git a/lib/src/client/config.rs b/lib/src/client/config.rs index 33dccc03f..696c94c26 100644 --- a/lib/src/client/config.rs +++ b/lib/src/client/config.rs @@ -9,9 +9,10 @@ use std::{ collections::BTreeMap, path::{Path, PathBuf}, str::FromStr, - time::Duration, }; +use tokio::time::Duration; + use crate::{ core::config::Config, crypto::SecurityPolicy, @@ -82,17 +83,18 @@ impl ClientUserToken { ); valid = false; } - } else { - if self.cert_path.is_none() && self.private_key_path.is_none() { - error!( - "User token {} fails to provide a password or certificate info.", - self.user - ); - valid = false; - } else if self.cert_path.is_none() || self.private_key_path.is_none() { - error!("User token {} fails to provide both a certificate path and a private key path.", self.user); - valid = false; - } + } else if self.cert_path.is_none() && self.private_key_path.is_none() { + error!( + "User token {} fails to provide a password or certificate info.", + self.user + ); + valid = false; + } else if self.cert_path.is_none() || self.private_key_path.is_none() { + error!( + "User token {} fails to provide both a certificate path and a private key path.", + self.user + ); + valid = false; } valid } diff --git a/lib/src/client/mod.rs b/lib/src/client/mod.rs index 517f84de7..dd731df95 100644 --- a/lib/src/client/mod.rs +++ b/lib/src/client/mod.rs @@ -37,12 +37,13 @@ //! //! ```no_run //! use std::sync::Arc; -//! use std::time::Duration; +//! //! use opcua::client::{ClientBuilder, IdentityToken, Session, DataChangeCallback, MonitoredItem}; //! use opcua::types::{ //! EndpointDescription, MessageSecurityMode, UserTokenPolicy, StatusCode, //! NodeId, TimestampsToReturn, MonitoredItemCreateRequest, DataValue //! }; +//! use tokio::time::Duration; //! //! #[tokio::main] //! async fn main() { diff --git a/lib/src/client/retry.rs b/lib/src/client/retry.rs index ebb9fbe77..d09457b6e 100644 --- a/lib/src/client/retry.rs +++ b/lib/src/client/retry.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use tokio::time::Duration; pub(crate) struct ExponentialBackoff { max_sleep: Duration, @@ -26,7 +26,7 @@ impl Iterator for ExponentialBackoff { return None; } - let next_sleep = self.current_sleep.clone(); + let next_sleep = self.current_sleep; self.current_sleep = self.max_sleep.min(self.current_sleep * 2); self.retry_count += 1; diff --git a/lib/src/client/session/client.rs b/lib/src/client/session/client.rs index fd11b279f..837dfbd3b 100644 --- a/lib/src/client/session/client.rs +++ b/lib/src/client/session/client.rs @@ -1,7 +1,6 @@ use std::{path::PathBuf, str::FromStr, sync::Arc}; use chrono::Duration; -use tokio::{pin, select}; use crate::{ client::{ @@ -165,9 +164,8 @@ impl Client { let server_endpoints = self .get_server_endpoints_from_url(server_url) .await - .map_err(|status_code| { + .inspect_err(|status_code| { error!("Cannot get endpoints for server, error - {}", status_code); - status_code })?; // Find the server endpoint that matches the one desired @@ -180,12 +178,11 @@ impl Client { endpoint.security_mode, ) .ok_or(StatusCode::BadTcpEndpointUrlInvalid) - .map_err(|status_code| { + .inspect_err(|_| { error!( "Cannot find matching endpoint for {}", endpoint.endpoint_url.as_ref() ); - status_code })?; Ok(self @@ -509,10 +506,10 @@ impl Client { let mut evt_loop = channel.connect().await?; let send_fut = self.get_server_endpoints_inner(&endpoint, &channel); - pin!(send_fut); + tokio::pin!(send_fut); let res = loop { - select! { + tokio::select! { r = evt_loop.poll() => { if let TransportPollResult::Closed(e) = r { return Err(e); @@ -549,12 +546,7 @@ impl Client { let response = channel.send(request, self.config.request_timeout).await?; if let SupportedMessage::FindServersResponse(response) = response { process_service_result(&response.response_header)?; - let servers = if let Some(servers) = response.servers { - servers - } else { - Vec::new() - }; - Ok(servers) + Ok(response.servers.unwrap_or_default()) } else { Err(process_unexpected_response(response)) } @@ -588,10 +580,10 @@ impl Client { let mut evt_loop = channel.connect().await?; let send_fut = self.find_servers_inner(discovery_endpoint_url, &channel); - pin!(send_fut); + tokio::pin!(send_fut); let res = loop { - select! { + tokio::select! { r = evt_loop.poll() => { if let TransportPollResult::Closed(e) = r { return Err(e); @@ -731,7 +723,7 @@ impl Client { let Some(endpoint) = endpoints .iter() - .filter(|e| self.is_supported_endpoint(*e)) + .filter(|e| self.is_supported_endpoint(e)) .max_by(|a, b| a.security_level.cmp(&b.security_level)) else { error!("Cannot find an endpoint that we call register server on"); @@ -753,10 +745,10 @@ impl Client { let mut evt_loop = channel.connect().await?; let send_fut = self.register_server_inner(server, &channel); - pin!(send_fut); + tokio::pin!(send_fut); let res = loop { - select! { + tokio::select! { r = evt_loop.poll() => { if let TransportPollResult::Closed(e) = r { return Err(e); diff --git a/lib/src/client/session/connect.rs b/lib/src/client/session/connect.rs index 1ea509840..cb9f1e4f9 100644 --- a/lib/src/client/session/connect.rs +++ b/lib/src/client/session/connect.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use tokio::{pin, select}; - use crate::{ client::transport::{SecureChannelEventLoop, TransportPollResult}, types::{NodeId, StatusCode}, @@ -42,10 +40,10 @@ impl SessionConnector { let mut event_loop = self.inner.channel.connect_no_retry().await?; let activate_fut = self.ensure_and_activate_session(); - pin!(activate_fut); + tokio::pin!(activate_fut); let res = loop { - select! { + tokio::select! { r = event_loop.poll() => { if let TransportPollResult::Closed(c) = r { return Err(c); diff --git a/lib/src/client/session/event_loop.rs b/lib/src/client/session/event_loop.rs index 75b1f3065..09f59f33c 100644 --- a/lib/src/client/session/event_loop.rs +++ b/lib/src/client/session/event_loop.rs @@ -1,9 +1,10 @@ -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::sync::Arc; use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; +use tokio::{ + sync::watch, + time::{interval, sleep_until, Duration, Instant, Interval, MissedTickBehavior}, +}; use crate::{ client::{ @@ -57,23 +58,23 @@ enum SessionEventLoopState { #[must_use = "The session event loop must be started for the session to work"] pub struct SessionEventLoop { inner: Arc, - trigger_publish_recv: tokio::sync::watch::Receiver, retry: SessionRetryPolicy, keep_alive_interval: Duration, + trigger_publish_rx: watch::Receiver, } impl SessionEventLoop { pub(crate) fn new( inner: Arc, retry: SessionRetryPolicy, - trigger_publish_recv: tokio::sync::watch::Receiver, keep_alive_interval: Duration, + trigger_publish_rx: watch::Receiver, ) -> Self { Self { inner, retry, - trigger_publish_recv, keep_alive_interval, + trigger_publish_rx, } } @@ -183,7 +184,7 @@ impl SessionEventLoop { )) } SessionEventLoopState::Connecting(connector, mut backoff, next_try) => { - tokio::time::sleep_until(next_try.into()).await; + sleep_until(next_try).await; match connector.try_connect().await { Ok((channel, result)) => { @@ -200,7 +201,7 @@ impl SessionEventLoop { .boxed(), SubscriptionEventLoop::new( slf.inner.clone(), - slf.trigger_publish_recv.clone(), + slf.trigger_publish_rx.clone(), ) .run() .boxed(), @@ -245,13 +246,13 @@ enum SessionTickEvent { } struct SessionIntervals { - keep_alive: tokio::time::Interval, + keep_alive: Interval, } impl SessionIntervals { pub fn new(keep_alive_interval: Duration) -> Self { - let mut keep_alive = tokio::time::interval(keep_alive_interval); - keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut keep_alive = interval(keep_alive_interval); + keep_alive.set_missed_tick_behavior(MissedTickBehavior::Skip); Self { keep_alive } } diff --git a/lib/src/client/session/mod.rs b/lib/src/client/session/mod.rs index d539c7ad7..d882d38d3 100644 --- a/lib/src/client/session/mod.rs +++ b/lib/src/client/session/mod.rs @@ -2,6 +2,8 @@ mod client; mod connect; mod event_loop; mod services; + +#[expect(clippy::module_inception)] mod session; /// Information about the server endpoint, security policy, security mode and user identity that the session will diff --git a/lib/src/client/session/services/attributes.rs b/lib/src/client/session/services/attributes.rs index 42a1688f7..ffc07ce64 100644 --- a/lib/src/client/session/services/attributes.rs +++ b/lib/src/client/session/services/attributes.rs @@ -18,25 +18,25 @@ use crate::{ /// Enumeration used with Session::history_read() pub enum HistoryReadAction { - ReadEventDetails(ReadEventDetails), - ReadRawModifiedDetails(ReadRawModifiedDetails), - ReadProcessedDetails(ReadProcessedDetails), - ReadAtTimeDetails(ReadAtTimeDetails), + Event(ReadEventDetails), + RawModified(ReadRawModifiedDetails), + Processed(ReadProcessedDetails), + ReadAtTime(ReadAtTimeDetails), } impl From for ExtensionObject { fn from(action: HistoryReadAction) -> Self { match action { - HistoryReadAction::ReadEventDetails(v) => { + HistoryReadAction::Event(v) => { Self::from_encodable(ObjectId::ReadEventDetails_Encoding_DefaultBinary, &v) } - HistoryReadAction::ReadRawModifiedDetails(v) => { + HistoryReadAction::RawModified(v) => { Self::from_encodable(ObjectId::ReadRawModifiedDetails_Encoding_DefaultBinary, &v) } - HistoryReadAction::ReadProcessedDetails(v) => { + HistoryReadAction::Processed(v) => { Self::from_encodable(ObjectId::ReadProcessedDetails_Encoding_DefaultBinary, &v) } - HistoryReadAction::ReadAtTimeDetails(v) => { + HistoryReadAction::ReadAtTime(v) => { Self::from_encodable(ObjectId::ReadAtTimeDetails_Encoding_DefaultBinary, &v) } } @@ -45,34 +45,34 @@ impl From for ExtensionObject { /// Enumeration used with Session::history_update() pub enum HistoryUpdateAction { - UpdateDataDetails(UpdateDataDetails), - UpdateStructureDataDetails(UpdateStructureDataDetails), - UpdateEventDetails(UpdateEventDetails), - DeleteRawModifiedDetails(DeleteRawModifiedDetails), - DeleteAtTimeDetails(DeleteAtTimeDetails), - DeleteEventDetails(DeleteEventDetails), + UpdateData(UpdateDataDetails), + UpdateStructureData(UpdateStructureDataDetails), + UpdateEvent(UpdateEventDetails), + DeleteRawModified(DeleteRawModifiedDetails), + DeleteAtTime(DeleteAtTimeDetails), + DeleteEvent(DeleteEventDetails), } impl From<&HistoryUpdateAction> for ExtensionObject { fn from(action: &HistoryUpdateAction) -> Self { match action { - HistoryUpdateAction::UpdateDataDetails(v) => { + HistoryUpdateAction::UpdateData(v) => { Self::from_encodable(ObjectId::UpdateDataDetails_Encoding_DefaultBinary, v) } - HistoryUpdateAction::UpdateStructureDataDetails(v) => Self::from_encodable( + HistoryUpdateAction::UpdateStructureData(v) => Self::from_encodable( ObjectId::UpdateStructureDataDetails_Encoding_DefaultBinary, v, ), - HistoryUpdateAction::UpdateEventDetails(v) => { + HistoryUpdateAction::UpdateEvent(v) => { Self::from_encodable(ObjectId::UpdateEventDetails_Encoding_DefaultBinary, v) } - HistoryUpdateAction::DeleteRawModifiedDetails(v) => { + HistoryUpdateAction::DeleteRawModified(v) => { Self::from_encodable(ObjectId::DeleteRawModifiedDetails_Encoding_DefaultBinary, v) } - HistoryUpdateAction::DeleteAtTimeDetails(v) => { + HistoryUpdateAction::DeleteAtTime(v) => { Self::from_encodable(ObjectId::DeleteAtTimeDetails_Encoding_DefaultBinary, v) } - HistoryUpdateAction::DeleteEventDetails(v) => { + HistoryUpdateAction::DeleteEvent(v) => { Self::from_encodable(ObjectId::DeleteEventDetails_Encoding_DefaultBinary, v) } } @@ -120,12 +120,7 @@ impl Session { if let SupportedMessage::ReadResponse(response) = response { session_debug!(self, "read(), success"); process_service_result(&response.response_header)?; - let results = if let Some(results) = response.results { - results - } else { - Vec::new() - }; - Ok(results) + Ok(response.results.unwrap_or_default()) } else { session_error!(self, "read() value failed"); Err(process_unexpected_response(response)) @@ -184,12 +179,7 @@ impl Session { if let SupportedMessage::HistoryReadResponse(response) = response { session_debug!(self, "history_read(), success"); process_service_result(&response.response_header)?; - let results = if let Some(results) = response.results { - results - } else { - Vec::new() - }; - Ok(results) + Ok(response.results.unwrap_or_default()) } else { session_error!(self, "history_read() value failed"); Err(process_unexpected_response(response)) @@ -282,12 +272,7 @@ impl Session { if let SupportedMessage::HistoryUpdateResponse(response) = response { session_debug!(self, "history_update(), success"); process_service_result(&response.response_header)?; - let results = if let Some(results) = response.results { - results - } else { - Vec::new() - }; - Ok(results) + Ok(response.results.unwrap_or_default()) } else { session_error!(self, "history_update() failed {:?}", response); Err(process_unexpected_response(response)) diff --git a/lib/src/client/session/services/session.rs b/lib/src/client/session/services/session.rs index 8e09baaba..b006fc970 100644 --- a/lib/src/client/session/services/session.rs +++ b/lib/src/client/session/services/session.rs @@ -123,15 +123,23 @@ impl Session { /// * `Err(StatusCode)` - Request failed, [Status code](StatusCode) is the reason for failure. /// pub(crate) async fn activate_session(&self) -> Result<(), StatusCode> { - let secure_channel = trace_read_lock!(self.channel.secure_channel); - - let (user_identity_token, user_token_signature) = - self.user_identity_token(&secure_channel)?; - - let server_cert = secure_channel.remote_cert(); - let server_nonce = secure_channel.remote_nonce_as_byte_string(); - - drop(secure_channel); + // Do this in a block to release the lock before awaiting any async calls. + let (server_cert, server_nonce, user_identity_token, user_token_signature) = { + let secure_channel = trace_read_lock!(self.channel.secure_channel); + + let (user_identity_token, user_token_signature) = + self.user_identity_token(&secure_channel)?; + + let server_cert = secure_channel.remote_cert(); + let server_nonce = secure_channel.remote_nonce_as_byte_string(); + + ( + server_cert, + server_nonce, + user_identity_token, + user_token_signature, + ) + }; let locale_ids = if self.session_info.preferred_locales.is_empty() { None diff --git a/lib/src/client/session/services/subscriptions/event_loop.rs b/lib/src/client/session/services/subscriptions/event_loop.rs index 96f039bfa..0e3f521d9 100644 --- a/lib/src/client/session/services/subscriptions/event_loop.rs +++ b/lib/src/client/session/services/subscriptions/event_loop.rs @@ -1,6 +1,10 @@ -use std::{sync::Arc, time::Instant}; +use std::sync::Arc; use futures::{future::Either, stream::FuturesUnordered, Future, Stream, StreamExt}; +use tokio::{ + sync::watch, + time::{sleep_until, Instant}, +}; use crate::{ client::{ @@ -26,9 +30,9 @@ pub enum SubscriptionActivity { /// and subscription keep-alive. pub struct SubscriptionEventLoop { session: Arc, - trigger_publish_recv: tokio::sync::watch::Receiver, max_inflight_publish: usize, last_external_trigger: Instant, + trigger_publish_rx: watch::Receiver, // This is true if the client has received a message BadTooManyPublishRequests // and is waiting for a response before making further requests. is_waiting_for_response: bool, @@ -41,18 +45,15 @@ impl SubscriptionEventLoop { /// /// * `session` - A shared reference to an [AsyncSession]. /// * `trigger_publish_recv` - A channel used to transmit external publish triggers. - /// This is used to trigger publish outside of the normal schedule, for example when - /// a new subscription is created. - pub fn new( - session: Arc, - trigger_publish_recv: tokio::sync::watch::Receiver, - ) -> Self { - let last_external_trigger = trigger_publish_recv.borrow().clone(); + /// This is used to trigger publish outside of the normal schedule, for example when + /// a new subscription is created. + pub fn new(session: Arc, trigger_publish_rx: watch::Receiver) -> Self { + let last_external_trigger = *trigger_publish_rx.borrow(); Self { max_inflight_publish: session.max_inflight_publish, - last_external_trigger, - trigger_publish_recv, session, + last_external_trigger, + trigger_publish_rx, is_waiting_for_response: false, } } @@ -65,24 +66,22 @@ impl SubscriptionEventLoop { |(mut slf, mut futures)| async move { // Store the next publish time, or None if there are no active subscriptions. let mut next = slf.session.next_publish_time(false); - let mut recv: tokio::sync::watch::Receiver = - slf.trigger_publish_recv.clone(); + let mut recv = slf.trigger_publish_rx.clone(); let res = loop { // Future for the next periodic publish. We do not send publish requests if there - // are no active subscriptions. In this case, simply return the non-terminating - // future. + // are no active subscriptions. In this case we return the non-terminating future. let next_tick_fut = if let Some(next) = next { if slf.is_waiting_for_response && !futures.is_empty() { Either::Right(futures::future::pending::<()>()) } else { - Either::Left(tokio::time::sleep_until(next.into())) + Either::Left(sleep_until(next)) } } else { Either::Right(futures::future::pending::<()>()) }; - // If FuturesUnordered is empty, it will immediately yield `None`. We don't want that, - // so if it is empty we return the non-terminating future. + // If FuturesUnordered is empty, it will immediately yield `None`. We don't + // want that, so if it is empty we return the non-terminating future. let next_publish_fut = if futures.is_empty() { Either::Left(futures::future::pending()) } else { @@ -97,7 +96,7 @@ impl SubscriptionEventLoop { // On an external trigger, we always publish. futures.push(slf.static_publish()); next = slf.session.next_publish_time(true); - slf.last_external_trigger = v.clone(); + slf.last_external_trigger = *v; } } _ = next_tick_fut => { diff --git a/lib/src/client/session/services/subscriptions/mod.rs b/lib/src/client/session/services/subscriptions/mod.rs index 24ff936d0..23527c4cf 100644 --- a/lib/src/client/session/services/subscriptions/mod.rs +++ b/lib/src/client/session/services/subscriptions/mod.rs @@ -30,6 +30,9 @@ pub(crate) struct ModifyMonitoredItem { pub queue_size: u32, } +type InnerDataValueCallback = Box; +type InnerEventCallback = Box>, &MonitoredItem) + Send + Sync>; + /// A set of callbacks for notifications on a subscription. /// You may implement this on your own struct, or simply use [SubscriptionCallbacks] /// for a simple collection of closures. @@ -47,8 +50,8 @@ pub trait OnSubscriptionNotification: Send + Sync { /// A convenient wrapper around a set of callback functions that implements [OnSubscriptionNotification] pub struct SubscriptionCallbacks { status_change: Box, - data_value: Box, - event: Box>, &MonitoredItem) + Send + Sync>, + data_value: InnerDataValueCallback, + event: InnerEventCallback, } impl SubscriptionCallbacks { @@ -77,21 +80,21 @@ impl SubscriptionCallbacks { impl OnSubscriptionNotification for SubscriptionCallbacks { fn on_subscription_status_change(&mut self, notification: StatusChangeNotification) { - (&mut self.status_change)(notification); + (self.status_change)(notification); } fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) { - (&mut self.data_value)(notification, item); + (self.data_value)(notification, item); } fn on_event(&mut self, event_fields: Option>, item: &MonitoredItem) { - (&mut self.event)(event_fields, item); + (self.event)(event_fields, item); } } /// A wrapper around a data change callback that implements [OnSubscriptionNotification] pub struct DataChangeCallback { - data_value: Box, + data_value: InnerDataValueCallback, } impl DataChangeCallback { @@ -110,13 +113,13 @@ impl DataChangeCallback { impl OnSubscriptionNotification for DataChangeCallback { fn on_data_value(&mut self, notification: DataValue, item: &MonitoredItem) { - (&mut self.data_value)(notification, item); + (self.data_value)(notification, item); } } /// A wrapper around an event callback that implements [OnSubscriptionNotification] pub struct EventCallback { - event: Box>, &MonitoredItem) + Send + Sync>, + event: InnerEventCallback, } impl EventCallback { @@ -137,7 +140,7 @@ impl EventCallback { impl OnSubscriptionNotification for EventCallback { fn on_event(&mut self, event_fields: Option>, item: &MonitoredItem) { - (&mut self.event)(event_fields, item); + (self.event)(event_fields, item); } } diff --git a/lib/src/client/session/services/subscriptions/service.rs b/lib/src/client/session/services/subscriptions/service.rs index 6115c38b2..3d38bba5e 100644 --- a/lib/src/client/session/services/subscriptions/service.rs +++ b/lib/src/client/session/services/subscriptions/service.rs @@ -1,7 +1,6 @@ -use std::{ - collections::HashSet, - time::{Duration, Instant}, -}; +use std::collections::HashSet; + +use tokio::time::{Duration, Instant}; use crate::{ client::{ @@ -50,7 +49,7 @@ impl Session { process_service_result(&response.response_header)?; let subscription = Subscription::new( response.subscription_id, - Duration::from_millis(response.revised_publishing_interval.max(0.0).floor() as u64), + Duration::from_millis(response.revised_publishing_interval.max(0.0) as u64), response.revised_lifetime_count, response.revised_max_keep_alive_count, max_notifications_per_publish, @@ -129,7 +128,7 @@ impl Session { max_notifications_per_publish: u32, priority: u8, publishing_enabled: bool, - callback: impl OnSubscriptionNotification + Send + Sync + 'static, + callback: impl OnSubscriptionNotification + 'static, ) -> Result { self.create_subscription_inner( publishing_interval, @@ -940,8 +939,8 @@ impl Session { let items_to_create = subscription .monitored_items - .iter() - .map(|(_, item)| MonitoredItemCreateRequest { + .values() + .map(|item| MonitoredItemCreateRequest { item_to_monitor: item.item_to_monitor().clone(), monitoring_mode: item.monitoring_mode, requested_parameters: MonitoringParameters { diff --git a/lib/src/client/session/services/subscriptions/state.rs b/lib/src/client/session/services/subscriptions/state.rs index cf916ade0..15e9cdb53 100644 --- a/lib/src/client/session/services/subscriptions/state.rs +++ b/lib/src/client/session/services/subscriptions/state.rs @@ -1,7 +1,6 @@ -use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; +use std::collections::HashMap; + +use tokio::time::{Duration, Instant}; use crate::types::{ DecodingOptions, MonitoringMode, NotificationMessage, SubscriptionAcknowledgement, @@ -24,7 +23,7 @@ impl SubscriptionState { /// # Arguments /// /// * `min_publishing_interval` - The minimum accepted publishing interval, any lower values - /// will be set to this. + /// will be set to this. pub(crate) fn new(min_publish_interval: Duration) -> Self { Self { subscriptions: HashMap::new(), @@ -40,16 +39,13 @@ impl SubscriptionState { return None; } - let next = self - .subscriptions + self.subscriptions .values() .filter(|s| s.publishing_enabled()) .map(|s| s.publishing_interval().max(self.min_publish_interval)) .min() - .or_else(|| self.keep_alive_timeout) - .map(|e| self.last_publish + e); - - next + .or(self.keep_alive_timeout) + .map(|e| self.last_publish + e) } pub(crate) fn set_last_publish(&mut self) { @@ -68,7 +64,7 @@ impl SubscriptionState { } pub(crate) fn re_queue_acknowledgements(&mut self, acks: Vec) { - self.acknowledgements.extend(acks.into_iter()); + self.acknowledgements.extend(acks); } /// List of subscription IDs. diff --git a/lib/src/client/session/session.rs b/lib/src/client/session/session.rs index 9402ac5aa..87f287ff0 100644 --- a/lib/src/client/session/session.rs +++ b/lib/src/client/session/session.rs @@ -1,12 +1,13 @@ -use std::{ - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, - time::{Duration, Instant}, +use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, }; use arc_swap::ArcSwap; +use tokio::{ + sync::watch, + time::{Duration, Instant}, +}; use crate::{ client::{ @@ -40,8 +41,8 @@ lazy_static! { /// pub struct Session { pub(super) channel: AsyncSecureChannel, - pub(super) state_watch_rx: tokio::sync::watch::Receiver, - pub(super) state_watch_tx: tokio::sync::watch::Sender, + pub(super) state_watch_rx: watch::Receiver, + pub(super) state_watch_tx: watch::Sender, pub(super) certificate_store: Arc>, pub(super) session_id: Arc>, pub(super) auth_token: Arc>, @@ -56,7 +57,7 @@ pub struct Session { pub(super) max_inflight_publish: usize, pub subscription_state: Mutex, pub(super) monitored_item_handle: AtomicHandle, - pub(super) trigger_publish_tx: tokio::sync::watch::Sender, + pub(super) trigger_publish_tx: watch::Sender, } impl Session { @@ -70,9 +71,8 @@ impl Session { config: &ClientConfig, ) -> (Arc, SessionEventLoop) { let auth_token: Arc> = Default::default(); - let (state_watch_tx, state_watch_rx) = - tokio::sync::watch::channel(SessionState::Disconnected); - let (trigger_publish_tx, trigger_publish_rx) = tokio::sync::watch::channel(Instant::now()); + let (state_watch_tx, state_watch_rx) = watch::channel(SessionState::Disconnected); + let (trigger_publish_tx, trigger_publish_rx) = watch::channel(Instant::now()); let session = Arc::new(Session { channel: AsyncSecureChannel::new( @@ -115,8 +115,8 @@ impl Session { SessionEventLoop::new( session, session_retry_policy, - trigger_publish_rx, config.keep_alive_interval, + trigger_publish_rx, ), ) } @@ -150,18 +150,16 @@ impl Session { async fn wait_for_state(&self, connected: bool) -> bool { let mut rx = self.state_watch_rx.clone(); - let res = match rx + let result = rx .wait_for(|s| { connected && matches!(*s, SessionState::Connected) || !connected && matches!(*s, SessionState::Disconnected) }) .await - { - Ok(_) => true, - Err(_) => false, - }; + .is_ok(); - res + #[expect(clippy::let_and_return)] + result } /// The internal ID of the session, used to keep track of multiple sessions in the same program. diff --git a/lib/src/client/transport/buffer.rs b/lib/src/client/transport/buffer.rs index 5c501a242..e11825d1e 100644 --- a/lib/src/client/transport/buffer.rs +++ b/lib/src/client/transport/buffer.rs @@ -108,7 +108,7 @@ impl SendBuffer { self.last_sent_sequence_number += chunks.len() as u32; // Send chunks - self.chunks.extend(chunks.into_iter()); + self.chunks.extend(chunks); Ok(request_id) } } diff --git a/lib/src/client/transport/channel.rs b/lib/src/client/transport/channel.rs index 7273cc9dc..a1de87c19 100644 --- a/lib/src/client/transport/channel.rs +++ b/lib/src/client/transport/channel.rs @@ -1,7 +1,21 @@ -use std::{str::FromStr, sync::Arc, time::Duration}; +use std::{str::FromStr, sync::Arc}; + +use arc_swap::{ArcSwap, ArcSwapOption}; +use tokio::{ + sync::{mpsc, Mutex}, + time::{sleep, Duration}, +}; use crate::{ - client::{session::SessionInfo, transport::core::TransportPollResult}, + client::{ + retry::SessionRetryPolicy, + session::SessionInfo, + transport::core::TransportPollResult, + transport::{ + tcp::{TcpTransport, TransportConfiguration}, + OutgoingMessage, + }, + }, core::{ comms::secure_channel::{Role, SecureChannel}, supported_message::SupportedMessage, @@ -13,18 +27,9 @@ use crate::{ SecurityTokenRequestType, StatusCode, }, }; -use arc_swap::{ArcSwap, ArcSwapOption}; use super::state::{Request, RequestSend, SecureChannelState}; -use crate::client::{ - retry::SessionRetryPolicy, - transport::{ - tcp::{TcpTransport, TransportConfiguration}, - OutgoingMessage, - }, -}; - /// Wrapper around an open secure channel pub struct AsyncSecureChannel { session_info: SessionInfo, @@ -33,7 +38,7 @@ pub struct AsyncSecureChannel { certificate_store: Arc>, transport_config: TransportConfiguration, state: SecureChannelState, - issue_channel_lock: tokio::sync::Mutex<()>, + issue_channel_lock: Mutex<()>, request_send: ArcSwapOption, } @@ -66,7 +71,7 @@ impl AsyncSecureChannel { Self { transport_config, - issue_channel_lock: tokio::sync::Mutex::new(()), + issue_channel_lock: Mutex::new(()), state: SecureChannelState::new(ignore_clock_skew, secure_channel.clone(), auth_token), session_info, secure_channel, @@ -133,7 +138,7 @@ impl AsyncSecureChannel { break Err(s); }; - tokio::time::sleep(delay).await + sleep(delay).await } } } @@ -201,7 +206,7 @@ impl AsyncSecureChannel { async fn create_transport( &self, - ) -> Result<(TcpTransport, tokio::sync::mpsc::Sender), StatusCode> { + ) -> Result<(TcpTransport, mpsc::Sender), StatusCode> { let endpoint_url = self.session_info.endpoint.endpoint_url.clone(); info!("Connect"); let security_policy = @@ -214,39 +219,38 @@ impl AsyncSecureChannel { self.session_info.endpoint.security_policy_uri.as_ref() ); return Err(StatusCode::BadSecurityPolicyRejected); - } else { - let (cert, key) = { - let certificate_store = trace_write_lock!(self.certificate_store); - certificate_store.read_own_cert_and_pkey_optional() - }; - - { - let mut secure_channel = trace_write_lock!(self.secure_channel); - secure_channel.set_private_key(key); - secure_channel.set_cert(cert); - secure_channel.set_security_policy(security_policy); - secure_channel.set_security_mode(self.session_info.endpoint.security_mode); - let _ = secure_channel.set_remote_cert_from_byte_string( - &self.session_info.endpoint.server_certificate, - ); - info!("Security policy = {:?}", security_policy); - info!( - "Security mode = {:?}", - self.session_info.endpoint.security_mode - ); - } + } - let (send, recv) = tokio::sync::mpsc::channel(self.transport_config.max_inflight); - let transport = TcpTransport::connect( - self.secure_channel.clone(), - recv, - self.transport_config.clone(), - endpoint_url.as_ref(), - ) - .await?; + let (cert, key) = { + let certificate_store = trace_write_lock!(self.certificate_store); + certificate_store.read_own_cert_and_pkey_optional() + }; - Ok((transport, send)) + { + let mut secure_channel = trace_write_lock!(self.secure_channel); + secure_channel.set_private_key(key); + secure_channel.set_cert(cert); + secure_channel.set_security_policy(security_policy); + secure_channel.set_security_mode(self.session_info.endpoint.security_mode); + let _ = secure_channel + .set_remote_cert_from_byte_string(&self.session_info.endpoint.server_certificate); + debug!("security policy = {:?}", security_policy); + debug!( + "security mode = {:?}", + self.session_info.endpoint.security_mode + ); } + + let (send, recv) = mpsc::channel(self.transport_config.max_inflight); + let transport = TcpTransport::connect( + self.secure_channel.clone(), + recv, + self.transport_config.clone(), + endpoint_url.as_ref(), + ) + .await?; + + Ok((transport, send)) } /// Close the secure channel, optionally wait for the channel to close. @@ -262,7 +266,6 @@ impl AsyncSecureChannel { if let Some(request) = request { if let Err(e) = request.send_no_response().await { error!("Failed to send disconnect message, queue full: {e}"); - return; } } } diff --git a/lib/src/client/transport/core.rs b/lib/src/client/transport/core.rs index 754231d16..23b941af2 100644 --- a/lib/src/client/transport/core.rs +++ b/lib/src/client/transport/core.rs @@ -1,17 +1,25 @@ -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Instant; +use std::{collections::HashMap, sync::Arc, time::Instant}; use futures::future::Either; use parking_lot::RwLock; +use tokio::{ + sync::{mpsc, oneshot}, + time::sleep_until, +}; -use crate::core::comms::message_chunk::MessageIsFinalType; -use crate::core::comms::{ - chunker::Chunker, message_chunk::MessageChunk, message_chunk_info::ChunkInfo, - secure_channel::SecureChannel, tcp_codec::Message, +use crate::{ + core::{ + comms::{ + chunker::Chunker, + message_chunk::{MessageChunk, MessageIsFinalType}, + message_chunk_info::ChunkInfo, + secure_channel::SecureChannel, + tcp_codec::Message, + }, + supported_message::SupportedMessage, + }, + types::StatusCode, }; -use crate::core::supported_message::SupportedMessage; -use crate::types::StatusCode; use super::buffer::SendBuffer; @@ -22,14 +30,14 @@ struct MessageChunkWithChunkInfo { } pub(crate) struct MessageState { - callback: tokio::sync::oneshot::Sender>, + callback: oneshot::Sender>, chunks: Vec, deadline: Instant, } pub(super) struct TransportState { /// Channel for outgoing requests. Will only be polled if the number of inflight requests is below the limit. - outgoing_recv: tokio::sync::mpsc::Receiver, + outgoing_recv: mpsc::Receiver, /// State of pending requests message_states: HashMap, /// Maximum number of inflight requests, or None if unlimited. @@ -52,14 +60,14 @@ pub enum TransportPollResult { pub(crate) struct OutgoingMessage { pub request: SupportedMessage, - pub callback: Option>>, + pub callback: Option>>, pub deadline: Instant, } impl TransportState { pub fn new( secure_channel: Arc>, - outgoing_recv: tokio::sync::mpsc::Receiver, + outgoing_recv: mpsc::Receiver, max_pending_incoming: usize, max_inflight: usize, ) -> Self { @@ -82,7 +90,7 @@ impl TransportState { // Check for any messages that have timed out, and get the time until the next message // times out let timeout_fut = match self.next_timeout() { - Some(t) => Either::Left(tokio::time::sleep_until(t.into())), + Some(t) => Either::Left(sleep_until(t.into())), None => Either::Right(futures::future::pending::<()>()), }; @@ -93,9 +101,7 @@ impl TransportState { continue; } outgoing = self.outgoing_recv.recv() => { - let Some(outgoing) = outgoing else { - return None; - }; + let outgoing = outgoing?; let request_id = send_buffer.next_request_id(); if let Some(callback) = outgoing.callback { self.message_states.insert(request_id, MessageState { diff --git a/lib/src/client/transport/state.rs b/lib/src/client/transport/state.rs index 9bd8eb59d..f404eb5fd 100644 --- a/lib/src/client/transport/state.rs +++ b/lib/src/client/transport/state.rs @@ -3,7 +3,8 @@ use std::{ time::{Duration, Instant}, }; -use tokio::sync::mpsc::error::SendTimeoutError; +use arc_swap::ArcSwap; +use tokio::sync::mpsc::{self, error::SendTimeoutError}; use crate::{ client::{session::process_unexpected_response, transport::OutgoingMessage}, @@ -18,9 +19,8 @@ use crate::{ RequestHeader, SecurityTokenRequestType, StatusCode, }, }; -use arc_swap::ArcSwap; -pub(crate) type RequestSend = tokio::sync::mpsc::Sender; +pub(crate) type RequestSend = mpsc::Sender; lazy_static! { static ref NEXT_SESSION_ID: AtomicU32 = AtomicU32::new(1); @@ -75,6 +75,7 @@ impl Request { pub async fn send(self) -> Result { let (cb_send, cb_recv) = tokio::sync::oneshot::channel(); + // trace!("Sending request: {:?}", self.payload); let message = OutgoingMessage { request: self.payload, callback: Some(cb_send), @@ -88,7 +89,10 @@ impl Request { } match cb_recv.await { - Ok(r) => r, + Ok(r) => { + // trace!("Received response: {:?}", r); + r + } // Should not really happen, would mean something paniced. Err(_) => Err(StatusCode::BadConnectionClosed), } diff --git a/lib/src/client/transport/tcp.rs b/lib/src/client/transport/tcp.rs index 5166dd8e0..78691761f 100644 --- a/lib/src/client/transport/tcp.rs +++ b/lib/src/client/transport/tcp.rs @@ -1,21 +1,30 @@ use std::sync::Arc; -use super::buffer::SendBuffer; -use super::core::{OutgoingMessage, TransportPollResult, TransportState}; -use crate::core::comms::{ - secure_channel::SecureChannel, - tcp_codec::{Message, TcpCodec}, - tcp_types::HelloMessage, - url::hostname_port_from_url, -}; -use crate::core::supported_message::SupportedMessage; -use crate::types::{encoding::BinaryEncoder, StatusCode}; use futures::StreamExt; use parking_lot::RwLock; -use tokio::io::{AsyncWriteExt, ReadHalf, WriteHalf}; -use tokio::net::TcpStream; +use tokio::{ + io::{AsyncWriteExt, ReadHalf, WriteHalf}, + net::TcpStream, + sync::mpsc, +}; use tokio_util::codec::FramedRead; +use crate::{ + core::{ + comms::{ + secure_channel::SecureChannel, + tcp_codec::{Message, TcpCodec}, + tcp_types::HelloMessage, + url::hostname_port_from_url, + }, + supported_message::SupportedMessage, + }, + types::{encoding::BinaryEncoder, StatusCode}, +}; + +use super::buffer::SendBuffer; +use super::core::{OutgoingMessage, TransportPollResult, TransportState}; + #[derive(Debug, Clone, Copy)] enum TransportCloseState { Open, @@ -48,7 +57,7 @@ impl TcpTransport { /// calling `run` on the returned transport in order to actually send and receive messages. pub async fn connect( secure_channel: Arc>, - outgoing_recv: tokio::sync::mpsc::Receiver, + outgoing_recv: mpsc::Receiver, config: TransportConfiguration, endpoint_url: &str, ) -> Result { diff --git a/lib/src/crypto/security_policy.rs b/lib/src/crypto/security_policy.rs index 298b9d182..55c3fc240 100644 --- a/lib/src/crypto/security_policy.rs +++ b/lib/src/crypto/security_policy.rs @@ -563,7 +563,7 @@ impl SecurityPolicy { let mut their_signature = vec![0u8; their_key.size()]; self.asymmetric_sign(&their_key, data, their_signature.as_mut_slice())?; trace!( - "Using their_key, signature should be {:?}", + "Using their key, signature should be {:?}", &their_signature ); }