Skip to content

Commit

Permalink
Parallel CloudVolume IO
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasturner1 committed Apr 26, 2018
1 parent 846e994 commit 4116010
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 23 deletions.
8 changes: 4 additions & 4 deletions synaptor/io/backends/cloudvolume.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from ...types.bbox import BBox3d


def read_cloud_volume_chunk(cv_name, bbox, mip=0):
def read_cloud_volume_chunk(cv_name, bbox, mip=0, parallel=1):
""" Reads a chunk of data specified by a bounding box """

cv = cloudvolume.CloudVolume(cv_name, mip=mip)
cv = cloudvolume.CloudVolume(cv_name, mip=mip, parallel=parallel)

#ensuring that we always read something
cv.fill_missing = True
Expand All @@ -32,10 +32,10 @@ def read_cloud_volume_chunk(cv_name, bbox, mip=0):
return cv[bbox.index()][:,:,:,0]


def write_cloud_volume_chunk(data, cv_name, bbox, mip=0):
def write_cloud_volume_chunk(data, cv_name, bbox, mip=0, parallel=1):
""" Writes a chunk of data specified by a bounding box """

cv = cloudvolume.CloudVolume(cv_name, mip=mip)
cv = cloudvolume.CloudVolume(cv_name, mip=mip, parallel=parallel)

cv[bbox.index()] = data.astype(cv.dtype)

Expand Down
42 changes: 28 additions & 14 deletions synaptor/proc_tasks/tasks_w_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@

def chunk_ccs_task(output_cvname, cleft_cvname, proc_dir_path,
chunk_begin, chunk_end,
cc_thresh, sz_thresh, mip=0):
cc_thresh, sz_thresh,
mip=0, parallel=1):

chunk_bounds = types.BBox3d(chunk_begin, chunk_end)


net_output = timed("Reading network output chunk: {}".format(chunk_bounds),
io.read_cloud_volume_chunk,
output_cvname, chunk_bounds, mip=mip)
output_cvname, chunk_bounds,
mip=mip, parallel=parallel)

