From 3624c3880b48fa65df49ffb3c8973be578535d1d Mon Sep 17 00:00:00 2001 From: Ynon Flum Date: Tue, 19 Nov 2024 15:44:07 +0200 Subject: [PATCH] Add pytorch MNIST Workflow tutorial --- ...Prox_PyTorch_MNIST_Workflow_Tutorial.ipynb | 632 ++++++++++++++++++ 1 file changed, 632 insertions(+) create mode 100644 openfl-tutorials/Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb diff --git a/openfl-tutorials/Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb b/openfl-tutorials/Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb new file mode 100644 index 0000000000..b93a61565a --- /dev/null +++ b/openfl-tutorials/Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb @@ -0,0 +1,632 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Federated FedProx PyTorch MNIST Tutorial\n", + "This notebook mimics Federated_FedProx_PyTorch_MNIST_Tutorial using the workflow API.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Install dependencies if not already installed\n", + "!pip install torch torchvision" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "\n", + "import torch.utils\n", + "import torch.utils.data\n", + "import torchvision\n", + "import torchvision.transforms as transforms\n", + "\n", + "from openfl.utilities.optimizers.torch.fedprox import FedProxAdam\n", + "\n", + "from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n", + "from openfl.experimental.runtime import LocalRuntime\n", + "from openfl.experimental.placement import aggregator, collaborator\n", + "\n", + "def one_hot(labels, classes):\n", + " return np.eye(classes)[labels]\n", + "\n", + "def cross_entropy(output, target):\n", + " \"\"\"Binary cross-entropy loss function\"\"\"\n", + " return F.binary_cross_entropy_with_logits(input=output,target=target.float())\n", + "\n", + "def get_optimizer(model):\n", + " return FedProxAdam(model.parameters(), lr=1e-3, mu=0.01)\n", + "\n", + "# Aggregation algorithm\n", + "def FedAvg(models, weights=None):\n", + " new_model = models[0]\n", + " new_state_dict = dict()\n", + " for key in new_model.state_dict().keys():\n", + " new_state_dict[key] = torch.from_numpy(np.average([model.state_dict()[key].numpy() for model in models],\n", + " axis=0, \n", + " weights=weights))\n", + "\n", + " new_model.load_state_dict(new_state_dict)\n", + " return new_model" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "After importing the required packages, the next step is setting up the dataset:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transform = transforms.Compose(\n", + " [transforms.ToTensor(),\n", + " transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])\n", + "\n", + "mnist_train = torchvision.datasets.MNIST(\n", + " \"./files/\",\n", + " train=True,\n", + " download=True,\n", + " transform=transform,\n", + ")\n", + "\n", + "mnist_test = torchvision.datasets.MNIST(\n", + " \"./files/\",\n", + " train=False,\n", + " download=True,\n", + " transform=transform,\n", + ")\n", + "\n", + "class CustomDataset(torch.utils.data.Dataset):\n", + " \"\"\"Dataset enumeration as tensors\"\"\"\n", + " def __init__(self, images, labels):\n", + " self.images = images\n", + " self.labels = labels\n", + "\n", + " def __len__(self):\n", + " return len(self.images)\n", + "\n", + " def __getitem__(self, idx):\n", + " image = self.images[idx]\n", + " label = self.labels[idx]\n", + " return image, label" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "After importing the required packages, the next step is setting up the participants which will train the model:\n", + "\n", + "Now we are ready to define our dataset and model to perform federated learning on. The dataset should be composed of a numpy arrayWe start with a simple fully connected model that is trained on the MNIST dataset. " + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "def one_hot(labels, classes):\n", + " return np.eye(classes)[labels]\n", + "\n", + "transform = transforms.Compose(\n", + " [transforms.ToTensor(),\n", + " transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])\n", + "\n", + "trainset = torchvision.datasets.MNIST(root='./data', train=True,\n", + " download=True, transform=transform)\n", + "\n", + "train_images,train_labels = trainset.train_data, np.array(trainset.train_labels)\n", + "train_images = torch.from_numpy(np.expand_dims(train_images, axis=1)).float()\n", + "train_labels = one_hot(train_labels,10)\n", + "\n", + "validset = torchvision.datasets.MNIST(root='./data', train=False,\n", + " download=True, transform=transform)\n", + "\n", + "valid_images,valid_labels = validset.test_data, np.array(validset.test_labels)\n", + "valid_images = torch.from_numpy(np.expand_dims(valid_images, axis=1)).float()\n", + "#valid_labels = one_hot(valid_labels,10)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FedProx" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.utilities.optimizers.torch import FedProxOptimizer" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "feature_shape = train_images.shape[1]\n", + "classes = 10\n", + "\n", + "fl_data = FederatedDataSet(train_images,train_labels,valid_images,valid_labels,batch_size=32,num_classes=classes)\n", + "\n", + "class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(1, 16, 3)\n", + " self.pool = nn.MaxPool2d(2, 2)\n", + " self.conv2 = nn.Conv2d(16, 32, 3)\n", + " self.fc1 = nn.Linear(32 * 5 * 5, 32)\n", + " self.fc2 = nn.Linear(32, 84)\n", + " self.fc3 = nn.Linear(84, 10)\n", + "\n", + " def forward(self, x):\n", + " x = self.pool(F.relu(self.conv1(x)))\n", + " x = self.pool(F.relu(self.conv2(x)))\n", + " x = x.view(x.size(0),-1)\n", + " x = F.relu(self.fc1(x))\n", + " x = F.relu(self.fc2(x))\n", + " x = self.fc3(x)\n", + " return F.log_softmax(x, dim=1)\n", + " \n", + " def train_epoch(self, batch_generator):\n", + " from openfl.federated.task import PyTorchTaskRunner\n", + " self.optimizer.set_old_weights([p for p in self.parameters()])\n", + " return PyTorchTaskRunner.train_epoch(self, batch_generator)\n", + "\n", + " \n", + "optimizer = lambda x: FedProxOptimizer(x, lr=1e-3, mu=0.1)\n", + "\n", + "def cross_entropy(output, target):\n", + " \"\"\"Binary cross-entropy metric\n", + " \"\"\"\n", + " return F.binary_cross_entropy_with_logits(input=output,target=target.float())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "feature_shape = train_images.shape[1]\n", + "classes = 10\n", + "\n", + "fl_data = FederatedDataSet(train_images,train_labels,valid_images,valid_labels,batch_size=32,num_classes=classes)\n", + "\n", + "class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(1, 16, 3)\n", + " self.pool = nn.MaxPool2d(2, 2)\n", + " self.conv2 = nn.Conv2d(16, 32, 3)\n", + " self.fc1 = nn.Linear(32 * 5 * 5, 32)\n", + " self.fc2 = nn.Linear(32, 84)\n", + " self.fc3 = nn.Linear(84, 10)\n", + "\n", + " def forward(self, x):\n", + " x = self.pool(F.relu(self.conv1(x)))\n", + " x = self.pool(F.relu(self.conv2(x)))\n", + " x = x.view(x.size(0),-1)\n", + " x = F.relu(self.fc1(x))\n", + " x = F.relu(self.fc2(x))\n", + " x = self.fc3(x)\n", + " return F.log_softmax(x, dim=1)\n", + " \n", + " def train_epoch(self, batch_generator):\n", + " from openfl.federated.task import PyTorchTaskRunner\n", + " self.optimizer.set_old_weights([p for p in self.parameters()])\n", + " return PyTorchTaskRunner.train_epoch(self, batch_generator)\n", + "\n", + " \n", + "optimizer = lambda x: FedProxOptimizer(x, lr=1e-3, mu=0.1)\n", + "\n", + "def cross_entropy(output, target):\n", + " \"\"\"Binary cross-entropy metric\n", + " \"\"\"\n", + " return F.binary_cross_entropy_with_logits(input=output,target=target.float())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "#Create a federated model using the pytorch class, lambda optimizer function, and loss function\n", + "fl_model = FederatedModel(build_model=Net,optimizer=optimizer,loss_fn=cross_entropy,data_loader=fl_data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `FederatedModel` object is a wrapper around your Keras, Tensorflow or PyTorch model that makes it compatible with openfl. It provides built in federated training and validation functions that we will see used below. Using it's `setup` function, collaborator models and datasets can be automatically defined for the experiment. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "collaborator_models = fl_model.setup(num_collaborators=2)\n", + "collaborators = {'one':collaborator_models[0],'two':collaborator_models[1]}#, 'three':collaborator_models[2]}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Original MNIST dataset\n", + "print(f'Original training data size: {len(train_images)}')\n", + "print(f'Original validation data size: {len(valid_images)}\\n')\n", + "\n", + "print(f'{train_images.shape} {valid_images.shape} {train_labels.shape} {valid_labels.shape}\\n')\n", + "\n", + "#Collaborator one's data\n", + "print(f'Collaborator one\\'s training data size: {len(collaborator_models[0].data_loader.X_train)}')\n", + "print(f'Collaborator one\\'s validation data size: {len(collaborator_models[0].data_loader.X_valid)}\\n')\n", + "\n", + "#Collaborator two's data\n", + "print(f'Collaborator two\\'s training data size: {len(collaborator_models[1].data_loader.X_train)}')\n", + "print(f'Collaborator two\\'s validation data size: {len(collaborator_models[1].data_loader.X_valid)}\\n')\n", + "\n", + "#Collaborator three's data\n", + "#print(f'Collaborator three\\'s training data size: {len(collaborator_models[2].data_loader.X_train)}')\n", + "#print(f'Collaborator three\\'s validation data size: {len(collaborator_models[2].data_loader.X_valid)}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can see the current plan values by running the `fx.get_plan()` function" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + " #Get the current values of the plan. Each of these can be overridden\n", + "import json\n", + "print(json.dumps(fx.get_plan(), indent=4, sort_keys=True))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we are ready to run our experiment. If we want to pass in custom plan settings, we can easily do that with the `override_config` parameter" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Run experiment, return trained FederatedModel\n", + "final_fl_model = fx.run_experiment(\n", + " collaborators,\n", + " {\n", + " 'aggregator.settings.rounds_to_train': 5,\n", + " 'collaborator.settings.opt_treatment': 'CONTINUE_GLOBAL',\n", + " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Save final model\n", + "final_fl_model.save_native('final_pytorch_model')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FedProxAdam" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": { + "code_folding": [] + }, + "outputs": [], + "source": [ + "import numpy as np\n", + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "import torch.optim as optim\n", + "\n", + "import torchvision\n", + "import torchvision.transforms as transforms\n", + "import openfl.native as fx\n", + "from openfl.federated import FederatedModel,FederatedDataSet\n", + "import random\n", + "import warnings\n", + "warnings.filterwarnings('ignore')\n", + "\n", + "classes = 10\n", + "collaborator_num = 5\n", + "NUM_USER = collaborator_num\n", + "\n", + "def one_hot(labels, classes):\n", + " return np.eye(classes)[labels]\n", + "\n", + "\n", + "def softmax(x):\n", + " ex = np.exp(x)\n", + " sum_ex = np.sum(np.exp(x))\n", + " return ex/sum_ex\n", + "\n", + "\n", + "def generate_synthetic(alpha, beta, iid):\n", + "\n", + " dimension = 60\n", + " NUM_CLASS = 10\n", + "\n", + " samples_per_user = np.random.lognormal(4, 2, (NUM_USER)).astype(int) + 50\n", + " num_samples = np.sum(samples_per_user)\n", + "\n", + " X_split = [[] for _ in range(NUM_USER)]\n", + " y_split = [[] for _ in range(NUM_USER)]\n", + "\n", + " #### define some eprior ####\n", + " mean_W = np.random.normal(0, alpha, NUM_USER)\n", + " mean_b = mean_W\n", + " B = np.random.normal(0, beta, NUM_USER)\n", + " mean_x = np.zeros((NUM_USER, dimension))\n", + "\n", + " diagonal = np.zeros(dimension)\n", + " for j in range(dimension):\n", + " diagonal[j] = np.power((j+1), -1.2)\n", + " cov_x = np.diag(diagonal)\n", + "\n", + " for i in range(NUM_USER):\n", + " if iid == 1:\n", + " mean_x[i] = np.ones(dimension) * B[i] # all zeros\n", + " else:\n", + " mean_x[i] = np.random.normal(B[i], 1, dimension)\n", + "\n", + " if iid == 1:\n", + " W_global = np.random.normal(0, 1, (dimension, NUM_CLASS))\n", + " b_global = np.random.normal(0, 1, NUM_CLASS)\n", + "\n", + " for i in range(NUM_USER):\n", + "\n", + " W = np.random.normal(mean_W[i], 1, (dimension, NUM_CLASS))\n", + " b = np.random.normal(mean_b[i], 1, NUM_CLASS)\n", + "\n", + " if iid == 1:\n", + " W = W_global\n", + " b = b_global\n", + "\n", + " xx = np.random.multivariate_normal(\n", + " mean_x[i], cov_x, samples_per_user[i])\n", + " yy = np.zeros(samples_per_user[i])\n", + "\n", + " for j in range(samples_per_user[i]):\n", + " tmp = np.dot(xx[j], W) + b\n", + " yy[j] = np.argmax(softmax(tmp))\n", + "\n", + " X_split[i] = xx.tolist()\n", + " y_split[i] = yy.tolist()\n", + "\n", + "# print(\"{}-th users has {} exampls\".format(i, len(y_split[i])))\n", + "\n", + " return X_split, y_split\n", + "\n", + "\n", + "class SyntheticFederatedDataset(FederatedDataSet):\n", + " def __init__(self, batch_size=1, num_classes=None, **kwargs):\n", + " X, y = generate_synthetic(0.0, 0.0, 0)\n", + " X = [np.array([np.array(sample).astype(np.float32)\n", + " for sample in col]) for col in X]\n", + " y = [np.array([np.array(int(sample)) for sample in col]) for col in y]\n", + " self.X_train_all = [col[:int(0.9 * len(col))] for col in X]\n", + " self.X_valid_all = [col[int(0.9 * len(col)):] for col in X]\n", + " self.y_train_all = [col[:int(0.9 * len(col))] for col in y]\n", + " self.y_valid_all = [col[int(0.9 * len(col)):] for col in y]\n", + " super().__init__(self.X_train_all[0], self.y_train_all[0], self.X_valid_all[0],\n", + " self.y_valid_all[0], batch_size, num_classes)\n", + "\n", + " def split(self, num_collaborators, shuffle=True, equally=False):\n", + " return [\n", + " FederatedDataSet(\n", + " self.X_train_all[i],\n", + " self.y_train_all[i],\n", + " self.X_valid_all[i],\n", + " self.y_valid_all[i],\n", + " batch_size=self.batch_size,\n", + " num_classes=self.num_classes\n", + " ) for i in range(num_collaborators)\n", + " ]\n", + "\n", + "import importlib\n", + "importlib.invalidate_caches()\n", + "import openfl.utilities.optimizers.torch\n", + "importlib.reload(openfl.utilities.optimizers.torch) \n", + "from openfl.utilities.optimizers.torch import FedProxAdam \n", + "\n", + "class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.linear1 = nn.Linear(60, 100)\n", + " self.linear2 = nn.Linear(100, 10)\n", + "\n", + " def forward(self, x):\n", + " x = self.linear1(x)\n", + " x = self.linear2(x)\n", + " return x\n", + "\n", + " def train_epoch(self, batch_generator):\n", + " from openfl.federated.task import PyTorchTaskRunner\n", + " self.optimizer.set_old_weights(\n", + " [p.clone().detach() for p in self.parameters()])\n", + " return PyTorchTaskRunner.train_epoch(self, batch_generator)\n", + "\n", + "\n", + "def optimizer(x): return FedProxAdam(x, lr=1e-3, mu=0.01)\n", + "# optimizer = lambda x: torch.optim.Adam(x, lr=1e-3)\n", + "\n", + "\n", + "def cross_entropy(output, target):\n", + " \"\"\"Binary cross-entropy metric\n", + " \"\"\"\n", + " return F.cross_entropy(output, target)\n", + "# return F.binary_cross_entropy_with_logits(input=output,target=target.float())\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fl_data = SyntheticFederatedDataset(batch_size=32, num_classes=classes)\n", + "#Create a federated model using the pytorch class, lambda optimizer function, and loss function\n", + "fl_model = FederatedModel(build_model=Net,optimizer=optimizer,loss_fn=cross_entropy,data_loader=fl_data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `FederatedModel` object is a wrapper around your Keras, Tensorflow or PyTorch model that makes it compatible with openfl. It provides built in federated training and validation functions that we will see used below. Using it's `setup` function, collaborator models and datasets can be automatically defined for the experiment. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "collaborator_models = fl_model.setup(num_collaborators=collaborator_num,device='cpu')\n", + "collaborators = {f'col{i}':collaborator_models[i] for i in range(collaborator_num)}#, 'three':collaborator_models[2]}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "collaborator = 4\n", + "a = np.argmax(collaborators[f'col{collaborator}'].data_loader.y_valid)\n", + "import matplotlib.pyplot as plt\n", + "plt.hist(a)\n", + "collaborator_models[collaborator].data_loader.y_valid.shape" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can see the current plan values by running the `fx.get_plan()` function" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we are ready to run our experiment. If we want to pass in custom plan settings, we can easily do that with the `override_config` parameter" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Run experiment, return trained FederatedModel\n", + "final_fl_model = fx.run_experiment(\n", + " collaborators,\n", + " {\n", + " 'aggregator.settings.rounds_to_train': 20,\n", + " 'collaborator.settings.opt_treatment': 'CONTINUE_GLOBAL',\n", + " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Save final model\n", + "final_fl_model.save_native('final_pytorch_model')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}