From fe10bc02e74d38e463009042cececf311221cd9d Mon Sep 17 00:00:00 2001 From: Mikkel Denker Date: Wed, 4 Dec 2024 14:12:52 +0100 Subject: [PATCH] document ampc framework --- crates/core/src/ampc/coordinator.rs | 2 ++ crates/core/src/ampc/dht/mod.rs | 2 +- crates/core/src/ampc/dht/store.rs | 1 + crates/core/src/ampc/finisher.rs | 2 ++ crates/core/src/ampc/mapper.rs | 1 + crates/core/src/ampc/mod.rs | 27 +++++++++++++++++++++++++++ crates/core/src/ampc/setup.rs | 12 ++++++++++++ crates/core/src/ampc/worker.rs | 2 ++ 8 files changed, 48 insertions(+), 1 deletion(-) diff --git a/crates/core/src/ampc/coordinator.rs b/crates/core/src/ampc/coordinator.rs index ad1b6b4a..37f6a1c3 100644 --- a/crates/core/src/ampc/coordinator.rs +++ b/crates/core/src/ampc/coordinator.rs @@ -24,6 +24,8 @@ use super::{DhtConn, Finisher, Job, JobScheduled, RemoteWorker, Setup, Worker, W use crate::{distributed::retry_strategy::ExponentialBackoff, Result}; use anyhow::anyhow; +/// A coordinator is responsible for scheduling jobs on workers and coordinating +/// between rounds of computation. pub struct Coordinator where J: Job, diff --git a/crates/core/src/ampc/dht/mod.rs b/crates/core/src/ampc/dht/mod.rs index e3910068..5dc6f0e8 100644 --- a/crates/core/src/ampc/dht/mod.rs +++ b/crates/core/src/ampc/dht/mod.rs @@ -21,7 +21,7 @@ //! with multiple shards. Each shard cluster //! is a Raft cluster, and each key is then routed to the correct //! cluster based on hash(key) % number_of_shards. The keys -//! are currently *not* rebalanced if the number of shards change, so +//! are currently *not* re-balanced if the number of shards change, so //! if an entire shard becomes unavailable or a new shard is added, all //! keys in the entire DHT is essentially lost as the //! keys might hash incorrectly. diff --git a/crates/core/src/ampc/dht/store.rs b/crates/core/src/ampc/dht/store.rs index 6457e8e7..cc41a150 100644 --- a/crates/core/src/ampc/dht/store.rs +++ b/crates/core/src/ampc/dht/store.rs @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see + use std::collections::BTreeMap; use std::fmt::Debug; use std::io::Cursor; diff --git a/crates/core/src/ampc/finisher.rs b/crates/core/src/ampc/finisher.rs index 72a9dd92..0fd83c24 100644 --- a/crates/core/src/ampc/finisher.rs +++ b/crates/core/src/ampc/finisher.rs @@ -16,6 +16,8 @@ use super::prelude::Job; +/// A finisher is responsible for determining if the computation is finished +/// or if another round of computation is needed. pub trait Finisher { type Job: Job; diff --git a/crates/core/src/ampc/mapper.rs b/crates/core/src/ampc/mapper.rs index 6150cb1b..1de4a166 100644 --- a/crates/core/src/ampc/mapper.rs +++ b/crates/core/src/ampc/mapper.rs @@ -16,6 +16,7 @@ use super::{prelude::Job, DhtConn}; +/// A mapper is the specific computation to be run on the graph. pub trait Mapper: bincode::Encode + bincode::Decode + Send + Sync + Clone { type Job: Job; diff --git a/crates/core/src/ampc/mod.rs b/crates/core/src/ampc/mod.rs index 006377d7..c16ce05a 100644 --- a/crates/core/src/ampc/mod.rs +++ b/crates/core/src/ampc/mod.rs @@ -14,6 +14,33 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! # Framework for Adaptive Massively Parallel Computation (AMPC). +//! +//! AMPC is a system for implementing large-scale distributed graph algorithms efficiently. +//! It provides a framework for parallel computation across clusters of machines. +//! +//! While similar in concept to MapReduce, AMPC uses a distributed hash table (DHT) as its +//! underlying data structure rather than the traditional map and reduce phases. This key +//! architectural difference enables more flexible and efficient computation patterns. +//! +//! The main advantage over MapReduce is that workers can dynamically access any keys in +//! the DHT during computation. This is in contrast to MapReduce where the keyspace must +//! be statically partitioned between reducers before computation begins. The dynamic +//! access pattern allows for more natural expression of graph algorithms in a distributed +//! setting. +//! +//! This is roughly inspired by +//! [Massively Parallel Graph Computation: From Theory to Practice](https://research.google/blog/massively-parallel-graph-computation-from-theory-to-practice/) +//! +//! ## Key concepts +//! +//! * **DHT**: A distributed hash table is used to store the result of the computation for +//! each round. +//! * **Worker**: A worker owns a subset of the overall graph and is responsible for +//! executing mappers on its portion of the graph and sending results to the DHT. +//! * **Mapper**: A mapper is the specific computation to be run on the graph. +//! * **Coordinator**: The coordinator is responsible for scheduling the jobs on the workers. + use self::{job::Job, worker::WorkerRef}; use crate::distributed::sonic; diff --git a/crates/core/src/ampc/setup.rs b/crates/core/src/ampc/setup.rs index 14ef5761..c0bda974 100644 --- a/crates/core/src/ampc/setup.rs +++ b/crates/core/src/ampc/setup.rs @@ -16,12 +16,24 @@ use super::DhtConn; +/// A setup is responsible for initializing the DHT before each round of computation. pub trait Setup { type DhtTables; + /// Setup initial state of the DHT. fn init_dht(&self) -> DhtConn; + + /// Setup state for a new round. + /// + /// This is called once for each round of computation. + /// The first round will run `setup_first_round` first + /// but will still call `setup_round` after that. #[allow(unused_variables)] // reason = "dht might be used by implementors" fn setup_round(&self, dht: &Self::DhtTables) {} + + /// Setup state for the first round. + /// + /// This is called once before the first round of computation. fn setup_first_round(&self, dht: &Self::DhtTables) { self.setup_round(dht); } diff --git a/crates/core/src/ampc/worker.rs b/crates/core/src/ampc/worker.rs index 469656c0..5bde34c5 100644 --- a/crates/core/src/ampc/worker.rs +++ b/crates/core/src/ampc/worker.rs @@ -23,6 +23,8 @@ use crate::Result; use anyhow::anyhow; use tokio::net::ToSocketAddrs; +/// A worker is responsible for executing a mapper on its portion of the graph and +/// sending results to the DHT. pub trait Worker: Send + Sync { type Remote: RemoteWorker;