Skip to content

Commit

Permalink
merge: #2839
Browse files Browse the repository at this point in the history
2839: fix(sdf): builtins on transactions, skip dependent values update for builtins r=zacharyhamm a=zacharyhamm

Two changes:
  1. Install each builtin on its own Postgres connection so they run
     truly concurrently.
  2. Skip dependent values updates on builtin installation for
     near-instant builtin creation.

I can't figure out a reason to execute dependent value update jobs on
builtin creation. If there is a value that needs to exist on an
attribute for a schema variant in the default context, we can
pre-execute the function during authoring to produce a default value for
that attribute, or explicitly set the default value.  Otherwise, every
value computed by a function will be properly computed when the
component is created, and this is the context where we care about
computing the most up to date values in any case.

This makes builtin installation take about 30 seconds, which is a huge
win to me versus any desire we might have to execute the dependent value
update jobs for the schema variants themselves, before a component is
created.


Co-authored-by: Zachary Hamm <[email protected]>
  • Loading branch information
si-bors-ng[bot] and zacharyhamm authored Oct 10, 2023
2 parents ccfc9fe + 809d522 commit e905b40
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 39 deletions.
16 changes: 9 additions & 7 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ impl AttributeValue {
// TODO(fnichol): we might want to fire off a status even at this point, however we've
// already updated the initial attribute value, so is there much value?

if propagate_dependent_values {
if propagate_dependent_values && !ctx.no_dependent_values() {
ctx.enqueue_job(DependentValuesUpdate::new(
ctx.access_builder(),
*ctx.visibility(),
Expand Down Expand Up @@ -857,12 +857,14 @@ impl AttributeValue {

let new_attribute_value_id: AttributeValueId = row.try_get("new_attribute_value_id")?;

ctx.enqueue_job(DependentValuesUpdate::new(
ctx.access_builder(),
*ctx.visibility(),
vec![new_attribute_value_id],
))
.await?;
if !ctx.no_dependent_values() {
ctx.enqueue_job(DependentValuesUpdate::new(
ctx.access_builder(),
*ctx.visibility(),
vec![new_attribute_value_id],
))
.await?;
}

Ok(new_attribute_value_id)
}
Expand Down
19 changes: 19 additions & 0 deletions lib/dal/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl ServicesContext {
DalContextBuilder {
services_context: self,
blocking,
no_dependent_values: false,
}
}

Expand Down Expand Up @@ -202,6 +203,9 @@ pub struct DalContext {
/// This is useful to ensure child jobs of blocking jobs also block so there is no race-condition in the DAL.
/// And also for SDF routes to block the HTTP request until the jobs get executed, so SDF tests don't race.
blocking: bool,
/// Determines if we should not enqueue dependent value update jobs for attribute updates in
/// this context
no_dependent_values: bool,
}

impl DalContext {
Expand All @@ -211,6 +215,7 @@ impl DalContext {
DalContextBuilder {
services_context,
blocking,
no_dependent_values: false,
}
}

Expand All @@ -230,6 +235,10 @@ impl DalContext {
self.blocking
}

pub fn no_dependent_values(&self) -> bool {
self.no_dependent_values
}

pub fn services_context(&self) -> ServicesContext {
self.services_context.clone()
}
Expand Down Expand Up @@ -566,6 +575,9 @@ pub struct DalContextBuilder {
/// This is useful to ensure child jobs of blocking jobs also block so there is no race-condition in the DAL.
/// And also for SDF routes to block the HTTP request until the jobs get executed, so SDF tests don't race.
blocking: bool,
/// Determines if we should not enqueue dependent value update jobs for attribute value
/// changes.
no_dependent_values: bool,
}

impl DalContextBuilder {
Expand All @@ -579,6 +591,7 @@ impl DalContextBuilder {
tenancy: Tenancy::new_empty(),
visibility: Visibility::new_head(false),
history_actor: HistoryActor::SystemInit,
no_dependent_values: self.no_dependent_values,
})
}

Expand All @@ -595,6 +608,7 @@ impl DalContextBuilder {
tenancy: access_builder.tenancy,
history_actor: access_builder.history_actor,
visibility: Visibility::new_head(false),
no_dependent_values: self.no_dependent_values,
})
}

Expand All @@ -611,6 +625,7 @@ impl DalContextBuilder {
tenancy: request_context.tenancy,
visibility: request_context.visibility,
history_actor: request_context.history_actor,
no_dependent_values: self.no_dependent_values,
})
}

Expand Down Expand Up @@ -647,6 +662,10 @@ impl DalContextBuilder {
pub fn set_blocking(&mut self) {
self.blocking = true;
}

pub fn set_no_dependent_values(&mut self) {
self.no_dependent_values = true;
}
}

#[remain::sorted]
Expand Down
8 changes: 1 addition & 7 deletions lib/dal/src/pkg/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,13 +1079,7 @@ async fn import_socket(
)
.await?;
}
(Some(func_unique_id), _, Some(_)) => {
dbg!(
"Input socket that is set by a function?",
func_unique_id,
socket_spec.inputs()?
);
}
(Some(_), _, Some(_)) => {}
_ => {}
}

