From 2a677c1a71d4efaf0393354a9d5277fc413e31ff Mon Sep 17 00:00:00 2001 From: DarkSky Date: Wed, 27 Sep 2023 16:58:30 +0800 Subject: [PATCH] feat: init workspace api --- apps/keck/src/server/api/blocks/mod.rs | 1 + apps/keck/src/server/api/blocks/workspace.rs | 60 +++++++++++++++++++- apps/keck/src/server/api/doc.rs | 1 + apps/keck/src/server/api/mod.rs | 10 ++++ libs/jwst-storage/src/storage/mod.rs | 24 +++++++- libs/jwst-storage/src/types.rs | 4 +- 6 files changed, 97 insertions(+), 3 deletions(-) diff --git a/apps/keck/src/server/api/blocks/mod.rs b/apps/keck/src/server/api/blocks/mod.rs index 6ead69c0..be6f7378 100644 --- a/apps/keck/src/server/api/blocks/mod.rs +++ b/apps/keck/src/server/api/blocks/mod.rs @@ -30,6 +30,7 @@ fn block_apis(router: Router) -> Router { fn workspace_apis(router: Router) -> Router { router + .route("/block/:workspace/init", post(workspace::init_workspace)) .route("/block/:workspace/client", get(clients::workspace_client)) .route("/block/:workspace/clients", get(clients::workspace_clients)) .route("/block/:workspace/history", get(history::history_workspace)) diff --git a/apps/keck/src/server/api/blocks/workspace.rs b/apps/keck/src/server/api/blocks/workspace.rs index 7601d09a..98181ad1 100644 --- a/apps/keck/src/server/api/blocks/workspace.rs +++ b/apps/keck/src/server/api/blocks/workspace.rs @@ -1,8 +1,13 @@ use axum::{ - extract::{Path, Query}, + extract::{BodyStream, Path, Query}, response::Response, }; +use futures::{ + future, + stream::{iter, StreamExt}, +}; use jwst_core::DocStorage; +use jwst_storage::JwstStorageError; use utoipa::IntoParams; use super::*; @@ -32,6 +37,59 @@ pub async fn get_workspace(Extension(context): Extension>, Path(wor } } +/// Init a `Workspace` by id +/// - Return 200 Ok and `Workspace`'s data if init success. +/// - Return 304 Not Modified if `Workspace` is exists. +/// - Return 500 Internal Server Error if init failed. +#[utoipa::path( + post, + tag = "Workspace", + context_path = "/api/block", + path = "/{workspace}/init", + params( + ("workspace", description = "workspace id"), + ), + request_body( + content = BodyStream, + content_type="application/octet-stream" + ), + responses( + (status = 200, description = "Workspace init success"), + (status = 304, description = "Workspace is exists"), + (status = 500, description = "Failed to init a workspace") + ) +)] +pub async fn init_workspace( + Extension(context): Extension>, + Path(workspace): Path, + body: BodyStream, +) -> Response { + info!("init_workspace: {}", workspace); + + let mut has_error = false; + let data = body + .take_while(|x| { + has_error = x.is_err(); + future::ready(x.is_ok()) + }) + .filter_map(|data| future::ready(data.ok())) + .flat_map(|buffer| iter(buffer)) + .collect::>() + .await; + + if has_error { + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } else if let Err(e) = context.init_workspace(workspace, data).await { + if matches!(e, JwstStorageError::WorkspaceExists(_)) { + return StatusCode::NOT_MODIFIED.into_response(); + } + warn!("failed to init workspace: {}", e.to_string()); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } else { + StatusCode::OK.into_response() + } +} + /// Create a `Workspace` by id /// - Return 200 Ok and `Workspace`'s data if init success or `Workspace` is /// exists. diff --git a/apps/keck/src/server/api/doc.rs b/apps/keck/src/server/api/doc.rs index 7130ce4a..e86d9308 100644 --- a/apps/keck/src/server/api/doc.rs +++ b/apps/keck/src/server/api/doc.rs @@ -16,6 +16,7 @@ use super::{ history::history_workspace, subscribe::subscribe_workspace, subscribe::subscribe_test_hook, + workspace::init_workspace, workspace::get_workspace, workspace::set_workspace, workspace::delete_workspace, diff --git a/apps/keck/src/server/api/mod.rs b/apps/keck/src/server/api/mod.rs index 07281905..92f384c3 100644 --- a/apps/keck/src/server/api/mod.rs +++ b/apps/keck/src/server/api/mod.rs @@ -84,6 +84,9 @@ impl Context { let webhook = self.webhook.clone(); let ws_id = workspace.id(); workspace.subscribe_doc(move |_, history| { + if history.is_empty() { + return; + } let webhook = webhook.read().unwrap(); if webhook.is_empty() { return; @@ -127,6 +130,13 @@ impl Context { .map(|w| self.register_webhook(w)) } + pub async fn init_workspace(&self, workspace_id: S, data: Vec) -> JwstStorageResult + where + S: AsRef, + { + self.storage.init_workspace(workspace_id, data).await + } + pub async fn create_workspace(&self, workspace_id: S) -> JwstStorageResult where S: AsRef, diff --git a/libs/jwst-storage/src/storage/mod.rs b/libs/jwst-storage/src/storage/mod.rs index 99a1c09f..05d6b11a 100644 --- a/libs/jwst-storage/src/storage/mod.rs +++ b/libs/jwst-storage/src/storage/mod.rs @@ -67,7 +67,7 @@ impl JwstStorage { Ok(storage) } - async fn db_migrate(&self) -> JwstStorageResult<()> { + async fn db_migrate(&self) -> JwstStorageResult { Migrator::up(&self.pool, None).await?; Ok(()) } @@ -143,6 +143,28 @@ impl JwstStorage { } } + pub async fn init_workspace(&self, workspace_id: S, data: Vec) -> JwstStorageResult + where + S: AsRef, + { + let workspace_id = workspace_id.as_ref(); + info!("init_workspace: {}", workspace_id); + if !self + .docs + .detect_workspace(workspace_id) + .await + .map_err(|_err| JwstStorageError::Crud(format!("failed to check workspace {}", workspace_id)))? + { + self.docs + .flush_workspace(workspace_id.into(), data) + .await + .map_err(|_err| JwstStorageError::Crud(format!("failed to init workspace {}", workspace_id)))?; + Ok(()) + } else { + Err(JwstStorageError::WorkspaceExists(workspace_id.into())) + } + } + pub async fn full_migrate(&self, workspace_id: String, update: Option>, force: bool) -> bool { let mut map = self.last_migrate.lock().await; let ts = map.entry(workspace_id.clone()).or_insert(Instant::now()); diff --git a/libs/jwst-storage/src/types.rs b/libs/jwst-storage/src/types.rs index 90cadee9..f71543ad 100644 --- a/libs/jwst-storage/src/types.rs +++ b/libs/jwst-storage/src/types.rs @@ -17,6 +17,8 @@ pub enum JwstStorageError { DocMerge(tokio::task::JoinError), #[error("workspace {0} not found")] WorkspaceNotFound(String), + #[error("workspace {0} exists")] + WorkspaceExists(String), #[error("jwst error")] Jwst(#[from] jwst_core::JwstError), #[error("failed to process blob")] @@ -29,4 +31,4 @@ pub enum JwstStorageError { DotEnvy(#[from] dotenvy::Error), } -pub type JwstStorageResult = Result; +pub type JwstStorageResult = Result;