From f254b61ec366ed893fb44388d4c9d408acc5878c Mon Sep 17 00:00:00 2001 From: SHANiTH K K <59169878+shanithkk@users.noreply.github.com> Date: Tue, 14 Nov 2023 22:38:53 +0530 Subject: [PATCH] feat: add pause and resume feature in workflow management (#80) * chore: add pause and resume feature in workflow management * chore: update userdata status field with boolean --- actions/workflow-invoker/src/lib.rs | 31 +- actions/workflow-invoker/src/types/data.rs | 18 +- actions/workflow-invoker/src/types/mod.rs | 3 +- actions/workflow-invoker/src/types/topic.rs | 10 - actions/workflow-management/Cargo.toml | 7 +- actions/workflow-management/src/lib.rs | 613 +++++++++++++++++- actions/workflow-management/src/types/mod.rs | 2 +- .../workflow-management/src/types/topic.rs | 10 +- 8 files changed, 646 insertions(+), 48 deletions(-) diff --git a/actions/workflow-invoker/src/lib.rs b/actions/workflow-invoker/src/lib.rs index f8c99b7f..c3440435 100644 --- a/actions/workflow-invoker/src/lib.rs +++ b/actions/workflow-invoker/src/lib.rs @@ -2,6 +2,7 @@ extern crate serde_json; use serde_derive::{Deserialize, Serialize}; use serde_json::{Error, Value}; +use types::UserData; mod types; use crate::types::update_with; #[cfg(test)] @@ -56,26 +57,30 @@ impl Action { self.context.as_mut().expect("Action not Initialized!") } - pub fn fetch_input(&mut self) -> Result, Error> { + pub fn fetch_input(&mut self) -> Result, Error> { let id = self.params.messages.clone()[0].topic.clone(); let data = self.get_context().get_document(&id)?; let parsed = serde_json::from_value::(data)?; Ok(parsed.data) } - pub fn invoke_trigger(&mut self, payload: &mut Vec) -> Result { + pub fn invoke_trigger(&mut self, payload: &mut Vec) -> Result { let mut failed_triggers = vec![]; - for message in payload.iter_mut() { + + for user in payload.iter_mut() { let data = serde_json::from_str::(&self.params.messages[0].value).unwrap(); - update_with(message, &data); + update_with(&mut user.input_data, &data); let trigger = self.params.polkadot_payout_trigger.clone(); - if self - .get_context() - .invoke_trigger(&trigger, &serde_json::json!({"data": message})) - .is_err() - { - failed_triggers.push(self.params.messages[0].value.clone()); + + if user.status { + if self + .get_context() + .invoke_trigger(&trigger, &serde_json::json!({"data": user.input_data})) + .is_err() + { + failed_triggers.push(self.params.messages[0].value.clone()); + } } } if !failed_triggers.is_empty() { @@ -132,7 +137,11 @@ mod tests { let workflow_db = action.connect_db(&action.params.db_url, &action.params.db_name); let workflow_management_db_context = Context::new(workflow_db, None); let doc = serde_json::json!({ - "data": [{ "url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() }] + "data": [{ + "user_id" : "asdf", + "status" : true, + "input_data" :{ "url": "todo!()".to_string(), "validator": "todo!()".to_string(), "owner_key": "todo!()".to_string() } + }] }); let _ = workflow_management_db_context .insert_document(&doc, Some(action.params.messages[0].topic.clone())); diff --git a/actions/workflow-invoker/src/types/data.rs b/actions/workflow-invoker/src/types/data.rs index f3014ab7..300bba0a 100644 --- a/actions/workflow-invoker/src/types/data.rs +++ b/actions/workflow-invoker/src/types/data.rs @@ -1,9 +1,17 @@ use serde_derive::{Deserialize, Serialize}; +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Topic { + #[serde(rename = "_id")] + pub id: String, + #[serde(rename = "_rev")] + pub rev: String, + pub data: Vec, +} #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct DbDatas { - pub endpoint: String, - pub validator: String, - pub key: String, +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] +pub struct UserData { + pub user_id: String, + pub status: bool, + pub input_data: serde_json::Value, } diff --git a/actions/workflow-invoker/src/types/mod.rs b/actions/workflow-invoker/src/types/mod.rs index 1d684afa..db682eee 100644 --- a/actions/workflow-invoker/src/types/mod.rs +++ b/actions/workflow-invoker/src/types/mod.rs @@ -3,9 +3,8 @@ pub mod source; pub mod topic; pub use message::{Era, Message}; pub use source::Source; -pub use topic::Topic; mod data; -pub use data::*; +pub use data::{Topic, UserData}; pub fn update_with(dest: &mut serde_json::Value, src: &serde_json::Value) { use serde_json::Value::{Null, Object}; diff --git a/actions/workflow-invoker/src/types/topic.rs b/actions/workflow-invoker/src/types/topic.rs index f2fec45a..ccbe8a15 100644 --- a/actions/workflow-invoker/src/types/topic.rs +++ b/actions/workflow-invoker/src/types/topic.rs @@ -1,14 +1,4 @@ use serde_derive::{Deserialize, Serialize}; -use serde_json::Value; - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct Topic { - #[serde(skip_serializing, rename(deserialize = "_id"))] - pub id: String, - #[serde(skip_serializing, rename(deserialize = "_rev"))] - pub rev: String, - pub data: Vec, -} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/actions/workflow-management/Cargo.toml b/actions/workflow-management/Cargo.toml index 51572291..96f40725 100644 --- a/actions/workflow-management/Cargo.toml +++ b/actions/workflow-management/Cargo.toml @@ -33,7 +33,10 @@ actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117' } reqwest = { version = "0.11", features = ["blocking", "json"] } bcrypt = "0.13.0" jsonwebtoken = "7.1" +chrono = { version = "0.4", features = ["serde"] } [dev-dependencies] -actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117', features = ["mock_containers"] } -tokio = { version = "1.0.0", features = ["macros"] } \ No newline at end of file +actions-common = { git = "https://github.com/HugoByte/aurras", rev = '1f7e117', features = [ + "mock_containers", +] } +tokio = { version = "1.0.0", features = ["macros"] } diff --git a/actions/workflow-management/src/lib.rs b/actions/workflow-management/src/lib.rs index d59a525e..dfea79d6 100644 --- a/actions/workflow-management/src/lib.rs +++ b/actions/workflow-management/src/lib.rs @@ -4,16 +4,16 @@ use chesterfield::sync::{Client, Database}; use serde_derive::{Deserialize, Serialize}; use serde_json::{Error, Value}; mod types; -#[cfg(not(test))] + use jsonwebtoken::{decode, DecodingKey, Validation}; -#[cfg(not(test))] + use types::Claims; -use types::{Response, Topic}; +use types::{Response, Topic, UserData}; #[cfg(test)] use actions_common::Config; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] struct Input { __ow_method: String, #[serde(default = "empty_string")] @@ -30,12 +30,18 @@ struct Input { token: String, #[serde(default)] input: Value, + #[serde(default = "default_status")] + status: String, } fn empty_string() -> String { String::new() } +fn default_status() -> String { + String::from("active") +} + struct Action { params: Input, context: Option, @@ -92,8 +98,7 @@ impl Action { })) } - #[cfg(not(test))] - pub fn user_validate(&self) -> Result<(), Error> { + pub fn user_validate(&self) -> Result { let decoding_key = DecodingKey::from_secret("user_registration_token_secret_key".as_bytes()); let validation = Validation::default(); @@ -103,13 +108,12 @@ impl Action { let db = self.connect_db(&self.params.db_url, &"user_registration_db".to_string()); let context = Context::new(db, None); let _data = context.get_document(&uuid)?; - Ok(()) + Ok(uuid) } } pub fn add_data_to_db(&mut self) -> Result { - #[cfg(not(test))] - self.user_validate()?; + let user_id = self.user_validate()?; let mut db_input = self.params.input.clone(); db_input["token"] = serde_json::json!(self.params.token.clone()); @@ -120,17 +124,85 @@ impl Action { if context.get_document(&topic).is_err() { context.insert_document( &serde_json::json!({ - "data": [db_input] + "data": [{ + "user_id": user_id, + "status": true, + "input_data": db_input + }] }), Some(topic.to_string()), ) } else { let mut doc: Topic = serde_json::from_value(context.get_document(&topic)?)?; + let mut user_index = None; + for (index, user) in doc.data.iter().enumerate() { + if user.user_id == user_id { + user_index = Some(index); + } + } + match user_index { + Some(x) => doc.data[x].input_data = db_input, + None => { + let new_user = UserData { + user_id, + status: true, + input_data: db_input, + }; + doc.data.push(new_user); + } + } - doc.data.push(db_input); context.update_document(&topic, &doc.rev, &serde_json::to_value(doc.clone())?) } } + + pub fn update_subscription_status(&mut self) -> Result { + let user_id = self.user_validate()?; + let db = self.connect_db(&self.params.db_url, &self.params.workflow_management_db); + let context = Context::new(db, None); + let topic = self.params.topic.clone(); + + let mut doc: Topic = serde_json::from_value(context.get_document(&topic)?)?; + let mut user_index = None; + for (index, user) in doc.data.iter().enumerate() { + if user.user_id == user_id { + user_index = Some(index); + } + } + let status = self.params.status.clone() == "active".to_string(); + match user_index { + Some(x) => doc.data[x].status = status, + None => (), + } + + context.update_document(&topic, &doc.rev, &serde_json::to_value(doc.clone())?) + } + pub fn delete_subscription_status(&mut self) -> Result { + let user_id = self.user_validate()?; + let db = self.connect_db(&self.params.db_url, &self.params.workflow_management_db); + let context = Context::new(db, None); + let topic = self.params.topic.clone(); + + let mut doc: Topic = serde_json::from_value(context.get_document(&topic)?)?; + let mut user_index = None; + for (index, user) in doc.data.iter().enumerate() { + if user.user_id == user_id { + user_index = Some(index); + } + } + + match user_index { + Some(x) => { + doc.data.remove(x); + } + None => { + return Err(format!("User didnt subscribed this service",)) + .map_err(serde::de::Error::custom) + } + } + + context.update_document(&topic, &doc.rev, &serde_json::to_value(doc.clone())?) + } } pub fn main(args: Value) -> Result { @@ -154,6 +226,27 @@ pub fn main(args: Value) -> Result { })) } "get" => action.get_event_sources(), + "put" => { + action.update_subscription_status()?; + Ok(serde_json::json!({ + "statusCode": 200, + "headers": { "Content-Type": "application/json" }, + "body": { + "success": true + } + })) + } + "delete" => { + action.delete_subscription_status()?; + Ok(serde_json::json!({ + "statusCode": 200, + "headers": { "Content-Type": "application/json" }, + "body": { + "success": true + } + })) + } + method => Err(format!("method not supported document {}", method)) .map_err(serde::de::Error::custom), } @@ -164,8 +257,22 @@ mod tests { use super::*; use actions_common::mock_containers::CouchDB; use actions_common::Config; + use chrono::{Duration as ChronoDuration, Utc}; + use jsonwebtoken::{encode, EncodingKey, Header}; use tokio; use tokio::time::{sleep, Duration}; + use uuid::Uuid; + + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + pub struct User { + name: String, + email: String, + password: String, + } + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + pub struct UserId { + pub user_id: String, + } #[tokio::test] async fn filter_topics_pass() { @@ -174,7 +281,196 @@ mod tests { .await .unwrap(); sleep(Duration::from_millis(5000)).await; + let user = User { + name: "test".to_string(), + email: "test@example.com".to_string(), + password: "testpassword".to_string(), + }; + let user_id = Uuid::new_v4().to_string(); + let doc = serde_json::to_value(user.clone()).unwrap(); + let uder_id_doc = serde_json::to_value(UserId { + user_id: user_id.clone(), + }) + .unwrap(); + + let headers = Header::default(); + let encoding_key = + EncodingKey::from_secret("user_registration_token_secret_key".as_bytes()); + let now = Utc::now() + ChronoDuration::days(1); // Expires in 1 day + let claims = Claims { + sub: user_id.clone(), + exp: now.timestamp(), + }; + let user_token = encode(&headers, &claims, &encoding_key).unwrap(); + let url = format!("http://admin:password@localhost:{}", couchdb.port()); + let mut action = Action::new(Input { + db_url: url.clone(), + db_name: "test".to_string(), + __ow_method: "post".to_string(), + address: "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(), + workflow_management_db: "workflow_management_db".to_string(), + event_registration_db: "event_registration_db".to_string(), + auth_token: user_token, + topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(), + token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(), + input: serde_json::json!({ + "url": "".to_string(), + "owner_key": "".to_string(), + "validator": "".to_string(), + }), + ..Default::default() + }); + action.init(&config); + + let user_db = action.connect_db(&action.params.db_url, &"user_registration_db".to_string()); + let user_db_context = Context::new(user_db, None); + user_db_context + .insert_document(&uder_id_doc, Some("test@example.com".to_string())) + .unwrap(); + user_db_context + .insert_document(&doc, Some(user_id.clone())) + .unwrap(); + + let event_registration_db = + action.connect_db(&action.params.db_url, &action.params.event_registration_db); + let event_registration_db_context = Context::new(event_registration_db, None); + + event_registration_db_context + .insert_document( + &serde_json::json!({ + "name": "polkadot", + "trigger": "trigger" + }), + Some("event_id".to_string()), + ) + .unwrap(); + let workflow_db = + action.connect_db(&action.params.db_url, &action.params.workflow_management_db); + let workflow_management_db_context = Context::new(workflow_db, None); + let _res = action.add_data_to_db(); + let _res = action.add_data_to_db(); + let res_data = + workflow_management_db_context.get_document("418a8b8c-02b8-11ec-9a03-0242ac130003"); + let res = serde_json::from_value::(res_data.unwrap()); + assert!(res.is_ok()); + couchdb.delete().await.expect("Stopping Container Failed"); + } + + #[tokio::test] + async fn add_data_to_db() { + let config = Config::new(); + let couchdb = CouchDB::new("admin".to_string(), "password".to_string()) + .await + .unwrap(); + sleep(Duration::from_millis(5000)).await; + let url = format!("http://admin:password@localhost:{}", couchdb.port()); + + let user = User { + name: "test".to_string(), + email: "test@example.com".to_string(), + password: "testpassword".to_string(), + }; + let user_id = Uuid::new_v4().to_string(); + let doc = serde_json::to_value(user.clone()).unwrap(); + let uder_id_doc = serde_json::to_value(UserId { + user_id: user_id.clone(), + }) + .unwrap(); + + let headers = Header::default(); + let encoding_key = + EncodingKey::from_secret("user_registration_token_secret_key".as_bytes()); + let now = Utc::now() + ChronoDuration::days(1); // Expires in 1 day + let claims = Claims { + sub: user_id.clone(), + exp: now.timestamp(), + }; + let user_token = encode(&headers, &claims, &encoding_key).unwrap(); + let mut action = Action::new(Input { + db_url: url.clone(), + db_name: "test".to_string(), + __ow_method: "post".to_string(), + address: "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(), + workflow_management_db: "workflow_management_db".to_string(), + event_registration_db: "event_registration_db".to_string(), + auth_token: user_token, + topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(), + token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(), + input: serde_json::json!({ + "url": "".to_string(), + "owner_key": "".to_string(), + "validator": "".to_string(), + }), + ..Default::default() + }); + action.init(&config); + + let user_db = action.connect_db(&action.params.db_url, &"user_registration_db".to_string()); + let user_db_context = Context::new(user_db, None); + user_db_context + .insert_document(&uder_id_doc, Some("test@example.com".to_string())) + .unwrap(); + user_db_context + .insert_document(&doc, Some(user_id.clone())) + .unwrap(); + + let event_registration_db = + action.connect_db(&action.params.db_url, &action.params.event_registration_db); + let event_registration_db_context = Context::new(event_registration_db, None); + + event_registration_db_context + .insert_document( + &serde_json::json!({ + "name": "polkadot", + "trigger": "trigger" + }), + Some("event_id".to_string()), + ) + .unwrap(); + let workflow_db = + action.connect_db(&action.params.db_url, &action.params.workflow_management_db); + let workflow_management_db_context = Context::new(workflow_db, None); + let _res = action.add_data_to_db(); + let _res = action.add_data_to_db(); + let res_data = + workflow_management_db_context.get_document("418a8b8c-02b8-11ec-9a03-0242ac130003"); + let res = serde_json::from_value::(res_data.unwrap()); + assert!(res.is_ok()); + couchdb.delete().await.expect("Stopping Container Failed"); + } + + #[tokio::test] + async fn add_two_user_data_to_db() { + let config = Config::new(); + let couchdb = CouchDB::new("admin".to_string(), "password".to_string()) + .await + .unwrap(); + sleep(Duration::from_millis(5000)).await; + let url = format!("http://admin:password@localhost:{}", couchdb.port()); + + let user = User { + name: "test".to_string(), + email: "test@example.com".to_string(), + password: "testpassword".to_string(), + }; + let user_id = Uuid::new_v4().to_string(); + let doc = serde_json::to_value(user.clone()).unwrap(); + let uder_id_doc = serde_json::to_value(UserId { + user_id: user_id.clone(), + }) + .unwrap(); + + let headers = Header::default(); + let encoding_key = + EncodingKey::from_secret("user_registration_token_secret_key".as_bytes()); + let now = Utc::now() + ChronoDuration::days(1); // Expires in 1 day + let claims = Claims { + sub: user_id.clone(), + exp: now.timestamp(), + }; + let user_token = encode(&headers, &claims, &encoding_key).unwrap(); + let _topic = "1234".to_string(); let _address = "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(); let _token = "1".to_string(); @@ -185,7 +481,75 @@ mod tests { address: "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(), workflow_management_db: "workflow_management_db".to_string(), event_registration_db: "event_registration_db".to_string(), - auth_token: "1".to_string(), + auth_token: user_token, + topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(), + token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(), + input: serde_json::json!({ + "url": "".to_string(), + "owner_key": "".to_string(), + "validator": "".to_string(), + }), + ..Default::default() + }); + action.init(&config); + + let user_db = action.connect_db(&action.params.db_url, &"user_registration_db".to_string()); + let user_db_context = Context::new(user_db, None); + user_db_context + .insert_document(&uder_id_doc, Some("test@example.com".to_string())) + .unwrap(); + user_db_context + .insert_document(&doc, Some(user_id.clone())) + .unwrap(); + + let event_registration_db = + action.connect_db(&action.params.db_url, &action.params.event_registration_db); + let event_registration_db_context = Context::new(event_registration_db, None); + + event_registration_db_context + .insert_document( + &serde_json::json!({ + "name": "polkadot", + "trigger": "trigger" + }), + Some("event_id".to_string()), + ) + .unwrap(); + let workflow_db = + action.connect_db(&action.params.db_url, &action.params.workflow_management_db); + let workflow_management_db_context = Context::new(workflow_db, None); + let _res = action.add_data_to_db(); + let _res = action.add_data_to_db(); + + let user = User { + name: "test".to_string(), + email: "test1@example.com".to_string(), + password: "testpassword".to_string(), + }; + let user_id = Uuid::new_v4().to_string(); + let doc = serde_json::to_value(user.clone()).unwrap(); + let uder_id_doc = serde_json::to_value(UserId { + user_id: user_id.clone(), + }) + .unwrap(); + + let headers = Header::default(); + let encoding_key = + EncodingKey::from_secret("user_registration_token_secret_key".as_bytes()); + let now = Utc::now() + ChronoDuration::days(1); // Expires in 1 day + let claims = Claims { + sub: user_id.clone(), + exp: now.timestamp(), + }; + let user_token = encode(&headers, &claims, &encoding_key).unwrap(); + let mut action = Action::new(Input { + db_url: url.clone(), + db_name: "test".to_string(), + __ow_method: "post".to_string(), + address: "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(), + workflow_management_db: "workflow_management_db".to_string(), + event_registration_db: "event_registration_db".to_string(), + auth_token: user_token, topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(), token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(), input: serde_json::json!({ @@ -193,9 +557,83 @@ mod tests { "owner_key": "".to_string(), "validator": "".to_string(), }), + ..Default::default() }); action.init(&config); + user_db_context + .insert_document(&uder_id_doc, Some("test1@example.com".to_string())) + .unwrap(); + user_db_context + .insert_document(&doc, Some(user_id.clone())) + .unwrap(); + let _res = action.add_data_to_db(); + + let res_data = + workflow_management_db_context.get_document("418a8b8c-02b8-11ec-9a03-0242ac130003"); + let res = serde_json::from_value::(res_data.unwrap()); + assert!(res.is_ok()); + couchdb.delete().await.expect("Stopping Container Failed"); + } + + #[tokio::test] + async fn update_subscription_status() { + let config = Config::new(); + let couchdb = CouchDB::new("admin".to_string(), "password".to_string()) + .await + .unwrap(); + sleep(Duration::from_millis(5000)).await; + let user = User { + name: "test".to_string(), + email: "test@example.com".to_string(), + password: "testpassword".to_string(), + }; + let user_id = Uuid::new_v4().to_string(); + let doc = serde_json::to_value(user.clone()).unwrap(); + let uder_id_doc = serde_json::to_value(UserId { + user_id: user_id.clone(), + }) + .unwrap(); + + let headers = Header::default(); + let encoding_key = + EncodingKey::from_secret("user_registration_token_secret_key".as_bytes()); + let now = Utc::now() + ChronoDuration::days(1); // Expires in 1 day + let claims = Claims { + sub: user_id.clone(), + exp: now.timestamp(), + }; + let user_token = encode(&headers, &claims, &encoding_key).unwrap(); + + let url = format!("http://admin:password@localhost:{}", couchdb.port()); + let mut action = Action::new(Input { + db_url: url.clone(), + db_name: "test".to_string(), + __ow_method: "post".to_string(), + address: "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(), + workflow_management_db: "workflow_management_db".to_string(), + event_registration_db: "event_registration_db".to_string(), + auth_token: user_token, + topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(), + token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(), + input: serde_json::json!({ + "url": "".to_string(), + "owner_key": "".to_string(), + "validator": "".to_string(), + }), + status: "inactive".to_string(), + }); + action.init(&config); + + let user_db = action.connect_db(&action.params.db_url, &"user_registration_db".to_string()); + let user_db_context = Context::new(user_db, None); + user_db_context + .insert_document(&uder_id_doc, Some("test@example.com".to_string())) + .unwrap(); + user_db_context + .insert_document(&doc, Some(user_id.clone())) + .unwrap(); + let event_registration_db = action.connect_db(&action.params.db_url, &action.params.event_registration_db); let event_registration_db_context = Context::new(event_registration_db, None); @@ -213,11 +651,162 @@ mod tests { action.connect_db(&action.params.db_url, &action.params.workflow_management_db); let workflow_management_db_context = Context::new(workflow_db, None); let _res = action.add_data_to_db(); + let _res = action.update_subscription_status(); + + let res_data = + workflow_management_db_context.get_document("418a8b8c-02b8-11ec-9a03-0242ac130003"); + let res = serde_json::from_value::(res_data.unwrap()); + assert!(res.is_ok()); + couchdb.delete().await.expect("Stopping Container Failed"); + } + + #[tokio::test] + async fn remove_workflow_subscription() { + let config = Config::new(); + let couchdb = CouchDB::new("admin".to_string(), "password".to_string()) + .await + .unwrap(); + sleep(Duration::from_millis(5000)).await; + let user = User { + name: "test".to_string(), + email: "test@example.com".to_string(), + password: "testpassword".to_string(), + }; + let user_id = Uuid::new_v4().to_string(); + let doc = serde_json::to_value(user.clone()).unwrap(); + let uder_id_doc = serde_json::to_value(UserId { + user_id: user_id.clone(), + }) + .unwrap(); + + let headers = Header::default(); + let encoding_key = + EncodingKey::from_secret("user_registration_token_secret_key".as_bytes()); + let now = Utc::now() + ChronoDuration::days(1); // Expires in 1 day + let claims = Claims { + sub: user_id.clone(), + exp: now.timestamp(), + }; + let user_token = encode(&headers, &claims, &encoding_key).unwrap(); + + let url = format!("http://admin:password@localhost:{}", couchdb.port()); + let mut action = Action::new(Input { + db_url: url.clone(), + db_name: "test".to_string(), + __ow_method: "post".to_string(), + address: "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(), + workflow_management_db: "workflow_management_db".to_string(), + event_registration_db: "event_registration_db".to_string(), + auth_token: user_token, + topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(), + token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(), + input: serde_json::json!({ + "url": "".to_string(), + "owner_key": "".to_string(), + "validator": "".to_string(), + }), + status: "active".to_string(), + }); + action.init(&config); + + let user_db = action.connect_db(&action.params.db_url, &"user_registration_db".to_string()); + let user_db_context = Context::new(user_db, None); + user_db_context + .insert_document(&uder_id_doc, Some("test@example.com".to_string())) + .unwrap(); + user_db_context + .insert_document(&doc, Some(user_id.clone())) + .unwrap(); + + let event_registration_db = + action.connect_db(&action.params.db_url, &action.params.event_registration_db); + let event_registration_db_context = Context::new(event_registration_db, None); + + event_registration_db_context + .insert_document( + &serde_json::json!({ + "name": "polkadot", + "trigger": "trigger" + }), + Some("event_id".to_string()), + ) + .unwrap(); + let workflow_db = + action.connect_db(&action.params.db_url, &action.params.workflow_management_db); + let workflow_management_db_context = Context::new(workflow_db, None); let _res = action.add_data_to_db(); + let _res = action.delete_subscription_status(); + let res_data = workflow_management_db_context.get_document("418a8b8c-02b8-11ec-9a03-0242ac130003"); let res = serde_json::from_value::(res_data.unwrap()); assert!(res.is_ok()); couchdb.delete().await.expect("Stopping Container Failed"); } + + #[tokio::test] + #[should_panic(expected = "User didnt subscribed this service")] + async fn remove_subscription_fails() { + let config = Config::new(); + let couchdb = CouchDB::new("admin".to_string(), "password".to_string()) + .await + .unwrap(); + sleep(Duration::from_millis(5000)).await; + let user = User { + name: "test".to_string(), + email: "test@example.com".to_string(), + password: "testpassword".to_string(), + }; + let user_id = Uuid::new_v4().to_string(); + let doc = serde_json::to_value(user.clone()).unwrap(); + let uder_id_doc = serde_json::to_value(UserId { + user_id: user_id.clone(), + }) + .unwrap(); + + let headers = Header::default(); + let encoding_key = + EncodingKey::from_secret("user_registration_token_secret_key".as_bytes()); + let now = Utc::now() + ChronoDuration::days(1); // Expires in 1 day + let claims = Claims { + sub: user_id.clone(), + exp: now.timestamp(), + }; + let user_token = encode(&headers, &claims, &encoding_key).unwrap(); + + let url = format!("http://admin:password@localhost:{}", couchdb.port()); + let mut action = Action::new(Input { + db_url: url.clone(), + db_name: "test".to_string(), + __ow_method: "post".to_string(), + address: "15ss3TDX2NLG31ugk6QN5zHhq2MUfiaPhePSjWwht6Dr9RUw".to_string(), + workflow_management_db: "workflow_management_db".to_string(), + event_registration_db: "event_registration_db".to_string(), + auth_token: user_token, + topic: "418a8b8c-02b8-11ec-9a03-0242ac130003".to_string(), + token: "akjDSIJGFIJHNSdmngknomlmxcgknhNDlnglnlkoNSDG".to_string(), + input: serde_json::json!({ + "url": "".to_string(), + "owner_key": "".to_string(), + "validator": "".to_string(), + }), + status: "active".to_string(), + }); + action.init(&config); + + let user_db = action.connect_db(&action.params.db_url, &"user_registration_db".to_string()); + let user_db_context = Context::new(user_db, None); + user_db_context + .insert_document(&uder_id_doc, Some("test@example.com".to_string())) + .unwrap(); + user_db_context + .insert_document(&doc, Some(user_id.clone())) + .unwrap(); + + action.add_data_to_db().unwrap(); + action.delete_subscription_status().unwrap(); + let res = action.delete_subscription_status(); + couchdb.delete().await.expect("Stopping Container Failed"); + res.unwrap(); + } } diff --git a/actions/workflow-management/src/types/mod.rs b/actions/workflow-management/src/types/mod.rs index 0bb29b63..a6405d99 100644 --- a/actions/workflow-management/src/types/mod.rs +++ b/actions/workflow-management/src/types/mod.rs @@ -3,4 +3,4 @@ mod response; mod topic; pub use body::{Body, Claims}; pub use response::Response; -pub use topic::{DbDatas, Topic}; +pub use topic::{Topic, UserData}; diff --git a/actions/workflow-management/src/types/topic.rs b/actions/workflow-management/src/types/topic.rs index 2f9c3e7e..24a9db85 100644 --- a/actions/workflow-management/src/types/topic.rs +++ b/actions/workflow-management/src/types/topic.rs @@ -6,13 +6,13 @@ pub struct Topic { pub id: String, #[serde(rename = "_rev")] pub rev: String, - pub data: Vec, + pub data: Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] -pub struct DbDatas { - pub endpoint: String, - pub validator: String, - pub key: String, +pub struct UserData { + pub user_id: String, + pub status: bool, + pub input_data: serde_json::Value, }