diff --git a/lib/dal/src/change_set.rs b/lib/dal/src/change_set.rs index 3bdb09612d..bdcf418a55 100644 --- a/lib/dal/src/change_set.rs +++ b/lib/dal/src/change_set.rs @@ -1,16 +1,17 @@ use chrono::{DateTime, Utc}; +use postgres_types::ToSql; use serde::{Deserialize, Serialize}; use si_data_nats::NatsError; -use si_data_pg::PgError; +use si_data_pg::{PgError, PgPoolError}; use strum::{Display, EnumString}; use telemetry::prelude::*; use thiserror::Error; use crate::standard_model::{object_option_from_row_option, objects_from_rows}; -use crate::ws_event::{WsEvent, WsEventError, WsPayload}; use crate::{ pk, Action, ActionError, HistoryActor, HistoryEvent, HistoryEventError, LabelListError, StandardModelError, Tenancy, Timestamp, TransactionsError, User, UserError, UserPk, Visibility, + WsEvent, WsEventError, WsPayload, }; use crate::{ComponentError, DalContext, WsEventResult}; @@ -18,6 +19,8 @@ const CHANGE_SET_OPEN_LIST: &str = include_str!("queries/change_set/open_list.sq const CHANGE_SET_GET_BY_PK: &str = include_str!("queries/change_set/get_by_pk.sql"); const GET_ACTORS: &str = include_str!("queries/change_set/get_actors.sql"); +const BEGIN_MERGE_FLOW: &str = include_str!("queries/change_set/begin_merge_flow.sql"); + #[remain::sorted] #[derive(Error, Debug)] pub enum ChangeSetError { @@ -38,6 +41,8 @@ pub enum ChangeSetError { #[error(transparent)] Pg(#[from] PgError), #[error(transparent)] + PgPool(#[from] PgPoolError), + #[error(transparent)] SerdeJson(#[from] serde_json::Error), #[error(transparent)] StandardModel(#[from] StandardModelError), @@ -52,12 +57,13 @@ pub enum ChangeSetError { pub type ChangeSetResult = Result; #[remain::sorted] -#[derive(Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Clone, ToSql)] pub enum ChangeSetStatus { Abandoned, Applied, Closed, Failed, + NeedsApproval, Open, } @@ -113,6 +119,23 @@ impl ChangeSet { Utc::now().format("%Y-%m-%d-%H:%M").to_string() } + pub async fn begin_approval_flow(&mut self, ctx: &mut DalContext) -> ChangeSetResult<()> { + let row = ctx + .pg_pool() + .get() + .await? + .query_one( + BEGIN_MERGE_FLOW, + &[&self.pk, &ChangeSetStatus::NeedsApproval], + ) + .await?; + let updated_at: DateTime = row.try_get("timestamp_updated_at")?; + self.timestamp.updated_at = updated_at; + self.status = ChangeSetStatus::NeedsApproval; + + Ok(()) + } + #[instrument(skip(ctx))] pub async fn apply(&mut self, ctx: &mut DalContext) -> ChangeSetResult<()> { let actor = serde_json::to_value(ctx.history_actor())?; @@ -272,6 +295,30 @@ impl WsEvent { ) .await } + + pub async fn change_set_merge_vote( + ctx: &DalContext, + change_set_pk: ChangeSetPk, + user_pk: UserPk, + vote: bool, + ) -> WsEventResult { + WsEvent::new( + ctx, + WsPayload::ChangeSetMergeVote(ChangeSetMergeVotePayload { + change_set_pk, + user_pk, + vote, + }), + ) + .await + } + + pub async fn change_set_begin_approval_process( + ctx: &DalContext, + change_set_pk: ChangeSetPk, + ) -> WsEventResult { + WsEvent::new(ctx, WsPayload::ChangeSetBeginApprovalProcess(change_set_pk)).await + } } #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] @@ -280,3 +327,11 @@ pub struct ChangeSetAppliedPayload { change_set_pk: ChangeSetPk, user_pk: Option, } + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ChangeSetMergeVotePayload { + change_set_pk: ChangeSetPk, + user_pk: UserPk, + vote: bool, +} diff --git a/lib/dal/src/queries/change_set/begin_merge_flow.sql b/lib/dal/src/queries/change_set/begin_merge_flow.sql new file mode 100644 index 0000000000..039a765269 --- /dev/null +++ b/lib/dal/src/queries/change_set/begin_merge_flow.sql @@ -0,0 +1,4 @@ +UPDATE change_sets +SET status = $2, updated_at = now() +WHERE pk = $1 +RETURNING updated_at \ No newline at end of file diff --git a/lib/dal/src/ws_event.rs b/lib/dal/src/ws_event.rs index 018cc74917..c1621aca61 100644 --- a/lib/dal/src/ws_event.rs +++ b/lib/dal/src/ws_event.rs @@ -3,7 +3,7 @@ use si_data_nats::NatsError; use si_data_pg::PgError; use thiserror::Error; -use crate::change_set::ChangeSetAppliedPayload; +use crate::change_set::{ChangeSetAppliedPayload, ChangeSetMergeVotePayload}; use crate::component::ComponentCreatedPayload; use crate::pkg::ModuleImported; use crate::{ @@ -44,8 +44,10 @@ pub type WsEventResult = Result; #[allow(clippy::large_enum_variant)] pub enum WsPayload { ChangeSetApplied(ChangeSetAppliedPayload), + ChangeSetBeginApprovalProcess(ChangeSetPk), ChangeSetCanceled(ChangeSetPk), ChangeSetCreated(ChangeSetPk), + ChangeSetMergeVote(ChangeSetMergeVotePayload), ChangeSetWritten(ChangeSetPk), CheckedQualifications(QualificationCheckPayload), CodeGenerated(CodeGeneratedPayload), diff --git a/lib/sdf-server/src/server/service/change_set.rs b/lib/sdf-server/src/server/service/change_set.rs index 679e3edbd7..de4dce6e14 100644 --- a/lib/sdf-server/src/server/service/change_set.rs +++ b/lib/sdf-server/src/server/service/change_set.rs @@ -17,10 +17,12 @@ use crate::{server::state::AppState, service::pkg::PkgError}; pub mod add_action; pub mod apply_change_set; +mod begin_approval_process; pub mod create_change_set; pub mod get_change_set; pub mod get_stats; pub mod list_open_change_sets; +mod merge_vote; pub mod remove_action; pub mod update_selected_change_set; @@ -106,4 +108,9 @@ pub fn routes() -> Router { "/update_selected_change_set", post(update_selected_change_set::update_selected_change_set), ) + .route( + "/being_merge_flow", + post(begin_approval_process::begin_approval_process), + ) + .route("/merge_vote", post(merge_vote::merge_vote)) } diff --git a/lib/sdf-server/src/server/service/change_set/begin_approval_process.rs b/lib/sdf-server/src/server/service/change_set/begin_approval_process.rs new file mode 100644 index 0000000000..0f6345e287 --- /dev/null +++ b/lib/sdf-server/src/server/service/change_set/begin_approval_process.rs @@ -0,0 +1,49 @@ +use crate::server::extract::{AccessBuilder, HandlerContext, PosthogClient}; +use crate::server::tracking::track; +use crate::service::change_set::{ChangeSetError, ChangeSetResult}; +use axum::extract::OriginalUri; +use axum::Json; +use dal::{ChangeSet, Visibility, WsEvent}; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct BeginMergeFlow { + #[serde(flatten)] + pub visibility: Visibility, +} + +pub async fn begin_approval_process( + OriginalUri(original_uri): OriginalUri, + PosthogClient(posthog_client): PosthogClient, + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + Json(request): Json, +) -> ChangeSetResult> { + let mut ctx = builder.build(request_ctx.build(request.visibility)).await?; + + let mut change_set = ChangeSet::get_by_pk(&ctx, &ctx.visibility().change_set_pk) + .await? + .ok_or(ChangeSetError::ChangeSetNotFound)?; + change_set.begin_approval_flow(&mut ctx).await?; + + track( + &posthog_client, + &ctx, + &original_uri, + "begin_approval_process", + serde_json::json!({ + "how": "/change_set/begin_approval_process", + "change_set_pk": ctx.visibility().change_set_pk, + }), + ); + + WsEvent::change_set_begin_approval_process(&ctx, ctx.visibility().change_set_pk) + .await? + .publish_on_commit(&ctx) + .await?; + + ctx.commit().await?; + + Ok(Json(())) +} diff --git a/lib/sdf-server/src/server/service/change_set/merge_vote.rs b/lib/sdf-server/src/server/service/change_set/merge_vote.rs new file mode 100644 index 0000000000..102ecb92ad --- /dev/null +++ b/lib/sdf-server/src/server/service/change_set/merge_vote.rs @@ -0,0 +1,60 @@ +use crate::server::extract::{AccessBuilder, HandlerContext, PosthogClient}; +use crate::server::tracking::track; +use crate::service::change_set::{ChangeSetError, ChangeSetResult}; +use axum::extract::OriginalUri; +use axum::Json; +use dal::{HistoryActor, User, Visibility, WsEvent}; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MergeVoteRequest { + pub approve: bool, + #[serde(flatten)] + pub visibility: Visibility, +} + +pub async fn merge_vote( + OriginalUri(original_uri): OriginalUri, + PosthogClient(posthog_client): PosthogClient, + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + Json(request): Json, +) -> ChangeSetResult> { + let ctx = builder.build(request_ctx.build(request.visibility)).await?; + + let user = match ctx.history_actor() { + HistoryActor::User(user_pk) => User::get_by_pk(&ctx, *user_pk) + .await? + .ok_or(ChangeSetError::InvalidUser(*user_pk))?, + + HistoryActor::SystemInit => return Err(ChangeSetError::InvalidUserSystemInit), + }; + + track( + &posthog_client, + &ctx, + &original_uri, + "merge_vote", + serde_json::json!({ + "how": "/change_set/merge_vote", + "change_set_pk": ctx.visibility().change_set_pk, + "user_pk": user.pk(), + "vote": request.approve, + }), + ); + + WsEvent::change_set_merge_vote( + &ctx, + ctx.visibility().change_set_pk, + user.pk(), + request.approve, + ) + .await? + .publish_on_commit(&ctx) + .await?; + + ctx.commit().await?; + + Ok(Json(())) +}