diff --git a/crates/core/c8y_api/src/smartrest/operations.rs b/crates/core/c8y_api/src/smartrest/operations.rs index aea0b52f50..8025a3ae82 100644 --- a/crates/core/c8y_api/src/smartrest/operations.rs +++ b/crates/core/c8y_api/src/smartrest/operations.rs @@ -1,48 +1,370 @@ +//! [Supported Operations API] +//! (https://thin-edge.github.io/thin-edge.io/operate/c8y/supported-operations/#supported-operations-api). +//! +//! This module should encompass loading and saving supported operations from and to the filesystem. +//! +//! This module works with a c8y operations directory, which by default is `/etc/tedge/operations/c8y`. This directory +//! contains operation files for the main device and directories that contain operation files for child devices. +//! +//! The names of files in these directories should be the same as names of supported c8y operations, for example +//! `c8y_Restart`. +//! +//! For known c8y operations, the file can be empty. For custom c8y operations, the file should contain a +//! [`OnMessageExec`] section describing how c8y mapper should convert this c8y operation to a local thin-edge command. + use crate::json_c8y_deserializer::C8yDeviceControlTopic; use crate::smartrest::error::OperationsError; use crate::smartrest::smartrest_serializer::declare_supported_operations; +use anyhow::ensure; +use anyhow::Context; +use camino::Utf8Path; +use camino::Utf8PathBuf; +use mqtt_channel::MqttMessage; use mqtt_channel::TopicFilter; use serde::Deserialize; use serde::Deserializer; +use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; use std::fs; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use tedge_api::substitution::Record; use tedge_config::TopicPrefix; +use tedge_utils::file; use tracing::error; use tracing::warn; use super::payload::SmartrestPayload; +use super::topic::C8yTopic; const DEFAULT_GRACEFUL_TIMEOUT: Duration = Duration::from_secs(3600); const DEFAULT_FORCEFUL_TIMEOUT: Duration = Duration::from_secs(60); +type ExternalId = String; +type ExternalIdRef = str; + +type OperationName = String; +type OperationNameRef = str; + +/// Used to hold and query supported operations for all devices. +pub struct SupportedOperations { + /// External ID of the main device. + /// + /// Required because when given an external id of the main device when creating an operation, we want to create it + /// in a main directory, instead of a subdirectory. + pub device_id: String, + + /// Base c8y operations directory for all devices. + /// + /// By default `/etc/tedge/operations/c8y`. Contains operation files for the main device, operation templates, and + /// directories with operation files for child devices. + pub base_ops_dir: Arc, + + /// Currently loaded set of supported operations for all registered devices by its external id. + pub operations_by_xid: HashMap, +} + +impl SupportedOperations { + /// Add an operation to the supported operation set for a given device. + /// + /// Creates and writes new operation file to the filesystem. + pub fn add_operation( + &self, + device_xid: &ExternalIdRef, + c8y_operation_name: &OperationNameRef, + ) -> Result<(), anyhow::Error> { + let ops_file = if device_xid == self.device_id { + self.base_ops_dir.join(c8y_operation_name) + } else { + self.base_ops_dir.join(device_xid).join(c8y_operation_name) + }; + + // Create directory for a device if it doesn't exist yet + file::create_directory_with_defaults(ops_file.parent().expect("should never fail"))?; + + // if a template for such operation already exists on the main device, that means we should symlink to it, + // because it should contain properties required for custom operation + let operations = self + .operations_by_xid + .get(&self.device_id) + .context("main device should be present")?; + + if let Some(template_name) = + operations.get_template_name_by_operation_name(c8y_operation_name) + { + let template_path = self.base_ops_dir.join(template_name); + file::create_symlink(template_path, &ops_file)?; + } else { + file::create_file_with_defaults(&ops_file, None)?; + }; + + Ok(()) + } + + /// Loads and saves a new supported operation set from a given directory. + /// + /// Matches the given directory and loads the operations to the device with an appropriate external id. + /// + /// All operation files from the given operation directory are loaded and set as the new supported operation set for + /// a given device. Invalid operation files are ignored. + /// + /// If the supported operation set changed, `Ok(true)` is returned to denote that this change should be sent to the + /// cloud. + /// + /// # Errors + /// + /// The function will return an error if the given device operations directory is not the same as or inside the base + /// operations directory or if the directory doesn't exist. + pub fn load_from_dir( + &mut self, + ops_dir: &Path, + bridge_config: &impl Record, + ) -> Result { + let device_xid = self.xid_from_path(ops_dir)?; + + self.load_all(&device_xid, bridge_config) + } + + /// Load an operation with a given name from the c8y device operation directory. + /// + /// Returns `Err` if the operation file doesn't exist. + /// + /// If the supported operation set changed, `Ok(true)` is returned to denote that this change should be sent to the + /// cloud. + pub fn load( + &mut self, + device_xid: &ExternalIdRef, + c8y_operation_name: &OperationNameRef, + bridge_config: &impl Record, + ) -> Result { + let ops_file = self.ops_file_name_for_device(device_xid, c8y_operation_name); + + let operation = get_operation(ops_file.as_std_path(), bridge_config)?; + + let current_operations = + if let Some(current_operations) = self.operations_by_xid.get_mut(device_xid) { + current_operations + } else { + self.operations_by_xid + .insert(device_xid.to_string(), Operations::default()); + self.operations_by_xid.get_mut(device_xid).unwrap() + }; + + let prev_operation = current_operations.insert_operation(operation); + + // even if the body of the operation is different, as long as it has the same name, supported operations message + // will be the same, so we don't need to resend + let modified = prev_operation.is_none(); + + Ok(modified) + } + + /// Loads all operations from a device c8y operations directory and saves a new supported operation set for a given + /// device. + /// + /// All operation files from the given operation directory are loaded and set as the new supported operation set for + /// a given device. Invalid operation files are ignored. + /// + /// If the supported operation set changed, `Ok(true)` is returned to denote that this change should be sent to the + /// cloud. + pub fn load_all( + &mut self, + device_xid: &ExternalIdRef, + bridge_config: &impl Record, + ) -> Result { + // load operations from the directory + let dir = self.base_ops_dir_for_device(device_xid); + let new_operations = Operations::try_new(dir, bridge_config)?; + + // the current supported operations set is empty + // TODO: simplify + let Some(current_operations) = self.operations_by_xid.get_mut(device_xid) else { + self.operations_by_xid + .insert(device_xid.to_string(), new_operations); + return Ok(true); + }; + + // current operation set is not empty but it's different + let modified = *current_operations != new_operations; + *current_operations = new_operations; + + Ok(modified) + } + + /// If a given directory is a c8y operations directory of a device, return that device's external id. + pub fn xid_from_path(&self, ops_dir: &Path) -> Result { + ensure!( + ops_dir.starts_with(self.base_ops_dir.as_std_path()), + format!( + "given path '{}' is not the same as or inside the base operations directory '{}'", + ops_dir.to_string_lossy(), + self.base_ops_dir + ) + ); + + ensure!( + ops_dir.is_dir(), + format!( + "given path '{}' does not point to a directory", + ops_dir.to_string_lossy(), + ) + ); + + let suffix = ops_dir.strip_prefix(self.base_ops_dir.as_std_path())?; + + // /etc/tedge/operations/c8y -> main device + // /etc/tedge/operations/c8y/directory -> child device + let device_xid = match suffix + .to_str() + .context("directory name should be a valid external id")? + { + "" => self.device_id.to_string(), + filename => filename.to_string(), + }; + + Ok(device_xid) + } + + /// Returns a directory path for c8y operations for the given device. + fn base_ops_dir_for_device(&self, device_xid: &ExternalIdRef) -> Utf8PathBuf { + if device_xid == self.device_id { + self.base_ops_dir.to_path_buf() + } else { + self.base_ops_dir.join(device_xid) + } + } + + /// Returns a path for c8y operation file for a given device + fn ops_file_name_for_device( + &self, + device_xid: &ExternalIdRef, + c8y_operation_name: &OperationNameRef, + ) -> Utf8PathBuf { + if device_xid == self.device_id { + self.base_ops_dir.join(c8y_operation_name).to_path_buf() + } else { + self.base_ops_dir.join(device_xid).join(c8y_operation_name) + } + } + + /// Create a [supported operations message][1] containing operations supported by the device. + /// + /// [1]: https://cumulocity.com/docs/smartrest/mqtt-static-templates/#114 + pub fn create_supported_operations( + &self, + device_xid: &ExternalIdRef, + c8y_prefix: &TopicPrefix, + ) -> Result { + let payload = self + .operations_by_xid + .get(device_xid) + .map(|o| o.create_smartrest_ops_message()) + .unwrap_or(Operations::default().create_smartrest_ops_message()); + + let topic = if device_xid != self.device_id { + C8yTopic::ChildSmartRestResponse(device_xid.into()).to_topic(c8y_prefix)? + } else { + C8yTopic::upstream_topic(c8y_prefix) + }; + + Ok(MqttMessage::new(&topic, payload.into_inner())) + } + + pub fn get_operation_handlers( + &self, + device_xid: &ExternalIdRef, + topic: &str, + prefix: &TopicPrefix, + ) -> Vec<(String, Operation)> { + let handlers = self + .operations_by_xid + .get(device_xid) + .map(|o| o.filter_by_topic(topic, prefix)) + .unwrap_or_default(); + + handlers + } + + pub fn get_json_custom_operation_topics(&self) -> Result { + Ok(self + .operations_by_xid + .values() + .flat_map(|o| o.operations.values()) + .filter(|operation| operation.on_fragment().is_some()) + .filter_map(|operation| operation.topic()) + .collect::>() + .try_into()?) + } + + pub fn get_smartrest_custom_operation_topics(&self) -> Result { + Ok(self + .operations_by_xid + .values() + .flat_map(|o| o.operations.values()) + .filter(|operation| operation.on_message().is_some()) + .filter_map(|operation| operation.topic()) + .collect::>() + .try_into()?) + } + + pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> { + self.operations_by_xid + .values() + .flat_map(|o| o.operations.values()) + .find(|o| { + o.template() + .is_some_and(|template| template == operation_template) + }) + } + + /// Return operation name if `tedge_cmd` matches + pub fn get_operation_name_by_workflow_operation(&self, command_name: &str) -> Option { + let matching_templates: Vec<&Operation> = self + .operations_by_xid + .get(&self.device_id) + .map(|o| { + o.templates + .iter() + .filter(|template| { + template + .workflow_operation() + .is_some_and(|operation| operation.eq(command_name)) + }) + .collect::>() + }) + .unwrap_or_default(); + + if matching_templates.len() > 1 { + warn!( + "Found more than one template with the same `workflow.operation` field. Picking {}", + matching_templates.first().unwrap().name + ); + } + + matching_templates + .first() + .and_then(|template| template.on_fragment()) + } +} + /// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory /// Each operation is a file name in one of the subdirectories /// The file name is the operation name #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct Operations { - operations: Vec, + operations: BTreeMap, templates: Vec, } impl Operations { - pub fn add_operation(&mut self, operation: Operation) { - self.operations.push(operation); - // as we insert, we need to maintain order, because depending on if `Operations` is created - // by adding operations one by one or by directory scan, we can end up with a different order - // TODO: refactor to provide more sane API, potentially using a backing `BTreeMap` - self.operations - .sort_unstable_by(|o1, o2| o1.name.cmp(&o2.name)); - } - - pub fn remove_operation(&mut self, name: &str) -> Option { - self.operations.dedup(); - let pos = self.operations.iter().position(|op| op.name == name); - pos.map(|pos| self.operations.remove(pos)) + /// Inserts a new operation. + /// + /// If an operation under such name was already present, the old value is returned. + pub fn insert_operation(&mut self, operation: Operation) -> Option { + self.operations.insert(operation.name.clone(), operation) } /// Loads operations defined in the operations directory. @@ -59,33 +381,13 @@ impl Operations { self.templates.push(template); } - pub fn get_operations_list(&self) -> Vec { - let mut ops_name: Vec = Vec::default(); - for op in &self.operations { - ops_name.push(op.name.clone()); - } - - ops_name + pub fn get_operations_list(&self) -> Vec<&str> { + self.operations.keys().map(String::as_str).collect() } - pub fn matching_smartrest_template(&self, operation_template: &str) -> Option { - for op in self.operations.clone() { - if let Some(template) = op.template() { - if template.eq(operation_template) { - return Some(op); - } - } - } - None - } - - pub fn filter_by_topic( - &self, - topic_name: &str, - prefix: &TopicPrefix, - ) -> Vec<(String, Operation)> { + fn filter_by_topic(&self, topic_name: &str, prefix: &TopicPrefix) -> Vec<(String, Operation)> { let mut vec: Vec<(String, Operation)> = Vec::new(); - for op in self.operations.iter() { + for op in self.operations.values() { match (op.topic(), op.on_fragment()) { (None, Some(on_fragment)) if C8yDeviceControlTopic::name(prefix) == topic_name => { vec.push((on_fragment, op.clone())) @@ -101,38 +403,16 @@ impl Operations { pub fn topics_for_operations(&self) -> HashSet { self.operations - .iter() + .values() .filter_map(|operation| operation.topic()) .collect::>() } pub fn create_smartrest_ops_message(&self) -> SmartrestPayload { - let mut ops = self.get_operations_list(); - ops.sort(); - let ops = ops.iter().map(|op| op.as_str()).collect::>(); + let ops = self.get_operations_list(); declare_supported_operations(&ops) } - pub fn get_json_custom_operation_topics(&self) -> Result { - Ok(self - .operations - .iter() - .filter(|operation| operation.on_fragment().is_some()) - .filter_map(|operation| operation.topic()) - .collect::>() - .try_into()?) - } - - pub fn get_smartrest_custom_operation_topics(&self) -> Result { - Ok(self - .operations - .iter() - .filter(|operation| operation.on_message().is_some()) - .filter_map(|operation| operation.topic()) - .collect::>() - .try_into()?) - } - /// Return operation name if `workflow.operation` matches pub fn get_operation_name_by_workflow_operation(&self, command_name: &str) -> Option { let matching_templates: Vec<&Operation> = self @@ -390,7 +670,9 @@ fn get_operations( if details.is_valid_operation_handler() || details.is_supported_operation_file() { match path.extension() { - None => operations.add_operation(details), + None => { + operations.insert_operation(details); + } Some(extension) if extension.eq("template") => operations.add_template(details), Some(_) => { return Err(OperationsError::InvalidOperationName(path.to_owned())); @@ -585,15 +867,6 @@ mod tests { } } - impl Operation { - fn new(exec: OnMessageExec) -> Self { - Self { - name: "name".to_string(), - exec: Some(exec), - } - } - } - struct TestBridgeConfig { c8y_prefix: TopicPrefix, } @@ -696,7 +969,10 @@ mod tests { )] fn valid_custom_operation_handlers(toml: &str) { let exec: OnMessageExec = toml::from_str(toml).unwrap(); - let operation = Operation::new(exec); + let operation = Operation { + name: "operation".to_string(), + exec: Some(exec), + }; assert!(operation.is_valid_operation_handler()); } @@ -727,7 +1003,10 @@ mod tests { )] fn invalid_custom_operation_handlers(toml: &str) { let exec: OnMessageExec = toml::from_str(toml).unwrap(); - let operation = Operation::new(exec); + let operation = Operation { + name: "operation".to_string(), + exec: Some(exec), + }; assert!(!operation.is_valid_operation_handler()); } @@ -744,13 +1023,22 @@ mod tests { )] fn filter_by_topic_(toml1: &str, toml2: &str) { let exec: OnMessageExec = toml::from_str(toml1).unwrap(); - let operation1 = Operation::new(exec); + let operation1 = Operation { + name: "operation1".to_string(), + exec: Some(exec), + }; let exec: OnMessageExec = toml::from_str(toml2).unwrap(); - let operation2 = Operation::new(exec); + let operation2 = Operation { + name: "operation2".to_string(), + exec: Some(exec), + }; let ops = Operations { - operations: vec![operation1.clone(), operation2.clone()], + operations: BTreeMap::from([ + (operation1.name.clone(), operation1.clone()), + (operation2.name.clone(), operation2.clone()), + ]), ..Default::default() }; diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 75e77213b8..d54f198df7 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -33,6 +33,7 @@ use c8y_api::smartrest::operations::get_child_ops; use c8y_api::smartrest::operations::Operation; use c8y_api::smartrest::operations::Operations; use c8y_api::smartrest::operations::ResultFormat; +use c8y_api::smartrest::operations::SupportedOperations; use c8y_api::smartrest::smartrest_serializer::fail_operation_with_id; use c8y_api::smartrest::smartrest_serializer::fail_operation_with_name; use c8y_api::smartrest::smartrest_serializer::request_pending_operations; @@ -90,9 +91,6 @@ use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_uploader_ext::UploadRequest; use tedge_uploader_ext::UploadResult; -use tedge_utils::file::create_directory_with_defaults; -use tedge_utils::file::create_file_with_defaults; -use tedge_utils::file::create_symlink; use tedge_utils::file::FileError; use tedge_utils::size_threshold::SizeThreshold; use thiserror::Error; @@ -181,11 +179,9 @@ pub struct CumulocityConverter { pub(crate) device_topic_id: EntityTopicId, pub(crate) device_type: String, alarm_converter: AlarmConverter, - pub operations: Operations, operation_logs: OperationLogs, mqtt_publisher: LoggingSender, pub http_proxy: C8YHttpProxy, - pub children: HashMap, pub service_type: String, pub mqtt_schema: MqttSchema, pub entity_store: EntityStore, @@ -194,6 +190,7 @@ pub struct CumulocityConverter { // Keep active command IDs to avoid creation of multiple commands for an operation pub active_commands: HashSet, + supported_operations: SupportedOperations, pub operation_handler: OperationHandler, } @@ -219,9 +216,21 @@ impl CumulocityConverter { let prefix = &config.bridge_config.c8y_prefix; - let operations = Operations::try_new(&*config.ops_dir, &config.bridge_config)?; + let operations_by_xid = { + let mut operations = get_child_ops(&*config.ops_dir, &config.bridge_config)?; + operations.insert( + config.device_id.clone(), + Operations::try_new(&*config.ops_dir, &config.bridge_config)?, + ); + operations + }; + let operation_manager = SupportedOperations { + device_id: device_id.clone(), - let children = get_child_ops(&*config.ops_dir, &config.bridge_config)?; + base_ops_dir: Arc::clone(&config.ops_dir), + + operations_by_xid, + }; let alarm_converter = AlarmConverter::new(); @@ -266,10 +275,9 @@ impl CumulocityConverter { device_topic_id, device_type, alarm_converter, - operations, + supported_operations: operation_manager, operation_logs, http_proxy, - children, mqtt_publisher, service_type, mqtt_schema: mqtt_schema.clone(), @@ -808,14 +816,10 @@ impl CumulocityConverter { let target = self.entity_store.try_get_by_external_id(&entity_xid)?; match target.r#type { - EntityType::MainDevice => Ok(self.operations.filter_by_topic(topic, prefix)), - EntityType::ChildDevice => { - if let Some(operations) = self.children.get(device_xid) { - Ok(operations.filter_by_topic(topic, prefix)) - } else { - Ok(Vec::new()) - } - } + EntityType::MainDevice | EntityType::ChildDevice => Ok(self + .supported_operations + .get_operation_handlers(device_xid, topic, prefix)), + EntityType::Service => { warn!("operation for services are currently unsupported"); Ok(Vec::new()) @@ -997,7 +1001,10 @@ impl CumulocityConverter { payload: &str, template: &str, ) -> Result, CumulocityMapperError> { - if let Some(operation) = self.operations.matching_smartrest_template(template) { + if let Some(operation) = self + .supported_operations + .matching_smartrest_template(template) + { if let Some(command) = operation.command() { let script = ShellScript { command, @@ -1258,19 +1265,6 @@ impl CumulocityConverter { } let auto_registered_entities = self.entity_store.auto_register_entity(source)?; - for auto_registered_entity in &auto_registered_entities { - if auto_registered_entity.r#type == EntityType::ChildDevice { - self.children.insert( - self.entity_store - .get(source) - .expect("Entity should have been auto registered in the previous step") - .external_id - .as_ref() - .into(), - Operations::default(), - ); - } - } Ok(auto_registered_entities) } @@ -1444,7 +1438,7 @@ impl CumulocityConverter { } topic if self - .operations + .supported_operations .get_json_custom_operation_topics()? .accept_topic(topic) => { @@ -1452,7 +1446,7 @@ impl CumulocityConverter { } topic if self - .operations + .supported_operations .get_smartrest_custom_operation_topics()? .accept_topic(topic) => { @@ -1470,8 +1464,12 @@ impl CumulocityConverter { fn try_init_messages(&mut self) -> Result, ConversionError> { let mut messages = self.parse_base_inventory_file()?; - let supported_operations_message = - self.create_supported_operations(self.config.ops_dir.as_std_path())?; + self.supported_operations + .load_all(&self.config.device_id, &self.config.bridge_config)?; + let supported_operations_message = self.supported_operations.create_supported_operations( + &self.config.device_id, + &self.config.bridge_config.c8y_prefix, + )?; let device_data_message = self.inventory_device_type_update_message()?; @@ -1486,40 +1484,6 @@ impl CumulocityConverter { Ok(messages) } - fn create_supported_operations(&self, path: &Path) -> Result { - let is_child = is_child_operation_path(path); - let device = if is_child { - let external_id = get_child_external_id(path)?; - self.entity_store - .get_by_external_id(&EntityExternalId::from(external_id)) - .expect("should be registered") - } else { - self.entity_store - .get(self.entity_store.main_device()) - .expect("should be registered") - }; - - let operations = self - .operations_for_device(&device.topic_id) - .expect("should exist"); - - let prefix = &self.config.bridge_config.c8y_prefix; - - let topic = if is_child_operation_path(path) { - let child_id = get_child_external_id(path)?; - let child_external_id = Self::validate_external_id(&child_id)?; - - C8yTopic::ChildSmartRestResponse(child_external_id.into()).to_topic(prefix)? - } else { - C8yTopic::upstream_topic(prefix) - }; - - Ok(MqttMessage::new( - &topic, - operations.create_smartrest_ops_message().into_inner(), - )) - } - pub fn sync_messages(&mut self) -> Vec { let sync_messages: Vec = self.alarm_converter.sync(); self.alarm_converter = AlarmConverter::Synced; @@ -1530,32 +1494,24 @@ impl CumulocityConverter { &mut self, message: &DiscoverOp, ) -> Result, ConversionError> { - let needs_cloud_update = self.update_operations(&message.ops_dir)?; + let needs_cloud_update = self + .supported_operations + .load_from_dir(&message.ops_dir, &self.config.bridge_config)?; if needs_cloud_update { - Ok(Some(self.create_supported_operations(&message.ops_dir)?)) + let device_xid = self.supported_operations.xid_from_path(&message.ops_dir)?; + Ok(Some( + self.supported_operations.create_supported_operations( + &device_xid, + &self.config.bridge_config.c8y_prefix, + )?, + )) } else { Ok(None) } } } -// FIXME: this only extracts the final component of the path, the path prefix can be anything. this -// should be simplified -fn get_child_external_id(dir_path: &Path) -> Result { - match dir_path.file_name() { - Some(child_id) => { - let child_id = child_id.to_string_lossy().to_string(); - Ok(child_id) - } - // only returned when path is empty, e.g. "/", in practice this shouldn't ever be given as - // input - None => Err(ConversionError::DirPathComponentError { - dir: dir_path.to_owned(), - }), - } -} - fn create_get_pending_operations_message( prefix: &TopicPrefix, ) -> Result { @@ -1563,20 +1519,6 @@ fn create_get_pending_operations_message( Ok(MqttMessage::new(&topic, request_pending_operations())) } -fn is_child_operation_path(path: &Path) -> bool { - // a `path` can contains operations for the parent or for the child - // example paths: - // {cfg_dir}/operations/c8y/child_name/ - // {cfg_dir}/operations/c8y/ - // - // the difference between an operation for the child or for the parent - // is the existence of a directory after `operations/c8y` or not. - match path.file_name() { - Some(file_name) => !file_name.eq("c8y"), - None => false, - } -} - impl CumulocityConverter { /// Register on C8y an operation capability for a device. /// @@ -1591,70 +1533,44 @@ impl CumulocityConverter { c8y_operation_name: &str, ) -> Result, ConversionError> { let device = self.entity_store.try_get(target)?; - let ops_dir = match device.r#type { - EntityType::MainDevice => self.config.ops_dir.clone(), - EntityType::ChildDevice => { - let child_dir_name = device.external_id.as_ref(); - self.config.ops_dir.join(child_dir_name).into() - } - EntityType::Service => { - error!( - %target, - "{c8y_operation_name} operation for services are currently unsupported" - ); - return Err(ConversionError::UnexpectedError(anyhow!( - "{c8y_operation_name} operation for services are currently unsupported" - ))); - } - }; - - let ops_file = ops_dir.join(c8y_operation_name); - // Save the operation to the operation directory - create_directory_with_defaults(&*ops_dir)?; - if let Some(template_name) = self - .operations - .get_template_name_by_operation_name(c8y_operation_name) - { - create_symlink(self.config.ops_dir.join(template_name), &ops_file)?; - } else { - create_file_with_defaults(&ops_file, None)?; - }; + if let EntityType::Service = device.r#type { + error!( + %target, + "{c8y_operation_name} operation for services are currently unsupported" + ); + return Err(ConversionError::UnexpectedError(anyhow!( + "{c8y_operation_name} operation for services are currently unsupported" + ))); + } - let ops_dir = ops_dir.as_std_path(); + self.supported_operations + .add_operation(device.external_id.as_ref(), c8y_operation_name)?; - let need_cloud_update = match is_child_operation_path(ops_dir) { + let need_cloud_update = match device.r#type { // for devices other than the main device, dynamic update of supported operations via file events is // disabled, so we have to additionally load new operations from the c8y operations for that device - true => self.update_operations(ops_dir)?, + EntityType::ChildDevice => self + .supported_operations + .load_all(device.external_id.as_ref(), &self.config.bridge_config)?, // for main devices new operation files are loaded dynamically as they are created, so only register one // operation we need - false => { - let bridge_config = &self.config.bridge_config; - let operation = c8y_api::smartrest::operations::get_operation( - ops_file.as_std_path(), - bridge_config, - )?; - - let current_operations = self - .operations_for_device_mut(target) - .expect("entity should've been checked before that it's not a service"); + EntityType::MainDevice => self.supported_operations.load( + device.external_id.as_ref(), + c8y_operation_name, + &self.config.bridge_config, + )?, - let prev_operation = current_operations.remove_operation(&operation.name); - - // even if the body of the operation is different, as long as it has the same name, supported operations message - // will be the same, so we don't need to resend - let need_cloud_update = prev_operation.is_none(); - - current_operations.add_operation(operation); - - need_cloud_update - } + EntityType::Service => unreachable!("error returned earlier"), }; if need_cloud_update { - let cloud_update_operations_message = self.create_supported_operations(ops_dir)?; + let cloud_update_operations_message = + self.supported_operations.create_supported_operations( + device.external_id.as_ref(), + &self.config.bridge_config.c8y_prefix, + )?; return Ok(vec![cloud_update_operations_message]); } @@ -1662,31 +1578,6 @@ impl CumulocityConverter { Ok(vec![]) } - /// Loads and saves a new supported operation set for a given device. - /// - /// All operation files from the given operation directory are loaded and set as the new supported operation set for - /// a given device. Invalid operation files are ignored. - /// - /// If the supported operation set changed, `Ok(true)` is returned to denote that this change should be sent to the - /// cloud. - fn update_operations(&mut self, dir: &Path) -> Result { - let operations = Operations::try_new(dir, &self.config.bridge_config)?; - let current_operations = if is_child_operation_path(dir) { - let child_id = get_child_external_id(dir)?; - let Some(current_operations) = self.children.get_mut(&child_id) else { - self.children.insert(child_id, operations); - return Ok(true); - }; - current_operations - } else { - &mut self.operations - }; - let modified = *current_operations != operations; - *current_operations = operations; - - Ok(modified) - } - async fn register_restart_operation( &mut self, target: &EntityTopicId, @@ -1706,7 +1597,7 @@ impl CumulocityConverter { command_name: &str, ) -> Result, ConversionError> { if let Some(c8y_op_name) = self - .operations + .supported_operations .get_operation_name_by_workflow_operation(command_name) { match self.register_operation(target, &c8y_op_name) { @@ -1755,41 +1646,6 @@ impl CumulocityConverter { registration.push(self.request_software_list(target)); Ok(registration) } - - fn operations_for_device(&self, device: &EntityTopicId) -> Option<&Operations> { - let device = self.entity_store.get(device)?; - - match device.r#type { - EntityType::MainDevice => Some(&self.operations), - EntityType::ChildDevice => self.children.get(device.external_id.as_ref()), - EntityType::Service => None, - } - } - - /// Return `Operations` struct for a given entity. - /// - /// Returns `None` if an entity is a service, as services can't have operations. - /// If `Operations` wasn't yet created for a device, this function creates it. - fn operations_for_device_mut(&mut self, device: &EntityTopicId) -> Option<&mut Operations> { - let device = self - .entity_store - .get(device) - .expect("Entity should've been registered"); - - match device.r#type { - EntityType::MainDevice => Some(&mut self.operations), - EntityType::ChildDevice => { - let key = device.external_id.as_ref(); - // can't avoid the double hash+lookup because of borrow checker limitation - // https://www.reddit.com/r/rust/comments/qi3ye9/comment/hih04qs/ - if !self.children.contains_key(key) { - self.children.insert(key.to_string(), Operations::default()); - } - self.children.get_mut(key) - } - EntityType::Service => None, - } - } } /// Lists all the locally available child devices linked to this parent device.