-
Notifications
You must be signed in to change notification settings - Fork 1
/
wdt.py
219 lines (195 loc) · 9.22 KB
/
wdt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import numpy as np
try:
import lib._wdt
fortran_lib = True
except ImportError:
print("No Fortran modules found, falling back on Python implementation.\nDid you run `python3 setup.py install`?")
fortran_lib = False
import math
import heapq
from scipy.misc import imread
import matplotlib.pyplot as plt
DIR_STRINGS = ["left", "down", "right", "up"]
DIRS = ((-1, 0), (0, -1), (1, 0), (0, 1))
def map_image_to_costs(image):
"""
Read image data and convert it to a marginal cost function,
a 2D array containing costs for moving through each pixel.
This cost field forms the input for the weighted distance transform
zero costs denote exits, infinite costs denote fully impenetrable obstacles.
In this example, we follow Mercurial standards: obstacles are in black, exits in green,
accessible space is in white, less accessible space has less white.
Adapt to your own needs.
:param image: String of image file or open file descriptor of image
:return: 2D array representing the cost field
"""
# Read image and convert to binary format
data = imread(image, mode='RGB') / 255.
# Exits are present in all green enough places ("G >> R and G")
exits = np.where(data[:, :, 1] - (data[:, :, 0] + data[:, :, 2]) / 2 > 1./3 )
# Obstacles are in black (so at least G and B must be zero)
obstacles = np.where(np.abs(data[:, :, 0] + data[:, :, 2]) < 1. / 256)
# Convert image to greyscale
grey_scales = np.dot(data[..., :3], [0.299, 0.587, 0.114])
# Boolean index array for places without exits and obstacles
space = np.ones(grey_scales.shape, dtype=np.bool)
space[obstacles] = False
space[exits] = False
# Cost field: Inversely proportional to greyscale values
cost_field = np.empty(data[:, :, 0].shape)
cost_field[obstacles] = np.inf
cost_field[exits] = 0
cost_field[space] = 1. / (255 * grey_scales[space])
return cost_field
def get_weighted_distance_transform(cost_field):
"""
Compute the weighted distance transform from the cost field using a fast marching algorithm.
We compute the distance transform with costs defined on a staggered grid for consistency.
This means that we use costs defined on the faces of cells, found by averaging the values of the two adjacent cells.
Starting from the exit, we march over all the pixels with the lowest weighted distance iteratively,
until we found values for all pixels in reach.
:param cost_field: nonnegative 2D array with cost in each cell/pixel, zero and infinity are allowed values.
:return: weighted distance transform field
"""
if fortran_lib:
# Fortran does not allow for infinite float.
nx, ny = cost_field.shape
# Float that is (probably far) higher than the highest reachable potential
obstacle_value = np.max(cost_field[cost_field < np.inf]) * nx * ny * 2
cost_field[cost_field == np.inf] = obstacle_value
# Run the Fortran module
wdt_field = lib._wdt.weighted_distance_transform(cost_field, nx, ny, obstacle_value)
wdt_field[wdt_field >= obstacle_value] = np.inf
return wdt_field
else:
# Run python implementation
return _wdt_python(cost_field)
def plot(field):
"""
Use Matplotlib to plot the weighted distance transform or cost field in a nice colourful graph
:param field: 2D array
:return: None
"""
plt.imshow(field)
plt.colorbar()
plt.show()
"""
Below follows the Python implementation. It is quite slow, and a slightly naive implementation
so it is probably best to just use the compiled Fortran code. Still, it might be usable for people with patience,
or for educational purposes.
"""
def _wdt_python(cost_field):
"""
See `get_weighted_distance_transform`
:param cost_field: 2D array
:return: Weighted distance transform array with same shape as `cost_field`
"""
nx, ny = cost_field.shape
# Cost for moving along horizontal lines
costs_x = np.ones([nx + 1, ny], order='F') * np.inf
costs_x[1:-1, :] = (cost_field[1:, :] + cost_field[:-1, :]) / 2
# Cost for moving along vertical lines
costs_y = np.ones([nx, ny + 1], order='F') * np.inf
costs_y[:, 1:-1] = (cost_field[:, 1:] + cost_field[:, :-1]) / 2
# Initialize locations (known/unknown/exit/obstacle)
weighted_distance_transform = np.ones_like(cost_field, order='F') * np.inf
exit_locs = np.where(cost_field == 0)
obstacle_locs = np.where(cost_field == np.inf)
weighted_distance_transform[exit_locs] = 0
# Initialize Cell structures
all_cells = {(i, j) for i in range(nx) for j in range(ny)}
known_cells = {cell for cell in zip(exit_locs[0], exit_locs[1])}
unknown_cells = all_cells - known_cells - {cell for cell in zip(obstacle_locs[0], obstacle_locs[1])}
new_candidate_cells = set()
for cell in known_cells:
new_candidate_cells |= _get_new_candidate_cells(cell, unknown_cells)
cand_heap = [(np.inf, cell) for cell in new_candidate_cells]
# Loop until all unknown cells have a distance value
while True:
# by repeatedly looping over the new candidate cells
for cell in new_candidate_cells:
# Compute a distance for each cell based on its neighbour cells
distance = _propagate_distance(cell, [costs_x, costs_y], weighted_distance_transform)
# Store this value in the heap (for fast lookup)
# Don't check whether we have the distance already in the heap; check on outcome
heapq.heappush(cand_heap, (distance, cell))
# See if the heap contains a good value and if so, add it to the field. If not, finish.
# Since we can store multiple distance values for one cell, we might need to pop a couple of times
while True:
min_distance, best_cell = heapq.heappop(cand_heap)
if weighted_distance_transform[best_cell] == np.inf:
# Got a good one: no assigned distance in wdt yet
break
elif min_distance == np.inf: # No more finite values; done
return weighted_distance_transform
# Good value found, add to the wdt and
weighted_distance_transform[best_cell] = min_distance
unknown_cells.remove(best_cell)
new_candidate_cells = _get_new_candidate_cells(best_cell, unknown_cells)
def _exists(index, nx, ny):
"""
Checks whether an index exists an array
:param index: 2D index tuple
:return: true if lower than tuple, false otherwise
"""
return (0 <= index[0] < nx) and (0 <= index[1] < ny)
def _get_new_candidate_cells(cell, unknown_cells):
"""
Compute the new candidate cells (cells for which we have no definite distance value yet
For more information on the algorithm: check fast marching method
:param cell: tuple of index; a new cell that has been added to the distance field
:param unknown_cells: set of tuples; all cells still unknown
:return: Set of new candidate cells for which to compute the distance
"""
new_candidate_cells = set()
for direction in DIRS:
nb_cell = (cell[0] + direction[0], cell[1] + direction[1])
if nb_cell in unknown_cells:
new_candidate_cells.add(nb_cell)
return new_candidate_cells
def _propagate_distance(cell, costs, wdt_field):
"""
Compute the weighted distance in a cell using costs and distances in other cells
:param cell: tuple, index of a candidate cell
:param costs: list of cost arrays in X and Y direction
:param wdt_field: the weighted distance transform field up until now
:return: a approximate distance based on the neighbour cells
"""
nx, ny = wdt_field.shape
# Find the minimal directions along a grid cell.
# Assume left and below are best, then overwrite with right and up if they are better
adjacent_distances = np.ones(4) * np.inf
pots_from_axis = [0, 0] # [x direction, y direction]
costs_from_axis = [np.inf, np.inf] #
for i, dir_s in enumerate(DIR_STRINGS):
# Direction for which we check the cost
normal = DIRS[i]
nb_cell = (cell[0] + normal[0], cell[1] + normal[1])
if not _exists(nb_cell, nx, ny):
continue
pot = wdt_field[nb_cell]
# distance in that neighbour field
if dir_s == 'left':
face_index = (nb_cell[0] + 1, nb_cell[1])
elif dir_s == 'down':
face_index = (nb_cell[0], nb_cell[1] + 1)
else:
face_index = nb_cell
# Left/right is x, up/down is y
cost = costs[i % 2][face_index]
# Proposed cost along this direction
adjacent_distances[i] = pot + cost
# If it is cheaper to go from the opposite direction
if adjacent_distances[i] < adjacent_distances[(i + 2) % 4]:
pots_from_axis[i % 2] = pot
costs_from_axis[i % 2] = cost
hor_pot, ver_pot = pots_from_axis
hor_cost, ver_cost = costs_from_axis
# Coefficients of quadratic equation (upwind discretization)
a = 1. / hor_cost ** 2 + 1. / ver_cost ** 2
b = -2 * (hor_pot / hor_cost ** 2 + ver_pot / ver_cost ** 2)
c = (hor_pot / hor_cost) ** 2 + (ver_pot / ver_cost) ** 2 - 1
D = b ** 2 - 4 * a * c
# Largest root represents upwind approximation
x_high = (2 * c) / (-b - math.sqrt(D))
return x_high