Expand Down
61 changes: 36 additions & 25 deletions lib/sdf-server/src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tokio::{
io::{AsyncRead, AsyncWrite},
signal,
sync::{broadcast, mpsc, oneshot},
task::{JoinError, JoinSet},
time,
};
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
Expand All @@ -55,6 +56,8 @@ pub enum ServerError {
Hyper(#[from] hyper::Error),
#[error("error initializing the server")]
Init,
#[error(transparent)]
Join(#[from] JoinError),
#[error("jwt secret key error")]
JwtSecretKey(#[from] dal::jwt_key::JwtKeyError),
#[error(transparent)]
Expand All @@ -79,7 +82,7 @@ pub enum ServerError {
SiPkg(#[from] SiPkgError),
#[error(transparent)]
StatusReceiver(#[from] StatusReceiverError),
#[error("transactions error")]
#[error("transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error(transparent)]
Uds(#[from] UdsIncomingStreamError),
Expand Down Expand Up @@ -434,31 +437,37 @@ async fn install_builtins(
let dal = &ctx;
let client = &module_index_client.clone();
let modules = module_list.modules;
let handles: Vec<_> = modules
.iter()
.map(|item| {
let dal = dal.clone();
let client = client.clone();
let item = item.clone();
tokio::spawn(async move {
install_builtin(&dal.clone(), &item, &client)
.await
.unwrap_or_else(|e| {
eprintln!("Error in install_builtin: {}", e);
})
})
})
.collect();
let total = modules.len();

let mut join_set = JoinSet::new();
for module in modules {
let module = module.clone();
let client = client.clone();
let mut context_builder = ctx.services_context().into_builder(false);
context_builder.set_no_dependent_values();
let mut ctx = context_builder.build_default().await?;
let workspace = Workspace::builtin(&ctx).await?;
ctx.update_tenancy(Tenancy::new(*workspace.pk()));
join_set.spawn(async move {
(
module.name.to_owned(),
install_builtin(&ctx, &module, &client).await,
)
});
}

for handle in handles {
match handle.await {
let mut count: usize = 0;
while let Some(res) = join_set.join_next().await {
let (pkg_name, res) = res?;
match res {
Ok(()) => {
println!("Pkg Install finished successfully");
count += 1;
println!(
"Pkg {pkg_name} Install finished successfully. {count} of {total} installed.",
);
}
Err(_err) => {
println!("Pkg Install failed");
let _ = Err::<Result<()>, ServerError>(ServerError::PkgInstall)
.expect("TODO: panic message");
Err(err) => {
println!("Pkg {pkg_name} Install failed, {err}");
}
}
}
Expand All @@ -478,8 +487,7 @@ async fn install_builtin(
.await?;

let pkg = SiPkg::load_from_bytes(module)?;
let pkg_name = pkg.metadata()?.name().to_owned();
println!("Installing Pkg {}", pkg_name);

import_pkg_from_pkg(
ctx,
&pkg,
Expand All @@ -491,6 +499,9 @@ async fn install_builtin(
}),
)
.await?;

ctx.commit().await?;

Ok(())
}

Expand Down

0 comments on commit e905b40

Please sign in to comment.