-
Notifications
You must be signed in to change notification settings - Fork 0
/
seed_kmeans.py
331 lines (271 loc) · 12.4 KB
/
seed_kmeans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import copy
import matplotlib.pyplot as plt
import numpy as np
import torch
class ConstrainedSeedKMeans:
"""Constrained seed KMeans algorithm proposed by Basu et al. in 2002."""
def __init__(self,
n_clusters=2,
*,
n_init=10,
max_iter=300,
tol=0.0001,
verbose=False,
invalide_label=-1):
"""Initialization a constrained seed kmeans estimator.
Args:
n_clusters: The number of clusters.
n_init: The number of times the algorithm will run in order to choose
the best result.
max_iter: The maximum number of iterations the algorithm will run.
tol: The convergence threshold of the algorithm. If the norm of a
matrix, which is the difference between two consective cluster
centers, is less than this threshold, we think the algorithm converges.
verbose: Whether to print intermediate results to console.
invalide_label: Spicial sign to indicate which samples are unlabeled.
If the y value of a sample equals to this value, then that sample
is a unlabeled one.
"""
self.n_clusters = n_clusters
self.n_init = n_init
self.max_iter = max_iter
self.tol = tol
self.verbose = verbose
self.INVALID_LABEL = invalide_label
def _check_params(self, X, y):
"""Check if the parameters of the algorithm and the inputs to it are valid."""
if type(X) not in (np.ndarray, torch.Tensor):
raise TypeError(f"Type of X can only take numpy.ndarray and "
f"torch.Tensor, but got {type(X)} instead.")
if type(y) not in (list, np.ndarray, torch.Tensor):
raise TypeError(f"Type of y can only take list, numpy.ndarray, and"
f"torch.Tensor, but got{type(y)} instead.")
if self.n_clusters > X.shape[0]:
raise ValueError(f"The number of clusters mube be less than the "
f"number of samples.")
if self.max_iter <= 0:
raise ValueError(f"The number of maximum iteration must larger than zero.")
def _init_centroids(self, X, y):
"""Initialize cluster centers with little samples having label."""
if type(y) == np.ndarray:
pkg = np
elif type(y) == torch.Tensor:
pkg = torch
elif type(y) == list and type(X) == np.ndarray:
y = np.array(y)
pkg = np
elif type(y) == list and type(X) == torch.Tensor:
y = torch.Tensor(y)
pkg = torch
else:
raise TypeError('Data type is not supported, please check it again.')
y_unique = pkg.unique(y)
if self.INVALID_LABEL in y_unique:
n_seed_centroids = len(y_unique) - 1
else:
n_seed_centroids = len(y_unique)
assert n_seed_centroids <= self.n_clusters, f"The number of seed centroids" \
f"should be less than the total" \
f"number of clusters."
centers = pkg.empty((self.n_clusters, X.shape[1]), dtype=X.dtype)
# First, initialize seed centers using samples with label
for i in range(n_seed_centroids):
seed_samples = X[y == i]
centers[i] = seed_samples.mean(axis=0)
# Then, initilize the remaining centers with random samples from X
unlabel_idxes = pkg.where(y == self.INVALID_LABEL)[0] # np.where returns a tuple
# for i in range(n_seed_centroids, self.n_clusters):
# idx = np.random.choice(unlabel_idxes, 1, replace=False)
# centers[i] = X[idx]
# If all samples are labeled, kmeans algorithm may not updates itself, moreover, there is no need for clustering
if len(unlabel_idxes) == 0:
raise ValueError("All samples are labeled! No need for clustering!")
if len(unlabel_idxes) < self.n_clusters - n_seed_centroids:
# In this case, we randomly select (self.n_clusters - n_seed_centroids) different data points from the whole dataset
idx = np.random.randint(X.shape[0], size=self.n_clusters - n_seed_centroids)
print('index', idx)
for i in range(n_seed_centroids, self.n_clusters):
centers[i] = X[idx[i - n_seed_centroids]]
else:
for i in range(n_seed_centroids, self.n_clusters):
idx = np.random.choice(unlabel_idxes, 1, replace=False)
centers[i] = X[idx]
return centers, n_seed_centroids
def _kmeans(self, X, y, init_centers):
"""KMeans algorithm implementation."""
indices = copy.copy(y)
if type(indices) == list:
indices = np.array(indices)
n_samples, n_features = X.shape[0], X.shape[1]
cur_centers = init_centers
new_centers = copy.deepcopy(init_centers)
# Main loop
for iter_ in range(self.max_iter):
# Fist step in KMeans: calculate the closest centroid for each sample
for i in range(n_samples):
# If this sample has label, then we use the ground-truth label
# as its cluster index
if y[i] != self.INVALID_LABEL:
continue
if type(X) == np.ndarray:
min_idx = np.linalg.norm(cur_centers - X[i], axis=1).argmin()
else:
min_idx = torch.norm(cur_centers - X[i], dim=1).argmin()
indices[i] = min_idx
# Second step in KMeans: update each centroids
for i in range(self.n_clusters):
cluster_samples = X[indices == i]
# In the case that the cluster is empty, randomly choose
# a sample from X.
if cluster_samples.shape[0] == 0:
new_centers[i] = X[np.random.choice(n_samples, 1, replace=False)]
else:
new_centers[i] = cluster_samples.mean(axis=0)
# Calculate inertial at current iteration
inertia = 0
for i in range(self.n_clusters):
if type(X) == np.ndarray:
inertia += np.linalg.norm(X[indices == i] - new_centers[i], axis=1).sum()
else:
inertia += torch.norm(X[indices == i] - new_centers[i], dim=1).sum().item()
if self.verbose:
print('Iteration {}, inertia: {}'.format(iter_, inertia))
# Check if KMeans converges
if type(X) == np.ndarray:
difference = np.linalg.norm(new_centers - cur_centers, ord='fro')
else:
difference = torch.norm(new_centers - cur_centers, p='fro')
if difference < self.tol:
if self.verbose:
print('Converged at iteration {}.\n'.format(iter_))
break
# ATTENSION: Avoid using direct assignment like cur_centers = new_centers
# This will cause cur_centers and new_cneters to point at the same
# object in the memory. To fix this, you must create a new object.
cur_centers = copy.deepcopy(new_centers)
return new_centers, indices, inertia
def fit(self, X, y):
"""Using features and little labels to do clustering.
Args:
X: numpy.ndarray or torch.Tensor with shape (n_samples, n_features)
y: List or numpy.ndarray, or torch.Tensor with shape (n_samples,).
For index i, if y[i] equals to self.INVALID_LABEL, then X[i] is
an unlabels sample.
Returns:
self: The estimator itself.
"""
self._check_params(X, y)
_, n_seed_centroids = self._init_centroids(X, y)
if n_seed_centroids == self.n_clusters:
self.n_init = 1
# run constrained seed KMeans n_init times in order to choose the best one
best_inertia = None
best_centers, best_indices = None, None
for i in range(self.n_init):
init_centers, _ = self._init_centroids(X, y)
if self.verbose:
print('Initialization complete')
new_centers, indices, new_inertia = self._kmeans(X, y, init_centers)
if best_inertia is None or new_inertia < best_inertia:
best_inertia = new_inertia
best_centers = new_centers
best_indices = indices
self.inertia_ = best_inertia
self.cluster_centers_ = best_centers
self.indices = best_indices
return self
def predict(self, X):
"""Predict the associated cluster index of samples.
Args:
X: numpy.ndarray or torch.Tensor with shape (n_samples, n_features).
Returns:
indices: The associated cluster index of each sample, with shape
(n_samples,)
"""
n_samples = X.shape[0]
indices = [-1 for _ in range(n_samples)]
for i in range(n_samples):
if type(X) == np.ndarray:
min_idx = np.linalg.norm(self.cluster_centers_ - X[i], axis=1).argmin()
else:
min_idx = torch.norm(self.cluster_centers_ - X[i], dim=1).argmin()
indices[i] = min_idx
if type(X) == np.ndarray:
return np.array(indices)
else:
return torch.tensor(indices)
def fit_predict(self, X, y):
"""Convenient function."""
return self.fit(X, y).predict(X)
def transform(self, X):
"""Transform the input to the centorid space.
Args:
X: numpy.ndarray or torch.Tensor with shape (n_samples, n_features).
Returns:
output: With shape (n_samples, n_clusters)
"""
if type(X) == np.ndarray:
pkg = np
else:
pkg = torch
n_samples = X.shape[0]
output = pkg.empty((n_samples, self.n_clusters), dtype=X.dtype)
for i in range(n_samples):
if type(X) == np.ndarray:
output[i] = np.linalg.norm(self.cluster_centers_ - X[i], axis=1)
else:
output[i] = torch.norm(self.cluster_centers_ - X[i], dim=1)
return output
def fit_transform(self, X, y):
"""Convenient function"""
return self.fit(X, y).transform(X)
def score(self, X):
"""Opposite of the value of X on the K-means objective."""
interia = 0
n_samples = X.shape[0]
for i in range(n_samples):
if type(X) == np.ndarray:
interia += np.linalg.norm(self.cluster_centers_ - X[i], axis=1).min()
else:
interia += torch.norm(self.cluster_centers_ - X[i], dim=1).min().item()
return -1 * interia
def plot(X, estimator, name):
import pandas as pd
import seaborn as sns
df = pd.DataFrame()
df['dim1'] = X[:, 0]
df['dim2'] = X[:, 1]
if name == 'sklearn_kmeans':
df['y'] = estimator.labels_
else:
df['y'] = estimator.indices
plt.close()
plt.xlim(0.1, 0.9)
plt.ylim(0.0, 0.8)
sns.scatterplot(x='dim1', y='dim2', hue=df.y.tolist(),
palette=sns.color_palette('hls', 3), data=df)
plt.savefig('./figures/{}.pdf'.format(str.upper(name)))
if __name__ == '__main__':
# Load watermelon-4.0 dataset from book Machine Learning by Zhihua Zhou
dataset = np.genfromtxt('./watermelon_4.0.txt', delimiter=',')
X = dataset[:, 1:] # the first column are IDs
y = [-1 for _ in range(X.shape[0])] # by default, all samples has no label
# 1. KMeans with no additional labels
# reference: Machine Learning book, chapter 9.4
kmeans = ConstrainedSeedKMeans(n_clusters=3, n_init=10, verbose=False)
kmeans.fit(X, y)
indices = kmeans.indices
plot(X, kmeans, name='kmeans')
# 2. Constrained seed KMeans with little labels
# reference: Machine Learning book ,chapter 13.6
seed_kmeans = ConstrainedSeedKMeans(n_clusters=3, n_init=10, verbose=False)
y[3], y[24] = 0, 0
y[11], y[19] = 1, 1
y[13], y[16] = 2, 2
seed_kmeans.fit(X, y)
plot(X, seed_kmeans, name='seed_kmeans')
# 3. scikit-learn build-in KMeans
# from sklearn.cluster import KMeans
# sklearn_kmeans = KMeans(n_clusters=3)
# sklearn_kmeans.fit(X)
# plot(X, sklearn_kmeans, 'sklearn_kmeans')