import numpy as np
+import pandas as pd
+from sklearn.datasets import make_circles, make_blobs
+from sklearn.preprocessing import StandardScaler
+from sklearn.model_selection import train_test_split
+
+import torch
+import torch.nn as nn
+import torch.optim as optim
+from torch.utils.data import DataLoader, TensorDataset
+from torch.optim import SGD, Adam
+
+from ipywidgets import interact, IntSlider, FloatSlider, Dropdown, Checkbox
+import plotly.graph_objects as go
+import plotly.express as px
+
Binary Classification Task on Circles Dataset¶
Step 0: Getting the Data¶
First, let's generate some data. We will use Scikit-learn's make_circles
to create a dataset that isn't linearly separable.
The dataset consists of two concentric circles where the color of each point represents which class it belongs to.
+from sklearn.datasets import make_circles
+X, y = make_circles(n_samples=1000, noise=0.2, factor=0.5, random_state=42)
+data_fig = go.FigureWidget()
+data_fig.add_trace(go.Scatter(x=X[y == 0, 0], y=X[y == 0, 1],
+ mode='markers', marker=dict(color='red'),
+ name='0'))
+data_fig.add_trace(go.Scatter(x=X[y == 1, 0], y=X[y == 1, 1],
+ mode='markers', marker=dict(color='blue'),
+ name='1'))
+data_fig.update_layout(width=500, height=500,
+ xaxis_range=[-2, 2], yaxis_range=[-2, 2],
+ xaxis_title='Feature 1',
+ yaxis_title='Feature 2',
+ title='make_circles Dataset')
+data_fig.show()
+print("The first 5 training datapoints:", X[:5])
+print("The labels for the first 5 datapoints:", y[:5])
+
The first 5 training datapoints: [[ 0.36229708 0.28247097] + [-0.27207715 0.23564621] + [-0.64072517 0.54943623] + [-0.56693828 0.24588771] + [ 0.47106162 -0.88152647]] +The labels for the first 5 datapoints: [1 1 1 1 0] ++
Convert the data into PyTorch Tensor format and split it into training and test sets.
+def make_tensors(X, y):
+ from torch.utils.data import random_split, TensorDataset
+ data = TensorDataset(torch.tensor(X, dtype=torch.float32),
+ torch.tensor(y, dtype=torch.long))
+ torch.manual_seed(140)
+ train_data, test_data = random_split(data, [0.8, 0.2])
+ return train_data, test_data
+
+training_data, test_data = make_tensors(X, y)
+
Step 1: Defining the Model¶
+# Logistic Regression Model
+class LogisticRegressionModel(nn.Module):
+ def __init__(self):
+ super(LogisticRegressionModel, self).__init__()
+ self.linear = nn.Linear(2, 1)
+
+ def forward(self, x):
+ p = torch.sigmoid(self.linear(x))
+ return torch.cat([1 - p, p], dim=1)
+
Step 2: Define the Loss¶
Here we are using cross entropy loss since we made the model return a probability for each class.
+loss_fn = nn.CrossEntropyLoss()
+
Step 3: Optimize the Loss¶
+def plot_decision_boundary_pytorch(model, num_points=100, probs=True):
+ # Generate a grid of points
+ xx, yy = torch.meshgrid(torch.linspace(-4, 4, num_points),
+ torch.linspace(-4, 4, num_points),
+ indexing='ij')
+ grid = torch.cat([xx.reshape(-1, 1), yy.reshape(-1, 1)], dim=1)
+ with torch.no_grad():
+ preds = model(grid)
+ num_classes = preds.shape[1]
+ if num_classes > 2: # support for multiclass
+ preds = torch.argmax(preds, axis=1).reshape(xx.shape).T
+ return go.Contour(x=xx[:, 0], y=yy[0], z=preds,
+ colorscale=[px.colors.qualitative.Plotly[i] for i in range(num_classes)],
+ opacity = 0.5, showscale=False)
+ else: # Binary classification case (red/blue)
+ if probs:
+ preds = preds[:,1].reshape(xx.shape).T
+ else:
+ preds = (preds[:, 1] > 0.5).float().reshape(xx.shape).T
+ return go.Contour(x=xx[:, 0], y=yy[0], z=preds,
+ colorscale=[[0, 'red'], [1, 'blue']],
+ opacity = 0.5, showscale=False)
+
def optimize_model(train_dataset,
+ test_dataset,
+ model, loss_fn,
+ pred_fig, loss_fig,
+ batch_size=64,
+ learning_rate = 0.01,
+ nepochs=50,
+ sleep_time=0.2):
+ import time
+
+ from torch.utils.data import DataLoader
+ # Create a dataloader for training
+ train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
+ test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
+
+ # Define the optimizer (this is the update rule)
+ optimizer = optim.Adam(model.parameters(), learning_rate)
+
+ test_loss_curve = []
+ for epoch in range(nepochs):
+ # Loop through all the batches
+ for batch, (X, y) in enumerate(train_loader):
+ # Zero the gradients to start the next step
+ optimizer.zero_grad()
+ # Compute prediction and loss
+ pred = model(X)
+ loss = loss_fn(pred, y)
+ # Backpropagation (compute the gradient)
+ loss.backward()
+ # Update the parameters using the optimizer's update rule
+ optimizer.step()
+
+
+ # Evaluate the model on the test data
+ # In practice, we often do this in batches too since the data is too big to fit in memory
+ with torch.no_grad():
+ test_loss_sum = 0.0
+ for X_test, y_test in test_loader:
+ test_pred = model(X_test)
+ test_loss = loss_fn(test_pred, y_test)
+ test_loss_sum += test_loss.item()
+ num_test_batches = len(test_loader)
+ test_loss_curve.append(test_loss_sum/num_test_batches)
+
+ # Visualization Code
+ boundary = plot_decision_boundary_pytorch(model, probs=True)
+ pred_fig.data[-1].z = boundary.z
+ loss_fig.data[0].x = np.arange(epoch+1)
+ loss_fig.data[0].y = test_loss_curve
+ if(sleep_time > 0):
+ time.sleep(sleep_time)
+
from ipywidgets import HBox
+pred_fig = go.FigureWidget(data=data_fig.data, layout=data_fig.layout)
+loss_fig = go.FigureWidget()
+loss_fig.add_trace(go.Scatter(x=[], y=[], mode='lines', name='Train Loss'))
+loss_fig.update_layout(title='Test Loss', xaxis_title='Epochs', yaxis_title='Test Loss (BCE)')
+model = LogisticRegressionModel()
+boundary = plot_decision_boundary_pytorch(model, probs=True)
+pred_fig.add_trace(boundary)
+display(HBox([pred_fig,loss_fig]))
+optimize_model(training_data, test_data, model, loss_fn, pred_fig, loss_fig, nepochs=50)
+
HBox(children=(FigureWidget({ + 'data': [{'marker': {'color': 'red'}, + 'mode': 'markers', + …+
Go back to Step 1: Building a neural network¶
It's apparent that our linear decision boundary won't cut it for this data. +Let's try to build a simple neural network that can distinguish between these two classes.
+class NeuralNetworkModel1(nn.Module):
+ def __init__(self):
+ super().__init__()
+ self.hidden1 = nn.Linear(2, 8)
+ self.hidden2 = nn.Linear(8, 8)
+ self.output = nn.Linear(8, 1)
+
+ def forward(self, x):
+ x = torch.tanh(self.hidden1(x))
+ x = torch.tanh(self.hidden2(x))
+ p = torch.sigmoid(self.output(x))
+ return torch.cat([1 - p, p], dim=1)
+
How we initialize the weights can have a big impact on how well our model trains. With logistic regression we can set all the weights to zero and be fine. With a neural network we cannot. Why? However, small starting weights are generally a good idea. Xavier initialization is a popular method for initializing weights in a neural network. It sets the weights to be normally distributed with mean 0 and variance 2/(number of input units + number of output units). PyTorch has a function for this in torch.nn.init
.
def initialize_weights(model):
+ for layer in model.children():
+ if isinstance(layer, nn.Linear):
+ nn.init.xavier_uniform_(layer.weight)
+ nn.init.zeros_(layer.bias)
+
model = NeuralNetworkModel1()
+initialize_weights(model)
+
Let's try running this model with the exact same optimization and visualization code we used before:
+from ipywidgets import HBox
+
+model = NeuralNetworkModel1()
+initialize_weights(model)
+
+pred_fig = go.FigureWidget(data=data_fig.data, layout=data_fig.layout)
+loss_fig = go.FigureWidget()
+loss_fig.add_trace(go.Scatter(x=[], y=[], mode='lines', name='Train Loss'))
+loss_fig.update_layout(title='Test Loss', xaxis_title='Epochs', yaxis_title='Test Loss (BCE)')
+boundary = plot_decision_boundary_pytorch(model, probs=True)
+pred_fig.add_trace(boundary)
+display(HBox([pred_fig,loss_fig]))
+optimize_model(training_data, test_data, model, loss_fn, pred_fig, loss_fig,
+ batch_size=16, learning_rate =0.001, nepochs=100, sleep_time=0)
+
HBox(children=(FigureWidget({ + 'data': [{'marker': {'color': 'red'}, + 'mode': 'markers', + …+
Step 4: A Custom Design¶
+# Function to dynamically create a neural network
+class CustomNeuralNetwork(nn.Module):
+ def __init__(self, input_size, output_size, layers, activation_functions):
+ super().__init__()
+ self.layers = nn.ModuleList()
+ self.activations = []
+
+ # Create the layers dynamically
+ current_size = input_size
+ for i, layer_size in enumerate(layers):
+ self.layers.append(nn.Linear(current_size, layer_size))
+ self.activations.append(activation_functions[i])
+ current_size = layer_size
+
+ # Add the final output layer
+ self.layers.append(nn.Linear(current_size, output_size))
+ self.activations.append('sigmoid') # Output activation is sigmoid for binary classification
+
+ def forward(self, x):
+ for i, layer in enumerate(self.layers[:-1]): # Apply activation for all but the last layer
+ x = layer(x)
+ x = self.apply_activation(x, self.activations[i])
+ x = self.layers[-1](x) # Final output layer
+ p = torch.sigmoid(x) # Apply sigmoid activation for output
+ return torch.cat([1 - p, p], dim=1)
+
+ @staticmethod
+ def apply_activation(x, activation):
+ if activation == 'relu':
+ return torch.relu(x)
+ elif activation == 'tanh':
+ return torch.tanh(x)
+ elif activation == 'sigmoid':
+ return torch.sigmoid(x)
+ else:
+ return x
+
from ipywidgets import HBox
+
+model = NeuralNetworkModel1()
+initialize_weights(model)
+
+pred_fig = go.FigureWidget(data=data_fig.data, layout=data_fig.layout)
+loss_fig = go.FigureWidget()
+loss_fig.add_trace(go.Scatter(x=[], y=[], mode='lines', name='Train Loss'))
+loss_fig.update_layout(title='Test Loss', xaxis_title='Epochs', yaxis_title='Test Loss (BCE)')
+boundary = plot_decision_boundary_pytorch(model, probs=True)
+pred_fig.add_trace(boundary)
+display(HBox([pred_fig,loss_fig]))
+
+@interact(n_layers=IntSlider(min=1, max=5, step=1, value=2, description="Layers"),
+ neurons_per_layer=IntSlider(min=4, max=64, step=4, value=8, description="Neurons/Layer"),
+ activation_fn=Dropdown(options=['relu', 'tanh', 'sigmoid'], value='tanh', description="Activation"),
+ learning_rate=FloatSlider(min=0.001, max=0.1, step=0.001, value=0.01, description="Learning Rate"),
+ batch_size=IntSlider(min=1, max=128, step=8, value=32, description="Batch Size"),
+ epochs=IntSlider(min=10, max=200, step=10, value=10, description="Epochs"))
+def update_model(n_layers, neurons_per_layer, activation_fn, learning_rate, batch_size, epochs):
+ # setup the model
+ layers = [neurons_per_layer] * n_layers
+ activation_functions = [activation_fn] * n_layers
+ model = CustomNeuralNetwork(input_size=2, output_size=1, layers=layers, activation_functions=activation_functions)
+ initialize_weights(model)
+ optimize_model(training_data, test_data, model, loss_fn, pred_fig, loss_fig,
+ learning_rate=learning_rate,
+ batch_size=batch_size, nepochs=epochs, sleep_time=0)
+
HBox(children=(FigureWidget({ + 'data': [{'marker': {'color': 'red'}, + 'mode': 'markers', + …+
interactive(children=(IntSlider(value=2, description='Layers', max=5, min=1), IntSlider(value=8, description='…+
# Generate and normalize the dataset
+def generate_blobs(n_samples, centers, std):
+ from sklearn.datasets import make_blobs
+ from sklearn.preprocessing import StandardScaler
+ X, y = make_blobs(n_samples=n_samples, centers=centers, cluster_std=std, random_state=42)
+ scaler = StandardScaler()
+ X = scaler.fit_transform(X)
+ return X, y
+
X, y = generate_blobs(n_samples=100, centers=3, std=1)
+blob_data_fig = go.FigureWidget()
+blob_data_fig.update_layout(width=500, height=500,
+ xaxis_range=[-3, 3], yaxis_range=[-3, 3],
+ xaxis_title='Feature 1',
+ yaxis_title='Feature 2',
+ title='make_blobs Dataset')
+for i in np.unique(y):
+ blob_data_fig.add_trace(go.Scatter(x=X[y == i, 0], y=X[y == i, 1],
+ mode='markers', marker=dict(color=px.colors.qualitative.Plotly[i]),
+ name=str(i)))
+
+blob_data_fig.show()
+
Step 1: Defining the Model¶
+# Define the Neural Network with dynamic output size
+class BlobNN(nn.Module):
+ def __init__(self, input_dim, num_classes, hidden_dim = 16):
+ super().__init__()
+ self.hidden = nn.Linear(input_dim, hidden_dim)
+ self.output = nn.Linear(hidden_dim, num_classes)
+
+ def forward(self, x):
+ x = torch.relu(self.hidden(x))
+ logits = self.output(x)
+ return torch.softmax(logits, dim=1)
+
Step 2: Training the Model and Visualize¶
+model = BlobNN(2, 3)
+initialize_weights(model)
+
+blob_pred_fig = go.FigureWidget(data=blob_data_fig.data, layout=blob_data_fig.layout)
+blob_loss_fig = go.FigureWidget()
+blob_loss_fig.add_trace(go.Scatter(x=[], y=[], mode='lines', name='Train Loss'))
+blob_loss_fig.update_layout(title='Test Loss', xaxis_title='Epochs', yaxis_title='Test Loss (CE)')
+boundary = plot_decision_boundary_pytorch(model)
+blob_pred_fig.add_trace(boundary)
+display(HBox([blob_pred_fig, blob_loss_fig]))
+
+@interact(n_samples=IntSlider(min=100, max=1000, step=100, value=500, description="Samples"),
+ centers=IntSlider(min=2, max=5, step=1, value=3, description="Centers"),
+ std=FloatSlider(min=0.5, max=5.0, step=0.5, value=1.0, description="Std Dev"),
+ epochs=IntSlider(min=10, max=200, step=2, value=20, description="Epochs"),
+ learning_rate=FloatSlider(min=0.001, max=0.1, step=0.001, value=0.01, description="Learning Rate"))
+def update_model2(n_samples, centers, std, epochs, learning_rate):
+ X, y = generate_blobs(n_samples=n_samples, centers=centers, std=std)
+ blob_pred_fig.data = []
+ for i in np.unique(y):
+ blob_pred_fig.add_trace(go.Scatter(x=X[y == i, 0], y=X[y == i, 1],
+ mode='markers', marker=dict(color=px.colors.qualitative.Plotly[i]),
+ name=str(i)))
+ train_data, test_data = make_tensors(X, y)
+ model = BlobNN(2, centers)
+ initialize_weights(model)
+ boundary = plot_decision_boundary_pytorch(model)
+ blob_pred_fig.add_trace(boundary)
+ optimize_model(train_data, test_data, model, loss_fn, blob_pred_fig, blob_loss_fig,
+ nepochs=epochs, learning_rate=learning_rate, sleep_time=0)
+
+
HBox(children=(FigureWidget({ + 'data': [{'marker': {'color': '#636EFA'}, + 'mode': 'markers', + …+
interactive(children=(IntSlider(value=500, description='Samples', max=1000, min=100, step=100), IntSlider(valu…+
+