Skip to content

Commit

Permalink
optional max distance in shortest paths
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Dec 9, 2024
1 parent 774dbd8 commit bd34954
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 11 deletions.
1 change: 1 addition & 0 deletions configs/shortest_paths/coordinator.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
source = "https://www.cdc.gov/healthywater/swimming/"
host = "0.0.0.0:5000"
output_path = "data/shortest_paths"
max_distance = 4

[gossip]
addr = "0.0.0.0:5001"
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ pub struct ShortestPathCoordinatorConfig {
pub gossip: GossipConfig,
pub host: SocketAddr,
pub output_path: String,
pub max_distance: Option<u64>,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
Expand Down
24 changes: 21 additions & 3 deletions crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ impl Setup for ShortestPathSetup {
}

fn setup_round(&self, dht: &Self::DhtTables) {
let meta = dht.meta.get(()).unwrap();

dht.meta.set(
(),
Meta {
round: meta.round + 1,
round_had_changes: false,
},
);
Expand All @@ -92,18 +95,28 @@ impl Setup for ShortestPathSetup {
(),
Meta {
round_had_changes: true,
round: 0,
},
);
}
}

pub struct ShortestPathFinish;
pub struct ShortestPathFinish {
max_distance: Option<u64>,
}

impl Finisher for ShortestPathFinish {
type Job = ShortestPathJob;

fn is_finished(&self, dht: &ShortestPathTables) -> bool {
!dht.meta.get(()).unwrap().round_had_changes
let meta = dht.meta.get(()).unwrap();
if let Some(max_distance) = self.max_distance {
if meta.round >= max_distance {
return true;
}
}

!meta.round_had_changes
}
}

Expand Down Expand Up @@ -202,7 +215,12 @@ pub fn run(config: ShortestPathCoordinatorConfig) -> Result<()> {
tracing::info!("starting {} jobs", jobs.len());

let coordinator = build(&cluster.dht, cluster.workers.clone(), source);
let res = coordinator.run(jobs, ShortestPathFinish)?;
let res = coordinator.run(
jobs,
ShortestPathFinish {
max_distance: config.max_distance,
},
)?;

let output_path = Path::new(&config.output_path);

Expand Down
15 changes: 7 additions & 8 deletions crates/core/src/entrypoint/ampc/shortest_path/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use rustc_hash::FxHashMap;
use super::{
updated_nodes::{UpdatedNodes, UpdatedNodesKind},
worker::ShortestPathWorker,
DhtTable as _, Mapper, Meta, ShortestPathJob, ShortestPathTables,
DhtTable as _, Mapper, ShortestPathJob, ShortestPathTables,
};
use crate::{
ampc::{
Expand Down Expand Up @@ -237,13 +237,12 @@ impl Mapper for ShortestPathMapper {
dht.next()
.changed_nodes
.set(worker.shard(), new_changed_nodes.lock().unwrap().clone());
dht.next().meta.set(
(),
Meta {
round_had_changes: round_had_changes
.load(std::sync::atomic::Ordering::Relaxed),
},
);

let mut meta = dht.next().meta.get(()).unwrap();
meta.round_had_changes =
round_had_changes.load(std::sync::atomic::Ordering::Relaxed);

dht.next().meta.set((), meta);
}
ShortestPathMapper::UpdateChangedNodes => {
let all_changed_nodes: Vec<_> =
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/entrypoint/ampc/shortest_path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use self::worker::{RemoteShortestPathWorker, ShortestPathWorker};
)]
pub struct Meta {
round_had_changes: bool,
round: u64,
}

#[derive(bincode::Encode, bincode::Decode, Debug, Clone)]
Expand Down

0 comments on commit bd34954

Please sign in to comment.