-
Notifications
You must be signed in to change notification settings - Fork 1
/
example.py
89 lines (69 loc) · 2.21 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# import uDTW
from uDTW import uDTW
def weight_init(m):
if isinstance(m, nn.Linear):
nn.init.xavier_normal_(m.weight)
nn.init.constant_(m.bias, 0)
def sigmoid_ab(a, b, input):
return a * torch.sigmoid(input) + b
class Sigmoid(nn.Module):
def __init__(self):
super().__init__()
def forward(self, a, b, input):
return sigmoid_ab(a, b, input)
class SimpleSigmaNet(nn.Module):
def __init__(self):
super(SimpleSigmaNet, self).__init__()
self.fc1 = nn.Linear(10, 20)
self.fc2 = nn.Linear(20, 10)
self.sigmoid = Sigmoid()
def forward(self, x, a, b):
batch_size = x.shape[0]
length = x.shape[1]
x = x.view(batch_size*length, -1)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
x = x.view(batch_size, length, -1).mean(2, keepdim = True)
sigma = self.sigmoid(a, b, x)
return sigma
torch.manual_seed(0)
# create the sequences
batch_size, len_x, len_y, dims = 4, 6, 9, 10
# sequence x & y
if torch.cuda.is_available():
x = torch.rand((batch_size, len_x, dims)).cuda()
y = torch.rand((batch_size, len_y, dims)).cuda()
else:
x = torch.rand((batch_size, len_x, dims))
y = torch.rand((batch_size, len_y, dims))
# define parameters for scaled sigmoid function
a = 1.5
b = 0.5
# a very simple network
sigmanet = SimpleSigmaNet()
sigmanet.apply(weight_init)
if torch.cuda.is_available():
sigmanet.cuda()
# create the criterion object
if torch.cuda.is_available():
udtw = uDTW(use_cuda=True, gamma=0.01, normalize=True)
else:
udtw = uDTW(use_cuda=False, gamma=0.01, normalize=True)
# set optimizer
optimizer = optim.SGD(sigmanet.parameters(), lr=0.5, momentum=0.9)
for epoch in range(10):
optimizer.zero_grad()
sigma_x = sigmanet(x, a, b)
sigma_y = sigmanet(y, a, b)
# Compute the loss value
loss_d, loss_s = udtw(x, y, sigma_x, sigma_y, beta = 1)
loss = (loss_d.mean() + loss_s.mean()) / (len_x * len_y)
print('epoch ', epoch, ' | loss: ', '{:.10f}'.format(loss.item()))
# aggregate and call backward()
loss.backward()
optimizer.step()