Skip to content

Commit

Permalink
merge: #2957
Browse files Browse the repository at this point in the history
2957: feat(sdf, dal): Add ability to start a ChangeSet Approval Flow via WSEvents r=stack72 a=stack72



Co-authored-by: stack72 <[email protected]>
  • Loading branch information
si-bors-ng[bot] and stack72 authored Nov 16, 2023
2 parents fda500b + 9930a62 commit 4225da4
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 4 deletions.
61 changes: 58 additions & 3 deletions lib/dal/src/change_set.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use chrono::{DateTime, Utc};
use postgres_types::ToSql;
use serde::{Deserialize, Serialize};
use si_data_nats::NatsError;
use si_data_pg::PgError;
use si_data_pg::{PgError, PgPoolError};
use strum::{Display, EnumString};
use telemetry::prelude::*;
use thiserror::Error;

use crate::standard_model::{object_option_from_row_option, objects_from_rows};
use crate::ws_event::{WsEvent, WsEventError, WsPayload};
use crate::{
pk, Action, ActionError, HistoryActor, HistoryEvent, HistoryEventError, LabelListError,
StandardModelError, Tenancy, Timestamp, TransactionsError, User, UserError, UserPk, Visibility,
WsEvent, WsEventError, WsPayload,
};
use crate::{ComponentError, DalContext, WsEventResult};

const CHANGE_SET_OPEN_LIST: &str = include_str!("queries/change_set/open_list.sql");
const CHANGE_SET_GET_BY_PK: &str = include_str!("queries/change_set/get_by_pk.sql");
const GET_ACTORS: &str = include_str!("queries/change_set/get_actors.sql");

const BEGIN_MERGE_FLOW: &str = include_str!("queries/change_set/begin_merge_flow.sql");

