Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix actions #27

Merged
merged 7 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ jobs:
python -m pip install pre-commit tox
- name: run pre-commit
run: pre-commit run -a
- name: mypy (sdk)
- name: mypy
run: |
cd compute_sdk
tox -e mypy
- name: mypy (endpoint)
run: |
cd compute_endpoint
cd flox
tox -e mypy

safety-check:
Expand All @@ -39,7 +35,7 @@ jobs:
- name: install requirements
run: |
python -m pip install --upgrade pip setuptools
python -m pip install './flox'
python -m pip install '.[all]'
python -m pip install safety
- name: run safety check
run: safety check
Expand Down
4 changes: 2 additions & 2 deletions docs/getting_started/flock.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ The ``flox.flock`` module contains the code needed to define your own ``Flock``
1. interactive mode
2. file mode

Interactive mode involves creating a ``NetworkX.DiGraph()`` object directly and then passing that into the ``Flock`` constructor. This is **not** recommended.
Interactive mode involves creating a ``NetworkX.DiGraph()`` object directly and then passing that into the ``Flock`` constructor. This is **not** recommended.

The recommended approach is ***file mode***. In this mode, you define the Flock network using a supported file type (e.g., `*.yaml`) and simply use it to create the Flock instance.

```python
from flox.flock import Flock

f = "my_flock.yaml"
flock = Flock.from_yaml(f)
flock = Flock.from_yaml(f)
```
2 changes: 1 addition & 1 deletion docs/getting_started/strategies/callbacks.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Strategy Callbacks

## How are Strategies defined in FLoX?
FLoX was designed to support modularity to enable creative and novel solutions for FL research. Therefore, in FLoX, we define a base ``Strategy`` class which serves as a class of callbacks. Classes that extend this base class (e.g., `FedAvg` extends `Strategy`) can implement their own unique logic which is seamlessly incorporated into the FL process.
FLoX was designed to support modularity to enable creative and novel solutions for FL research. Therefore, in FLoX, we define a base ``Strategy`` class which serves as a class of callbacks. Classes that extend this base class (e.g., `FedAvg` extends `Strategy`) can implement their own unique logic which is seamlessly incorporated into the FL process.

```python
from flox.strategies import Strategy
Expand Down
8 changes: 4 additions & 4 deletions docs/getting_started/strategies/custom.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Defining Your Own Custom Strategies

FLoX was designed with customizability in mind. FL is a new research area that invites countless questions about how to
best perform FL. Additionally, the best FL approach will vary depending on the data, network connectivity, other
FLoX was designed with customizability in mind. FL is a new research area that invites countless questions about how to
best perform FL. Additionally, the best FL approach will vary depending on the data, network connectivity, other
requirements, etc. As such, we aimed to make defining original Strategies to be as pain-free as possible.

Implementing a custom ``Strategy`` simply requires defining a new class that extends/subclasses the ``Strategy`` protocol
Implementing a custom ``Strategy`` simply requires defining a new class that extends/subclasses the ``Strategy`` protocol
(as seen above). The ``Strategy`` protocol provides a handful of callbacks for you to inject custom logic to adjust how the
FL process runs.
FL process runs.

As an example, let's use our source code for the implementation of ``FedProx`` as an example.

Expand Down
2 changes: 1 addition & 1 deletion docs/getting_started/strategies/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Some prominent examples from the literature of what we consider "Strategies" inc
* FedAvg with Proximal Term (`FedProx`)[^fedprox]

## What _exactly_ do Strategies do?
In a nutshell, a lot. Federated Learning is a complex process with tasks being done on the worker nodes and the aggregator node(s). Thus, Strategies can touch a lot of different parts of the entire logic of an FL process.
In a nutshell, a lot. Federated Learning is a complex process with tasks being done on the worker nodes and the aggregator node(s). Thus, Strategies can touch a lot of different parts of the entire logic of an FL process.

### Model Parameter Aggregation
...
Expand Down
14 changes: 7 additions & 7 deletions docs/getting_started/strategies/what.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# What Do Strategies Do?

## How do they run?
Below is a sequence diagram of how the `runner`, `aggregator`, and `worker` interact with each other during a
Below is a sequence diagram of how the `runner`, `aggregator`, and `worker` interact with each other during a

```mermaid
sequenceDiagram
Expand All @@ -13,15 +13,15 @@ sequenceDiagram
participant aggregator
participant worker
%% end

loop each round
runner->>aggregator: submit `aggregate_job()`

aggregator->>aggregator: Strategy.agg_before_round()
aggregator->>aggregator: Strategy.agg_worker_selection()
aggregator->>aggregator: Strategy.agg_before_share_params()
aggregator->>worker: submit `local_fit_job()`