(ccs, continuations,
cleft_info) = tasks.chunk_ccs_task(net_output,
Expand All @@ -31,7 +33,8 @@ def chunk_ccs_task(output_cvname, cleft_cvname, proc_dir_path,

timed("Writing cleft chunk: {}".format(chunk_bounds),
io.write_cloud_volume_chunk,
ccs, cleft_cvname, chunk_bounds, mip=mip)
ccs, cleft_cvname, chunk_bounds,
mip=mip, parallel=parallel)

timed("Writing chunk_continuations",
taskio.write_chunk_continuations,
Expand Down Expand Up @@ -75,7 +78,8 @@ def chunk_edges_task(img_cvname, cleft_cvname, seg_cvname,
chunk_begin, chunk_end, patchsz,
num_samples_per_cleft, dil_param,
proc_dir_path, wshed_cvname=None,
mip=0, img_mip=None, seg_mip=None):
mip=0, img_mip=None, seg_mip=None,
parallel=1):
"""
Runs tasks.chunk_edges_task after reading the relevant
cloud volume chunks and downsampling the cleft volume
Expand All @@ -99,18 +103,22 @@ def chunk_edges_task(img_cvname, cleft_cvname, seg_cvname,

img = timed("Reading img chunk at mip {}".format(img_mip),
io.read_cloud_volume_chunk,
img_cvname, chunk_bounds, mip=img_mip)
img_cvname, chunk_bounds,
mip=img_mip, parallel=parallel)
# clefts won't be downsampled - will do that myself later
clefts = timed("Reading cleft chunk at mip 0",
io.read_cloud_volume_chunk,
cleft_cvname, mip0_bounds, mip=0)
cleft_cvname, mip0_bounds,
mip=0, parallel=parallel)
seg = timed("Reading segmentation chunk at mip {}".format(seg_mip),
io.read_cloud_volume_chunk,
seg_cvname, chunk_bounds, mip=seg_mip)
seg_cvname, chunk_bounds, mip=seg_mip,
parallel=parallel)
if wshed_cvname is not None:
wshed = timed("Reading watershed chunk at mip {}".format(seg_mip),
io.read_cloud_volume_chunk,
wshed_cvname, chunk_bounds, mip=seg_mip)
wshed_cvname, chunk_bounds,
mip=seg_mip, parallel=parallel)
assert wshed.shape == seg.shape, "mismatched wshed basins"
else:
wshed = None
Expand Down Expand Up @@ -174,17 +182,20 @@ def merge_edges_task(voxel_res, dist_thr, size_thr, proc_dir_path):

def chunk_overlaps_task(seg_cvname, base_seg_cvname,
chunk_begin, chunk_end,
proc_dir_path, mip=0):
proc_dir_path, mip=0,
parallel=1):

chunk_bounds = types.BBox3d(chunk_begin, chunk_end)

seg_chunk = timed("Reading seg chunk",
io.read_cloud_volume_chunk,
seg_cvname, chunk_bounds, mip=mip)
seg_cvname, chunk_bounds,
mip=mip, parallel=parallel)

base_seg_chunk = timed("Reading base seg chunk",
io.read_cloud_volume_chunk,
base_seg_cvname, chunk_bounds, mip=mip)
base_seg_cvname, chunk_bounds,
mip=mip, parallel=parallel)

overlap_matrix = tasks.chunk_overlaps_task(seg_chunk, base_seg_chunk)

Expand All @@ -208,7 +219,8 @@ def merge_overlaps_task(proc_dir_path):


def remap_ids_task(cleft_in_cvname, cleft_out_cvname,
chunk_begin, chunk_end, proc_dir_path, mip=0):
chunk_begin, chunk_end, proc_dir_path,
mip=0, parallel=1):

chunk_bounds = types.BBox3d(chunk_begin, chunk_end)

Expand All @@ -222,11 +234,13 @@ def remap_ids_task(cleft_in_cvname, cleft_out_cvname,

cleft_chunk = timed("Reading cleft chunk",
io.read_cloud_volume_chunk,
cleft_in_cvname, chunk_bounds, mip=mip)
cleft_in_cvname, chunk_bounds,
mip=mip, parallel=parallel)

cleft_chunk = tasks.remap_ids_task(cleft_chunk, chunk_id_map,
dup_id_map, copy=False)

timed("Writing results",
io.write_cloud_volume_chunk,
cleft_chunk, cleft_out_cvname, chunk_bounds, mip=mip)
cleft_chunk, cleft_out_cvname, chunk_bounds,
mip=mip, parallel=parallel)
5 changes: 3 additions & 2 deletions tasks/chunk_ccs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
parser.add_argument("proc_dir_path")

# Processing Parameters
parser.add_argument("cc_thresh", type=float)
parser.add_argument("sz_thresh", type=int)
parser.add_argument("--chunk_begin", nargs="+", type=int, required=True)
parser.add_argument("--chunk_end", nargs="+", type=int, required=True)
parser.add_argument("--parallel", type=int, default=1)
parser.add_argument("--mip", type=int, default=0)
parser.add_argument("cc_thresh", type=float)
parser.add_argument("sz_thresh", type=int)

args = parser.parse_args()
print(vars(args))
Expand Down
7 changes: 4 additions & 3 deletions tasks/chunk_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
parser.add_argument("--wshed_cvname", default=None)

# Processing Parameters
parser.add_argument("num_samples_per_cleft", type=int)
parser.add_argument("dil_param", type=int)
parser.add_argument("--chunk_begin", nargs="+", type=int, required=True)
parser.add_argument("--chunk_end", nargs="+", type=int, required=True)
parser.add_argument("--patchsz", nargs="+", type=int, required=True)
parser.add_argument("num_samples_per_cleft", type=int)
parser.add_argument("dil_param", type=int)
parser.add_argument("--mip", type=int, default=0)
parser.add_argument("--parallel", type=int, default=1)
parser.add_argument("--img_mip", type=int, default=0)
parser.add_argument("--seg_mip", type=int, default=0)
parser.add_argument("--mip", type=int, default=0)

args = parser.parse_args()
print(vars(args))
Expand Down
1 change: 1 addition & 0 deletions tasks/chunk_overlaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# Processing Parameters
parser.add_argument("--chunk_begin", nargs="+", type=int, required=True)
parser.add_argument("--chunk_end", nargs="+", type=int, required=True)
parser.add_argument("--parallel", type=int, default=1)
parser.add_argument("--mip", type=int, default=0)

args = parser.parse_args()
Expand Down
1 change: 1 addition & 0 deletions tasks/remap_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# Processing Parameters
parser.add_argument("--chunk_begin", nargs="+", type=int, required=True)
parser.add_argument("--chunk_end", nargs="+", type=int, required=True)
parser.add_argument("--parallel", type=int, default=1)
parser.add_argument("--mip", type=int, default=0)

args = parser.parse_args()
Expand Down

0 comments on commit 4116010

Please sign in to comment.