From 809d52265868a811450e024716f093663582ba33 Mon Sep 17 00:00:00 2001 From: Zachary Hamm Date: Mon, 9 Oct 2023 16:13:12 -0500 Subject: [PATCH] fix(sdf): per builtin transaction, no dependent update for builtins Two changes: 1. Install each builtin on its own Postgres connection so they run truly concurrently. 2. Skip dependent values updates on builtin installation for near-instant builtin creation. I can't figure out a reason to execute dependent value update jobs on builtin creation. If there is a value that needs to exist on an attribute for a schema variant in the default context, we can pre-execute the function during authoring to produce a default value for that attribute, or explicitly set the default value. Otherwise, every value computed by a function will be properly computed when the component is created, and this is the context where we care about computing the most up to date values in any case. This makes builtin installation take about 30 seconds, which is a huge win to me versus any desire we might have to execute the dependent value update jobs for the schema variants themselves, before a component is created. --- lib/dal/src/attribute/value.rs | 16 ++++---- lib/dal/src/context.rs | 19 +++++++++ lib/dal/src/pkg/import.rs | 8 +--- lib/sdf-server/src/server/server.rs | 61 +++++++++++++++++------------ 4 files changed, 65 insertions(+), 39 deletions(-) diff --git a/lib/dal/src/attribute/value.rs b/lib/dal/src/attribute/value.rs index e0a4b04d9d..c3e3f0f7ea 100644 --- a/lib/dal/src/attribute/value.rs +++ b/lib/dal/src/attribute/value.rs @@ -776,7 +776,7 @@ impl AttributeValue { // TODO(fnichol): we might want to fire off a status even at this point, however we've // already updated the initial attribute value, so is there much value? - if propagate_dependent_values { + if propagate_dependent_values && !ctx.no_dependent_values() { ctx.enqueue_job(DependentValuesUpdate::new( ctx.access_builder(), *ctx.visibility(), @@ -857,12 +857,14 @@ impl AttributeValue { let new_attribute_value_id: AttributeValueId = row.try_get("new_attribute_value_id")?; - ctx.enqueue_job(DependentValuesUpdate::new( - ctx.access_builder(), - *ctx.visibility(), - vec![new_attribute_value_id], - )) - .await?; + if !ctx.no_dependent_values() { + ctx.enqueue_job(DependentValuesUpdate::new( + ctx.access_builder(), + *ctx.visibility(), + vec![new_attribute_value_id], + )) + .await?; + } Ok(new_attribute_value_id) } diff --git a/lib/dal/src/context.rs b/lib/dal/src/context.rs index e280c1e804..d2a3b6673d 100644 --- a/lib/dal/src/context.rs +++ b/lib/dal/src/context.rs @@ -66,6 +66,7 @@ impl ServicesContext { DalContextBuilder { services_context: self, blocking, + no_dependent_values: false, } } @@ -202,6 +203,9 @@ pub struct DalContext { /// This is useful to ensure child jobs of blocking jobs also block so there is no race-condition in the DAL. /// And also for SDF routes to block the HTTP request until the jobs get executed, so SDF tests don't race. blocking: bool, + /// Determines if we should not enqueue dependent value update jobs for attribute updates in + /// this context + no_dependent_values: bool, } impl DalContext { @@ -211,6 +215,7 @@ impl DalContext { DalContextBuilder { services_context, blocking, + no_dependent_values: false, } } @@ -230,6 +235,10 @@ impl DalContext { self.blocking } + pub fn no_dependent_values(&self) -> bool { + self.no_dependent_values + } + pub fn services_context(&self) -> ServicesContext { self.services_context.clone() } @@ -566,6 +575,9 @@ pub struct DalContextBuilder { /// This is useful to ensure child jobs of blocking jobs also block so there is no race-condition in the DAL. /// And also for SDF routes to block the HTTP request until the jobs get executed, so SDF tests don't race. blocking: bool, + /// Determines if we should not enqueue dependent value update jobs for attribute value + /// changes. + no_dependent_values: bool, } impl DalContextBuilder { @@ -579,6 +591,7 @@ impl DalContextBuilder { tenancy: Tenancy::new_empty(), visibility: Visibility::new_head(false), history_actor: HistoryActor::SystemInit, + no_dependent_values: self.no_dependent_values, }) } @@ -595,6 +608,7 @@ impl DalContextBuilder { tenancy: access_builder.tenancy, history_actor: access_builder.history_actor, visibility: Visibility::new_head(false), + no_dependent_values: self.no_dependent_values, }) } @@ -611,6 +625,7 @@ impl DalContextBuilder { tenancy: request_context.tenancy, visibility: request_context.visibility, history_actor: request_context.history_actor, + no_dependent_values: self.no_dependent_values, }) } @@ -647,6 +662,10 @@ impl DalContextBuilder { pub fn set_blocking(&mut self) { self.blocking = true; } + + pub fn set_no_dependent_values(&mut self) { + self.no_dependent_values = true; + } } #[remain::sorted] diff --git a/lib/dal/src/pkg/import.rs b/lib/dal/src/pkg/import.rs index 4e2d5864a3..ab1322cd75 100644 --- a/lib/dal/src/pkg/import.rs +++ b/lib/dal/src/pkg/import.rs @@ -1079,13 +1079,7 @@ async fn import_socket( ) .await?; } - (Some(func_unique_id), _, Some(_)) => { - dbg!( - "Input socket that is set by a function?", - func_unique_id, - socket_spec.inputs()? - ); - } + (Some(_), _, Some(_)) => {} _ => {} } diff --git a/lib/sdf-server/src/server/server.rs b/lib/sdf-server/src/server/server.rs index 575292f413..50052376a8 100644 --- a/lib/sdf-server/src/server/server.rs +++ b/lib/sdf-server/src/server/server.rs @@ -29,6 +29,7 @@ use tokio::{ io::{AsyncRead, AsyncWrite}, signal, sync::{broadcast, mpsc, oneshot}, + task::{JoinError, JoinSet}, time, }; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; @@ -55,6 +56,8 @@ pub enum ServerError { Hyper(#[from] hyper::Error), #[error("error initializing the server")] Init, + #[error(transparent)] + Join(#[from] JoinError), #[error("jwt secret key error")] JwtSecretKey(#[from] dal::jwt_key::JwtKeyError), #[error(transparent)] @@ -79,7 +82,7 @@ pub enum ServerError { SiPkg(#[from] SiPkgError), #[error(transparent)] StatusReceiver(#[from] StatusReceiverError), - #[error("transactions error")] + #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error(transparent)] Uds(#[from] UdsIncomingStreamError), @@ -434,31 +437,37 @@ async fn install_builtins( let dal = &ctx; let client = &module_index_client.clone(); let modules = module_list.modules; - let handles: Vec<_> = modules - .iter() - .map(|item| { - let dal = dal.clone(); - let client = client.clone(); - let item = item.clone(); - tokio::spawn(async move { - install_builtin(&dal.clone(), &item, &client) - .await - .unwrap_or_else(|e| { - eprintln!("Error in install_builtin: {}", e); - }) - }) - }) - .collect(); + let total = modules.len(); + + let mut join_set = JoinSet::new(); + for module in modules { + let module = module.clone(); + let client = client.clone(); + let mut context_builder = ctx.services_context().into_builder(false); + context_builder.set_no_dependent_values(); + let mut ctx = context_builder.build_default().await?; + let workspace = Workspace::builtin(&ctx).await?; + ctx.update_tenancy(Tenancy::new(*workspace.pk())); + join_set.spawn(async move { + ( + module.name.to_owned(), + install_builtin(&ctx, &module, &client).await, + ) + }); + } - for handle in handles { - match handle.await { + let mut count: usize = 0; + while let Some(res) = join_set.join_next().await { + let (pkg_name, res) = res?; + match res { Ok(()) => { - println!("Pkg Install finished successfully"); + count += 1; + println!( + "Pkg {pkg_name} Install finished successfully. {count} of {total} installed.", + ); } - Err(_err) => { - println!("Pkg Install failed"); - let _ = Err::, ServerError>(ServerError::PkgInstall) - .expect("TODO: panic message"); + Err(err) => { + println!("Pkg {pkg_name} Install failed, {err}"); } } } @@ -478,8 +487,7 @@ async fn install_builtin( .await?; let pkg = SiPkg::load_from_bytes(module)?; - let pkg_name = pkg.metadata()?.name().to_owned(); - println!("Installing Pkg {}", pkg_name); + import_pkg_from_pkg( ctx, &pkg, @@ -491,6 +499,9 @@ async fn install_builtin( }), ) .await?; + + ctx.commit().await?; + Ok(()) }