activate worker
worker->>worker: Strategy.wrk_on_recv_params()
worker->>worker: Strategy.wrk_before_train_step()
Expand All @@ -30,11 +30,11 @@ sequenceDiagram
worker->>worker: Strategy.wrk_before_submit_params()
worker->>aggregator: JobUpdate
deactivate worker
aggregator->>aggregator: Strategy.agg_collect_parameters()

aggregator->>aggregator: Strategy.agg_collect_parameters()
aggregator->>aggregator: Strategy.agg_param_aggregation()
aggregator->>aggregator: Strategy.agg_after_round()

aggregator->>runner: JobUpdate
end
```
10 changes: 5 additions & 5 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# Welcome to FLoX

### Getting Started
FLoX is a simple, highly customizable, and easy-to-deploy framework for launching Federated Learning processes across a
decentralized network. It is designed to simulate FL workflows while also making it trivially easy to deploy them on
real-world devices (e.g., Internet-of-Things and edge devices). Built on top of _Globus Compute_ (formerly known as
_funcX_), FLoX is designed to run on anything that can be started as a Globus Compute Endpoint.
FLoX is a simple, highly customizable, and easy-to-deploy framework for launching Federated Learning processes across a
decentralized network. It is designed to simulate FL workflows while also making it trivially easy to deploy them on
real-world devices (e.g., Internet-of-Things and edge devices). Built on top of _Globus Compute_ (formerly known as
_funcX_), FLoX is designed to run on anything that can be started as a Globus Compute Endpoint.


### What can FLoX do?

FLoX is supports several state-of-the-art approaches for FL processes, including hierarchical and asynchronous FL.
FLoX is supports several state-of-the-art approaches for FL processes, including hierarchical and asynchronous FL.

| | 2-tier | Hierarhchical |
| --: |:----------------:|:-----------------:|
Expand Down
1 change: 0 additions & 1 deletion flox/backends/launcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@
from flox.backends.launcher.impl_local import LocalLauncher
from flox.backends.launcher.impl_parsl import ParslLauncher


__all__ = ["Launcher", "GlobusComputeLauncher", "LocalLauncher", "ParslLauncher"]
11 changes: 10 additions & 1 deletion flox/backends/launcher/impl_base.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
from abc import ABC, abstractmethod
from concurrent.futures import Future
from typing import Any, Protocol

from flox.flock import FlockNode


class LauncherFunction(Protocol):
def __call__(self, node: FlockNode, *args: Any, **kwargs: Any) -> Any:
...


class Launcher(ABC):
"""
Base class for launching functions in an FL process.
"""

@abstractmethod
def __init__(self):
pass

@abstractmethod
def submit(self, fn, node: FlockNode, /, *args, **kwargs) -> Future:
def submit(
self, fn: LauncherFunction, node: FlockNode, /, *args, **kwargs
) -> Future:
raise NotImplementedError()

@abstractmethod
Expand Down
10 changes: 4 additions & 6 deletions flox/backends/launcher/impl_globus.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
from concurrent.futures import Future
from typing import Any, Callable

from flox.flock import FlockNode
from flox.backends.launcher.impl_base import Launcher
import globus_compute_sdk

from flox.backends.launcher.impl_base import Launcher, LauncherFunction
from flox.flock import FlockNode


class GlobusComputeLauncher(Launcher):
"""
Class that executes tasks on Globus Compute.
"""

_globus_compute_executor: globus_compute_sdk.Executor | None = None

def __init__(self):
super().__init__()
if self._globus_compute_executor is None:
self._globus_compute_executor = globus_compute_sdk.Executor()

def submit(
self, fn: Callable[[FlockNode, ...], Any], node: FlockNode, /, *args, **kwargs
self, fn: LauncherFunction, node: FlockNode, /, *args, **kwargs
) -> Future:
endpoint_id = node.globus_compute_endpoint
self._globus_compute_executor.endpoint_id = endpoint_id
Expand Down
9 changes: 6 additions & 3 deletions flox/backends/launcher/impl_local.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, Future
from concurrent.futures import Executor, Future, ProcessPoolExecutor, ThreadPoolExecutor

from flox.backends.launcher.impl_base import Launcher, LauncherFunction
from flox.flock import FlockNode
from flox.backends.launcher.impl_base import Launcher


class LocalLauncher(Launcher):
Expand All @@ -12,6 +12,7 @@ class LocalLauncher(Launcher):
def __init__(self, pool: str, n_workers: int = 1):
super().__init__()
self.n_workers = n_workers
self.pool: Executor
if pool == "process":
self.pool = ProcessPoolExecutor(n_workers)
elif pool == "thread":
Expand All @@ -21,7 +22,9 @@ def __init__(self, pool: str, n_workers: int = 1):
"Illegal value for argument `pool`. Must be either 'pool' or 'thread'."
)

def submit(self, fn, node: FlockNode, /, *args, **kwargs) -> Future:
def submit(
self, fn: LauncherFunction, node: FlockNode, /, *args, **kwargs
) -> Future:
return self.pool.submit(fn, node, *args, **kwargs)

def collect(self):
Expand Down
11 changes: 7 additions & 4 deletions flox/backends/launcher/impl_parsl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from concurrent.futures import Future

from flox.backends.launcher.impl_base import Launcher, LauncherFunction
from flox.flock import FlockNode
from flox.backends.launcher.impl_base import Launcher


class ParslLauncher(Launcher):
Expand All @@ -12,8 +13,10 @@ def __init__(self):
super().__init__()
raise NotImplementedError(f"{self.__name__} yet implemented")

def submit(self, fn, node: FlockNode, /, *args, **kwargs) -> Future:
pass
def submit(
self, fn: LauncherFunction, node: FlockNode, /, *args, **kwargs
) -> Future:
raise NotImplementedError()

def collect(self):
pass
raise NotImplementedError()
11 changes: 7 additions & 4 deletions flox/backends/transfer/proxystore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import cast
from uuid import UUID

from proxystore.connectors.endpoint import EndpointConnector
from proxystore.store import Store
from proxystore.proxy import Proxy
from proxystore.store import Store

from flox.backends.transfer.base import BaseTransfer
from flox.flock import Flock
Expand All @@ -17,10 +20,10 @@ def __init__(self, flock: Flock, store: str = "endpoint", name: str = "default")
)

self.connector = EndpointConnector(
endpoints=[node.proxystore_endpoint for node in flock.nodes()]
endpoints=[cast(UUID, node.proxystore_endpoint) for node in flock.nodes()]
)
store = Store(name=name, connector=self.connector)
self.config = store.config()
store_instance = Store(name=name, connector=self.connector)
self.config = store_instance.config()

def report(
self, node_state, node_idx, node_kind, state_dict, history
Expand Down
4 changes: 2 additions & 2 deletions flox/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
FLoX includes utility functions to simplify the conversion from a standard, centralized PyTorch dataset to a
simulated, decentralized dataset.
"""
from flox.data.core import FloxDataset, FederatedSubsets
from flox.data.core import FederatedSubsets, FloxDataset
from flox.data.utils import fed_barplot, federated_split

__all__ = ["FloxDataset", "fed_barplot", "federated_split"]
__all__ = ["FloxDataset", "FederatedSubsets", "fed_barplot", "federated_split"]
22 changes: 16 additions & 6 deletions flox/data/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import auto, IntEnum
from typing import NewType, Union, TypeVar
from collections.abc import Mapping
from enum import IntEnum, auto
from typing import NewType, Union, get_args

from torch.utils.data import Dataset, Subset

Expand All @@ -16,11 +17,21 @@ class FloxDatasetKind(IntEnum):
def from_obj(obj) -> "FloxDatasetKind":
if isinstance(obj, Dataset):
return FloxDatasetKind.STANDARD
elif isinstance(obj, FederatedSubsets):
elif FloxDatasetKind.is_federated_dataset(obj):
return FloxDatasetKind.FEDERATED
else:
return FloxDatasetKind.INVALID

@staticmethod
def is_federated_dataset(obj) -> bool:
if not isinstance(obj, Mapping):
return False

return all(
isinstance(k, get_args(FlockNodeID)) and isinstance(v, (Dataset, Subset))
for k, v in obj.items()
)


def flox_compatible_data(obj) -> bool:
kind = FloxDatasetKind.from_obj(obj)
Expand All @@ -29,9 +40,8 @@ def flox_compatible_data(obj) -> bool:
return True


T_co = TypeVar("T_co", covariant=True)
FederatedSubsets = NewType(
"FederatedSubsets", dict[FlockNodeID, Union[Dataset[T_co], Subset[T_co]]]
"FederatedSubsets", Mapping[FlockNodeID, Union[Dataset, Subset]]
)


Expand All @@ -41,4 +51,4 @@ def __init__(self, state: NodeState, /, *args, **kwargs):
self.state = state


FloxDataset = NewType("FloxDataset", Union[MyFloxDataset, FederatedSubsets])
FloxDataset = Union[MyFloxDataset, FederatedSubsets]
1 change: 0 additions & 1 deletion flox/data/subsets.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

Loading
Loading