Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Workflow Interface FedProx Example with Synthetic Dataset #734

Open
wants to merge 31 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ab217e7
Adding Workflow Interface FedProx Example with Synthetic Dataset
ParthM-GitHub Feb 14, 2023
ba40762
Modified FedProx Example, tested following scenarios
ParthM-GitHub Feb 21, 2023
b56adfe
Uncommented few necessary lines
ParthM-GitHub Feb 21, 2023
bc3b19c
Updated FedProx example
ParthM-GitHub Feb 21, 2023
8fc8111
Updated FedProx example
ParthM-GitHub Feb 21, 2023
4eba802
Updated FedProx example
ParthM-GitHub Feb 21, 2023
29728d7
Update Workflow_Interface_401_FedProx_with_Synthetic_nonIID.ipynb
ParthM-GitHub Feb 22, 2023
2a3cc29
Changes as per numpy 1.24.x
ParthM-GitHub Feb 22, 2023
717a79b
Update README.md (#719)
grib0ed0v Feb 3, 2023
8f6e31a
Add files via upload
operepel Feb 9, 2023
936a707
Add files via upload
operepel Feb 9, 2023
f5afec2
Update CONTRIBUTING.md
operepel Feb 9, 2023
445dba4
Update CONTRIBUTING.md
operepel Feb 9, 2023
9e5b313
Update README.md
operepel Feb 9, 2023
bed9626
Fixed workflow interface notebook requirements (#729)
psfoley Feb 13, 2023
529e7e5
Update CONTRIBUTING.md
operepel Feb 14, 2023
f8fb971
Update CONTRIBUTING.md
operepel Feb 14, 2023
28c90a4
Fix CONTINUE_GLOBAL optimizer treatment (#711)
itrushkin Feb 14, 2023
013f4f1
Added following changes:
ParthM-GitHub Mar 1, 2023
cbd1568
Merge branch 'intel:develop' into fedprox_example
ParthM-GitHub Mar 1, 2023
2be8d78
Refactored WeightedAverage:
ParthM-GitHub Mar 9, 2023
051abf3
Added __init__.py file in tests/openfl/experimental folder.
ParthM-GitHub Mar 9, 2023
a9e2ca0
Fixing lint suggestions
ParthM-GitHub Mar 10, 2023
2346f4c
Fixing lint suggestions.
ParthM-GitHub Mar 10, 2023
9cba0ff
Fixing lint suggestions.
ParthM-GitHub Mar 10, 2023
6658023
Fixing lint suggestions.
ParthM-GitHub Mar 10, 2023
65e7b55
Adding optimizer.pth file for optimizer weighted average test case.
ParthM-GitHub Mar 10, 2023
ce024e0
Merge branch 'securefederatedai:develop' into fedprox_example
ParthM-GitHub Mar 24, 2023
b66b22f
Merge branch 'securefederatedai:develop' into fedprox_example
ParthM-GitHub Apr 10, 2023
6a7ab72
Merge branch 'securefederatedai:develop' into fedprox_example
ParthM-GitHub Jun 12, 2023
d98633d
Merge branch 'securefederatedai:develop' into fedprox_example
ParthM-GitHub Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions openfl/experimental/interface/keras/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""openfl.experimental.interface.keras package."""

from .aggregation_functions import WeightedAverage

__all__ = ["WeightedAverage", ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""openfl.experimenal.interface.keras.aggregation_functions package."""

from .weighted_average import WeightedAverage

__all__ = ["WeightedAverage", ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""openfl.experimental.interface.keras.aggregation_functions.weighted_average package."""


class WeightedAverage:
"""Weighted average aggregation for keras or tensorflow."""

def __init__(self) -> None:
"""
WeightedAverage class for Keras or Tensorflow library.
"""
raise NotImplementedError("WeightedAverage for keras will be implemented in the future.")
7 changes: 7 additions & 0 deletions openfl/experimental/interface/torch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""openfl.experimental.interface.torch package."""

from .aggregation_functions import WeightedAverage

__all__ = ["WeightedAverage", ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""openfl.experimenal.interface.torch.aggregation_functions package."""

from .weighted_average import WeightedAverage

__all__ = ["WeightedAverage", ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""openfl.experimental.interface.torch.aggregation_functions.weighted_average package."""

import collections
import numpy as np
import torch as pt


def weighted_average(tensors, weights):
"""Compute weighted average."""
return np.average(tensors, weights=weights, axis=0)


class WeightedAverage:
"""Weighted average aggregation."""

def __call__(self, objects_list, weights_list) -> np.ndarray:
"""
Compute weighted average of models, optimizers, loss, or accuracy metrics.
For taking weighted average of optimizer do the following steps:
1. Call "_get_optimizer_state" (openfl.federated.task.runner_pt._get_optimizer_state)
pass optimizer to it, to take optimizer state dictionary.
2. Pass optimizer state dictionaries list to here.
3. To set the weighted average optimizer state dictionary back to optimizer,
call "_set_optimizer_state" (openfl.federated.task.runner_pt._set_optimizer_state)
and pass optimizer, device, and optimizer dictionary received in step 2.

Args:
objects_list: List of objects for which weighted average is to be computed.
- List of Model state dictionaries , or
- List of Metrics (Loss or accuracy), or
- List of optimizer state dictionaries (following steps need to be performed)
1. Obtain optimizer state dictionary by invoking "_get_optimizer_state"
(openfl.federated.task.runner_pt._get_optimizer_state).
2. Create a list of optimizer state dictionary obtained in step - 1
Invoke WeightedAverage on this list.
3. Invoke "_set_optimizer_state" to set weighted average of optimizer
state back to optimizer (openfl.federated.task.runner_pt._set_optimizer_state).
weights_list: Weight for each element in the list.

Returns:
dict: For model or optimizer
float: For Loss or Accuracy metrics
"""
# Check the type of first element of tensors list
if type(objects_list[0]) in (dict, collections.OrderedDict):
optimizer = False
# If __opt_state_needed found then optimizer state dictionary is passed
if "__opt_state_needed" in objects_list[0]:
optimizer = True
# Remove __opt_state_needed from all state dictionary in list, and
# check if weightedaverage of optimizer can be taken.
for tensor in objects_list:
error_msg = "Optimizer is stateless, WeightedAverage cannot be taken"
assert tensor.pop("__opt_state_needed") == "true", error_msg

tmp_list = []
# # Take keys in order to rebuild the state dictionary taking keys back up
for tensor in objects_list:
# Append values of each state dictionary in list
# If type(value) is Tensor then it needs to be detached
tmp_list.append(np.array([value.detach() if isinstance(value, pt.Tensor) else value
for value in tensor.values()], dtype=object))
# Take weighted average of list of arrays
# new_params passed is weighted average of each array in tmp_list
new_params = weighted_average(tmp_list, weights_list)
new_state = {}
# Take weighted average parameters and building a dictionary
for i, k in enumerate(objects_list[0].keys()):
if optimizer:
new_state[k] = new_params[i]
else:
new_state[k] = pt.from_numpy(new_params[i].numpy())
return new_state
else:
return weighted_average(objects_list, weights_list)
3 changes: 3 additions & 0 deletions tests/openfl/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""tests.openfl.experimental package."""
Binary file added tests/openfl/experimental/optimizer.pth
Binary file not shown.
125 changes: 125 additions & 0 deletions tests/openfl/experimental/test_weighted_average.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright (C) 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Test for openfl.experimenal.interface.torch.aggregation_function module."""

from openfl.experimental.interface.torch import WeightedAverage
from openfl.federated.task.runner_pt import _get_optimizer_state

import torch as pt
import numpy as np

import pickle
from typing import List, Dict, Any
from copy import deepcopy


class Net(pt.nn.Module):
"""
Returns a simple model for test case
"""
def __init__(self) -> None:
super(Net, self).__init__()
self.linear1 = pt.nn.Linear(10, 20)
self.linear2 = pt.nn.Linear(20, 1)

def forward(self, x):
x = self.linear1(x)
x = self.linear2(x)
return x


def get_optimizer() -> Any:
"""
Get optimizer
"""
with open("optimizer.pth", "rb") as f:
return pickle.load(f)


def take_model_weighted_average(models_state_dicts_list: List[Dict],
weights_list: List[int]) -> Dict:
"""
Take models weighted average manually
"""
tmp_list = []
for model_state_dict in models_state_dicts_list:
tmp_list.append(np.array([value.detach() for value in model_state_dict.values()],
dtype=object))

new_params = np.average(tmp_list, weights=weights_list, axis=0)

new_state = {}
for i, k in enumerate(models_state_dicts_list[0].keys()):
new_state[k] = pt.from_numpy(new_params[i].numpy())
return new_state


def take_optimizer_weighted_average(optimizer_state_dicts_list: List[Dict],
weights_list: List[int]) -> Dict:
"""
Take models weighted average manually
"""
for optimizer_state_dict in optimizer_state_dicts_list:
assert optimizer_state_dict.pop("__opt_state_needed") == "true"

tmp_list = []
for optimizer_state_dict in optimizer_state_dicts_list:
tmp_list.append(np.array(list(optimizer_state_dict.values()), dtype=object))

new_params = np.average(tmp_list, weights=weights_list, axis=0)

new_state = {}
for i, k in enumerate(optimizer_state_dicts_list[0].keys()):
new_state[k] = new_params[i]
return new_state


def test_list_weighted_average():
"""
Test list weighted average
"""
float_element_list = [0.4, 0.21, 0.1, 0.03]
weights_list = [0.1, 0.25, 0.325, 0.325]

weighted_average = WeightedAverage()

averaged_loss_using_class = weighted_average(deepcopy(float_element_list),
weights_list)
averaged_loss_manually = np.average(deepcopy(float_element_list),
weights=weights_list, axis=0)

assert np.all(averaged_loss_using_class) == np.all(averaged_loss_manually)


def test_model_weighted_average():
"""
Test model weighted average
"""
model_state_dicts_list = [Net().state_dict() for _ in range(4)]
weights_list = [0.1, 0.25, 0.325, 0.325]

weighted_average = WeightedAverage()

averaged_model_using_class = weighted_average(deepcopy(model_state_dicts_list),
weights_list)
averaged_model_manually = take_model_weighted_average(deepcopy(model_state_dicts_list),
weights_list)

assert all(averaged_model_using_class) == all(averaged_model_manually)


def test_optimizer_weighted_average():
"""
Test optimizers weighted average
"""
optimizer_state_dicts_list = [_get_optimizer_state(get_optimizer()) for _ in range(4)]
weights_list = [0.1, 0.25, 0.325, 0.325]

weighted_average = WeightedAverage()

averaged_optimizer_using_class = weighted_average(deepcopy(optimizer_state_dicts_list),
weights_list)
averaged_optimizer_manually = take_optimizer_weighted_average(
deepcopy(optimizer_state_dicts_list), weights_list)

assert all(averaged_optimizer_using_class) == all(averaged_optimizer_manually)
Loading