Skip to content

Commit

Permalink
chore: using copy to improve write speed of batch collab api endpoint (
Browse files Browse the repository at this point in the history
…#862)

* chore: using copy to improve write speed of batch collab api endpoint

* chore: insert to redis

* chore: fix compile
  • Loading branch information
appflowy authored Oct 7, 2024
1 parent 9c0dffa commit 38ff187
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 93 deletions.
23 changes: 22 additions & 1 deletion libs/collab-rt-protocol/src/data_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use collab_entity::CollabType;
use tracing::instrument;

#[instrument(level = "trace", skip(data), fields(len = %data.len()))]
pub async fn validate_encode_collab(
#[inline]
pub async fn spawn_blocking_validate_encode_collab(
object_id: &str,
data: &[u8],
collab_type: &CollabType,
Expand All @@ -31,3 +32,23 @@ pub async fn validate_encode_collab(
})
.await?
}

#[instrument(level = "trace", skip(data), fields(len = %data.len()))]
#[inline]
pub fn validate_encode_collab(
object_id: &str,
data: &[u8],
collab_type: &CollabType,
) -> Result<(), Error> {
let encoded_collab = EncodedCollab::decode_from_bytes(data)?;
let collab = Collab::new_with_source(
CollabOrigin::Empty,
object_id,
DataSource::DocStateV1(encoded_collab.doc_state.to_vec()),
vec![],
false,
)?;

collab_type.validate_require_data(&collab)?;
Ok::<(), Error>(())
}
8 changes: 2 additions & 6 deletions libs/database/src/collab/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl CollabCache {
Ok(encode_collab)
}

pub async fn insert_encode_collab_in_disk(
pub async fn insert_encode_collab_to_disk(
&self,
workspace_id: &str,
uid: &i64,
Expand All @@ -193,11 +193,7 @@ impl CollabCache {
Ok(())
}

/// Insert the encoded collab data into the memory cache.
pub async fn insert_encode_collab_data_in_mem(
&self,
params: &CollabParams,
) -> Result<(), AppError> {
pub async fn insert_encode_collab_to_mem(&self, params: &CollabParams) -> Result<(), AppError> {
let timestamp = chrono::Utc::now().timestamp();
self
.mem_cache
Expand Down
16 changes: 8 additions & 8 deletions libs/database/src/collab/collab_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,19 @@ pub trait CollabStorage: Send + Sync + 'static {
/// if write_immediately is true, the data will be written to disk immediately. Otherwise, the data will
/// be scheduled to be written to disk later.
///
async fn insert_or_update_collab(
async fn queue_insert_or_update_collab(
&self,
workspace_id: &str,
uid: &i64,
params: CollabParams,
write_immediately: bool,
) -> AppResult<()>;

async fn insert_new_collab(
async fn batch_insert_new_collab(
&self,
workspace_id: &str,
uid: &i64,
params: CollabParams,
params: Vec<CollabParams>,
) -> AppResult<()>;

/// Insert a new collaboration in the storage.
Expand Down Expand Up @@ -185,7 +185,7 @@ where
self.as_ref().encode_collab_redis_query_state()
}

async fn insert_or_update_collab(
async fn queue_insert_or_update_collab(
&self,
workspace_id: &str,
uid: &i64,
Expand All @@ -194,19 +194,19 @@ where
) -> AppResult<()> {
self
.as_ref()
.insert_or_update_collab(workspace_id, uid, params, write_immediately)
.queue_insert_or_update_collab(workspace_id, uid, params, write_immediately)
.await
}

async fn insert_new_collab(
async fn batch_insert_new_collab(
&self,
workspace_id: &str,
uid: &i64,
params: CollabParams,
params: Vec<CollabParams>,
) -> AppResult<()> {
self
.as_ref()
.insert_new_collab(workspace_id, uid, params)
.batch_insert_new_collab(workspace_id, uid, params)
.await
}

Expand Down
4 changes: 2 additions & 2 deletions services/appflowy-collaborate/src/collab/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl StorageQueue {
trace!("queuing {} object to pending write queue", params.object_id,);
self
.collab_cache
.insert_encode_collab_data_in_mem(params)
.insert_encode_collab_to_mem(params)
.await?;

let seq = self
Expand Down Expand Up @@ -456,7 +456,7 @@ async fn write_pending_to_disk(
.execute(transaction.deref_mut())
.await?;
if let Err(_err) = collab_cache
.insert_encode_collab_in_disk(&record.workspace_id, &record.uid, params, &mut transaction)
.insert_encode_collab_to_disk(&record.workspace_id, &record.uid, params, &mut transaction)
.await
{
sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", savepoint_name))
Expand Down
45 changes: 34 additions & 11 deletions services/appflowy-collaborate/src/collab/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::shared_state::RealtimeSharedState;
use app_error::AppError;
use database::collab::cache::CollabCache;
use database::collab::{
AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl, GetCollabOrigin,
insert_into_af_collab_bulk_for_user, AppResult, CollabMetadata, CollabStorage,
CollabStorageAccessControl, GetCollabOrigin,
};
use database_entity::dto::{
AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab,
Expand Down Expand Up @@ -218,6 +219,26 @@ where
.await
.map_err(AppError::from)
}

async fn batch_insert_collabs(
&self,
workspace_id: &str,
uid: &i64,
params_list: Vec<CollabParams>,
) -> Result<(), AppError> {
let mut transaction = self.cache.pg_pool().begin().await?;
insert_into_af_collab_bulk_for_user(&mut transaction, uid, workspace_id, &params_list).await?;
transaction.commit().await?;

// update the mem cache without blocking the current task
let cache = self.cache.clone();
tokio::spawn(async move {
for params in params_list {
let _ = cache.insert_encode_collab_to_mem(&params).await;
}
});
Ok(())
}
}

#[async_trait]
Expand All @@ -230,7 +251,7 @@ where
(state.total_attempts, state.success_attempts)
}

async fn insert_or_update_collab(
async fn queue_insert_or_update_collab(
&self,
workspace_id: &str,
uid: &i64,
Expand Down Expand Up @@ -270,23 +291,25 @@ where
Ok(())
}

async fn insert_new_collab(
async fn batch_insert_new_collab(
&self,
workspace_id: &str,
uid: &i64,
params: CollabParams,
params_list: Vec<CollabParams>,
) -> AppResult<()> {
params.validate()?;

self
.check_write_workspace_permission(workspace_id, uid)
.await?;

// TODO(nathan): batch insert permission
for params in &params_list {
self
.access_control
.update_policy(uid, &params.object_id, AFAccessLevel::FullAccess)
.await?;
}
self
.access_control
.update_policy(uid, &params.object_id, AFAccessLevel::FullAccess)
.await?;
self
.queue_insert_collab(workspace_id, uid, params, WritePriority::High)
.batch_insert_collabs(workspace_id, uid, params_list)
.await?;
Ok(())
}
Expand Down
12 changes: 8 additions & 4 deletions services/appflowy-collaborate/src/collab/validator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use app_error::AppError;
use async_trait::async_trait;
use collab_rt_protocol::validate_encode_collab;
use collab_rt_protocol::spawn_blocking_validate_encode_collab;
use database_entity::dto::CollabParams;

#[async_trait]
Expand All @@ -11,8 +11,12 @@ pub trait CollabValidator {
#[async_trait]
impl CollabValidator for CollabParams {
async fn check_encode_collab(&self) -> Result<(), AppError> {
validate_encode_collab(&self.object_id, &self.encoded_collab_v1, &self.collab_type)
.await
.map_err(|err| AppError::NoRequiredData(err.to_string()))
spawn_blocking_validate_encode_collab(
&self.object_id,
&self.encoded_collab_v1,
&self.collab_type,
)
.await
.map_err(|err| AppError::NoRequiredData(err.to_string()))
}
}
2 changes: 1 addition & 1 deletion services/appflowy-collaborate/src/group/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ where

self
.storage
.insert_or_update_collab(&self.workspace_id, &self.uid, params, write_immediately)
.queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, write_immediately)
.await?;
// Update the edit state on successful save
self.edit_state.tick();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, error, trace, warn};
use validator::Validate;

use app_error::AppError;
use collab_rt_protocol::validate_encode_collab;
use collab_rt_protocol::spawn_blocking_validate_encode_collab;
use database::collab::{
create_snapshot_and_maintain_limit, get_all_collab_snapshot_meta, latest_snapshot_time,
select_snapshot, AppResult, COLLAB_SNAPSHOT_LIMIT, SNAPSHOT_PER_HOUR,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl SnapshotCommandRunner {
};

// Validate collab data before processing
let result = validate_encode_collab(
let result = spawn_blocking_validate_encode_collab(
&next_item.object_id,
&encoded_collab_v1,
&next_item.collab_type,
Expand Down
12 changes: 8 additions & 4 deletions src/api/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use actix_web::HttpRequest;
use appflowy_ai_client::dto::AIModel;
use async_trait::async_trait;
use byteorder::{ByteOrder, LittleEndian};
use collab_rt_protocol::validate_encode_collab;
use collab_rt_protocol::spawn_blocking_validate_encode_collab;
use database_entity::dto::CollabParams;
use std::str::FromStr;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -74,9 +74,13 @@ pub trait CollabValidator {
#[async_trait]
impl CollabValidator for CollabParams {
async fn check_encode_collab(&self) -> Result<(), AppError> {
validate_encode_collab(&self.object_id, &self.encoded_collab_v1, &self.collab_type)
.await
.map_err(|err| AppError::NoRequiredData(err.to_string()))
spawn_blocking_validate_encode_collab(
&self.object_id,
&self.encoded_collab_v1,
&self.collab_type,
)
.await
.map_err(|err| AppError::NoRequiredData(err.to_string()))
}
}

Expand Down
Loading

0 comments on commit 38ff187

Please sign in to comment.