diff --git a/openfl-tutorials/experimental/Workflow_Interface_401_FedProx_with_Synthetic_nonIID.ipynb b/openfl-tutorials/experimental/Workflow_Interface_401_FedProx_with_Synthetic_nonIID.ipynb new file mode 100644 index 0000000000..d6359cf96d --- /dev/null +++ b/openfl-tutorials/experimental/Workflow_Interface_401_FedProx_with_Synthetic_nonIID.ipynb @@ -0,0 +1,814 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Workflow Interface 401: Synthetic non-IID Dataset with FedProx Optimizer\n", + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/intel/openfl/blob/develop/openfl-tutorials/experimental/Workflow_Interface_401_FedProx_with_Synthetic_nonIID.ipynb)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this OpenFL workflow interface tutorial, we shall learn how to implement FedProx and compare its performance with FedAvg algorithm using a Synthetic non-IID dataset. Reference: [Federated Optimization in Heterogeneous Networks](https://arxiv.org/pdf/1812.06127.pdf)." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Getting Started" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First we start by installing the necessary dependencies for the workflow interface" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install git+https://github.com/intel/openfl.git\n", + "!pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/requirements_workflow_interface.txt\n", + "!pip install matplotlib\n", + "!pip install seaborn\n", + "!pip install torch torchvision\n", + "\n", + "# Uncomment following lines if running in Google Colab\n", + "# import os\n", + "# os.environ[\"USERNAME\"] = \"colab\"" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next we import necessary libraries, and define Synthetic non-iid dataset as described in [Federated Optimization in Heterogeneous Networks](https://arxiv.org/pdf/1812.06127.pdf)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import torch as pt\n", + "import torch.utils.data as data\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "\n", + "import numpy as np\n", + "\n", + "import random\n", + "import collections\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "RANDOM_SEED = 10\n", + "batch_size = 10\n", + "\n", + "# Sets seed to reproduce the results\n", + "def set_seed(seed):\n", + " pt.manual_seed(seed)\n", + " pt.cuda.manual_seed_all(seed)\n", + " pt.use_deterministic_algorithms(True)\n", + " pt.backends.cudnn.deterministic = True\n", + " pt.backends.cudnn.benchmark = False\n", + " pt.backends.cudnn.enabled = False\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + "\n", + "set_seed(RANDOM_SEED)\n", + "\n", + "\n", + "def one_hot(labels, classes):\n", + " return np.eye(classes)[labels]\n", + "\n", + "\n", + "def softmax(x):\n", + " ex = np.exp(x)\n", + " sum_ex = np.sum(np.exp(x))\n", + " return ex / sum_ex\n", + "\n", + "\n", + "def generate_synthetic(alpha, beta, iid, num_collaborators, num_classes):\n", + " dimension = 60\n", + " NUM_CLASS = num_classes\n", + " NUM_USER = num_collaborators\n", + "\n", + " samples_per_user = np.random.lognormal(4, 2, (NUM_USER)).astype(int) + 50\n", + " num_samples = np.sum(samples_per_user)\n", + "\n", + " X_split = [[] for _ in range(NUM_USER)]\n", + " y_split = [[] for _ in range(NUM_USER)]\n", + "\n", + " #### define some eprior ####\n", + " mean_W = np.random.normal(0, alpha, NUM_USER)\n", + " mean_b = mean_W\n", + " B = np.random.normal(0, beta, NUM_USER)\n", + " mean_x = np.zeros((NUM_USER, dimension))\n", + "\n", + " diagonal = np.zeros(dimension)\n", + " for j in range(dimension):\n", + " diagonal[j] = np.power((j + 1), -1.2)\n", + " cov_x = np.diag(diagonal)\n", + "\n", + " for i in range(NUM_USER):\n", + " if iid == 1:\n", + " mean_x[i] = np.ones(dimension) * B[i] # all zeros\n", + " else:\n", + " mean_x[i] = np.random.normal(B[i], 1, dimension)\n", + "\n", + " if iid == 1:\n", + " W_global = np.random.normal(0, 1, (dimension, NUM_CLASS))\n", + " b_global = np.random.normal(0, 1, NUM_CLASS)\n", + "\n", + " for i in range(NUM_USER):\n", + "\n", + " W = np.random.normal(mean_W[i], 1, (dimension, NUM_CLASS))\n", + " b = np.random.normal(mean_b[i], 1, NUM_CLASS)\n", + "\n", + " if iid == 1:\n", + " W = W_global\n", + " b = b_global\n", + "\n", + " xx = np.random.multivariate_normal(\n", + " mean_x[i], cov_x, samples_per_user[i])\n", + " yy = np.zeros(samples_per_user[i])\n", + "\n", + " for j in range(samples_per_user[i]):\n", + " tmp = np.dot(xx[j], W) + b\n", + " yy[j] = np.argmax(softmax(tmp))\n", + "\n", + " X_split[i] = xx.tolist()\n", + " y_split[i] = yy.tolist()\n", + "\n", + " return X_split, y_split\n", + "\n", + "\n", + "class SyntheticFederatedDataset:\n", + " def __init__(self, num_collaborators, batch_size=1, num_classes=10, **kwargs):\n", + " self.batch_size = batch_size\n", + " X, y = generate_synthetic(0.0, 0.0, 0, num_collaborators, num_classes)\n", + " X = [np.array([np.array(sample).astype(np.float32)\n", + " for sample in col]) for col in X]\n", + " y = [np.array([np.array(one_hot(int(sample), num_classes))\n", + " for sample in col]) for col in y]\n", + " self.X_train_all = np.array([col[:int(0.9 * len(col))] for col in X], dtype=np.ndarray)\n", + " self.X_valid_all = np.array([col[int(0.9 * len(col)):] for col in X], dtype=np.ndarray)\n", + " self.y_train_all = np.array([col[:int(0.9 * len(col))] for col in y], dtype=np.ndarray)\n", + " self.y_valid_all = np.array([col[int(0.9 * len(col)):] for col in y], dtype=np.ndarray)\n", + "\n", + " def split(self, collaborators):\n", + " for i, collaborator in enumerate(collaborators):\n", + " collaborator.private_attributes = {\n", + " \"train_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_train_all[i]),\n", + " pt.from_numpy(self.y_train_all[i])\n", + " ), \n", + " batch_size=batch_size, shuffle=True\n", + " ),\n", + " \"test_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_valid_all[i]),\n", + " pt.from_numpy(self.y_valid_all[i])\n", + " ), \n", + " batch_size=batch_size, shuffle=True\n", + " )\n", + " }" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now that we have defined dataset class. Let define model, optimizer, and some helper functions like we would for any other deep learning experiment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.interface.aggregation_functions.weighted_average import weighted_average as wa\n", + "\n", + "\n", + "class Net(nn.Module):\n", + " \"\"\"\n", + " Model to train the dataset\n", + "\n", + " Args:\n", + " None\n", + " \n", + " Returns:\n", + " model: class Net object\n", + " \"\"\"\n", + " def __init__(self):\n", + " # Set RANDOM_STATE to reproduce same model\n", + " pt.set_rng_state(pt.manual_seed(RANDOM_SEED).get_state())\n", + " super(Net, self).__init__()\n", + " self.linear1 = nn.Linear(60, 100)\n", + " self.linear2 = nn.Linear(100, 10)\n", + "\n", + " def forward(self, x):\n", + " x = self.linear1(x)\n", + " x = self.linear2(x)\n", + " return x\n", + "\n", + "\n", + "def cross_entropy(output, target):\n", + " \"\"\"\n", + " cross-entropy metric\n", + "\n", + " Args:\n", + " output: model ouput,\n", + " target: target label\n", + "\n", + " Returns:\n", + " crossentropy_loss: float\n", + " \"\"\"\n", + " return F.cross_entropy(output, pt.max(target, 1)[1])\n", + "\n", + "\n", + "def compute_loss_and_acc(network, dataloader):\n", + " \"\"\"\n", + " Model test method\n", + "\n", + " Args:\n", + " network: class Net object (model)\n", + " dataloader: torch.utils.data.DataLoader\n", + "\n", + " Returns:\n", + " (accuracy,\n", + " loss,\n", + " correct,\n", + " dataloader_size)\n", + " \"\"\"\n", + " network.eval()\n", + " test_loss = 0\n", + " correct = 0\n", + " with pt.no_grad():\n", + " for data, target in dataloader:\n", + " output = network(data)\n", + " test_loss += cross_entropy(output, target).item()\n", + " tar = target.argmax(dim=1, keepdim=True)\n", + " pred = output.argmax(dim=1, keepdim=True)\n", + " correct += pred.eq(tar).sum().cpu().numpy()\n", + " dataloader_size = len(dataloader.dataset)\n", + " test_loss /= dataloader_size\n", + " accuracy = float(correct / dataloader_size)\n", + " return accuracy, test_loss, correct\n", + "\n", + "\n", + "def weighted_average(tensors, weights):\n", + " \"\"\"\n", + " Take weighted average of models / optimizers / loss / accuracy\n", + " Incase of taking weighted average of optimizer do the following steps:\n", + " 1. Call \"_get_optimizer_state\" (openfl.federated.task.runner_pt._get_optimizer_state)\n", + " pass optimizer to it, to take optimizer state dictionary.\n", + " 2. Pass optimizer state dictionaries list to here.\n", + " 3. To set the weighted average optimizer state dictionary back to optimizer,\n", + " call \"_set_optimizer_state\" (openfl.federated.task.runner_pt._set_optimizer_state)\n", + " and pass optimizer, device, and optimizer dictionary received in step 2.\n", + "\n", + " Args:\n", + " tensors: Models state_dict list or optimizers state_dict list or loss list or accuracy list\n", + " weights: Weight for each element in the list\n", + "\n", + " Returns:\n", + " dict: Incase model list / optimizer list OR\n", + " float: Incase of loss list or accuracy list\n", + " \"\"\"\n", + " # Check the type of first element of tensors list\n", + " if type(tensors[0]) in (dict, collections.OrderedDict):\n", + " optimizer = False\n", + " # If __opt_state_needed found then optimizer state dictionary is passed\n", + " if \"__opt_state_needed\" in tensors[0]:\n", + " optimizer = True\n", + " # Remove __opt_state_needed from all state dictionary in list\n", + " [tensor.pop(\"__opt_state_needed\") for tensor in tensors]\n", + " tmp_list = []\n", + " # Take keys in order to rebuild the state dictionary taking keys back up\n", + " input_state_dict_keys = tensors[0].keys()\n", + " for tensor in tensors:\n", + " # Append values of each state dictionary in list\n", + " # If type(value) is Tensor then it needs to be detached\n", + " tmp_list.append(np.array([value.detach() if type(value) is pt.Tensor else value for value in tensor.values()], dtype=object))\n", + " # Take weighted average of list of arrays\n", + " # new_params passed is weighted average of each array in tmp_list\n", + " new_params = wa(tmp_list, weights)\n", + " new_state = {}\n", + " # Take weighted average parameters and building a dictionary\n", + " [new_state.update({k:new_params[i]}) if optimizer else new_state.update({k:pt.from_numpy(new_params[i].numpy())}) \\\n", + " for i, k in enumerate(input_state_dict_keys)]\n", + " return new_state\n", + " else:\n", + " return wa(tensors, weights)" + ] + }, + { + "attachments": { + "image.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let us now define the Workflow for our experiment. Here we use the methodology as provided in [quickstart](https://github.com/intel/openfl/blob/develop/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb), and define the workflow consisting of following steps:\n", + "-\t`start`: Start of the flow \n", + "-\t`compute_loss_and_accuracy`: Compute Train Loss and Test Accuracy on aggregated model. Performed *foreach collaborator* in Federation\n", + "-\t`gather_results_and_take_weighted_average`: Collect train loss, and test accuracy metrics for each collaborator and take weighted average to compute the *Aggregated* Train Loss and Test Accuracy. Performed on Aggregator\n", + "-\t`select_collaborators`: Randomly select *n_selected_collaborators* from the entire set of collaborators in Federation. Performed on Aggregator\n", + "-\t‘train_selected_collaborators` - Train selected collaborators on its individual datasets for *local_epoch* number of times. Performed on *n_selected_collaborators*\n", + "-\t`join`: Take weighted average of the model. Performed on Aggregator\n", + "-\t`end`: End of one round of flow. Flow can be run for *n_epochs* to obtain the desired results\n", + "\n", + "We also import the FedProxOptimizer from openfl.utilities.optimizer \n", + "\n", + "\n", + "![image.png](attachment:image.png)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n", + "from openfl.experimental.runtime import LocalRuntime\n", + "from openfl.experimental.placement import aggregator, collaborator\n", + "from openfl.utilities.optimizers.torch import FedProxOptimizer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class FedProxFlow(FLSpec):\n", + "\n", + " def __init__(self, model=None, optimizer=None, n_selected_collaborators=10, n_rounds=10, **kwargs):\n", + " super(FedProxFlow, self).__init__(**kwargs)\n", + " self.round_number = 1\n", + " self.n_selected_collaborators = n_selected_collaborators\n", + " self.n_rounds = n_rounds\n", + " self.loss_and_acc = {\"Train Loss\": [], \"Test Accuracy\": []}\n", + " if model is not None:\n", + " self.model = model\n", + " self.optimizer = optimizer\n", + " else:\n", + " self.model = Net()\n", + " self.optimizer = FedProxOptimizer(\n", + " self.model.parameters(), lr=learning_rate, mu=mu, weight_decay=weight_decay)\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " \"\"\"\n", + " Start of the flow. Call compute_loss_and_accuracy step for each collaborator\n", + " \"\"\"\n", + " print(f'\\nStarting round number {self.round_number} .... \\n')\n", + " self.collaborators = self.runtime.collaborators\n", + " self.next(self.compute_loss_and_accuracy, foreach='collaborators')\n", + "\n", + " @collaborator\n", + " def compute_loss_and_accuracy(self):\n", + " \"\"\"\n", + " Compute training accuracy, training loss, aggregated validation accuracy,\n", + " aggregated validation loss, \n", + " \"\"\"\n", + " # Compute Train Loss and Train Acc\n", + " self.training_accuracy, self.training_loss, _, = compute_loss_and_acc(\n", + " self.model, self.train_loader)\n", + " \n", + " # Compute Test Loss and Test Acc\n", + " self.agg_validation_score, self.agg_validation_loss, test_correct = compute_loss_and_acc(\n", + " self.model, self.test_loader)\n", + "\n", + " self.train_dataset_length = len(self.train_loader.dataset)\n", + " self.test_dataset_length = len(self.test_loader.dataset)\n", + "\n", + " print(\n", + " \" | Train Round: {:<5} : Train Loss {:<.6f}, Test Acc: {:<.6f} [{}/{}]\".format(\n", + " self.input,\n", + " self.round_number,\n", + " self.training_loss,\n", + " self.agg_validation_score,\n", + " test_correct, \n", + " self.test_dataset_length\n", + " )\n", + " )\n", + "\n", + " self.next(self.gather_results_and_take_weighted_average)\n", + "\n", + " @aggregator\n", + " def gather_results_and_take_weighted_average(self, inputs):\n", + " \"\"\"\n", + " Gather results of all collaborators computed in previous \n", + " step.\n", + " Compute train and test weightes, and compute weighted average of \n", + " aggregated training loss, and aggregated test accuracy\n", + " \"\"\"\n", + " # Calculate train_weights and test_weights\n", + " train_datasize, test_datasize = [], []\n", + " for input_ in inputs:\n", + " train_datasize.append(input_.train_dataset_length)\n", + " test_datasize.append(input_.test_dataset_length)\n", + "\n", + " self.train_weights, self.test_weights = [], []\n", + " for input_ in inputs:\n", + " self.train_weights.append(input_.train_dataset_length / sum(train_datasize))\n", + " self.test_weights.append(input_.test_dataset_length / sum(test_datasize))\n", + "\n", + " aggregated_model_accuracy_list, aggregated_model_loss_list = [], []\n", + " for input_ in inputs:\n", + " aggregated_model_loss_list.append(input_.training_loss)\n", + " aggregated_model_accuracy_list.append(input_.agg_validation_score)\n", + "\n", + " # Weighted average of training loss\n", + " self.aggregated_model_training_loss = weighted_average(aggregated_model_loss_list, self.train_weights)\n", + " # Weighted average of aggregated model accuracy\n", + " self.aggregated_model_test_accuracy = weighted_average(aggregated_model_accuracy_list, self.test_weights)\n", + "\n", + " # Store experiment results\n", + " self.loss_and_acc[\"Train Loss\"].append(self.aggregated_model_training_loss)\n", + " self.loss_and_acc[\"Test Accuracy\"].append(self.aggregated_model_test_accuracy)\n", + "\n", + " print(\n", + " \" | Train Round: {:<5} : Agg Train Loss {:<.6f}, Agg Test Acc: {:<.6f}\".format(\n", + " self.round_number,\n", + " self.aggregated_model_training_loss,\n", + " self.aggregated_model_test_accuracy\n", + " )\n", + " )\n", + "\n", + " self.next(self.select_collaborators)\n", + "\n", + " @aggregator\n", + " def select_collaborators(self):\n", + " \"\"\"\n", + " Randomly select n_selected_collaborators collaborator\n", + " \"\"\"\n", + " np.random.seed(self.round_number)\n", + " self.selected_collaborator_indices = np.random.choice(range(len(self.collaborators)), \\\n", + " self.n_selected_collaborators, replace=False)\n", + " self.selected_collaborators = [self.collaborators[idx] for idx in self.selected_collaborator_indices]\n", + "\n", + " self.next(self.train_selected_collaborators, foreach=\"selected_collaborators\")\n", + "\n", + " @collaborator\n", + " def train_selected_collaborators(self):\n", + " \"\"\"\n", + " Train selected collaborators\n", + " \"\"\"\n", + " self.model.train(mode=True)\n", + "\n", + " self.train_dataset_length = len(self.train_loader.dataset)\n", + "\n", + " # Rebuild the optimizer with global model parameters\n", + " self.optimizer = FedProxOptimizer(\n", + " self.model.parameters(), lr=learning_rate, mu=mu, weight_decay=weight_decay)\n", + " # Set global model parameters as old weights to enable computation of proximal term\n", + " self.optimizer.set_old_weights([p.clone().detach() for p in self.model.parameters()])\n", + "\n", + " for epoch in range(local_epoch):\n", + " train_loss = []\n", + " correct = 0\n", + " for data, target in self.train_loader:\n", + " self.optimizer.zero_grad()\n", + " output = self.model(data)\n", + " loss = cross_entropy(output, target)\n", + " loss.backward()\n", + " self.optimizer.step()\n", + " pred = output.argmax(dim=1, keepdim=True)\n", + " tar = target.argmax(dim=1, keepdim=True)\n", + " correct += pred.eq(tar).sum().cpu().numpy()\n", + " train_loss.append(loss.item())\n", + " training_accuracy = float(correct / self.train_dataset_length)\n", + " training_loss = np.mean(train_loss)\n", + " print(\n", + " \" | Train Round: {:<5} | Local Epoch: {:<3}: FedProx Optimization Train Loss {:<.6f}, Train Acc: {:<.6f} [{}/{}]\".format(\n", + " self.input,\n", + " self.round_number,\n", + " epoch,\n", + " training_loss,\n", + " training_accuracy,\n", + " correct, \n", + " len(self.train_loader.dataset)\n", + " )\n", + " )\n", + "\n", + " self.next(self.join)\n", + " \n", + " @aggregator\n", + " def join(self, inputs):\n", + " \"\"\"\n", + " Compute train dataset, and take weighted average of model.\n", + " \"\"\"\n", + " train_datasize = sum([input_.train_dataset_length for input_ in inputs])\n", + "\n", + " train_weights, model_state_dict_list = [], [] \n", + " for input_ in inputs:\n", + " train_weights.append(input_.train_dataset_length / train_datasize)\n", + " model_state_dict_list.append(input_.model.state_dict())\n", + "\n", + " avg_model_dict = weighted_average(model_state_dict_list, train_weights)\n", + " self.model.load_state_dict(avg_model_dict)\n", + "\n", + " self.next(self.internal_loop)\n", + "\n", + " @aggregator\n", + " def internal_loop(self):\n", + " \"\"\"\n", + " Check if training is finished for `self.n_rounds`\n", + " if finished move to end step. Otherwise, go back to start\n", + " step for next round of training.\n", + " \"\"\"\n", + " if self.round_number < self.n_rounds:\n", + " self.round_number += 1\n", + " self.next(self.start)\n", + " else:\n", + " self.next(self.end)\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " \"\"\"\n", + " This is the 'end' step.\n", + " \"\"\"\n", + " self.round_number += 1\n", + " print('This is end of the flow')" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Setup Federation\n", + "\n", + "In this step we define entities necessary to run the flow and assign the synthetic dataset as private atributes of collaborators." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "num_collaborators = 30\n", + "\n", + "# Setup aggregator\n", + "aggregator = Aggregator()\n", + "aggregator.private_attributes = {}\n", + "\n", + "# Setup collaborators with private attributes\n", + "collaborator_names = [f\"col{i}\" for i in range(num_collaborators)]\n", + "\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "\n", + "synthetic_federated_dataset = SyntheticFederatedDataset(\n", + " batch_size=batch_size, num_classes=10, num_collaborators=len(collaborators), seed=RANDOM_SEED)\n", + "synthetic_federated_dataset.split(collaborators)\n", + "\n", + "local_runtime = LocalRuntime(\n", + " aggregator=aggregator, collaborators=collaborators, backend=\"single_process\")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We define `loss_and_acc` dictionary to store the test results of our experiment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "loss_and_acc = {\n", + " \"FedProx\": {\n", + " \"Train Loss\": [], \"Test Accuracy\": []\n", + " },\n", + " \"FedAvg\": {\n", + " \"Train Loss\": [], \"Test Accuracy\": []\n", + " }\n", + "}" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Distribution\n", + "\n", + "Now that our Federation is setup and actors (Aggregator & Collaborators) are initialized, let us take a moment to analyze the *Synthetic non-IID dataset*. We check how the targets for individual collaborators are distributed across each of the classes by computing and plotting the heat-map distribution." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import seaborn as sns\n", + "from matplotlib.colors import LogNorm" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "targets_for_collaborators = []\n", + "\n", + "for collab in collaborators:\n", + " # Train, and Test dataset is divided into 9:1 ratio\n", + " _, train_y = collab.private_attributes[\"train_loader\"].dataset[:]\n", + " _, test_y = collab.private_attributes[\"test_loader\"].dataset[:]\n", + " # Append train, and test into 1 tensor array\n", + " y = pt.cat((train_y, test_y))\n", + " targets = np.argmax(y.numpy(), axis = 1)\n", + " # Count number of samples for each class\n", + " frequency = np.zeros(10, dtype=np.int32)\n", + " for i, item in enumerate(targets):\n", + " frequency[item] += 1\n", + " targets_for_collaborators.append(frequency)\n", + "\n", + "result_arr = np.array(targets_for_collaborators).T.tolist()\n", + "fig, ax = plt.subplots(figsize=(20, 5))\n", + "ax = sns.heatmap(result_arr, annot=True, fmt=\"d\", annot_kws={\"fontsize\": 7}, ax=ax, norm=LogNorm(), cbar=False)\n", + "ax.set_title('Distribution of Classes in Dataset across Collaborators', fontsize=12)\n", + "ax.set_xlabel('Collaborator ID', fontsize=10)\n", + "ax.set_ylabel('Classes (0 - 9)', fontsize=10)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FedProx\n", + "\n", + "Now that we have flow and runtime defined, let's define our parameters and run the experiment with FedProxOptimizer (mu > 0)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Randomly select `n_selected_collaborators` collaborators\n", + "# Must be less than total collaborators\n", + "n_selected_collaborators = 10\n", + "n_epochs = 100\n", + "learning_rate = 0.01\n", + "weight_decay = 0.001\n", + "local_epoch = 20\n", + "\n", + "# Set `mu` to `1.0` for FedProx\n", + "mu = 1.0\n", + "\n", + "flflow = FedProxFlow(n_selected_collaborators=n_selected_collaborators, n_rounds=n_epochs, checkpoint=False)\n", + "flflow.runtime = local_runtime\n", + "\n", + "flflow.run()\n", + "loss_and_acc[\"FedProx\"][\"Train Loss\"] = flflow.loss_and_acc[\"Train Loss\"][:]\n", + "loss_and_acc[\"FedProx\"][\"Test Accuracy\"] = flflow.loss_and_acc[\"Test Accuracy\"][:]" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FedAvg\n", + "\n", + "Now that we have obtained FedProx results, let's define the parameters for FedAvg and run experiment. Note that for comparison we only change the parameter mu to 0.0 (i.e. FedProxOptimizer with mu = 0.0)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "mu = 0.0\n", + "\n", + "flflow = FedProxFlow(n_selected_collaborators=n_selected_collaborators, n_rounds=n_epochs, checkpoint=False)\n", + "flflow.runtime = local_runtime\n", + "\n", + "flflow.run()\n", + "loss_and_acc[\"FedAvg\"][\"Train Loss\"] = flflow.loss_and_acc[\"Train Loss\"][:]\n", + "loss_and_acc[\"FedAvg\"][\"Test Accuracy\"] = flflow.loss_and_acc[\"Test Accuracy\"][:]" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Compare Results\n", + "\n", + "Now that we have obtained results for both the optimizers available we conclude the tutorial by comparing the Aggregated Training Loss and Aggregated Test Accuracy. Reference: Appendix C.3.2, Figure 6 of [Federated Optimization in Heterogeneous Networks](https://arxiv.org/pdf/1812.06127.pdf) for Synthetic (0,0) dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(24,6))\n", + "fig.subplots_adjust(hspace=0.4, top=0.8)\n", + "\n", + "fedprox_loss = loss_and_acc[\"FedProx\"][\"Train Loss\"]\n", + "fedavg_loss = loss_and_acc[\"FedAvg\"][\"Train Loss\"]\n", + "ax1.plot(fedprox_loss,'gv-', label='FedProx (mu=1.0)')\n", + "ax1.plot(fedavg_loss,'rs-', label='FedAvg (mu=0.0)')\n", + "ax1.legend()\n", + "ax1.minorticks_on()\n", + "ax1.grid(which='major',linestyle='-',color='0.5')\n", + "ax1.grid(which='minor',linestyle='--',color='0.25')\n", + "ax1.set_title('Train Loss')\n", + "ax1.set_xlabel('Training Round')\n", + "ax1.set_ylabel('Training Loss')\n", + "\n", + "fedprox_accuracy = loss_and_acc[\"FedProx\"][\"Test Accuracy\"]\n", + "fedavg_accuracy = loss_and_acc[\"FedAvg\"][\"Test Accuracy\"]\n", + "ax2.plot(fedprox_accuracy,'gv-', label='FedProx (mu=1.0)')\n", + "ax2.plot(fedavg_accuracy, 'rs-', label='FedAvg (mu=0.0)')\n", + "ax2.legend()\n", + "ax2.minorticks_on()\n", + "ax2.grid(which='major',linestyle='-',color='0.5')\n", + "ax2.grid(which='minor',linestyle='--',color='0.25')\n", + "ax2.set_title('Test Accuracy')\n", + "ax2.set_xlabel('Training Round')\n", + "ax2.set_ylabel('Test Accuracy')\n", + "\n", + "fig.suptitle('Comparison of FedProx (mu > 0) and FedAvg (mu = 0)', fontsize='18')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "env_fedprox_example", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + }, + "orig_nbformat": 4, + "vscode": { + "interpreter": { + "hash": "c96b31a6dd4c6365f3cc206f3a3aedb434a4eb5a8aa6c7dc735a6d54c4b635a9" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/openfl/experimental/interface/keras/__init__.py b/openfl/experimental/interface/keras/__init__.py new file mode 100644 index 0000000000..1d7d84eb7f --- /dev/null +++ b/openfl/experimental/interface/keras/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.experimental.interface.keras package.""" + +from .aggregation_functions import WeightedAverage + +__all__ = ["WeightedAverage", ] diff --git a/openfl/experimental/interface/keras/aggregation_functions/__init__.py b/openfl/experimental/interface/keras/aggregation_functions/__init__.py new file mode 100644 index 0000000000..94708487bc --- /dev/null +++ b/openfl/experimental/interface/keras/aggregation_functions/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.experimenal.interface.keras.aggregation_functions package.""" + +from .weighted_average import WeightedAverage + +__all__ = ["WeightedAverage", ] diff --git a/openfl/experimental/interface/keras/aggregation_functions/weighted_average.py b/openfl/experimental/interface/keras/aggregation_functions/weighted_average.py new file mode 100644 index 0000000000..326e57aece --- /dev/null +++ b/openfl/experimental/interface/keras/aggregation_functions/weighted_average.py @@ -0,0 +1,13 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.experimental.interface.keras.aggregation_functions.weighted_average package.""" + + +class WeightedAverage: + """Weighted average aggregation for keras or tensorflow.""" + + def __init__(self) -> None: + """ + WeightedAverage class for Keras or Tensorflow library. + """ + raise NotImplementedError("WeightedAverage for keras will be implemented in the future.") diff --git a/openfl/experimental/interface/torch/__init__.py b/openfl/experimental/interface/torch/__init__.py new file mode 100644 index 0000000000..969f47b43a --- /dev/null +++ b/openfl/experimental/interface/torch/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.experimental.interface.torch package.""" + +from .aggregation_functions import WeightedAverage + +__all__ = ["WeightedAverage", ] diff --git a/openfl/experimental/interface/torch/aggregation_functions/__init__.py b/openfl/experimental/interface/torch/aggregation_functions/__init__.py new file mode 100644 index 0000000000..2afa83b219 --- /dev/null +++ b/openfl/experimental/interface/torch/aggregation_functions/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.experimenal.interface.torch.aggregation_functions package.""" + +from .weighted_average import WeightedAverage + +__all__ = ["WeightedAverage", ] diff --git a/openfl/experimental/interface/torch/aggregation_functions/weighted_average.py b/openfl/experimental/interface/torch/aggregation_functions/weighted_average.py new file mode 100644 index 0000000000..a91cadfa0d --- /dev/null +++ b/openfl/experimental/interface/torch/aggregation_functions/weighted_average.py @@ -0,0 +1,77 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.experimental.interface.torch.aggregation_functions.weighted_average package.""" + +import collections +import numpy as np +import torch as pt + + +def weighted_average(tensors, weights): + """Compute weighted average.""" + return np.average(tensors, weights=weights, axis=0) + + +class WeightedAverage: + """Weighted average aggregation.""" + + def __call__(self, objects_list, weights_list) -> np.ndarray: + """ + Compute weighted average of models, optimizers, loss, or accuracy metrics. + For taking weighted average of optimizer do the following steps: + 1. Call "_get_optimizer_state" (openfl.federated.task.runner_pt._get_optimizer_state) + pass optimizer to it, to take optimizer state dictionary. + 2. Pass optimizer state dictionaries list to here. + 3. To set the weighted average optimizer state dictionary back to optimizer, + call "_set_optimizer_state" (openfl.federated.task.runner_pt._set_optimizer_state) + and pass optimizer, device, and optimizer dictionary received in step 2. + + Args: + objects_list: List of objects for which weighted average is to be computed. + - List of Model state dictionaries , or + - List of Metrics (Loss or accuracy), or + - List of optimizer state dictionaries (following steps need to be performed) + 1. Obtain optimizer state dictionary by invoking "_get_optimizer_state" + (openfl.federated.task.runner_pt._get_optimizer_state). + 2. Create a list of optimizer state dictionary obtained in step - 1 + Invoke WeightedAverage on this list. + 3. Invoke "_set_optimizer_state" to set weighted average of optimizer + state back to optimizer (openfl.federated.task.runner_pt._set_optimizer_state). + weights_list: Weight for each element in the list. + + Returns: + dict: For model or optimizer + float: For Loss or Accuracy metrics + """ + # Check the type of first element of tensors list + if type(objects_list[0]) in (dict, collections.OrderedDict): + optimizer = False + # If __opt_state_needed found then optimizer state dictionary is passed + if "__opt_state_needed" in objects_list[0]: + optimizer = True + # Remove __opt_state_needed from all state dictionary in list, and + # check if weightedaverage of optimizer can be taken. + for tensor in objects_list: + error_msg = "Optimizer is stateless, WeightedAverage cannot be taken" + assert tensor.pop("__opt_state_needed") == "true", error_msg + + tmp_list = [] + # # Take keys in order to rebuild the state dictionary taking keys back up + for tensor in objects_list: + # Append values of each state dictionary in list + # If type(value) is Tensor then it needs to be detached + tmp_list.append(np.array([value.detach() if isinstance(value, pt.Tensor) else value + for value in tensor.values()], dtype=object)) + # Take weighted average of list of arrays + # new_params passed is weighted average of each array in tmp_list + new_params = weighted_average(tmp_list, weights_list) + new_state = {} + # Take weighted average parameters and building a dictionary + for i, k in enumerate(objects_list[0].keys()): + if optimizer: + new_state[k] = new_params[i] + else: + new_state[k] = pt.from_numpy(new_params[i].numpy()) + return new_state + else: + return weighted_average(objects_list, weights_list) diff --git a/tests/openfl/experimental/__init__.py b/tests/openfl/experimental/__init__.py new file mode 100644 index 0000000000..5e9b6e2622 --- /dev/null +++ b/tests/openfl/experimental/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""tests.openfl.experimental package.""" diff --git a/tests/openfl/experimental/optimizer.pth b/tests/openfl/experimental/optimizer.pth new file mode 100644 index 0000000000..455ca9bb56 Binary files /dev/null and b/tests/openfl/experimental/optimizer.pth differ diff --git a/tests/openfl/experimental/test_weighted_average.py b/tests/openfl/experimental/test_weighted_average.py new file mode 100644 index 0000000000..a8cf0d5894 --- /dev/null +++ b/tests/openfl/experimental/test_weighted_average.py @@ -0,0 +1,125 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""Test for openfl.experimenal.interface.torch.aggregation_function module.""" + +from openfl.experimental.interface.torch import WeightedAverage +from openfl.federated.task.runner_pt import _get_optimizer_state + +import torch as pt +import numpy as np + +import pickle +from typing import List, Dict, Any +from copy import deepcopy + + +class Net(pt.nn.Module): + """ + Returns a simple model for test case + """ + def __init__(self) -> None: + super(Net, self).__init__() + self.linear1 = pt.nn.Linear(10, 20) + self.linear2 = pt.nn.Linear(20, 1) + + def forward(self, x): + x = self.linear1(x) + x = self.linear2(x) + return x + + +def get_optimizer() -> Any: + """ + Get optimizer + """ + with open("optimizer.pth", "rb") as f: + return pickle.load(f) + + +def take_model_weighted_average(models_state_dicts_list: List[Dict], + weights_list: List[int]) -> Dict: + """ + Take models weighted average manually + """ + tmp_list = [] + for model_state_dict in models_state_dicts_list: + tmp_list.append(np.array([value.detach() for value in model_state_dict.values()], + dtype=object)) + + new_params = np.average(tmp_list, weights=weights_list, axis=0) + + new_state = {} + for i, k in enumerate(models_state_dicts_list[0].keys()): + new_state[k] = pt.from_numpy(new_params[i].numpy()) + return new_state + + +def take_optimizer_weighted_average(optimizer_state_dicts_list: List[Dict], + weights_list: List[int]) -> Dict: + """ + Take models weighted average manually + """ + for optimizer_state_dict in optimizer_state_dicts_list: + assert optimizer_state_dict.pop("__opt_state_needed") == "true" + + tmp_list = [] + for optimizer_state_dict in optimizer_state_dicts_list: + tmp_list.append(np.array(list(optimizer_state_dict.values()), dtype=object)) + + new_params = np.average(tmp_list, weights=weights_list, axis=0) + + new_state = {} + for i, k in enumerate(optimizer_state_dicts_list[0].keys()): + new_state[k] = new_params[i] + return new_state + + +def test_list_weighted_average(): + """ + Test list weighted average + """ + float_element_list = [0.4, 0.21, 0.1, 0.03] + weights_list = [0.1, 0.25, 0.325, 0.325] + + weighted_average = WeightedAverage() + + averaged_loss_using_class = weighted_average(deepcopy(float_element_list), + weights_list) + averaged_loss_manually = np.average(deepcopy(float_element_list), + weights=weights_list, axis=0) + + assert np.all(averaged_loss_using_class) == np.all(averaged_loss_manually) + + +def test_model_weighted_average(): + """ + Test model weighted average + """ + model_state_dicts_list = [Net().state_dict() for _ in range(4)] + weights_list = [0.1, 0.25, 0.325, 0.325] + + weighted_average = WeightedAverage() + + averaged_model_using_class = weighted_average(deepcopy(model_state_dicts_list), + weights_list) + averaged_model_manually = take_model_weighted_average(deepcopy(model_state_dicts_list), + weights_list) + + assert all(averaged_model_using_class) == all(averaged_model_manually) + + +def test_optimizer_weighted_average(): + """ + Test optimizers weighted average + """ + optimizer_state_dicts_list = [_get_optimizer_state(get_optimizer()) for _ in range(4)] + weights_list = [0.1, 0.25, 0.325, 0.325] + + weighted_average = WeightedAverage() + + averaged_optimizer_using_class = weighted_average(deepcopy(optimizer_state_dicts_list), + weights_list) + averaged_optimizer_manually = take_optimizer_weighted_average( + deepcopy(optimizer_state_dicts_list), weights_list) + + assert all(averaged_optimizer_using_class) == all(averaged_optimizer_manually)