Skip to content

Commit

Permalink
Call spurt from subprocess to avoid fork issues (#455)
Browse files Browse the repository at this point in the history
* Call spurt from `subprocess` to avoid fork issues

* Manually make temp coh mask to prepare for combined masking

* fix test with mkdir
  • Loading branch information
scottstanie authored Oct 21, 2024
1 parent 72ce526 commit 53df4b8
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 87 deletions.
2 changes: 1 addition & 1 deletion src/dolphin/unwrap/_unwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def run(
ifg_filenames=ifg_filenames,
output_path=output_path,
temporal_coherence_file=temporal_coherence_file,
cor_filenames=cor_filenames,
# cor_filenames=cor_filenames,
mask_filename=mask_filename,
options=unwrap_options.spurt_options,
scratchdir=scratchdir,
Expand Down
164 changes: 78 additions & 86 deletions src/dolphin/unwrap/_unwrap_3d.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import shutil
import subprocess
from collections.abc import Sequence
from pathlib import Path

Expand All @@ -9,7 +10,7 @@
from opera_utils import get_dates
from scipy import ndimage, signal

from dolphin import io, utils
from dolphin import io
from dolphin._types import PathOrStr
from dolphin.workflows.config import SpurtOptions

Expand All @@ -25,112 +26,103 @@ def unwrap_spurt(
ifg_filenames: Sequence[PathOrStr],
output_path: PathOrStr,
temporal_coherence_file: PathOrStr,
cor_filenames: Sequence[PathOrStr] | None = None,
# cor_filenames: Sequence[PathOrStr] | None = None,
mask_filename: PathOrStr | None = None,
options: SpurtOptions = DEFAULT_OPTIONS,
scratchdir: PathOrStr | None = None,
) -> tuple[list[Path], list[Path]]:
"""Perform 3D unwrapping using `spurt`."""
from spurt.graph import Hop3Graph
from spurt.io import SLCStackReader
from spurt.workflows.emcf import (
GeneralSettings,
MergerSettings,
SolverSettings,
TilerSettings,
compute_phasediff_deciles,
get_bulk_offsets,
get_tiles,
merge_tiles,
unwrap_tiles,
"""Perform 3D unwrapping using `spurt` via subprocess call."""
# NOTE: we are working around spurt currently wanting "temporal_coherence.tif",
# and a temporal coherence threshold.
# we'll make our own mask of 0=bad, 1=good, then pass a threshold of 0.5
temp_coh = io.load_gdal(temporal_coherence_file, masked=True).filled(0)
# Mark the "bad" pixels (good=1, bad=0, following the unwrapper mask convention)
temp_coh_mask = temp_coh > options.temporal_coherence_threshold
if mask_filename:
nodata_mask = io.load_gdal(mask_filename).astype(bool)
# A good pixel has to be 1 in both masks
combined_mask = temp_coh_mask & nodata_mask
else:
combined_mask = temp_coh_mask
# We name it "temporal_coherence.tif" so spurt reads it.
# Also make it float32 as though it were temp coh
scratch_path = Path(scratchdir) if scratchdir else Path(output_path) / "scratch"
scratch_path.mkdir(exist_ok=True, parents=True)
combined_mask_filename = scratch_path / "temporal_coherence.tif"
io.write_arr(
arr=combined_mask.astype("float32"), output_name=combined_mask_filename
)

if existing_unw_files := sorted(Path(output_path).glob(f"*{UNW_SUFFIX}")):
logger.info(f"Found {len(existing_unw_files)} unwrapped files")
existing_ccl_files = sorted(Path(output_path).glob(f"*{CONNCOMP_SUFFIX}"))
return existing_unw_files, existing_ccl_files

if cor_filenames is not None:
assert len(ifg_filenames) == len(cor_filenames)
if mask_filename is not None:
# TODO: Combine this with the temporal coherence to pass one 0/1 mask
# This will still work for spurt, since it runs `> threshold`, which is
# always true once we set our desired pixels to 1 and undesired to 0
_mask = io.load_gdal(mask_filename)

if scratchdir is None:
scratchdir = Path(output_path) / "scratch"
gen_settings = GeneralSettings(
output_folder=output_path,
intermediate_folder=scratchdir,
**options.general_settings.model_dump(),
# Symlink the interferograms to the same scratch path so spurt finds everything
# expected in the one directory
for fn in ifg_filenames:
new_path = scratch_path / Path(fn).name
new_path.symlink_to(fn)

cmd = [
"python",
"-m",
"spurt.workflows.emcf",
"-i",
str(scratch_path),
"-o",
str(output_path),
"--tempdir",
str(scratch_path / "emcf_tmp"),
"-c",
str(0.5), # arbitrary, since we are passing a 0/1 file anyway
]
if not options.general_settings.use_tiles:
cmd.append("--singletile")

# Tiler Settings
cmd.extend(
[
"--pts-per-tile",
str(options.tiler_settings.target_points_per_tile),
]
)

tile_settings = TilerSettings(**options.tiler_settings.model_dump())
slv_settings = SolverSettings(**options.solver_settings.model_dump())
mrg_settings = MergerSettings(**options.merger_settings.model_dump())

# Using default Hop3Graph
# TODO: this is a weird hack.. if there are 15 dates, there are 14 interferograms
# the spurt cli expects one of the filenames to be all 0s? maybe?
# But also still expects them to be date1_date2.int.tif?
g_time = Hop3Graph(len(ifg_filenames) + 1)
logger.info(f"Using Hop3 Graph in time with { g_time.npoints } epochs.")

date_str_to_file = _map_date_str_to_file(ifg_filenames)
stack = SLCStackReader(
slc_files=date_str_to_file,
temp_coh_file=temporal_coherence_file,
temp_coh_threshold=options.temporal_coherence_threshold,
# Solver Settings
cmd.extend(
[
"-w",
str(options.solver_settings.t_worker_count),
"--s-workers",
str(options.solver_settings.s_worker_count),
"-b",
str(options.solver_settings.links_per_batch),
"--t-cost-type",
options.solver_settings.t_cost_type,
"--t-cost-scale",
str(int(options.solver_settings.t_cost_scale)),
"--unwrap-parallel-tiles",
str(options.solver_settings.num_parallel_tiles),
]
)
# Run the workflow
# Generate tiles
get_tiles(stack, gen_settings, tile_settings)

# Unwrap tiles
unwrap_tiles(stack, g_time, gen_settings, slv_settings)

# Compute overlap stats
compute_phasediff_deciles(gen_settings, mrg_settings)
# Merger Settings
cmd.extend(
[
"--merge-parallel-ifgs",
str(options.merger_settings.num_parallel_ifgs),
]
)

# Compute bulk offsets
get_bulk_offsets(stack, gen_settings, mrg_settings)
subprocess.run(cmd, check=True, text=True)

# Merge tiles and write output
unw_filenames = merge_tiles(stack, g_time, gen_settings, mrg_settings)
# TODO: What can we do for conncomps? Anything? Run snaphu?
# Return paths to output files
output_path = Path(output_path)
unw_filenames = sorted(output_path.glob("*[0-9].unw.tif"))
conncomp_filenames = _create_conncomps_from_mask(
temporal_coherence_file,
options.temporal_coherence_threshold,
unw_filenames=unw_filenames,
)
filled_masked_unw_regions(unw_filenames, ifg_filenames)

return unw_filenames, conncomp_filenames


def _map_date_str_to_file(
ifg_filenames: Sequence[PathOrStr], date_fmt: str = "%Y%m%d"
) -> dict[str, PathOrStr | None]:
# Then list individual SLCs
dates = [get_dates(f) for f in ifg_filenames]
if len({d[0] for d in dates}) > 1:
errmsg = "interferograms for spurt must be single reference."
raise ValueError(errmsg)

secondary_dates = [d[1] for d in dates]
first_date = dates[0][0].strftime(date_fmt)
date_strings = [utils.format_dates(d, fmt=date_fmt) for d in secondary_dates]

date_str_to_file: dict[str, PathOrStr | None] = dict(
zip(date_strings, ifg_filenames)
)
# first date - set to None
# None is special case for reference epoch
date_str_to_file[first_date] = None
return date_str_to_file


def _create_conncomps_from_mask(
temporal_coherence_file: PathOrStr,
temporal_coherence_threshold: float,
Expand Down

0 comments on commit 53df4b8

Please sign in to comment.