diff --git a/crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs b/crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs index 43da0e2c..51f3cc7a 100644 --- a/crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs +++ b/crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs @@ -28,6 +28,7 @@ use crate::ampc::{Coordinator, DefaultDhtTable, DhtConn}; use crate::config::ShortestPathCoordinatorConfig; use crate::distributed::cluster::Cluster; use crate::distributed::member::{Member, Service, ShardId}; +use crate::hyperloglog::HyperLogLog; use crate::webpage::url_ext::UrlExt; use crate::{webgraph, Result}; @@ -174,6 +175,21 @@ pub fn run(config: ShortestPathCoordinatorConfig) -> Result<()> { .build()? .block_on(setup_gossip(tokio_conf))?; + let sketch = cluster + .workers + .iter() + .map(|worker| worker.get_node_sketch()) + .fold(HyperLogLog::default(), |mut acc, sketch| { + acc.merge(&sketch); + acc + }); + + let num_nodes = sketch.size() as u64; + + for worker in cluster.workers.iter() { + worker.update_changed_nodes_precision(num_nodes); + } + let jobs: Vec<_> = cluster .workers .iter() diff --git a/crates/core/src/entrypoint/ampc/shortest_path/worker.rs b/crates/core/src/entrypoint/ampc/shortest_path/worker.rs index e097b474..b822130a 100644 --- a/crates/core/src/entrypoint/ampc/shortest_path/worker.rs +++ b/crates/core/src/entrypoint/ampc/shortest_path/worker.rs @@ -75,6 +75,11 @@ impl ShortestPathWorker { pub fn nodes_sketch(&self) -> &HyperLogLog<4096> { &self.nodes_sketch } + + pub fn update_changed_nodes_precision(&self, num_nodes: u64) { + let mut changed_nodes = self.changed_nodes().lock().unwrap(); + *changed_nodes = U64BloomFilter::new(num_nodes, 0.01); + } } #[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)] @@ -88,6 +93,17 @@ impl Message for GetNodeSketch { } } +#[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)] +pub struct UpdateChangedNodesPrecision(u64); + +impl Message for UpdateChangedNodesPrecision { + type Response = (); + + fn handle(self, worker: &ShortestPathWorker) -> Self::Response { + worker.update_changed_nodes_precision(self.0); + } +} + #[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)] pub struct BatchId2Node(Vec); @@ -109,7 +125,7 @@ impl Message for BatchId2Node { } } -impl_worker!(ShortestPathJob, RemoteShortestPathWorker => ShortestPathWorker, [BatchId2Node, GetNodeSketch]); +impl_worker!(ShortestPathJob, RemoteShortestPathWorker => ShortestPathWorker, [BatchId2Node, GetNodeSketch, UpdateChangedNodesPrecision]); #[derive(Clone)] pub struct RemoteShortestPathWorker { @@ -143,6 +159,10 @@ impl RemoteShortestPathWorker { pub fn get_node_sketch(&self) -> HyperLogLog<4096> { self.send(GetNodeSketch) } + + pub fn update_changed_nodes_precision(&self, num_nodes: u64) { + self.send(UpdateChangedNodesPrecision(num_nodes)); + } } impl RemoteWorker for RemoteShortestPathWorker {