#[remain::sorted]
#[derive(Error, Debug)]
pub enum ChangeSetError {
Expand All @@ -38,6 +41,8 @@ pub enum ChangeSetError {
#[error(transparent)]
Pg(#[from] PgError),
#[error(transparent)]
PgPool(#[from] PgPoolError),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error(transparent)]
StandardModel(#[from] StandardModelError),
Expand All @@ -52,12 +57,13 @@ pub enum ChangeSetError {
pub type ChangeSetResult<T> = Result<T, ChangeSetError>;

#[remain::sorted]
#[derive(Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Clone)]
#[derive(Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Clone, ToSql)]
pub enum ChangeSetStatus {
Abandoned,
Applied,
Closed,
Failed,
NeedsApproval,
Open,
}

Expand Down Expand Up @@ -113,6 +119,23 @@ impl ChangeSet {
Utc::now().format("%Y-%m-%d-%H:%M").to_string()
}

pub async fn begin_approval_flow(&mut self, ctx: &mut DalContext) -> ChangeSetResult<()> {
let row = ctx
.pg_pool()
.get()
.await?
.query_one(
BEGIN_MERGE_FLOW,
&[&self.pk, &ChangeSetStatus::NeedsApproval],
)
.await?;
let updated_at: DateTime<Utc> = row.try_get("timestamp_updated_at")?;
self.timestamp.updated_at = updated_at;
self.status = ChangeSetStatus::NeedsApproval;

Ok(())
}

#[instrument(skip(ctx))]
pub async fn apply(&mut self, ctx: &mut DalContext) -> ChangeSetResult<()> {
let actor = serde_json::to_value(ctx.history_actor())?;
Expand Down Expand Up @@ -272,6 +295,30 @@ impl WsEvent {
)
.await
}

pub async fn change_set_merge_vote(
ctx: &DalContext,
change_set_pk: ChangeSetPk,
user_pk: UserPk,
vote: bool,
) -> WsEventResult<Self> {
WsEvent::new(
ctx,
WsPayload::ChangeSetMergeVote(ChangeSetMergeVotePayload {
change_set_pk,
user_pk,
vote,
}),
)
.await
}

pub async fn change_set_begin_approval_process(
ctx: &DalContext,
change_set_pk: ChangeSetPk,
) -> WsEventResult<Self> {
WsEvent::new(ctx, WsPayload::ChangeSetBeginApprovalProcess(change_set_pk)).await
}
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)]
Expand All @@ -280,3 +327,11 @@ pub struct ChangeSetAppliedPayload {
change_set_pk: ChangeSetPk,
user_pk: Option<UserPk>,
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct ChangeSetMergeVotePayload {
change_set_pk: ChangeSetPk,
user_pk: UserPk,
vote: bool,
}
4 changes: 4 additions & 0 deletions lib/dal/src/queries/change_set/begin_merge_flow.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
UPDATE change_sets
SET status = $2, updated_at = now()
WHERE pk = $1
RETURNING updated_at
4 changes: 3 additions & 1 deletion lib/dal/src/ws_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use si_data_nats::NatsError;
use si_data_pg::PgError;
use thiserror::Error;

use crate::change_set::ChangeSetAppliedPayload;
use crate::change_set::{ChangeSetAppliedPayload, ChangeSetMergeVotePayload};
use crate::component::ComponentCreatedPayload;
use crate::pkg::ModuleImported;
use crate::{
Expand Down Expand Up @@ -44,8 +44,10 @@ pub type WsEventResult<T> = Result<T, WsEventError>;
#[allow(clippy::large_enum_variant)]
pub enum WsPayload {
ChangeSetApplied(ChangeSetAppliedPayload),
ChangeSetBeginApprovalProcess(ChangeSetPk),
ChangeSetCanceled(ChangeSetPk),
ChangeSetCreated(ChangeSetPk),
ChangeSetMergeVote(ChangeSetMergeVotePayload),
ChangeSetWritten(ChangeSetPk),
CheckedQualifications(QualificationCheckPayload),
CodeGenerated(CodeGeneratedPayload),
Expand Down
7 changes: 7 additions & 0 deletions lib/sdf-server/src/server/service/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use crate::{server::state::AppState, service::pkg::PkgError};

pub mod add_action;
pub mod apply_change_set;
mod begin_approval_process;
pub mod create_change_set;
pub mod get_change_set;
pub mod get_stats;
pub mod list_open_change_sets;
mod merge_vote;
pub mod remove_action;
pub mod update_selected_change_set;

Expand Down Expand Up @@ -106,4 +108,9 @@ pub fn routes() -> Router<AppState> {
"/update_selected_change_set",
post(update_selected_change_set::update_selected_change_set),
)
.route(
"/being_merge_flow",
post(begin_approval_process::begin_approval_process),
)
.route("/merge_vote", post(merge_vote::merge_vote))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::server::extract::{AccessBuilder, HandlerContext, PosthogClient};
use crate::server::tracking::track;
use crate::service::change_set::{ChangeSetError, ChangeSetResult};
use axum::extract::OriginalUri;
use axum::Json;
use dal::{ChangeSet, Visibility, WsEvent};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct BeginMergeFlow {
#[serde(flatten)]
pub visibility: Visibility,
}

pub async fn begin_approval_process(
OriginalUri(original_uri): OriginalUri,
PosthogClient(posthog_client): PosthogClient,
HandlerContext(builder): HandlerContext,
AccessBuilder(request_ctx): AccessBuilder,
Json(request): Json<BeginMergeFlow>,
) -> ChangeSetResult<Json<()>> {
let mut ctx = builder.build(request_ctx.build(request.visibility)).await?;

let mut change_set = ChangeSet::get_by_pk(&ctx, &ctx.visibility().change_set_pk)
.await?
.ok_or(ChangeSetError::ChangeSetNotFound)?;
change_set.begin_approval_flow(&mut ctx).await?;

track(
&posthog_client,
&ctx,
&original_uri,
"begin_approval_process",
serde_json::json!({
"how": "/change_set/begin_approval_process",
"change_set_pk": ctx.visibility().change_set_pk,
}),
);

WsEvent::change_set_begin_approval_process(&ctx, ctx.visibility().change_set_pk)
.await?
.publish_on_commit(&ctx)
.await?;

ctx.commit().await?;

Ok(Json(()))
}
60 changes: 60 additions & 0 deletions lib/sdf-server/src/server/service/change_set/merge_vote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::server::extract::{AccessBuilder, HandlerContext, PosthogClient};
use crate::server::tracking::track;
use crate::service::change_set::{ChangeSetError, ChangeSetResult};
use axum::extract::OriginalUri;
use axum::Json;
use dal::{HistoryActor, User, Visibility, WsEvent};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MergeVoteRequest {
pub approve: bool,
#[serde(flatten)]
pub visibility: Visibility,
}

pub async fn merge_vote(
OriginalUri(original_uri): OriginalUri,
PosthogClient(posthog_client): PosthogClient,
HandlerContext(builder): HandlerContext,
AccessBuilder(request_ctx): AccessBuilder,
Json(request): Json<MergeVoteRequest>,
) -> ChangeSetResult<Json<()>> {
let ctx = builder.build(request_ctx.build(request.visibility)).await?;

let user = match ctx.history_actor() {
HistoryActor::User(user_pk) => User::get_by_pk(&ctx, *user_pk)
.await?
.ok_or(ChangeSetError::InvalidUser(*user_pk))?,

HistoryActor::SystemInit => return Err(ChangeSetError::InvalidUserSystemInit),
};

track(
&posthog_client,
&ctx,
&original_uri,
"merge_vote",
serde_json::json!({
"how": "/change_set/merge_vote",
"change_set_pk": ctx.visibility().change_set_pk,
"user_pk": user.pk(),
"vote": request.approve,
}),
);

WsEvent::change_set_merge_vote(
&ctx,
ctx.visibility().change_set_pk,
user.pk(),
request.approve,
)
.await?
.publish_on_commit(&ctx)
.await?;

ctx.commit().await?;

Ok(Json(()))
}

0 comments on commit 4225da4

Please sign in to comment.