-
Notifications
You must be signed in to change notification settings - Fork 6
/
data_loaders.py
207 lines (174 loc) · 7.9 KB
/
data_loaders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
from __future__ import print_function
import numpy as np
import random
import math
import os
import torch
import torchvision
import torch.utils.data
def get_shapes(dataset):
"""
Get the input and output shapes of the data examples for each dataset used.
dataset: name of the dataset used
"""
if dataset == 'HAR':
num_inputs = 561
num_outputs = 6
num_labels = 6
else:
raise NotImplementedError
return num_inputs, num_outputs, num_labels
def load_data(dataset, seed):
"""
Load the dataset from the drive.
The har datasets need to be downloaded first with the provided scripts in /data.
dataset: name of the dataset
seed: seed for randomness
"""
def seed_worker(worker_id):
worker_seed = torch.initial_seed() % 2 ** 32
np.random.seed(worker_seed)
random.seed(worker_seed)
g = torch.Generator()
g.manual_seed(seed)
if dataset == 'HAR':
train_dir = os.path.join("data", "HAR", "train", "")
test_dir = os.path.join("data", "HAR", "test", "")
file = open(train_dir + "X_train.txt", 'r')
X_train = np.array([elem for elem in [row.replace(' ', ' ').strip().split(' ') for row in file]], dtype=np.float32)
file.close()
file = open(train_dir + "y_train.txt", 'r')
# Read dataset from disk, dealing with text file's syntax
y_train = np.array([elem for elem in [row.replace(' ', ' ').strip().split(' ')[0] for row in file]], dtype=np.int32)
file.close()
file = open(test_dir + "X_test.txt", 'r')
X_test = np.array([elem for elem in [row.replace(' ', ' ').strip().split(' ') for row in file]], dtype=np.float32)
file.close()
file = open(test_dir + "y_test.txt", 'r')
# Read dataset from disk, dealing with text file's syntax
y_test = np.array([elem for elem in [row.replace(' ', ' ').strip().split(' ')[0] for row in file]], dtype=np.int32)
file.close()
# Loading which datapoint belongs to which client
file = open(train_dir + "subject_train.txt", 'r')
train_clients = np.array([elem for elem in [row.replace(' ', ' ').strip().split(' ')[0] for row in file]], dtype=np.int32)
file.close()
file = open(test_dir + "subject_test.txt", 'r')
test_clients = np.array([elem for elem in [row.replace(' ', ' ').strip().split(' ')[0] for row in file]], dtype=np.int32)
file.close()
X = np.concatenate((X_train, X_test))
y = np.concatenate((y_train, y_test))
y_train, y_test, X_train, X_test = [], [], [], []
clients = np.concatenate((train_clients, test_clients))
for client in range(1, 31):
mask = tuple([clients == client])
x_client = X[mask]
y_client = y[mask]
split = np.concatenate((np.ones(int(np.ceil(0.75*len(y_client))), dtype=bool), np.zeros(int(np.floor(0.25*len(y_client))), dtype=bool)))
np.random.shuffle(split) # Generate mask for train test split with ~0.75 1
x_train_client = x_client[split]
y_train_client = y_client[split]
x_test_client = x_client[np.invert(split)]
y_test_client = y_client[np.invert(split)]
# Attach vector of client id to training data for data assignment in assign_data()
x_train_client = np.insert(x_train_client, 0, client, axis=1)
if len(X_train) == 0:
X_train = x_train_client
X_test = x_test_client
y_test = y_test_client
y_train = y_train_client
else:
X_train = np.append(X_train, x_train_client, axis=0)
X_test = np.append(X_test, x_test_client, axis=0)
y_test = np.append(y_test, y_test_client)
y_train = np.append(y_train, y_train_client)
tensor_train_X = torch.tensor(X_train, dtype=torch.float32)
tensor_test_X = torch.tensor(X_test, dtype=torch.float32)
tensor_train_y = torch.tensor(y_train, dtype=torch.int64) - 1
tensor_test_y = torch.tensor(y_test, dtype=torch.int64) - 1
train_dataset = torch.utils.data.TensorDataset(tensor_train_X, tensor_train_y)
test_dataset = torch.utils.data.TensorDataset(tensor_test_X, tensor_test_y)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=100, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False)
else:
raise NotImplementedError
return train_loader, test_loader
def assign_data(train_data, bias, device, num_labels=10, num_workers=100, server_pc=100, p=0.1, dataset="HAR", seed=1):
"""
Assign the data to the clients.
train_data: dataloader of the training dataset
bias: degree of non-iid between the classes loaded by each client
device: device used in training and inference
num_labels: number of classes
num_workers: number of benign and malicious clients used during training
server_pc: number of data examples in the server dataset
p: bias probability in server dataset
dataset: name of the dataset
seed: seed for randomness
"""
other_group_size = (1 - bias) / (num_labels - 1)
if dataset == "HAR":
worker_per_group = 30 / num_labels
else:
raise NotImplementedError
# assign training data to each worker
if dataset == "HAR":
each_worker_data = [[] for _ in range(30)]
each_worker_label = [[] for _ in range(30)]
else:
raise NotImplementedError
server_data = []
server_label = []
samp_dis = [0 for _ in range(num_labels)]
num1 = int(server_pc * p)
samp_dis[1] = num1
average_num = (server_pc - num1) / (num_labels - 1)
resid = average_num - np.floor(average_num)
sum_res = 0.
for other_num in range(num_labels - 1):
if other_num == 1:
continue
samp_dis[other_num] = int(average_num)
sum_res += resid
if sum_res >= 1.0:
samp_dis[other_num] += 1
sum_res -= 1
samp_dis[num_labels - 1] = server_pc - np.sum(samp_dis[:num_labels - 1])
server_counter = [0 for _ in range(num_labels)]
# compute the labels needed for each class
if dataset == "HAR":
for _, (data, label) in enumerate(train_data):
data = data.to(device)
label = label.to(device)
for (x, y) in zip(data, label):
clientId = int(x[0].item())-1
x = x[1:len(x)]
x = x.reshape(1, 561)
# Assign x and y to appropriate client or server based on method by original code
if server_counter[int(y.cpu().numpy())] < samp_dis[int(y.cpu().numpy())]:
server_data.append(x)
server_label.append(y)
server_counter[int(y.cpu().numpy())] += 1
else:
each_worker_data[clientId].append(x)
each_worker_label[clientId].append(y)
else:
raise NotImplementedError
if server_pc != 0:
server_data = torch.cat(server_data, dim=0)
server_label = torch.stack(server_label, dim=0)
else:
if dataset == "HAR":
server_data = torch.empty(size=(0, 561)).to(device)
else:
raise NotImplementedError
server_label = torch.empty(size=(0, )).to(device)
each_worker_data = [torch.cat(each_worker, dim=0) for each_worker in each_worker_data]
each_worker_label = [torch.stack(each_worker, dim=0) for each_worker in each_worker_label]
# randomly permute the workers
if dataset == "HAR":
random_order = np.random.RandomState(seed=seed).permutation(30)
else:
raise NotImplementedError
each_worker_data = [each_worker_data[i] for i in random_order]
each_worker_label = [each_worker_label[i] for i in random_order]
return server_data, server_label, each_worker_data, each_worker_label