Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(daemon): limit number of git children #9572

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ lazy_static = "1.4.0"
merge = "0.1.0"
mime = "0.3.16"
notify = "6.1.1"
num_cpus = "1.15.0"
once_cell = "1.17.1"
owo-colors = "3.5.0"
parking_lot = "0.12.1"
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = { version = "0.3.26" }
itertools = { workspace = true }
nibble_vec = "0.1.0"
notify = { workspace = true }
num_cpus = { workspace = true }
radix_trie = { workspace = true }
thiserror = "1.0.38"
tokio = { workspace = true, features = ["full", "time"] }
Expand Down
23 changes: 18 additions & 5 deletions crates/turborepo-filewatch/src/hash_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
debouncer::Debouncer,
globwatcher::{GlobError, GlobSet},
package_watcher::DiscoveryData,
scm_resource::SCMResource,
NotifyError, OptionalWatch,
};

Expand Down Expand Up @@ -155,7 +156,7 @@ struct Subscriber {
repo_root: AbsoluteSystemPathBuf,
package_discovery: watch::Receiver<Option<DiscoveryData>>,
query_rx: mpsc::Receiver<Query>,
scm: SCM,
scm: SCMResource,
next_version: AtomicUsize,
}

Expand Down Expand Up @@ -327,7 +328,7 @@ impl Subscriber {
Self {
repo_root,
package_discovery,
scm,
scm: SCMResource::new(scm),
query_rx,
next_version: AtomicUsize::new(0),
}
Expand Down Expand Up @@ -532,19 +533,31 @@ impl Subscriber {
let debouncer_copy = debouncer.clone();
tokio::task::spawn(async move {
debouncer_copy.debounce().await;
let scm_permit = scm.acquire_scm().await;
// We awkwardly copy the actual SCM instance since we're sending it to a
// different thread which requires it be 'static.
let scm_instance = scm_permit.clone();
// Package hashing involves blocking IO calls, so run on a blocking thread.
tokio::task::spawn_blocking(move || {
let blocking_handle = tokio::task::spawn_blocking(move || {
let telemetry = None;
let inputs = spec.inputs.as_inputs();
let result =
scm.get_package_file_hashes(&repo_root, &spec.package_path, &inputs, telemetry);
let result = scm_instance.get_package_file_hashes(
&repo_root,
&spec.package_path,
&inputs,
telemetry,
);
trace!("hashing complete for {:?}", spec);
let _ = tx.blocking_send(HashUpdate {
spec,
version,
result,
});
});
// We wait for the git task to finish so `scm_permit` only gets dropped once the
// resource is no longer being used.
// We should not shut down if a SCM task panics
blocking_handle.await.ok();
});
(version, debouncer)
}
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub mod globwatcher;
pub mod hash_watcher;
mod optional_watch;
pub mod package_watcher;
mod scm_resource;

pub use optional_watch::OptionalWatch;

Expand Down
78 changes: 78 additions & 0 deletions crates/turborepo-filewatch/src/scm_resource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::{ops::Deref, sync::Arc};

use tokio::sync::{Semaphore, SemaphorePermit};
use turborepo_scm::SCM;

#[derive(Debug, Clone)]
pub struct SCMResource {
scm: SCM,
semaphore: Arc<Semaphore>,
}

pub struct SCMPermit<'a> {
scm: &'a SCM,
_permit: SemaphorePermit<'a>,
}

impl SCMResource {
pub fn new(scm: SCM) -> Self {
// We want to only take at most NUM_CPUS - 3 for git processes.
// Accounting for the `turbo` process itself and the daemon this leaves one core
// available for the rest of the system.
let num_permits = num_cpus::get().saturating_sub(3).max(1);
Self::new_with_permits(scm, num_permits)
}

fn new_with_permits(scm: SCM, num_permits: usize) -> Self {
let semaphore = Arc::new(Semaphore::new(num_permits));
Self { scm, semaphore }
}

pub async fn acquire_scm(&self) -> SCMPermit {
let _permit = self
.semaphore
.acquire()
.await
.expect("semaphore should not be closed");
SCMPermit {
scm: &self.scm,
_permit,
}
}
}

impl<'a> Deref for SCMPermit<'a> {
type Target = SCM;

fn deref(&self) -> &Self::Target {
self.scm
}
}

#[cfg(test)]
mod test {
use tokio::sync::oneshot;

use super::*;

#[tokio::test]
async fn test_limits_access() {
let scm = SCMResource::new_with_permits(SCM::Manual, 1);
let scm_copy = scm.clone();
let permit_1 = scm.acquire_scm().await;
let (other_tx, mut other_rx) = oneshot::channel();
tokio::task::spawn(async move {
let _permit_2 = scm_copy.acquire_scm().await;
other_tx.send(()).ok();
});
assert!(
other_rx.try_recv().is_err(),
"other should not have gotten a scm permit"
);
drop(permit_1);
assert!(
other_rx.await.is_ok(),
"other should have gotten permit and exited"
);
}
}
2 changes: 1 addition & 1 deletion crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ merge = { workspace = true }
miette = { workspace = true, features = ["fancy"] }
nix = "0.26.2"
notify = { workspace = true }
num_cpus = "1.15.0"
num_cpus = { workspace = true }
owo-colors = { workspace = true }
path-clean = "1.0.1"
petgraph = { workspace = true }
Expand Down
Loading