-
Notifications
You must be signed in to change notification settings - Fork 16
/
utils.py
119 lines (95 loc) · 5.06 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# -*- coding: utf-8 -*-
import numpy as np
import tensorflow as tf
from PIL import Image
def LoadImage(path, color_mode='RGB', channel_mean=None, modcrop=[0,0,0,0]):
'''Load an image using PIL and convert it into specified color space,
and return it as an numpy array.
https://github.com/fchollet/keras/blob/master/keras/preprocessing/image.py
The code is modified from Keras.preprocessing.image.load_img, img_to_array.
'''
## Load image
img = Image.open(path)
if color_mode == 'RGB':
cimg = img.convert('RGB')
x = np.asarray(cimg, dtype='float32')
elif color_mode == 'YCbCr' or color_mode == 'Y':
cimg = img.convert('YCbCr')
x = np.asarray(cimg, dtype='float32')
if color_mode == 'Y':
x = x[:,:,0:1]
## To 0-1
x *= 1.0/255.0
if channel_mean:
x[:,:,0] -= channel_mean[0]
x[:,:,1] -= channel_mean[1]
x[:,:,2] -= channel_mean[2]
if modcrop[0]*modcrop[1]*modcrop[2]*modcrop[3]:
x = x[modcrop[0]:-modcrop[1], modcrop[2]:-modcrop[3], :]
return x
he_normal_init = tf.contrib.layers.variance_scaling_initializer(factor=2.0, mode='FAN_IN', uniform=False)
def BatchNorm(input, is_train, decay=0.999, name='BatchNorm'):
'''
https://github.com/zsdonghao/tensorlayer/blob/master/tensorlayer/layers.py
https://github.com/ry/tensorflow-resnet/blob/master/resnet.py
http://stackoverflow.com/questions/38312668/how-does-one-do-inference-with-batch-normalization-with-tensor-flow
'''
from tensorflow.python.training import moving_averages
from tensorflow.python.ops import control_flow_ops
axis = list(range(len(input.get_shape()) - 1))
fdim = input.get_shape()[-1:]
with tf.variable_scope(name):
beta = tf.get_variable('beta', fdim, initializer=tf.constant_initializer(value=0.0))
gamma = tf.get_variable('gamma', fdim, initializer=tf.constant_initializer(value=1.0))
moving_mean = tf.get_variable('moving_mean', fdim, initializer=tf.constant_initializer(value=0.0), trainable=False)
moving_variance = tf.get_variable('moving_variance', fdim, initializer=tf.constant_initializer(value=0.0), trainable=False)
def mean_var_with_update():
batch_mean, batch_variance = tf.nn.moments(input, axis)
update_moving_mean = moving_averages.assign_moving_average(moving_mean, batch_mean, decay, zero_debias=True)
update_moving_variance = moving_averages.assign_moving_average(moving_variance, batch_variance, decay, zero_debias=True)
with tf.control_dependencies([update_moving_mean, update_moving_variance]):
return tf.identity(batch_mean), tf.identity(batch_variance)
mean, variance = control_flow_ops.cond(is_train, mean_var_with_update, lambda: (moving_mean, moving_variance))
return tf.nn.batch_normalization(input, mean, variance, beta, gamma, 1e-3) #, tf.stack([mean[0], variance[0], beta[0], gamma[0]])
def Conv3D(input, kernel_shape, strides, padding, name='Conv3d', W_initializer=he_normal_init, bias=True):
with tf.variable_scope(name):
W = tf.get_variable("W", kernel_shape, initializer=W_initializer)
if bias is True:
b = tf.get_variable("b", (kernel_shape[-1]),initializer=tf.constant_initializer(value=0.0))
else:
b = 0
return tf.nn.conv3d(input, W, strides, padding) + b
def depth_to_space_3D(x, block_size):
ds_x = tf.shape(x)
x = tf.reshape(x, [ds_x[0]*ds_x[1], ds_x[2], ds_x[3], ds_x[4]])
y = tf.depth_to_space(x, block_size)
ds_y = tf.shape(y)
x = tf.reshape(y, [ds_x[0], ds_x[1], ds_y[1], ds_y[2], ds_y[3]])
return x
def DynFilter3D(x, F, filter_size):
'''
3D Dynamic filtering
input x: (b, t, h, w)
F: (b, h, w, tower_depth, output_depth)
filter_shape (ft, fh, fw)
'''
# make tower
filter_localexpand_np = np.reshape(np.eye(np.prod(filter_size), np.prod(filter_size)), (filter_size[1], filter_size[2], filter_size[0], np.prod(filter_size)))
filter_localexpand = tf.Variable(filter_localexpand_np, trainable=False, dtype='float32',name='filter_localexpand')
x = tf.transpose(x, perm=[0,2,3,1])
x_localexpand = tf.nn.conv2d(x, filter_localexpand, [1,1,1,1], 'SAME') # b, h, w, 1*5*5
x_localexpand = tf.expand_dims(x_localexpand, axis=3) # b, h, w, 1, 1*5*5
x = tf.matmul(x_localexpand, F) # b, h, w, 1, R*R
x = tf.squeeze(x, axis=3) # b, h, w, R*R
return x
def Huber(y_true, y_pred, delta, axis=None):
abs_error = tf.abs(y_pred - y_true)
quadratic = tf.minimum(abs_error, delta)
# The following expression is the same in value as
# tf.maximum(abs_error - delta, 0), but importantly the gradient for the
# expression when abs_error == delta is 0 (for tf.maximum it would be 1).
# This is necessary to avoid doubling the gradient, since there is already a
# nonzero contribution to the gradient from the quadratic term.
linear = (abs_error - quadratic)
losses = 0.5 * quadratic**2 + delta * linear
return tf.reduce_mean(losses, axis=axis)