From 8c14612f988fc01bd8da2ecc9646614e98bb79ed Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:04:47 +0800 Subject: [PATCH] feat: refactor access control to enable swapping implementation (#859) --- Cargo.lock | 22 +- Cargo.toml | 3 - libs/access-control/Cargo.toml | 17 +- .../access-control/src/{ => casbin}/access.rs | 82 +- .../src/{ => casbin}/adapter.rs | 3 +- libs/access-control/src/casbin/collab.rs | 160 ++++ libs/access-control/src/casbin/enforcer.rs | 829 ++++++++++++++++++ libs/access-control/src/casbin/mod.rs | 6 + .../src/casbin}/notification.rs | 6 +- .../src/casbin/workspace.rs} | 8 +- libs/access-control/src/enforcer.rs | 225 ----- libs/access-control/src/entity.rs | 24 + libs/access-control/src/lib.rs | 7 +- libs/access-control/src/metrics.rs | 4 +- libs/access-control/src/noops/collab.rs | 95 ++ libs/access-control/src/noops/mod.rs | 2 + libs/access-control/src/noops/workspace.rs | 60 ++ libs/access-control/src/request.rs | 2 +- libs/access-control/tests/enforce_test.rs | 585 ------------ libs/workspace-access/Cargo.toml | 14 - libs/workspace-access/src/lib.rs | 4 - services/appflowy-collaborate/Cargo.toml | 1 - .../src/actix_ws/server/rt_actor.rs | 34 +- services/appflowy-collaborate/src/api.rs | 4 +- .../appflowy-collaborate/src/application.rs | 24 +- .../src/client/client_msg_router.rs | 16 +- .../src/collab/access_control.rs | 174 +--- .../src/collab/notification.rs | 3 +- .../src/collab/storage.rs | 8 +- services/appflowy-collaborate/src/command.rs | 6 +- .../appflowy-collaborate/src/group/cmd.rs | 9 +- .../appflowy-collaborate/src/group/manager.rs | 11 +- .../appflowy-collaborate/src/pg_listener.rs | 2 +- .../appflowy-collaborate/src/rt_server.rs | 15 +- services/appflowy-collaborate/src/state.rs | 2 +- src/api/data_import.rs | 2 +- src/api/workspace.rs | 32 +- src/api/ws.rs | 4 +- src/application.rs | 57 +- src/biz/collab/access_control.rs | 16 +- src/biz/collab/ops.rs | 6 +- src/biz/pg_listener.rs | 2 +- src/biz/user/user_verify.rs | 1 - src/biz/workspace/access_control.rs | 16 +- src/biz/workspace/ops.rs | 16 +- src/config/config.rs | 11 + src/middleware/access_control_mw.rs | 7 - src/middleware/cors_mw.rs | 17 - src/middleware/mod.rs | 1 - src/state.rs | 13 +- 50 files changed, 1382 insertions(+), 1286 deletions(-) rename libs/access-control/src/{ => casbin}/access.rs (84%) rename libs/access-control/src/{ => casbin}/adapter.rs (99%) create mode 100644 libs/access-control/src/casbin/collab.rs create mode 100644 libs/access-control/src/casbin/enforcer.rs create mode 100644 libs/access-control/src/casbin/mod.rs rename libs/{workspace-access/src => access-control/src/casbin}/notification.rs (95%) rename libs/{workspace-access/src/access_control.rs => access-control/src/casbin/workspace.rs} (90%) delete mode 100644 libs/access-control/src/enforcer.rs create mode 100644 libs/access-control/src/entity.rs create mode 100644 libs/access-control/src/noops/collab.rs create mode 100644 libs/access-control/src/noops/mod.rs create mode 100644 libs/access-control/src/noops/workspace.rs delete mode 100644 libs/access-control/tests/enforce_test.rs delete mode 100644 libs/workspace-access/Cargo.toml delete mode 100644 libs/workspace-access/src/lib.rs delete mode 100644 src/middleware/cors_mw.rs diff --git a/Cargo.lock b/Cargo.lock index 3176237c3..251d201fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,10 +17,12 @@ dependencies = [ "lazy_static", "prometheus-client", "redis 0.25.4", + "serde", "sqlx", "tokio", "tokio-stream", "tracing", + "uuid", ] [[package]] @@ -680,7 +682,6 @@ dependencies = [ "url", "uuid", "validator", - "workspace-access", "workspace-template", "yrs", ] @@ -742,7 +743,6 @@ dependencies = [ "tracing-subscriber", "uuid", "validator", - "workspace-access", "workspace-template", "yrs", ] @@ -951,9 +951,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.11" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5" +checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429" dependencies = [ "bzip2", "deflate64", @@ -8433,20 +8433,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "workspace-access" -version = "0.1.0" -dependencies = [ - "access-control", - "app-error", - "async-trait", - "database-entity", - "serde", - "tokio", - "tracing", - "uuid", -] - [[package]] name = "workspace-template" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index de5fa657d..52571b302 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,7 +128,6 @@ gotrue-entity = { path = "libs/gotrue-entity" } infra = { path = "libs/infra" } authentication.workspace = true access-control.workspace = true -workspace-access.workspace = true app-error = { workspace = true, features = [ "sqlx_error", "actix_web_error", @@ -195,7 +194,6 @@ members = [ "libs/gotrue-entity", "admin_frontend", "libs/app-error", - "libs/workspace-access", "libs/workspace-template", "libs/encrypt", "libs/authentication", @@ -226,7 +224,6 @@ shared-entity = { path = "libs/shared-entity" } gotrue-entity = { path = "libs/gotrue-entity" } authentication = { path = "libs/authentication" } access-control = { path = "libs/access-control" } -workspace-access = { path = "libs/workspace-access" } app-error = { path = "libs/app-error" } async-trait = "0.1.77" prometheus-client = "0.22.0" diff --git a/libs/access-control/Cargo.toml b/libs/access-control/Cargo.toml index d93024939..39ff461ce 100644 --- a/libs/access-control/Cargo.toml +++ b/libs/access-control/Cargo.toml @@ -8,7 +8,11 @@ actix-http.workspace = true app-error.workspace = true anyhow.workspace = true async-trait.workspace = true -casbin = { version = "2.2.0", features = ["cached", "runtime-tokio", "incremental"] } +casbin = { version = "2.2.0", features = [ + "cached", + "runtime-tokio", + "incremental", +], optional = true } database.workspace = true database-entity.workspace = true futures-util.workspace = true @@ -17,8 +21,11 @@ prometheus-client.workspace = true redis.workspace = true sqlx = { workspace = true, default-features = false, features = ["postgres"] } tracing.workspace = true -tokio = { workspace = true, features = [ - "macros", - "time", -] } +tokio = { workspace = true, features = ["macros", "time"] } tokio-stream.workspace = true +uuid = { version = "1.8.0", features = ["v4"] } +serde = { version = "1.0.200", features = ["derive"] } + +[features] +default = ["casbin"] +casbin = ["dep:casbin"] diff --git a/libs/access-control/src/access.rs b/libs/access-control/src/casbin/access.rs similarity index 84% rename from libs/access-control/src/access.rs rename to libs/access-control/src/casbin/access.rs index 2d18c966a..000bec63e 100644 --- a/libs/access-control/src/access.rs +++ b/libs/access-control/src/casbin/access.rs @@ -1,6 +1,7 @@ +use super::adapter::PgAdapter; +use super::enforcer::{AFEnforcer, NoEnforceGroup}; use crate::act::{Action, ActionVariant, Acts}; -use crate::adapter::PgAdapter; -use crate::enforcer::{AFEnforcer, NoEnforceGroup}; +use crate::entity::ObjectType; use crate::metrics::{tick_metric, AccessControlMetrics}; use anyhow::anyhow; @@ -8,7 +9,6 @@ use app_error::AppError; use casbin::rhai::ImmutableString; use casbin::{CoreApi, DefaultModel, Enforcer, MgmtApi}; use database_entity::dto::{AFAccessLevel, AFRole}; -use lazy_static::lazy_static; use sqlx::PgPool; @@ -80,30 +80,19 @@ impl AccessControl { obj: ObjectType<'_>, act: ActionVariant<'_>, ) -> Result<(), AppError> { - if enable_access_control() { - let change = AccessControlChange::UpdatePolicy { - uid: *uid, - oid: obj.object_id().to_string(), - }; - self.enforcer.update_policy(uid, obj, act).await?; + let access_control_change = self.enforcer.update_policy(uid, obj, act).await?; + if let Some(change) = access_control_change { let _ = self.change_tx.send(change); - Ok(()) - } else { - Ok(()) } + Ok(()) } pub async fn remove_policy(&self, uid: &i64, obj: &ObjectType<'_>) -> Result<(), AppError> { - if enable_access_control() { - self.enforcer.remove_policy(uid, obj).await?; - let _ = self.change_tx.send(AccessControlChange::RemovePolicy { - uid: *uid, - oid: obj.object_id().to_string(), - }); - Ok(()) - } else { - Ok(()) + let access_control_change = self.enforcer.remove_policy(uid, obj).await?; + if let Some(change) = access_control_change { + let _ = self.change_tx.send(change); } + Ok(()) } pub async fn enforce( @@ -113,14 +102,10 @@ impl AccessControl { obj: ObjectType<'_>, act: ActionVariant<'_>, ) -> Result { - if enable_access_control() { - self - .enforcer - .enforce_policy(workspace_id, uid, obj, act) - .await - } else { - Ok(true) - } + self + .enforcer + .enforce_policy(workspace_id, uid, obj, act) + .await } } @@ -256,45 +241,6 @@ const GROUPING_FIELD_INDEX_ROLE: usize = 0; #[allow(dead_code)] const GROUPING_FIELD_INDEX_ACTION: usize = 1; -/// Represents the object type that is stored in the access control policy. -#[derive(Debug)] -pub enum ObjectType<'id> { - /// Stored as `workspace::` - Workspace(&'id str), - /// Stored as `collab::` - Collab(&'id str), -} - -impl ObjectType<'_> { - pub fn policy_object(&self) -> String { - match self { - ObjectType::Collab(s) => format!("collab::{}", s), - ObjectType::Workspace(s) => format!("workspace::{}", s), - } - } - - pub fn object_id(&self) -> &str { - match self { - ObjectType::Collab(s) => s, - ObjectType::Workspace(s) => s, - } - } -} - -lazy_static! { - static ref ENABLE_ACCESS_CONTROL: bool = { - match std::env::var("APPFLOWY_ACCESS_CONTROL") { - Ok(value) => value.eq_ignore_ascii_case("true") || value.eq("1"), - Err(_) => false, - } - }; -} - -#[inline] -pub fn enable_access_control() -> bool { - *ENABLE_ACCESS_CONTROL -} - pub(crate) async fn load_group_policies(enforcer: &mut Enforcer) -> Result<(), AppError> { // Grouping definition of access level to action. let af_access_levels = [ diff --git a/libs/access-control/src/adapter.rs b/libs/access-control/src/casbin/adapter.rs similarity index 99% rename from libs/access-control/src/adapter.rs rename to libs/access-control/src/casbin/adapter.rs index ff0d5298d..4e8ced220 100644 --- a/libs/access-control/src/adapter.rs +++ b/libs/access-control/src/casbin/adapter.rs @@ -1,7 +1,6 @@ -use crate::access::ObjectType; - use async_trait::async_trait; +use crate::entity::ObjectType; use crate::metrics::AccessControlMetrics; use casbin::Adapter; use casbin::Filter; diff --git a/libs/access-control/src/casbin/collab.rs b/libs/access-control/src/casbin/collab.rs new file mode 100644 index 000000000..75ffc8d22 --- /dev/null +++ b/libs/access-control/src/casbin/collab.rs @@ -0,0 +1,160 @@ +use app_error::AppError; +use async_trait::async_trait; +use database_entity::dto::AFAccessLevel; +use tracing::instrument; + +use crate::{ + act::{Action, ActionVariant}, + collab::{CollabAccessControl, RealtimeAccessControl}, + entity::ObjectType, +}; + +use super::access::AccessControl; + +#[derive(Clone)] +pub struct CollabAccessControlImpl { + access_control: AccessControl, +} + +impl CollabAccessControlImpl { + pub fn new(access_control: AccessControl) -> Self { + Self { access_control } + } +} + +#[async_trait] +impl CollabAccessControl for CollabAccessControlImpl { + async fn enforce_action( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + action: Action, + ) -> Result { + self + .access_control + .enforce( + workspace_id, + uid, + ObjectType::Collab(oid), + ActionVariant::FromAction(&action), + ) + .await + } + + async fn enforce_access_level( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + access_level: AFAccessLevel, + ) -> Result { + self + .access_control + .enforce( + workspace_id, + uid, + ObjectType::Collab(oid), + ActionVariant::FromAccessLevel(&access_level), + ) + .await + } + + #[instrument(level = "info", skip_all)] + async fn update_access_level_policy( + &self, + uid: &i64, + oid: &str, + level: AFAccessLevel, + ) -> Result<(), AppError> { + self + .access_control + .update_policy( + uid, + ObjectType::Collab(oid), + ActionVariant::FromAccessLevel(&level), + ) + .await?; + + Ok(()) + } + + #[instrument(level = "info", skip_all)] + async fn remove_access_level(&self, uid: &i64, oid: &str) -> Result<(), AppError> { + self + .access_control + .remove_policy(uid, &ObjectType::Collab(oid)) + .await?; + Ok(()) + } +} + +#[derive(Clone)] +pub struct RealtimeCollabAccessControlImpl { + access_control: AccessControl, +} + +impl RealtimeCollabAccessControlImpl { + pub fn new(access_control: AccessControl) -> Self { + // let action_by_oid = Arc::new(DashMap::new()); + // let mut sub = access_control.subscribe_change(); + // let weak_action_by_oid = Arc::downgrade(&action_by_oid); + // tokio::spawn(async move { + // while let Ok(change) = sub.recv().await { + // match weak_action_by_oid.upgrade() { + // None => break, + // Some(action_by_oid) => match change { + // AccessControlChange::UpdatePolicy { uid, oid } => {}, + // AccessControlChange::RemovePolicy { uid, oid } => {}, + // }, + // } + // } + // }); + Self { access_control } + } + + async fn can_perform_action( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + required_action: Action, + ) -> Result { + let is_permitted = self + .access_control + .enforce( + workspace_id, + uid, + ObjectType::Collab(oid), + ActionVariant::FromAction(&required_action), + ) + .await?; + + Ok(is_permitted) + } +} + +#[async_trait] +impl RealtimeAccessControl for RealtimeCollabAccessControlImpl { + async fn can_write_collab( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + ) -> Result { + self + .can_perform_action(workspace_id, uid, oid, Action::Write) + .await + } + + async fn can_read_collab( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + ) -> Result { + self + .can_perform_action(workspace_id, uid, oid, Action::Read) + .await + } +} diff --git a/libs/access-control/src/casbin/enforcer.rs b/libs/access-control/src/casbin/enforcer.rs new file mode 100644 index 000000000..3d63bfd44 --- /dev/null +++ b/libs/access-control/src/casbin/enforcer.rs @@ -0,0 +1,829 @@ +use super::access::{ + load_group_policies, AccessControlChange, POLICY_FIELD_INDEX_OBJECT, POLICY_FIELD_INDEX_SUBJECT, +}; +use crate::act::ActionVariant; +use crate::entity::ObjectType; +use crate::metrics::MetricsCalState; +use crate::request::{GroupPolicyRequest, PolicyRequest, WorkspacePolicyRequest}; +use anyhow::anyhow; +use app_error::AppError; +use async_trait::async_trait; +use casbin::{CoreApi, Enforcer, MgmtApi}; +use std::sync::atomic::Ordering; +use tokio::sync::RwLock; +use tracing::{event, instrument, trace}; + +#[async_trait] +pub trait EnforcerGroup { + /// Get the group id of the user. + /// User might belong to multiple groups. So return the highest permission group id. + async fn get_enforce_group_id(&self, uid: &i64) -> Option; +} + +pub struct AFEnforcer { + enforcer: RwLock, + pub(crate) metrics_state: MetricsCalState, + enforce_group: T, +} + +impl AFEnforcer +where + T: EnforcerGroup, +{ + pub async fn new(mut enforcer: Enforcer, enforce_group: T) -> Result { + load_group_policies(&mut enforcer).await?; + Ok(Self { + enforcer: RwLock::new(enforcer), + metrics_state: MetricsCalState::new(), + enforce_group, + }) + } + + /// Update policy for a user. + /// If the policy is already exist, then it will return Ok(false). + /// + /// [`ObjectType::Workspace`] has to be paired with [`ActionType::Role`], + /// [`ObjectType::Collab`] has to be paired with [`ActionType::Level`], + #[instrument(level = "debug", skip_all, err)] + pub async fn update_policy( + &self, + uid: &i64, + obj: ObjectType<'_>, + act: ActionVariant<'_>, + ) -> Result, AppError> { + validate_obj_action(&obj, &act)?; + + let policies = act + .policy_acts() + .into_iter() + .map(|act| vec![uid.to_string(), obj.policy_object(), act.to_string()]) + .collect::>>(); + let number_of_updated_policies = policies.len(); + + trace!("[access control]: add policy:{:?}", policies); + self + .enforcer + .write() + .await + .add_policies(policies) + .await + .map_err(|e| AppError::Internal(anyhow!("fail to add policy: {e:?}")))?; + + if number_of_updated_policies > 0 { + Ok(Some(AccessControlChange::UpdatePolicy { + uid: *uid, + oid: obj.object_id().to_string(), + })) + } else { + Ok(None) + } + } + + /// Returns policies that match the filter. + pub async fn remove_policy( + &self, + uid: &i64, + object_type: &ObjectType<'_>, + ) -> Result, AppError> { + let mut enforcer = self.enforcer.write().await; + self + .remove_with_enforcer(uid, object_type, &mut enforcer) + .await + } + + /// 1. **Workspace Policy**: Initially, it checks if the user has permission at the workspace level. If the user + /// has permission to perform the action on the workspace, the function returns `true` without further checks. + /// + /// 2. **Group Policy**: (If applicable) If the workspace policy check fails (`false`), the function will then + /// evaluate group-level policies. + /// + /// 3. **Object-Specific Policy**: If both previous checks fail, the function finally evaluates the policy + /// specific to the object itself. + /// + /// ## Parameters: + /// - `workspace_id`: The identifier for the workspace containing the object. + /// - `uid`: The user ID of the user attempting the action. + /// - `obj`: The type of object being accessed, encapsulated within an `ObjectType`. + /// - `act`: The action being attempted, encapsulated within an `ActionVariant`. + /// + /// ## Returns: + /// - `Ok(true)`: If the user is authorized to perform the action based on any of the evaluated policies. + /// - `Ok(false)`: If none of the policies authorize the user to perform the action. + /// - `Err(AppError)`: If an error occurs during policy enforcement. + /// + #[instrument(level = "debug", skip_all)] + pub async fn enforce_policy( + &self, + workspace_id: &str, + uid: &i64, + obj: ObjectType<'_>, + act: ActionVariant<'_>, + ) -> Result { + self + .metrics_state + .total_read_enforce_result + .fetch_add(1, Ordering::Relaxed); + + // 1. First, check workspace-level permissions. + let workspace_policy_request = WorkspacePolicyRequest::new(workspace_id, uid, &obj, &act); + let policy = workspace_policy_request.to_policy(); + let mut result = self + .enforcer + .read() + .await + .enforce(policy) + .map_err(|e| AppError::Internal(anyhow!("enforce: {e:?}")))?; + + // 2. Fallback to group policy if workspace-level check fails. + if !result { + if let Some(guid) = self.enforce_group.get_enforce_group_id(uid).await { + let policy_request = GroupPolicyRequest::new(&guid, &obj, &act); + result = self + .enforcer + .read() + .await + .enforce(policy_request.to_policy()) + .map_err(|e| AppError::Internal(anyhow!("enforce: {e:?}")))?; + } + } + + // 3. Finally, enforce object-specific policy if previous checks fail. + if !result { + let policy_request = PolicyRequest::new(*uid, &obj, &act); + let policy = policy_request.to_policy(); + result = self + .enforcer + .read() + .await + .enforce(policy) + .map_err(|e| AppError::Internal(anyhow!("enforce: {e:?}")))?; + } + + Ok(result) + } + + #[inline] + async fn remove_with_enforcer( + &self, + uid: &i64, + object_type: &ObjectType<'_>, + enforcer: &mut Enforcer, + ) -> Result, AppError> { + let policies_for_user_on_object = + policies_for_subject_with_given_object(uid, object_type, enforcer).await; + + // if there are no policies for the user on the object, return early. + if policies_for_user_on_object.is_empty() { + return Ok(None); + } + + event!( + tracing::Level::INFO, + "[access control]: remove policy:user={}, object={}, policies={:?}", + uid, + object_type.policy_object(), + policies_for_user_on_object + ); + + enforcer + .remove_policies(policies_for_user_on_object) + .await + .map_err(|e| AppError::Internal(anyhow!("error enforce: {e:?}")))?; + + Ok(Some(AccessControlChange::RemovePolicy { + uid: *uid, + oid: object_type.object_id().to_string(), + })) + } +} + +fn validate_obj_action(obj: &ObjectType<'_>, act: &ActionVariant) -> Result<(), AppError> { + match (obj, act) { + (ObjectType::Workspace(_), ActionVariant::FromRole(_)) + | (ObjectType::Collab(_), ActionVariant::FromAccessLevel(_)) => Ok(()), + _ => Err(AppError::Internal(anyhow!( + "invalid object type and action type combination: object={:?}, action={:?}", + obj, + act.to_enforce_act() + ))), + } +} +#[inline] +async fn policies_for_subject_with_given_object( + subject: T, + object_type: &ObjectType<'_>, + enforcer: &Enforcer, +) -> Vec> { + let subject = subject.to_string(); + let object_type_id = object_type.policy_object(); + let policies_related_to_object = + enforcer.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![object_type_id]); + + policies_related_to_object + .into_iter() + .filter(|p| p[POLICY_FIELD_INDEX_SUBJECT] == subject) + .collect::>() +} + +pub struct NoEnforceGroup; +#[async_trait] +impl EnforcerGroup for NoEnforceGroup { + async fn get_enforce_group_id(&self, _uid: &i64) -> Option { + None + } +} + +#[cfg(test)] +mod tests { + use crate::{ + act::{Action, ActionVariant}, + casbin::{ + access::{casbin_model, cmp_role_or_level}, + enforcer::NoEnforceGroup, + }, + entity::ObjectType, + }; + use async_trait::async_trait; + use casbin::{prelude::*, rhai::ImmutableString}; + use database_entity::dto::{AFAccessLevel, AFRole}; + + use super::{AFEnforcer, EnforcerGroup}; + + pub struct TestEnforceGroup { + guid: String, + } + #[async_trait] + impl EnforcerGroup for TestEnforceGroup { + async fn get_enforce_group_id(&self, _uid: &i64) -> Option { + Some(self.guid.clone()) + } + } + + async fn test_enforcer(enforce_group: T) -> AFEnforcer + where + T: EnforcerGroup, + { + let model = casbin_model().await.unwrap(); + let mut enforcer = casbin::Enforcer::new(model, MemoryAdapter::default()) + .await + .unwrap(); + + enforcer.add_function( + "cmpRoleOrLevel", + |r: ImmutableString, p: ImmutableString| cmp_role_or_level(r.as_str(), p.as_str()), + ); + AFEnforcer::new(enforcer, enforce_group).await.unwrap() + } + #[tokio::test] + async fn collab_group_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the collab + enforcer + .update_policy( + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), + ) + .await + .unwrap(); + + // when the user is the owner of the collab, then the user should have access to the collab + for action in [Action::Write, Action::Read] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result); + } + } + + #[tokio::test] + async fn workspace_group_policy_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Member), + ) + .await + .unwrap(); + + // test the user has permission to write and read the workspace + for action in [Action::Write, Action::Read] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result, "action={:?}", action); + } + } + + #[tokio::test] + async fn workspace_owner_and_try_to_full_access_collab_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Owner), + ) + .await + .unwrap(); + + for action in [Action::Write, Action::Read, Action::Delete] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result, "action={:?}", action); + } + } + + #[tokio::test] + async fn workspace_member_collab_owner_try_to_full_access_collab_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Member), + ) + .await + .unwrap(); + + enforcer + .update_policy( + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), + ) + .await + .unwrap(); + + for action in [Action::Write, Action::Read, Action::Delete] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result, "action={:?}", action); + } + } + + #[tokio::test] + async fn workspace_owner_collab_member_try_to_full_access_collab_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Owner), + ) + .await + .unwrap(); + + enforcer + .update_policy( + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&AFAccessLevel::ReadAndWrite), + ) + .await + .unwrap(); + + for action in [Action::Write, Action::Read, Action::Delete] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result, "action={:?}", action); + } + } + + #[tokio::test] + async fn workspace_member_collab_member_try_to_full_access_collab_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Member), + ) + .await + .unwrap(); + + enforcer + .update_policy( + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&AFAccessLevel::ReadAndWrite), + ) + .await + .unwrap(); + + for action in [Action::Write, Action::Read] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result, "action={:?}", action); + } + + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&Action::Delete), + ) + .await + .unwrap(); + assert!(!result, "only the owner can perform delete") + } + + #[tokio::test] + async fn workspace_member_but_not_collab_member_and_try_full_access_collab_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Member), + ) + .await + .unwrap(); + + // Although the user is not directly associated with the collab object, they are a member of the + // workspace containing it. Therefore, the system will evaluate their permissions based on the + // workspace policy as a fallback. + for action in [Action::Write, Action::Read] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result, "action={:?}", action); + } + + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&Action::Delete), + ) + .await + .unwrap(); + assert!(!result, "only the owner can perform delete") + } + + #[tokio::test] + async fn not_workspace_member_but_collab_owner_try_full_access_collab_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + enforcer + .update_policy( + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), + ) + .await + .unwrap(); + + for action in [Action::Write, Action::Read, Action::Delete] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(result, "action={:?}", action); + } + } + + #[tokio::test] + async fn not_workspace_member_not_collab_member_and_try_full_access_collab_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // Since the user is not a member of the specified collaboration object, the access control system + // should check if the user has fallback permissions from being a member of the workspace. + // However, as the user is not a member of the workspace either, they should not have permission + // to perform the actions on the collaboration object. + // Therefore, for both actions, the expected result is `false`, indicating that the permission to + // perform the action is denied. + for action in [Action::Write, Action::Read] { + let result = enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAction(&action), + ) + .await + .unwrap(); + assert!(!result, "action={:?}", action); + } + } + + #[tokio::test] + async fn cmp_owner_role_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Owner), + ) + .await + .unwrap(); + + for role in [AFRole::Owner, AFRole::Member, AFRole::Guest] { + assert!(enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + assert!(enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + } + } + + #[tokio::test] + async fn cmp_member_role_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Member), + ) + .await + .unwrap(); + + for role in [AFRole::Owner, AFRole::Member, AFRole::Guest] { + if role == AFRole::Owner { + assert!(!enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + + assert!(!enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + } else { + assert!(enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + assert!(enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + } + } + } + + #[tokio::test] + async fn cmp_guest_role_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + // add user as a member of the workspace + enforcer + .update_policy( + &uid, + ObjectType::Workspace(workspace_id), + ActionVariant::FromRole(&AFRole::Guest), + ) + .await + .unwrap(); + + for role in [AFRole::Owner, AFRole::Member, AFRole::Guest] { + if role == AFRole::Owner || role == AFRole::Member { + assert!(!enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + } else { + assert!(enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromRole(&role), + ) + .await + .unwrap()); + } + } + } + + #[tokio::test] + async fn cmp_full_access_level_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + enforcer + .update_policy( + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), + ) + .await + .unwrap(); + + for level in [ + AFAccessLevel::ReadAndComment, + AFAccessLevel::ReadAndWrite, + AFAccessLevel::ReadOnly, + ] { + assert!(enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&level), + ) + .await + .unwrap()); + } + } + + #[tokio::test] + async fn cmp_read_only_level_test() { + let enforcer = test_enforcer(NoEnforceGroup).await; + let uid = 1; + let workspace_id = "w1"; + let object_1 = "o1"; + + enforcer + .update_policy( + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&AFAccessLevel::ReadOnly), + ) + .await + .unwrap(); + + for level in [ + AFAccessLevel::ReadAndComment, + AFAccessLevel::ReadAndWrite, + AFAccessLevel::ReadOnly, + ] { + if matches!(level, AFAccessLevel::ReadOnly) { + assert!(enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&level), + ) + .await + .unwrap()); + } else { + assert!(!enforcer + .enforce_policy( + workspace_id, + &uid, + ObjectType::Collab(object_1), + ActionVariant::FromAccessLevel(&level), + ) + .await + .unwrap()); + } + } + } +} diff --git a/libs/access-control/src/casbin/mod.rs b/libs/access-control/src/casbin/mod.rs new file mode 100644 index 000000000..22e3bc5e1 --- /dev/null +++ b/libs/access-control/src/casbin/mod.rs @@ -0,0 +1,6 @@ +pub mod access; +mod adapter; +pub mod collab; +mod enforcer; +pub mod notification; +pub mod workspace; diff --git a/libs/workspace-access/src/notification.rs b/libs/access-control/src/casbin/notification.rs similarity index 95% rename from libs/workspace-access/src/notification.rs rename to libs/access-control/src/casbin/notification.rs index 38b29cf49..1f87fa83e 100644 --- a/libs/workspace-access/src/notification.rs +++ b/libs/access-control/src/casbin/notification.rs @@ -1,5 +1,6 @@ -use access_control::access::{AccessControl, ObjectType}; -use access_control::act::ActionVariant; +use super::access::AccessControl; +use crate::act::ActionVariant; +use crate::entity::ObjectType; use database_entity::dto::AFRole; use serde::Deserialize; use tokio::sync::broadcast; @@ -7,7 +8,6 @@ use tracing::error; use tracing::log::warn; use uuid::Uuid; -#[allow(dead_code)] pub fn spawn_listen_on_workspace_member_change( mut listener: broadcast::Receiver, access_control: AccessControl, diff --git a/libs/workspace-access/src/access_control.rs b/libs/access-control/src/casbin/workspace.rs similarity index 90% rename from libs/workspace-access/src/access_control.rs rename to libs/access-control/src/casbin/workspace.rs index 5ac44fc37..1054ce9b3 100644 --- a/libs/workspace-access/src/access_control.rs +++ b/libs/access-control/src/casbin/workspace.rs @@ -2,10 +2,10 @@ use async_trait::async_trait; use tracing::instrument; use uuid::Uuid; -use access_control::access::AccessControl; -use access_control::access::ObjectType; -use access_control::act::{Action, ActionVariant}; -use access_control::workspace::WorkspaceAccessControl; +use super::access::AccessControl; +use crate::act::{Action, ActionVariant}; +use crate::entity::ObjectType; +use crate::workspace::WorkspaceAccessControl; use app_error::AppError; use database_entity::dto::AFRole; diff --git a/libs/access-control/src/enforcer.rs b/libs/access-control/src/enforcer.rs deleted file mode 100644 index dee8b389a..000000000 --- a/libs/access-control/src/enforcer.rs +++ /dev/null @@ -1,225 +0,0 @@ -use crate::access::{ - load_group_policies, ObjectType, POLICY_FIELD_INDEX_OBJECT, POLICY_FIELD_INDEX_SUBJECT, -}; -use crate::act::ActionVariant; -use crate::metrics::MetricsCalState; -use crate::request::{GroupPolicyRequest, PolicyRequest, WorkspacePolicyRequest}; -use anyhow::anyhow; -use app_error::AppError; -use async_trait::async_trait; -use casbin::{CoreApi, Enforcer, MgmtApi}; -use std::sync::atomic::Ordering; -use std::time::Duration; -use tokio::sync::RwLock; -use tracing::{event, instrument, trace}; - -pub const ENFORCER_METRICS_TICK_INTERVAL: Duration = Duration::from_secs(120); - -#[async_trait] -pub trait EnforcerGroup { - /// Get the group id of the user. - /// User might belong to multiple groups. So return the highest permission group id. - async fn get_enforce_group_id(&self, uid: &i64) -> Option; -} - -pub struct AFEnforcer { - enforcer: RwLock, - pub(crate) metrics_state: MetricsCalState, - enforce_group: T, -} - -impl AFEnforcer -where - T: EnforcerGroup, -{ - pub async fn new(mut enforcer: Enforcer, enforce_group: T) -> Result { - load_group_policies(&mut enforcer).await?; - Ok(Self { - enforcer: RwLock::new(enforcer), - metrics_state: MetricsCalState::new(), - enforce_group, - }) - } - - /// Update policy for a user. - /// If the policy is already exist, then it will return Ok(false). - /// - /// [`ObjectType::Workspace`] has to be paired with [`ActionType::Role`], - /// [`ObjectType::Collab`] has to be paired with [`ActionType::Level`], - #[instrument(level = "debug", skip_all, err)] - pub async fn update_policy( - &self, - uid: &i64, - obj: ObjectType<'_>, - act: ActionVariant<'_>, - ) -> Result<(), AppError> { - validate_obj_action(&obj, &act)?; - - let policies = act - .policy_acts() - .into_iter() - .map(|act| vec![uid.to_string(), obj.policy_object(), act.to_string()]) - .collect::>>(); - - trace!("[access control]: add policy:{:?}", policies); - self - .enforcer - .write() - .await - .add_policies(policies) - .await - .map_err(|e| AppError::Internal(anyhow!("fail to add policy: {e:?}")))?; - - Ok(()) - } - - /// Returns policies that match the filter. - pub async fn remove_policy( - &self, - uid: &i64, - object_type: &ObjectType<'_>, - ) -> Result<(), AppError> { - let mut enforcer = self.enforcer.write().await; - self - .remove_with_enforcer(uid, object_type, &mut enforcer) - .await - } - - /// 1. **Workspace Policy**: Initially, it checks if the user has permission at the workspace level. If the user - /// has permission to perform the action on the workspace, the function returns `true` without further checks. - /// - /// 2. **Group Policy**: (If applicable) If the workspace policy check fails (`false`), the function will then - /// evaluate group-level policies. - /// - /// 3. **Object-Specific Policy**: If both previous checks fail, the function finally evaluates the policy - /// specific to the object itself. - /// - /// ## Parameters: - /// - `workspace_id`: The identifier for the workspace containing the object. - /// - `uid`: The user ID of the user attempting the action. - /// - `obj`: The type of object being accessed, encapsulated within an `ObjectType`. - /// - `act`: The action being attempted, encapsulated within an `ActionVariant`. - /// - /// ## Returns: - /// - `Ok(true)`: If the user is authorized to perform the action based on any of the evaluated policies. - /// - `Ok(false)`: If none of the policies authorize the user to perform the action. - /// - `Err(AppError)`: If an error occurs during policy enforcement. - /// - #[instrument(level = "debug", skip_all)] - pub async fn enforce_policy( - &self, - workspace_id: &str, - uid: &i64, - obj: ObjectType<'_>, - act: ActionVariant<'_>, - ) -> Result { - self - .metrics_state - .total_read_enforce_result - .fetch_add(1, Ordering::Relaxed); - - // 1. First, check workspace-level permissions. - let workspace_policy_request = WorkspacePolicyRequest::new(workspace_id, uid, &obj, &act); - let policy = workspace_policy_request.to_policy(); - let mut result = self - .enforcer - .read() - .await - .enforce(policy) - .map_err(|e| AppError::Internal(anyhow!("enforce: {e:?}")))?; - - // 2. Fallback to group policy if workspace-level check fails. - if !result { - if let Some(guid) = self.enforce_group.get_enforce_group_id(uid).await { - let policy_request = GroupPolicyRequest::new(&guid, &obj, &act); - result = self - .enforcer - .read() - .await - .enforce(policy_request.to_policy()) - .map_err(|e| AppError::Internal(anyhow!("enforce: {e:?}")))?; - } - } - - // 3. Finally, enforce object-specific policy if previous checks fail. - if !result { - let policy_request = PolicyRequest::new(*uid, &obj, &act); - let policy = policy_request.to_policy(); - result = self - .enforcer - .read() - .await - .enforce(policy) - .map_err(|e| AppError::Internal(anyhow!("enforce: {e:?}")))?; - } - - Ok(result) - } - - #[inline] - async fn remove_with_enforcer( - &self, - uid: &i64, - object_type: &ObjectType<'_>, - enforcer: &mut Enforcer, - ) -> Result<(), AppError> { - let policies_for_user_on_object = - policies_for_subject_with_given_object(uid, object_type, enforcer).await; - - // if there are no policies for the user on the object, return early. - if policies_for_user_on_object.is_empty() { - return Ok(()); - } - - event!( - tracing::Level::INFO, - "[access control]: remove policy:user={}, object={}, policies={:?}", - uid, - object_type.policy_object(), - policies_for_user_on_object - ); - - enforcer - .remove_policies(policies_for_user_on_object) - .await - .map_err(|e| AppError::Internal(anyhow!("error enforce: {e:?}")))?; - - Ok(()) - } -} - -fn validate_obj_action(obj: &ObjectType<'_>, act: &ActionVariant) -> Result<(), AppError> { - match (obj, act) { - (ObjectType::Workspace(_), ActionVariant::FromRole(_)) - | (ObjectType::Collab(_), ActionVariant::FromAccessLevel(_)) => Ok(()), - _ => Err(AppError::Internal(anyhow!( - "invalid object type and action type combination: object={:?}, action={:?}", - obj, - act.to_enforce_act() - ))), - } -} -#[inline] -async fn policies_for_subject_with_given_object( - subject: T, - object_type: &ObjectType<'_>, - enforcer: &Enforcer, -) -> Vec> { - let subject = subject.to_string(); - let object_type_id = object_type.policy_object(); - let policies_related_to_object = - enforcer.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![object_type_id]); - - policies_related_to_object - .into_iter() - .filter(|p| p[POLICY_FIELD_INDEX_SUBJECT] == subject) - .collect::>() -} - -pub struct NoEnforceGroup; -#[async_trait] -impl EnforcerGroup for NoEnforceGroup { - async fn get_enforce_group_id(&self, _uid: &i64) -> Option { - None - } -} diff --git a/libs/access-control/src/entity.rs b/libs/access-control/src/entity.rs new file mode 100644 index 000000000..1bbfa3822 --- /dev/null +++ b/libs/access-control/src/entity.rs @@ -0,0 +1,24 @@ +/// Represents the object type that is stored in the access control policy. +#[derive(Debug)] +pub enum ObjectType<'id> { + /// Stored as `workspace::` + Workspace(&'id str), + /// Stored as `collab::` + Collab(&'id str), +} + +impl ObjectType<'_> { + pub fn policy_object(&self) -> String { + match self { + ObjectType::Collab(s) => format!("collab::{}", s), + ObjectType::Workspace(s) => format!("workspace::{}", s), + } + } + + pub fn object_id(&self) -> &str { + match self { + ObjectType::Collab(s) => s, + ObjectType::Workspace(s) => s, + } + } +} diff --git a/libs/access-control/src/lib.rs b/libs/access-control/src/lib.rs index 9e48cd6c1..b548b1b0c 100644 --- a/libs/access-control/src/lib.rs +++ b/libs/access-control/src/lib.rs @@ -1,8 +1,9 @@ -pub mod access; pub mod act; -mod adapter; +#[cfg(feature = "casbin")] +pub mod casbin; pub mod collab; -pub mod enforcer; +pub mod entity; pub mod metrics; +pub mod noops; mod request; pub mod workspace; diff --git a/libs/access-control/src/metrics.rs b/libs/access-control/src/metrics.rs index bdb925a8b..b3d376cc1 100644 --- a/libs/access-control/src/metrics.rs +++ b/libs/access-control/src/metrics.rs @@ -1,11 +1,13 @@ use prometheus_client::metrics::gauge::Gauge; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; +use std::time::Duration; -use crate::enforcer::ENFORCER_METRICS_TICK_INTERVAL; use prometheus_client::registry::Registry; use tokio::time::interval; +pub const ENFORCER_METRICS_TICK_INTERVAL: Duration = Duration::from_secs(120); + #[derive(Clone)] pub struct AccessControlMetrics { load_all_policies: Gauge, diff --git a/libs/access-control/src/noops/collab.rs b/libs/access-control/src/noops/collab.rs new file mode 100644 index 000000000..6045e3b04 --- /dev/null +++ b/libs/access-control/src/noops/collab.rs @@ -0,0 +1,95 @@ +use app_error::AppError; +use async_trait::async_trait; +use database_entity::dto::AFAccessLevel; + +use crate::{ + act::Action, + collab::{CollabAccessControl, RealtimeAccessControl}, +}; + +#[derive(Clone)] +pub struct CollabAccessControlImpl; + +impl CollabAccessControlImpl { + pub fn new() -> Self { + Self {} + } +} + +impl Default for CollabAccessControlImpl { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl CollabAccessControl for CollabAccessControlImpl { + async fn enforce_action( + &self, + _workspace_id: &str, + _uid: &i64, + _oid: &str, + _action: Action, + ) -> Result { + Ok(true) + } + + async fn enforce_access_level( + &self, + _workspace_id: &str, + _uid: &i64, + _oid: &str, + _access_level: AFAccessLevel, + ) -> Result { + Ok(true) + } + + async fn update_access_level_policy( + &self, + _uid: &i64, + _oid: &str, + _level: AFAccessLevel, + ) -> Result<(), AppError> { + Ok(()) + } + + async fn remove_access_level(&self, _uid: &i64, _oid: &str) -> Result<(), AppError> { + Ok(()) + } +} + +#[derive(Clone)] +pub struct RealtimeCollabAccessControlImpl; + +impl RealtimeCollabAccessControlImpl { + pub fn new() -> Self { + Self {} + } +} + +impl Default for RealtimeCollabAccessControlImpl { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl RealtimeAccessControl for RealtimeCollabAccessControlImpl { + async fn can_write_collab( + &self, + _workspace_id: &str, + _uid: &i64, + _oid: &str, + ) -> Result { + Ok(true) + } + + async fn can_read_collab( + &self, + _workspace_id: &str, + _uid: &i64, + _oid: &str, + ) -> Result { + Ok(true) + } +} diff --git a/libs/access-control/src/noops/mod.rs b/libs/access-control/src/noops/mod.rs new file mode 100644 index 000000000..e5137b0f0 --- /dev/null +++ b/libs/access-control/src/noops/mod.rs @@ -0,0 +1,2 @@ +pub mod collab; +pub mod workspace; diff --git a/libs/access-control/src/noops/workspace.rs b/libs/access-control/src/noops/workspace.rs new file mode 100644 index 000000000..6bbf58a24 --- /dev/null +++ b/libs/access-control/src/noops/workspace.rs @@ -0,0 +1,60 @@ +use async_trait::async_trait; +use uuid::Uuid; + +use crate::act::Action; +use crate::workspace::WorkspaceAccessControl; +use app_error::AppError; +use database_entity::dto::AFRole; + +#[derive(Clone)] +pub struct WorkspaceAccessControlImpl; + +impl WorkspaceAccessControlImpl { + pub fn new() -> Self { + Self {} + } +} + +impl Default for WorkspaceAccessControlImpl { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl WorkspaceAccessControl for WorkspaceAccessControlImpl { + async fn enforce_role( + &self, + _uid: &i64, + _workspace_id: &str, + _role: AFRole, + ) -> Result { + Ok(true) + } + + async fn enforce_action( + &self, + _uid: &i64, + _workspace_id: &str, + _action: Action, + ) -> Result { + Ok(true) + } + + async fn insert_role( + &self, + _uid: &i64, + _workspace_id: &Uuid, + _role: AFRole, + ) -> Result<(), AppError> { + Ok(()) + } + + async fn remove_user_from_workspace( + &self, + _uid: &i64, + _workspace_id: &Uuid, + ) -> Result<(), AppError> { + Ok(()) + } +} diff --git a/libs/access-control/src/request.rs b/libs/access-control/src/request.rs index 2f6fe1deb..c4d0f522c 100644 --- a/libs/access-control/src/request.rs +++ b/libs/access-control/src/request.rs @@ -1,5 +1,5 @@ -use crate::access::ObjectType; use crate::act::ActionVariant; +use crate::entity::ObjectType; pub struct GroupPolicyRequest<'a> { pub guid: &'a str, diff --git a/libs/access-control/tests/enforce_test.rs b/libs/access-control/tests/enforce_test.rs deleted file mode 100644 index be0580f8b..000000000 --- a/libs/access-control/tests/enforce_test.rs +++ /dev/null @@ -1,585 +0,0 @@ -use access_control::access::{casbin_model, cmp_role_or_level, ObjectType}; -use access_control::act::{Action, ActionVariant}; -use access_control::enforcer::{AFEnforcer, EnforcerGroup, NoEnforceGroup}; -use async_trait::async_trait; -use casbin::rhai::ImmutableString; -use casbin::{CoreApi, MemoryAdapter}; -use database_entity::dto::{AFAccessLevel, AFRole}; - -pub struct TestEnforceGroup { - guid: String, -} -#[async_trait] -impl EnforcerGroup for TestEnforceGroup { - async fn get_enforce_group_id(&self, _uid: &i64) -> Option { - Some(self.guid.clone()) - } -} - -async fn test_enforcer(enforce_group: T) -> AFEnforcer -where - T: EnforcerGroup, -{ - let model = casbin_model().await.unwrap(); - let mut enforcer = casbin::Enforcer::new(model, MemoryAdapter::default()) - .await - .unwrap(); - - enforcer.add_function( - "cmpRoleOrLevel", - |r: ImmutableString, p: ImmutableString| cmp_role_or_level(r.as_str(), p.as_str()), - ); - AFEnforcer::new(enforcer, enforce_group).await.unwrap() -} -#[tokio::test] -async fn collab_group_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the collab - enforcer - .update_policy( - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), - ) - .await - .unwrap(); - - // when the user is the owner of the collab, then the user should have access to the collab - for action in [Action::Write, Action::Read] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result); - } -} - -#[tokio::test] -async fn workspace_group_policy_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Member), - ) - .await - .unwrap(); - - // test the user has permission to write and read the workspace - for action in [Action::Write, Action::Read] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result, "action={:?}", action); - } -} - -#[tokio::test] -async fn workspace_owner_and_try_to_full_access_collab_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Owner), - ) - .await - .unwrap(); - - for action in [Action::Write, Action::Read, Action::Delete] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result, "action={:?}", action); - } -} - -#[tokio::test] -async fn workspace_member_collab_owner_try_to_full_access_collab_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Member), - ) - .await - .unwrap(); - - enforcer - .update_policy( - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), - ) - .await - .unwrap(); - - for action in [Action::Write, Action::Read, Action::Delete] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result, "action={:?}", action); - } -} - -#[tokio::test] -async fn workspace_owner_collab_member_try_to_full_access_collab_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Owner), - ) - .await - .unwrap(); - - enforcer - .update_policy( - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&AFAccessLevel::ReadAndWrite), - ) - .await - .unwrap(); - - for action in [Action::Write, Action::Read, Action::Delete] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result, "action={:?}", action); - } -} - -#[tokio::test] -async fn workspace_member_collab_member_try_to_full_access_collab_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Member), - ) - .await - .unwrap(); - - enforcer - .update_policy( - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&AFAccessLevel::ReadAndWrite), - ) - .await - .unwrap(); - - for action in [Action::Write, Action::Read] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result, "action={:?}", action); - } - - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&Action::Delete), - ) - .await - .unwrap(); - assert!(!result, "only the owner can perform delete") -} - -#[tokio::test] -async fn workspace_member_but_not_collab_member_and_try_full_access_collab_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Member), - ) - .await - .unwrap(); - - // Although the user is not directly associated with the collab object, they are a member of the - // workspace containing it. Therefore, the system will evaluate their permissions based on the - // workspace policy as a fallback. - for action in [Action::Write, Action::Read] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result, "action={:?}", action); - } - - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&Action::Delete), - ) - .await - .unwrap(); - assert!(!result, "only the owner can perform delete") -} - -#[tokio::test] -async fn not_workspace_member_but_collab_owner_try_full_access_collab_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - enforcer - .update_policy( - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), - ) - .await - .unwrap(); - - for action in [Action::Write, Action::Read, Action::Delete] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(result, "action={:?}", action); - } -} - -#[tokio::test] -async fn not_workspace_member_not_collab_member_and_try_full_access_collab_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // Since the user is not a member of the specified collaboration object, the access control system - // should check if the user has fallback permissions from being a member of the workspace. - // However, as the user is not a member of the workspace either, they should not have permission - // to perform the actions on the collaboration object. - // Therefore, for both actions, the expected result is `false`, indicating that the permission to - // perform the action is denied. - for action in [Action::Write, Action::Read] { - let result = enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAction(&action), - ) - .await - .unwrap(); - assert!(!result, "action={:?}", action); - } -} - -#[tokio::test] -async fn cmp_owner_role_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Owner), - ) - .await - .unwrap(); - - for role in [AFRole::Owner, AFRole::Member, AFRole::Guest] { - assert!(enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - assert!(enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - } -} - -#[tokio::test] -async fn cmp_member_role_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Member), - ) - .await - .unwrap(); - - for role in [AFRole::Owner, AFRole::Member, AFRole::Guest] { - if role == AFRole::Owner { - assert!(!enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - - assert!(!enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - } else { - assert!(enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - assert!(enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - } - } -} - -#[tokio::test] -async fn cmp_guest_role_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - // add user as a member of the workspace - enforcer - .update_policy( - &uid, - ObjectType::Workspace(workspace_id), - ActionVariant::FromRole(&AFRole::Guest), - ) - .await - .unwrap(); - - for role in [AFRole::Owner, AFRole::Member, AFRole::Guest] { - if role == AFRole::Owner || role == AFRole::Member { - assert!(!enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - } else { - assert!(enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromRole(&role), - ) - .await - .unwrap()); - } - } -} - -#[tokio::test] -async fn cmp_full_access_level_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - enforcer - .update_policy( - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&AFAccessLevel::FullAccess), - ) - .await - .unwrap(); - - for level in [ - AFAccessLevel::ReadAndComment, - AFAccessLevel::ReadAndWrite, - AFAccessLevel::ReadOnly, - ] { - assert!(enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&level), - ) - .await - .unwrap()); - } -} - -#[tokio::test] -async fn cmp_read_only_level_test() { - let enforcer = test_enforcer(NoEnforceGroup).await; - let uid = 1; - let workspace_id = "w1"; - let object_1 = "o1"; - - enforcer - .update_policy( - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&AFAccessLevel::ReadOnly), - ) - .await - .unwrap(); - - for level in [ - AFAccessLevel::ReadAndComment, - AFAccessLevel::ReadAndWrite, - AFAccessLevel::ReadOnly, - ] { - if matches!(level, AFAccessLevel::ReadOnly) { - assert!(enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&level), - ) - .await - .unwrap()); - } else { - assert!(!enforcer - .enforce_policy( - workspace_id, - &uid, - ObjectType::Collab(object_1), - ActionVariant::FromAccessLevel(&level), - ) - .await - .unwrap()); - } - } -} diff --git a/libs/workspace-access/Cargo.toml b/libs/workspace-access/Cargo.toml deleted file mode 100644 index 728244cd9..000000000 --- a/libs/workspace-access/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "workspace-access" -version = "0.1.0" -edition = "2021" - -[dependencies] -access-control.workspace = true -app-error = { workspace = true, features = ["sqlx_error", "tokio_error"] } -async-trait = "0.1.77" -database-entity.workspace = true -tokio = { workspace = true, features = ["sync"] } -tracing = "0.1.40" -uuid = { version = "1.8.0", features = ["v4"] } -serde = { version = "1.0.200", features = ["derive"] } \ No newline at end of file diff --git a/libs/workspace-access/src/lib.rs b/libs/workspace-access/src/lib.rs deleted file mode 100644 index b3eaa12ce..000000000 --- a/libs/workspace-access/src/lib.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod access_control; -pub mod notification; - -pub use access_control::*; diff --git a/services/appflowy-collaborate/Cargo.toml b/services/appflowy-collaborate/Cargo.toml index a05cfa3b2..c36ff8254 100644 --- a/services/appflowy-collaborate/Cargo.toml +++ b/services/appflowy-collaborate/Cargo.toml @@ -86,7 +86,6 @@ parking_lot = "0.12.1" lazy_static = "1.4.0" itertools = "0.12.0" validator = "0.16.1" -workspace-access.workspace = true rayon.workspace = true [dev-dependencies] diff --git a/services/appflowy-collaborate/src/actix_ws/server/rt_actor.rs b/services/appflowy-collaborate/src/actix_ws/server/rt_actor.rs index 30d1f0748..0710074a6 100644 --- a/services/appflowy-collaborate/src/actix_ws/server/rt_actor.rs +++ b/services/appflowy-collaborate/src/actix_ws/server/rt_actor.rs @@ -5,7 +5,6 @@ use tracing::{error, info, warn}; use crate::error::RealtimeError; use crate::CollaborationServer; -use access_control::collab::RealtimeAccessControl; use collab_rt_entity::user::UserDevice; use database::collab::CollabStorage; @@ -13,27 +12,21 @@ use crate::actix_ws::client::rt_client::{RealtimeClientWebsocketSinkImpl, Realti use crate::actix_ws::entities::{ClientMessage, ClientStreamMessage, Connect, Disconnect}; #[derive(Clone)] -pub struct RealtimeServerActor(pub CollaborationServer); +pub struct RealtimeServerActor(pub CollaborationServer); -impl RealtimeServer for RealtimeServerActor -where - S: CollabStorage + Unpin, - AC: RealtimeAccessControl + Unpin, -{ -} +impl RealtimeServer for RealtimeServerActor where S: CollabStorage + Unpin {} -impl Deref for RealtimeServerActor { - type Target = CollaborationServer; +impl Deref for RealtimeServerActor { + type Target = CollaborationServer; fn deref(&self) -> &Self::Target { &self.0 } } -impl Actor for RealtimeServerActor +impl Actor for RealtimeServerActor where S: 'static + Unpin, - AC: RealtimeAccessControl + Unpin, { type Context = Context; @@ -46,12 +39,11 @@ where ctx.set_mailbox_capacity(mail_box_size); } } -impl actix::Supervised for RealtimeServerActor +impl actix::Supervised for RealtimeServerActor where S: 'static + Unpin, - AC: RealtimeAccessControl + Unpin, { - fn restarting(&mut self, ctx: &mut Context>) { + fn restarting(&mut self, ctx: &mut Context>) { error!("realtime server is restarting"); ctx.set_mailbox_capacity(mail_box_size()); } @@ -67,10 +59,9 @@ fn mail_box_size() -> usize { } } -impl Handler for RealtimeServerActor +impl Handler for RealtimeServerActor where S: CollabStorage + Unpin, - AC: RealtimeAccessControl + Unpin, { type Result = ResponseFuture>; @@ -80,10 +71,9 @@ where } } -impl Handler for RealtimeServerActor +impl Handler for RealtimeServerActor where S: CollabStorage + Unpin, - AC: RealtimeAccessControl + Unpin, { type Result = ResponseFuture>; fn handle(&mut self, msg: Disconnect, _: &mut Context) -> Self::Result { @@ -91,10 +81,9 @@ where } } -impl Handler for RealtimeServerActor +impl Handler for RealtimeServerActor where S: CollabStorage + Unpin, - AC: RealtimeAccessControl + Unpin, { type Result = ResponseFuture>; @@ -112,10 +101,9 @@ where } } -impl Handler for RealtimeServerActor +impl Handler for RealtimeServerActor where S: CollabStorage + Unpin, - AC: RealtimeAccessControl + Unpin, { type Result = ResponseFuture>; diff --git a/services/appflowy-collaborate/src/api.rs b/services/appflowy-collaborate/src/api.rs index 36b6ba5cb..4c49d3ff9 100644 --- a/services/appflowy-collaborate/src/api.rs +++ b/services/appflowy-collaborate/src/api.rs @@ -26,7 +26,6 @@ use shared_entity::response::{AppResponse, AppResponseError}; use crate::actix_ws::client::RealtimeClient; use crate::actix_ws::entities::ClientStreamMessage; use crate::actix_ws::server::RealtimeServerActor; -use crate::collab::access_control::RealtimeCollabAccessControlImpl; use crate::collab::storage::CollabAccessControlStorage; use crate::compression::{ decompress, CompressionType, X_COMPRESSION_BUFFER_SIZE, X_COMPRESSION_TYPE, @@ -49,8 +48,7 @@ pub fn collab_scope() -> Scope { const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB -pub type RealtimeServerAddr = - Addr>; +pub type RealtimeServerAddr = Addr>; #[instrument(skip_all, err)] pub async fn establish_ws_connection_v1( diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index 63b6ce629..10cdae715 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -2,27 +2,27 @@ use std::net::TcpListener; use std::sync::Arc; use std::time::Duration; +use access_control::casbin::collab::{CollabAccessControlImpl, RealtimeCollabAccessControlImpl}; +use access_control::casbin::notification::spawn_listen_on_workspace_member_change; +use access_control::casbin::workspace::WorkspaceAccessControlImpl; use actix::Supervisor; use actix_web::dev::Server; use actix_web::web::Data; use actix_web::{App, HttpServer}; use anyhow::{Context, Error}; +use database::collab::cache::CollabCache; use secrecy::ExposeSecret; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use tracing::{info, warn}; use crate::actix_ws::server::RealtimeServerActor; -use access_control::access::AccessControl; +use crate::collab::access_control::CollabStorageAccessControlImpl; +use access_control::casbin::access::AccessControl; use appflowy_ai_client::client::AppFlowyAIClient; -use database::collab::cache::CollabCache; -use workspace_access::notification::spawn_listen_on_workspace_member_change; -use workspace_access::WorkspaceAccessControlImpl; use crate::api::{collab_scope, ws_scope}; -use crate::collab::access_control::{ - CollabAccessControlImpl, CollabStorageAccessControlImpl, RealtimeCollabAccessControlImpl, -}; + use crate::collab::notification::spawn_listen_on_collab_member_change; use crate::collab::storage::CollabStorageImpl; use crate::command::{CLCommandReceiver, CLCommandSender}; @@ -69,9 +69,11 @@ pub async fn run_actix_server( let storage = state.collab_access_control_storage.clone(); // Initialize metrics that which are registered in the registry. - let realtime_server = CollaborationServer::<_, _>::new( + let realtime_server = CollaborationServer::<_>::new( storage.clone(), - RealtimeCollabAccessControlImpl::new(state.access_control.clone()), + Arc::new(RealtimeCollabAccessControlImpl::new( + state.access_control.clone(), + )), state.metrics.realtime_metrics.clone(), rt_cmd_recv, state.redis_connection_manager.clone(), @@ -132,8 +134,8 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result`, which is used to send updates to the connected client. /// - A `ReceiverStream`, which is used to receive authorized updates from the connected client. /// - pub fn init_client_communication( + pub fn init_client_communication( &mut self, workspace_id: &str, user: &RealtimeUser, object_id: &str, - access_control: Arc, + access_control: Arc, ) -> (UnboundedSenderSink, ReceiverStream) where T: Into + Send + Sync + 'static, - AC: RealtimeAccessControl, { let client_ws_sink = self.sink.clone(); let mut stream_rx = BroadcastStream::new(self.stream_tx.subscribe()); @@ -122,7 +121,7 @@ impl ClientMessageRouter { &stream_workspace_id, &user.uid, &message_object_id, - &access_control, + access_control.clone(), original_messages, ) .await; @@ -170,16 +169,13 @@ impl ClientMessageRouter { } #[inline] - async fn access_control( + async fn access_control( workspace_id: &str, uid: &i64, object_id: &str, - access_control: &Arc, + access_control: Arc, messages: Vec, - ) -> (Vec, Vec) - where - AC: RealtimeAccessControl, - { + ) -> (Vec, Vec) { let can_write = access_control .can_write_collab(workspace_id, uid, object_id) .await diff --git a/services/appflowy-collaborate/src/collab/access_control.rs b/services/appflowy-collaborate/src/collab/access_control.rs index d806eb01d..ca20c61a2 100644 --- a/services/appflowy-collaborate/src/collab/access_control.rs +++ b/services/appflowy-collaborate/src/collab/access_control.rs @@ -1,183 +1,23 @@ use async_trait::async_trait; +use database::collab::cache::CollabCache; use std::sync::Arc; -use tracing::instrument; -use access_control::access::ObjectType; -use access_control::access::{enable_access_control, AccessControl}; -use access_control::act::{Action, ActionVariant}; -use access_control::collab::{CollabAccessControl, RealtimeAccessControl}; +use access_control::act::Action; +use access_control::collab::CollabAccessControl; use access_control::workspace::WorkspaceAccessControl; use app_error::AppError; -use database::collab::cache::CollabCache; use database::collab::CollabStorageAccessControl; use database_entity::dto::AFAccessLevel; #[derive(Clone)] -pub struct CollabAccessControlImpl { - access_control: AccessControl, -} - -impl CollabAccessControlImpl { - pub fn new(access_control: AccessControl) -> Self { - Self { access_control } - } -} - -#[async_trait] -impl CollabAccessControl for CollabAccessControlImpl { - async fn enforce_action( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - action: Action, - ) -> Result { - self - .access_control - .enforce( - workspace_id, - uid, - ObjectType::Collab(oid), - ActionVariant::FromAction(&action), - ) - .await - } - - async fn enforce_access_level( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - access_level: AFAccessLevel, - ) -> Result { - self - .access_control - .enforce( - workspace_id, - uid, - ObjectType::Collab(oid), - ActionVariant::FromAccessLevel(&access_level), - ) - .await - } - - #[instrument(level = "info", skip_all)] - async fn update_access_level_policy( - &self, - uid: &i64, - oid: &str, - level: AFAccessLevel, - ) -> Result<(), AppError> { - self - .access_control - .update_policy( - uid, - ObjectType::Collab(oid), - ActionVariant::FromAccessLevel(&level), - ) - .await?; - - Ok(()) - } - - #[instrument(level = "info", skip_all)] - async fn remove_access_level(&self, uid: &i64, oid: &str) -> Result<(), AppError> { - self - .access_control - .remove_policy(uid, &ObjectType::Collab(oid)) - .await?; - Ok(()) - } -} - -#[derive(Clone)] -pub struct RealtimeCollabAccessControlImpl { - access_control: AccessControl, -} - -impl RealtimeCollabAccessControlImpl { - pub fn new(access_control: AccessControl) -> Self { - // let action_by_oid = Arc::new(DashMap::new()); - // let mut sub = access_control.subscribe_change(); - // let weak_action_by_oid = Arc::downgrade(&action_by_oid); - // tokio::spawn(async move { - // while let Ok(change) = sub.recv().await { - // match weak_action_by_oid.upgrade() { - // None => break, - // Some(action_by_oid) => match change { - // AccessControlChange::UpdatePolicy { uid, oid } => {}, - // AccessControlChange::RemovePolicy { uid, oid } => {}, - // }, - // } - // } - // }); - Self { access_control } - } - - async fn can_perform_action( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - required_action: Action, - ) -> Result { - if enable_access_control() { - let is_permitted = self - .access_control - .enforce( - workspace_id, - uid, - ObjectType::Collab(oid), - ActionVariant::FromAction(&required_action), - ) - .await?; - - Ok(is_permitted) - } else { - Ok(true) - } - } -} - -#[async_trait] -impl RealtimeAccessControl for RealtimeCollabAccessControlImpl { - async fn can_write_collab( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - ) -> Result { - self - .can_perform_action(workspace_id, uid, oid, Action::Write) - .await - } - - async fn can_read_collab( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - ) -> Result { - self - .can_perform_action(workspace_id, uid, oid, Action::Read) - .await - } -} - -#[derive(Clone)] -pub struct CollabStorageAccessControlImpl { - pub collab_access_control: Arc, - pub workspace_access_control: Arc, +pub struct CollabStorageAccessControlImpl { + pub collab_access_control: Arc, + pub workspace_access_control: Arc, pub cache: CollabCache, } #[async_trait] -impl CollabStorageAccessControl - for CollabStorageAccessControlImpl -where - CollabAC: CollabAccessControl, - WorkspaceAC: WorkspaceAccessControl, -{ +impl CollabStorageAccessControl for CollabStorageAccessControlImpl { async fn update_policy( &self, uid: &i64, diff --git a/services/appflowy-collaborate/src/collab/notification.rs b/services/appflowy-collaborate/src/collab/notification.rs index a047c4cef..0549bdb6e 100644 --- a/services/appflowy-collaborate/src/collab/notification.rs +++ b/services/appflowy-collaborate/src/collab/notification.rs @@ -1,5 +1,6 @@ -use access_control::access::{AccessControl, ObjectType}; use access_control::act::ActionVariant; +use access_control::casbin::access::AccessControl; +use access_control::entity::ObjectType; use database::pg_row::AFCollabMemberRow; use database::workspace::select_permission; use serde::Deserialize; diff --git a/services/appflowy-collaborate/src/collab/storage.rs b/services/appflowy-collaborate/src/collab/storage.rs index 58416e5c8..43660945a 100644 --- a/services/appflowy-collaborate/src/collab/storage.rs +++ b/services/appflowy-collaborate/src/collab/storage.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use collab::entity::EncodedCollab; use collab_entity::CollabType; use collab_rt_entity::ClientCollabMessage; +use database::collab::cache::CollabCache; use itertools::{Either, Itertools}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sqlx::Transaction; @@ -15,11 +16,9 @@ use tracing::warn; use tracing::{error, instrument, trace}; use validator::Validate; -use crate::collab::access_control::CollabAccessControlImpl; use crate::command::{CLCommandSender, CollaborationCommand}; use crate::shared_state::RealtimeSharedState; use app_error::AppError; -use database::collab::cache::CollabCache; use database::collab::{ insert_into_af_collab_bulk_for_user, AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl, GetCollabOrigin, @@ -28,7 +27,6 @@ use database_entity::dto::{ AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab, QueryCollabParams, QueryCollabResult, SnapshotData, }; -use workspace_access::WorkspaceAccessControlImpl; use crate::collab::access_control::CollabStorageAccessControlImpl; use crate::collab::queue::{StorageQueue, REDIS_PENDING_WRITE_QUEUE}; @@ -38,9 +36,7 @@ use crate::metrics::CollabMetrics; use crate::snapshot::SnapshotControl; use crate::state::RedisConnectionManager; -pub type CollabAccessControlStorage = CollabStorageImpl< - CollabStorageAccessControlImpl, ->; +pub type CollabAccessControlStorage = CollabStorageImpl; /// A wrapper around the actual storage implementation that provides access control and caching. #[derive(Clone)] diff --git a/services/appflowy-collaborate/src/command.rs b/services/appflowy-collaborate/src/command.rs index eb9909b38..bfb3cab08 100644 --- a/services/appflowy-collaborate/src/command.rs +++ b/services/appflowy-collaborate/src/command.rs @@ -5,7 +5,6 @@ use crate::{ manager::GroupManager, }, }; -use access_control::collab::RealtimeAccessControl; use collab::entity::EncodedCollab; use collab_rt_entity::ClientCollabMessage; use dashmap::DashMap; @@ -38,13 +37,12 @@ pub enum CollaborationCommand { }, } -pub(crate) fn spawn_collaboration_command( +pub(crate) fn spawn_collaboration_command( mut command_recv: CLCommandReceiver, group_sender_by_object_id: &Arc>, - weak_groups: Weak>, + weak_groups: Weak>, ) where S: CollabStorage, - AC: RealtimeAccessControl, { let group_sender_by_object_id = group_sender_by_object_id.clone(); tokio::spawn(async move { diff --git a/services/appflowy-collaborate/src/group/cmd.rs b/services/appflowy-collaborate/src/group/cmd.rs index 2dee91e45..f41aa08e0 100644 --- a/services/appflowy-collaborate/src/group/cmd.rs +++ b/services/appflowy-collaborate/src/group/cmd.rs @@ -8,7 +8,6 @@ use dashmap::DashMap; use futures_util::StreamExt; use tracing::{instrument, trace, warn}; -use access_control::collab::RealtimeAccessControl; use collab_rt_entity::user::RealtimeUser; use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, SinkMessage}; use collab_rt_entity::{CollabAck, RealtimeMessage}; @@ -47,20 +46,18 @@ pub type GroupCommandReceiver = tokio::sync::mpsc::Receiver; /// in tokio multi-thread runtime. It will receive the group command from the receiver and handle the /// command. /// -pub struct GroupCommandRunner +pub struct GroupCommandRunner where - AC: RealtimeAccessControl, S: CollabStorage, { - pub group_manager: Arc>, + pub group_manager: Arc>, pub msg_router_by_user: Arc>, pub recv: Option, } -impl GroupCommandRunner +impl GroupCommandRunner where S: CollabStorage, - AC: RealtimeAccessControl, { pub async fn run(mut self, object_id: String, notify: Arc) { let mut receiver = self.recv.take().expect("Only take once"); diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 36bfa26ce..9f2543ca2 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -26,10 +26,10 @@ use crate::group::state::GroupManagementState; use crate::indexer::IndexerProvider; use crate::metrics::CollabRealtimeMetrics; -pub struct GroupManager { +pub struct GroupManager { state: GroupManagementState, storage: Arc, - access_control: Arc, + access_control: Arc, metrics_calculate: Arc, collab_redis_stream: Arc, control_event_stream: Arc>, @@ -39,15 +39,14 @@ pub struct GroupManager { indexer_provider: Arc, } -impl GroupManager +impl GroupManager where S: CollabStorage, - AC: RealtimeAccessControl, { #[allow(clippy::too_many_arguments)] pub async fn new( storage: Arc, - access_control: Arc, + access_control: Arc, metrics_calculate: Arc, collab_stream: CollabRedisStream, persistence_interval: Duration, @@ -110,7 +109,7 @@ where // Lock the group and subscribe the user to the group. if let Some(group) = self.state.get_mut_group(object_id).await { trace!("[realtime]: {} subscribe group:{}", user, object_id,); - let (sink, stream) = client_msg_router.init_client_communication::( + let (sink, stream) = client_msg_router.init_client_communication::( &group.workspace_id, user, object_id, diff --git a/services/appflowy-collaborate/src/pg_listener.rs b/services/appflowy-collaborate/src/pg_listener.rs index 696299e82..f49a64a3b 100644 --- a/services/appflowy-collaborate/src/pg_listener.rs +++ b/services/appflowy-collaborate/src/pg_listener.rs @@ -1,10 +1,10 @@ use crate::collab::notification::CollabMemberNotification; +use access_control::casbin::notification::WorkspaceMemberNotification; use anyhow::Error; use database::listener::PostgresDBListener; use database::pg_row::AFUserNotification; use sqlx::PgPool; use tokio::sync::broadcast; -use workspace_access::notification::WorkspaceMemberNotification; pub struct PgListeners { user_listener: UserListener, diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index f973d6a56..c9640d41e 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -30,9 +30,9 @@ use crate::state::RedisConnectionManager; use crate::{CollabRealtimeMetrics, RealtimeClientWebsocketSink}; #[derive(Clone)] -pub struct CollaborationServer { +pub struct CollaborationServer { /// Keep track of all collab groups - group_manager: Arc>, + group_manager: Arc>, connect_state: ConnectState, group_sender_by_object_id: Arc>, storage: Arc, @@ -41,15 +41,14 @@ pub struct CollaborationServer { enable_custom_runtime: bool, } -impl CollaborationServer +impl CollaborationServer where S: CollabStorage, - AC: RealtimeAccessControl, { #[allow(clippy::too_many_arguments)] pub async fn new( storage: Arc, - access_control: AC, + access_control: Arc, metrics: Arc, command_recv: CLCommandReceiver, redis_connection_manager: RedisConnectionManager, @@ -69,7 +68,6 @@ where } let connect_state = ConnectState::new(); - let access_control = Arc::new(access_control); let collab_stream = CollabRedisStream::new_with_connection_manager(redis_connection_manager); let group_manager = Arc::new( GroupManager::new( @@ -285,12 +283,11 @@ fn spawn_handle_unindexed_collabs( )); } -fn spawn_period_check_inactive_group( - weak_groups: Weak>, +fn spawn_period_check_inactive_group( + weak_groups: Weak>, group_sender_by_object_id: &Arc>, ) where S: CollabStorage, - AC: RealtimeAccessControl, { let mut interval = interval(Duration::from_secs(20)); let cloned_group_sender_by_object_id = group_sender_by_object_id.clone(); diff --git a/services/appflowy-collaborate/src/state.rs b/services/appflowy-collaborate/src/state.rs index 48061224a..b78003078 100644 --- a/services/appflowy-collaborate/src/state.rs +++ b/services/appflowy-collaborate/src/state.rs @@ -1,11 +1,11 @@ use std::sync::Arc; +use access_control::casbin::access::AccessControl; use dashmap::DashMap; use futures_util::StreamExt; use sqlx::PgPool; use uuid::Uuid; -use access_control::access::AccessControl; use access_control::metrics::AccessControlMetrics; use app_error::AppError; use database::user::{select_all_uid_uuid, select_uid_from_uuid}; diff --git a/src/api/data_import.rs b/src/api/data_import.rs index 7dc25fb70..8fc9354bb 100644 --- a/src/api/data_import.rs +++ b/src/api/data_import.rs @@ -117,7 +117,7 @@ async fn import_data_handler( let workspace = create_empty_workspace( &state.pg_pool, - &state.workspace_access_control, + state.workspace_access_control.clone(), &state.collab_access_control_storage, &user_uuid, uid, diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 01f98409c..f51bafea9 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -232,7 +232,7 @@ async fn create_workspace_handler( let uid = state.user_cache.get_user_uid(&uuid).await?; let new_workspace = workspace::ops::create_workspace_for_user( &state.pg_pool, - &state.workspace_access_control, + state.workspace_access_control.clone(), &state.collab_access_control_storage, &uuid, uid, @@ -351,7 +351,7 @@ async fn post_accept_workspace_invite_handler( // Currently, when the server get restarted, the policy in access control will be lost. workspace::ops::accept_workspace_invite( &state.pg_pool, - &state.workspace_access_control, + state.workspace_access_control.clone(), user_uid, &user_uuid, &invite_id, @@ -369,7 +369,7 @@ async fn get_workspace_settings_handler( let uid = state.user_cache.get_user_uid(&user_uuid).await?; let settings = workspace::ops::get_workspace_settings( &state.pg_pool, - &state.workspace_access_control, + state.workspace_access_control.clone(), &workspace_id, &uid, ) @@ -389,7 +389,7 @@ async fn post_workspace_settings_handler( let uid = state.user_cache.get_user_uid(&user_uuid).await?; let settings = workspace::ops::update_workspace_settings( &state.pg_pool, - &state.workspace_access_control, + state.workspace_access_control.clone(), &workspace_id, &uid, data, @@ -435,7 +435,7 @@ async fn remove_workspace_member_handler( &state.pg_pool, &workspace_id, &member_emails, - &state.workspace_access_control, + state.workspace_access_control.clone(), ) .await?; @@ -482,7 +482,7 @@ async fn leave_workspace_handler( &state.pg_pool, &workspace_id, &user_uuid, - &state.workspace_access_control, + state.workspace_access_control.clone(), ) .await?; Ok(AppResponse::Ok().into()) @@ -508,7 +508,7 @@ async fn update_workspace_member_handler( &state.pg_pool, &workspace_id, &changeset, - &state.workspace_access_control, + state.workspace_access_control.clone(), ) .await?; } @@ -983,8 +983,12 @@ async fn add_collab_member_handler( ); } - biz::collab::ops::create_collab_member(&state.pg_pool, &payload, &state.collab_access_control) - .await?; + biz::collab::ops::create_collab_member( + &state.pg_pool, + &payload, + state.collab_access_control.clone(), + ) + .await?; Ok(Json(AppResponse::Ok())) } @@ -1009,7 +1013,7 @@ async fn update_collab_member_handler( &state.pg_pool, &user_uuid, &payload, - &state.collab_access_control, + state.collab_access_control.clone(), ) .await?; Ok(Json(AppResponse::Ok())) @@ -1030,8 +1034,12 @@ async fn remove_collab_member_handler( state: Data, ) -> Result>> { let payload = payload.into_inner(); - biz::collab::ops::delete_collab_member(&state.pg_pool, &payload, &state.collab_access_control) - .await?; + biz::collab::ops::delete_collab_member( + &state.pg_pool, + &payload, + state.collab_access_control.clone(), + ) + .await?; Ok(Json(AppResponse::Ok())) } diff --git a/src/api/ws.rs b/src/api/ws.rs index d23b4e409..51113e341 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -15,7 +15,6 @@ use tracing::{debug, error, instrument, trace}; use app_error::AppError; use appflowy_collaborate::actix_ws::client::rt_client::RealtimeClient; use appflowy_collaborate::actix_ws::server::RealtimeServerActor; -use appflowy_collaborate::collab::access_control::RealtimeCollabAccessControlImpl; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use authentication::jwt::{authorization_from_token, UserUuid}; use collab_rt_entity::user::{AFUserChange, RealtimeUser, UserMessage}; @@ -31,8 +30,7 @@ pub fn ws_scope() -> Scope { } const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB -pub type RealtimeServerAddr = - Addr>; +pub type RealtimeServerAddr = Addr>; /// This function will not be used after the 0.5.0 of the client. #[instrument(skip_all, err)] diff --git a/src/application.rs b/src/application.rs index b93cd196b..003c238df 100644 --- a/src/application.rs +++ b/src/application.rs @@ -2,20 +2,32 @@ use std::net::TcpListener; use std::sync::Arc; use std::time::Duration; +use access_control::casbin::access::AccessControl; +use access_control::casbin::collab::{CollabAccessControlImpl, RealtimeCollabAccessControlImpl}; +use access_control::casbin::workspace::WorkspaceAccessControlImpl; +use access_control::collab::{CollabAccessControl, RealtimeAccessControl}; +use access_control::noops::collab::{ + CollabAccessControlImpl as NoOpsCollabAccessControlImpl, + RealtimeCollabAccessControlImpl as NoOpsRealtimeCollabAccessControlImpl, +}; +use access_control::noops::workspace::WorkspaceAccessControlImpl as NoOpsWorkspaceAccessControlImpl; +use access_control::workspace::WorkspaceAccessControl; use actix::Supervisor; use actix_identity::IdentityMiddleware; use actix_session::storage::RedisSessionStore; use actix_session::SessionMiddleware; use actix_web::cookie::Key; -use actix_web::middleware::NormalizePath; +use actix_web::middleware::{Condition, NormalizePath}; use actix_web::{dev::Server, web::Data, App, HttpServer}; use anyhow::{Context, Error}; +use appflowy_collaborate::collab::access_control::CollabStorageAccessControlImpl; use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider}; use aws_sdk_s3::operation::create_bucket::CreateBucketError; use aws_sdk_s3::types::{ BucketInfo, BucketLocationConstraint, BucketType, CreateBucketConfiguration, }; use collab::lock::Mutex; +use database::collab::cache::CollabCache; use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; use openssl::x509::X509; use secrecy::{ExposeSecret, Secret}; @@ -23,24 +35,18 @@ use sqlx::{postgres::PgPoolOptions, PgPool}; use tokio::sync::RwLock; use tracing::{error, info, warn}; -use access_control::access::{enable_access_control, AccessControl}; use appflowy_ai_client::client::AppFlowyAIClient; use appflowy_collaborate::actix_ws::server::RealtimeServerActor; -use appflowy_collaborate::collab::access_control::{ - CollabAccessControlImpl, CollabStorageAccessControlImpl, RealtimeCollabAccessControlImpl, -}; use appflowy_collaborate::collab::storage::CollabStorageImpl; use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender}; use appflowy_collaborate::indexer::IndexerProvider; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::snapshot::SnapshotControl; use appflowy_collaborate::CollaborationServer; -use database::collab::cache::CollabCache; use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; use gotrue::grant::{Grant, PasswordGrant}; use snowflake::Snowflake; use tonic_proto::history::history_client::HistoryClient; -use workspace_access::WorkspaceAccessControlImpl; use crate::api::access_request::access_request_scope; use crate::api::ai::ai_completion_scope; @@ -125,17 +131,17 @@ pub async fn run_actix_server( let access_control = MiddlewareAccessControlTransform::new() .with_acs(WorkspaceMiddlewareAccessControl::new( state.pg_pool.clone(), - state.workspace_access_control.clone().into(), + state.workspace_access_control.clone(), )) .with_acs(CollabMiddlewareAccessControl::new( - state.collab_access_control.clone().into(), + state.collab_access_control.clone(), state.collab_cache.clone(), )); // Initialize metrics that which are registered in the registry. - let realtime_server = CollaborationServer::<_, _>::new( + let realtime_server = CollaborationServer::<_>::new( storage.clone(), - RealtimeCollabAccessControlImpl::new(state.access_control.clone()), + state.realtime_access_control.clone(), state.metrics.realtime_metrics.clone(), rt_cmd_recv, state.redis_connection_manager.clone(), @@ -159,7 +165,7 @@ pub async fn run_actix_server( .build(), ) // .wrap(DecryptPayloadMiddleware) - .wrap(access_control.clone()) + .wrap(Condition::new(config.access_control.is_enabled, access_control.clone())) .wrap(RequestIdMiddleware) .service(server_info_scope()) .service(user_scope()) @@ -267,7 +273,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result = if config.access_control.is_enabled { + Arc::new(CollabAccessControlImpl::new(access_control.clone())) + } else { + Arc::new(NoOpsCollabAccessControlImpl::new()) + }; + let workspace_access_control: Arc = + if config.access_control.is_enabled { + Arc::new(WorkspaceAccessControlImpl::new(access_control.clone())) + } else { + Arc::new(NoOpsWorkspaceAccessControlImpl::new()) + }; + let realtime_access_control: Arc = if config.access_control.is_enabled + { + Arc::new(RealtimeCollabAccessControlImpl::new(access_control)) + } else { + Arc::new(NoOpsRealtimeCollabAccessControlImpl::new()) + }; let collab_cache = CollabCache::new(redis_conn_manager.clone(), pg_pool.clone()); let collab_storage_access_control = CollabStorageAccessControlImpl { - collab_access_control: collab_access_control.clone().into(), - workspace_access_control: workspace_access_control.clone().into(), + collab_access_control: collab_access_control.clone(), + workspace_access_control: workspace_access_control.clone(), cache: collab_cache.clone(), }; let snapshot_control = SnapshotControl::new( @@ -338,11 +359,11 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result { - pub access_control: Arc, +pub struct CollabMiddlewareAccessControl { + pub access_control: Arc, collab_cache: CollabCache, skip_resources: Vec<(Method, ResourceDef)>, require_access_levels: Vec<(ResourceDef, HashMap)>, } -impl CollabMiddlewareAccessControl -where - AC: CollabAccessControl, -{ - pub fn new(access_control: Arc, collab_cache: CollabCache) -> Self { +impl CollabMiddlewareAccessControl { + pub fn new(access_control: Arc, collab_cache: CollabCache) -> Self { Self { skip_resources: vec![ // Skip access control when trying to create a collab @@ -79,10 +76,7 @@ where } #[async_trait] -impl MiddlewareAccessControl for CollabMiddlewareAccessControl -where - AC: CollabAccessControl, -{ +impl MiddlewareAccessControl for CollabMiddlewareAccessControl { fn resource(&self) -> AccessResource { AccessResource::Collab } diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index f81dea247..089ea8c1a 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -37,7 +37,7 @@ use super::publish_outline::collab_folder_to_published_outline; pub async fn create_collab_member( pg_pool: &PgPool, params: &InsertCollabMemberParams, - collab_access_control: &impl CollabAccessControl, + collab_access_control: Arc, ) -> Result<(), AppError> { params.validate()?; @@ -83,7 +83,7 @@ pub async fn upsert_collab_member( pg_pool: &PgPool, _user_uuid: &Uuid, params: &UpdateCollabMemberParams, - collab_access_control: &impl CollabAccessControl, + collab_access_control: Arc, ) -> Result<(), AppError> { params.validate()?; let mut transaction = pg_pool @@ -123,7 +123,7 @@ pub async fn get_collab_member( pub async fn delete_collab_member( pg_pool: &PgPool, params: &CollabMemberIdentify, - collab_access_control: &impl CollabAccessControl, + collab_access_control: Arc, ) -> Result<(), AppError> { params.validate()?; let mut transaction = pg_pool diff --git a/src/biz/pg_listener.rs b/src/biz/pg_listener.rs index 5ecd40530..5039aeaab 100644 --- a/src/biz/pg_listener.rs +++ b/src/biz/pg_listener.rs @@ -1,10 +1,10 @@ +use access_control::casbin::notification::WorkspaceMemberNotification; use anyhow::Error; use appflowy_collaborate::collab::notification::CollabMemberNotification; use database::listener::PostgresDBListener; use database::pg_row::AFUserNotification; use sqlx::PgPool; use tokio::sync::broadcast; -use workspace_access::notification::WorkspaceMemberNotification; pub struct PgListeners { user_listener: UserListener, diff --git a/src/biz/user/user_verify.rs b/src/biz/user/user_verify.rs index 714f57013..602e83d11 100644 --- a/src/biz/user/user_verify.rs +++ b/src/biz/user/user_verify.rs @@ -4,7 +4,6 @@ use anyhow::{Context, Result}; use sqlx::types::uuid; use tracing::{event, instrument, trace}; -use access_control::workspace::WorkspaceAccessControl; use app_error::AppError; use database::user::{create_user, is_user_exist}; use database::workspace::select_workspace; diff --git a/src/biz/workspace/access_control.rs b/src/biz/workspace/access_control.rs index 652bc02ed..6a3457760 100644 --- a/src/biz/workspace/access_control.rs +++ b/src/biz/workspace/access_control.rs @@ -27,18 +27,15 @@ use crate::middleware::access_control_mw::{AccessResource, MiddlewareAccessContr use crate::state::UserCache; #[derive(Clone)] -pub struct WorkspaceMiddlewareAccessControl { +pub struct WorkspaceMiddlewareAccessControl { pub pg_pool: PgPool, - pub access_control: Arc, + pub access_control: Arc, skip_resources: Vec<(Method, ResourceDef)>, require_role_rules: Vec<(ResourceDef, HashMap)>, } -impl WorkspaceMiddlewareAccessControl -where - AC: WorkspaceAccessControl, -{ - pub fn new(pg_pool: PgPool, access_control: Arc) -> Self { +impl WorkspaceMiddlewareAccessControl { + pub fn new(pg_pool: PgPool, access_control: Arc) -> Self { Self { pg_pool, // Skip access control when the request matches the following resources @@ -102,10 +99,7 @@ where } #[async_trait] -impl MiddlewareAccessControl for WorkspaceMiddlewareAccessControl -where - AC: WorkspaceAccessControl, -{ +impl MiddlewareAccessControl for WorkspaceMiddlewareAccessControl { fn resource(&self) -> AccessResource { AccessResource::Workspace } diff --git a/src/biz/workspace/ops.rs b/src/biz/workspace/ops.rs index 4090c3f1c..0f1e38fe5 100644 --- a/src/biz/workspace/ops.rs +++ b/src/biz/workspace/ops.rs @@ -64,7 +64,7 @@ pub async fn delete_workspace_for_user( /// object. pub async fn create_empty_workspace( pg_pool: &PgPool, - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, collab_storage: &Arc, user_uuid: &Uuid, user_uid: i64, @@ -117,7 +117,7 @@ pub async fn create_empty_workspace( pub async fn create_workspace_for_user( pg_pool: &PgPool, - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, collab_storage: &Arc, user_uuid: &Uuid, user_uid: i64, @@ -301,7 +301,7 @@ pub async fn open_workspace( pub async fn accept_workspace_invite( pg_pool: &PgPool, - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, user_uid: i64, user_uuid: &Uuid, invite_id: &Uuid, @@ -527,7 +527,7 @@ pub async fn leave_workspace( pg_pool: &PgPool, workspace_id: &Uuid, user_uuid: &Uuid, - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, ) -> Result<(), AppResponseError> { let email = database::user::select_email_from_user_uuid(pg_pool, user_uuid).await?; remove_workspace_members(pg_pool, workspace_id, &[email], workspace_access_control).await @@ -537,7 +537,7 @@ pub async fn remove_workspace_members( pg_pool: &PgPool, workspace_id: &Uuid, member_emails: &[String], - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, ) -> Result<(), AppResponseError> { let mut txn = pg_pool .begin() @@ -583,7 +583,7 @@ pub async fn update_workspace_member( pg_pool: &PgPool, workspace_id: &Uuid, changeset: &WorkspaceMemberChangeset, - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, ) -> Result<(), AppError> { if let Some(role) = &changeset.role { upsert_workspace_member(pg_pool, workspace_id, &changeset.email, role.clone()).await?; @@ -610,7 +610,7 @@ pub async fn get_workspace_document_total_bytes( pub async fn get_workspace_settings( pg_pool: &PgPool, - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, workspace_id: &Uuid, owner_uid: &i64, ) -> Result { @@ -631,7 +631,7 @@ pub async fn get_workspace_settings( pub async fn update_workspace_settings( pg_pool: &PgPool, - workspace_access_control: &impl WorkspaceAccessControl, + workspace_access_control: Arc, workspace_id: &Uuid, owner_uid: &i64, change: AFWorkspaceSettingsChange, diff --git a/src/config/config.rs b/src/config/config.rs index 199eb16f7..a5ddb4d5f 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -12,6 +12,7 @@ use infra::env_util::{get_env_var, get_env_var_opt}; #[derive(Clone, Debug)] pub struct Config { pub app_env: Environment, + pub access_control: AccessControlSetting, pub db_settings: DatabaseSetting, pub gotrue: GoTrueSetting, pub application: ApplicationSetting, @@ -27,6 +28,11 @@ pub struct Config { pub appflowy_web_url: Option, } +#[derive(serde::Deserialize, Clone, Debug)] +pub struct AccessControlSetting { + pub is_enabled: bool, +} + #[derive(serde::Deserialize, Clone, Debug)] pub struct MailerSetting { pub smtp_host: String, @@ -172,6 +178,11 @@ pub fn get_configuration() -> Result { app_env: get_env_var("APPFLOWY_ENVIRONMENT", "local") .parse() .context("fail to get APPFLOWY_ENVIRONMENT")?, + access_control: AccessControlSetting { + is_enabled: get_env_var("APPFLOWY_ACCESS_CONTROL", "false") + .parse() + .context("fail to get APPFLOWY_ACCESS_CONTROL")?, + }, db_settings: DatabaseSetting { pg_conn_opts: PgConnectOptions::from_str(&get_env_var( "APPFLOWY_DATABASE_URL", diff --git a/src/middleware/access_control_mw.rs b/src/middleware/access_control_mw.rs index 24fb9f29a..5a71caeac 100644 --- a/src/middleware/access_control_mw.rs +++ b/src/middleware/access_control_mw.rs @@ -1,6 +1,5 @@ use crate::api::workspace::{COLLAB_OBJECT_ID_PATH, WORKSPACE_ID_PATH}; use crate::state::AppState; -use access_control::access::enable_access_control; use actix_router::{Path, Url}; use actix_service::{forward_ready, Service, Transform}; use actix_web::dev::{ResourceDef, ServiceRequest, ServiceResponse}; @@ -116,12 +115,6 @@ where forward_ready!(service); fn call(&self, mut req: ServiceRequest) -> Self::Future { - // If the access control is not enabled, skip the access control - if !enable_access_control() { - let fut = self.service.call(req); - return Box::pin(fut); - } - let path = req.match_pattern().map(|pattern| { // Create ResourceDef will cause memory leak, so we use the cache to store the ResourceDef let mut path = req.match_info().clone(); diff --git a/src/middleware/cors_mw.rs b/src/middleware/cors_mw.rs deleted file mode 100644 index faec4c280..000000000 --- a/src/middleware/cors_mw.rs +++ /dev/null @@ -1,17 +0,0 @@ -use actix_cors::Cors; -use actix_web::http; - -// Deprecated -// AppFlowy Cloud uses nginx to configure CORS -pub fn default_cors() -> Cors { - Cors::default() // allowed_origin return access-control-allow-origin: * by default - .allow_any_origin() - .send_wildcard() - .allowed_methods(vec!["GET", "POST", "PUT", "DELETE"]) - .allowed_headers(vec![ - actix_web::http::header::AUTHORIZATION, - actix_web::http::header::ACCEPT, - ]) - .allowed_header(http::header::CONTENT_TYPE) - .max_age(3600) -} diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 6b1181f0c..40f5c54f4 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -1,5 +1,4 @@ pub mod access_control_mw; -// pub mod cors_mw; pub mod encrypt_mw; pub mod metrics_mw; pub mod request_id; diff --git a/src/state.rs b/src/state.rs index a68480561..9380a7260 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,30 +1,29 @@ use std::sync::Arc; +use access_control::collab::{CollabAccessControl, RealtimeAccessControl}; +use access_control::workspace::WorkspaceAccessControl; use collab::lock::Mutex; use dashmap::DashMap; +use database::collab::cache::CollabCache; use secrecy::{ExposeSecret, Secret}; use sqlx::PgPool; use tokio::sync::RwLock; use tokio_stream::StreamExt; use uuid::Uuid; -use access_control::access::AccessControl; use access_control::metrics::AccessControlMetrics; use app_error::AppError; use appflowy_ai_client::client::AppFlowyAIClient; -use appflowy_collaborate::collab::access_control::CollabAccessControlImpl; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use appflowy_collaborate::indexer::IndexerProvider; use appflowy_collaborate::metrics::CollabMetrics; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::CollabRealtimeMetrics; -use database::collab::cache::CollabCache; use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; use database::user::{select_all_uid_uuid, select_uid_from_uuid}; use gotrue::grant::{Grant, PasswordGrant}; use snowflake::Snowflake; use tonic_proto::history::history_client::HistoryClient; -use workspace_access::WorkspaceAccessControlImpl; use crate::api::metrics::{PublishedCollabMetrics, RequestMetrics}; use crate::biz::pg_listener::PgListeners; @@ -43,13 +42,13 @@ pub struct AppState { pub redis_connection_manager: RedisConnectionManager, pub collab_cache: CollabCache, pub collab_access_control_storage: Arc, - pub collab_access_control: CollabAccessControlImpl, - pub workspace_access_control: WorkspaceAccessControlImpl, + pub collab_access_control: Arc, + pub workspace_access_control: Arc, + pub realtime_access_control: Arc, pub bucket_storage: Arc, pub published_collab_store: Arc, pub bucket_client: AwsS3BucketClientImpl, pub pg_listeners: Arc, - pub access_control: AccessControl, pub metrics: AppMetrics, pub gotrue_admin: GoTrueAdmin, pub mailer: Mailer,