diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 98dd597..a22fe94 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -34,6 +34,6 @@ jobs: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - # - name: Test with pytest - # run: | - # pytest + - name: Test with pytest + run: | + pytest diff --git a/ereg/registration.py b/ereg/registration.py index 5c27ba1..1913cc1 100644 --- a/ereg/registration.py +++ b/ereg/registration.py @@ -179,7 +179,10 @@ def register( sitk.WriteTransform(self.transform, transform_file) # apply composite transform if provided - if self.parameters["composite_transform"] is not None: + self.parameters["composite_transform"] = self.parameters.get( + "composite_transform", None + ) + if self.parameters.get("composite_transform"): self.logger.info("Applying composite transform.") transform_composite = sitk.ReadTransform( self.parameters["composite_transform"] @@ -188,7 +191,6 @@ def register( transform_composite, self.transform ) - if self.parameters["composite_transform"]: self.logger.info("Applying previous transforms.") current_transform = None for previous_transform in self.parameters["previous_transforms"]: @@ -256,7 +258,8 @@ def resample_image( resampler = sitk.ResampleImageFilter() resampler.SetReferenceImage(target_image) interpolator_type = self.interpolator_type.get( - self.parameters["interpolator"] + self.parameters.get("interpolator", "linear").lower(), + sitk.sitkLinear, ) resampler.SetInterpolator(interpolator_type) resampler.SetDefaultPixelValue(0) @@ -388,248 +391,321 @@ def _register_image_and_get_transform( physical_units *= target_image.GetSpacing()[dim] self.logger.info("Initializing registration.") - R = sitk.ImageRegistrationMethod() - metric = self.parameters["metric"].lower() + registration = sitk.ImageRegistrationMethod() + self.parameters["metric_parameters"] = self.parameters.get( + "metric_parameters", {} + ) + metric = ( + self.parameters["metric_parameters"].get("type", "mean_squares").lower() + ) if ( (metric == "mattesmutualinformation") or (metric == "mattes") or (metric == "mmi") or ("mattes" in metric) ): - R.SetMetricAsMattesMutualInformation( - numberOfHistogramBins=self.parameters["metric_parameters"][ - "histogram_bins" - ] + registration.SetMetricAsMattesMutualInformation( + numberOfHistogramBins=self.parameters["metric_parameters"].get( + "histogram_bins", 50 + ), ) elif ( (metric == "antsneighborhoodcorrelation") or (metric == "ants") or ("ants" in metric) ): - R.SetMetricAsANTSNeighborhoodCorrelation( - radius=self.parameters["metric_parameters"]["radius"] + registration.SetMetricAsANTSNeighborhoodCorrelation( + radius=self.parameters["metric_parameters"].get("radius", 5) ) elif metric == "correlation": - R.SetMetricAsCorrelation() + registration.SetMetricAsCorrelation() elif metric == "demons": - R.SetMetricAsDemons( - intensityDifferenceThreshold=self.parameters["metric_parameters"][ - "intensityDifferenceThreshold" - ] + registration.SetMetricAsDemons( + intensityDifferenceThreshold=self.parameters["metric_parameters"].get( + "intensityDifferenceThreshold", 0.001 + ), ) elif ( (metric == "joint_histogram_mutual_information") or (metric == "joint") or ("joint" in metric) ): - R.SetMetricAsJointHistogramMutualInformation( - numberOfHistogramBins=self.parameters["metric_parameters"][ - "histogram_bins" - ], - varianceForJointPDFSmoothing=self.parameters["metric_parameters"][ - "varianceForJointPDFSmoothing" - ], + registration.SetMetricAsJointHistogramMutualInformation( + numberOfHistogramBins=self.parameters["metric_parameters"].get( + "histogram_bins", 50 + ), + varianceForJointPDFSmoothing=self.parameters["metric_parameters"].get( + "varianceForJointPDFSmoothing", 1.5 + ), ) else: - R.SetMetricAsMeanSquares() + registration.SetMetricAsMeanSquares() sampling_strategy_parsed = { - "random": R.RANDOM, - "regular": R.REGULAR, - "none": R.NONE, + "random": registration.RANDOM, + "regular": registration.REGULAR, + "none": registration.NONE, } - R.SetMetricSamplingStrategy( - sampling_strategy_parsed[self.parameters["sampling_strategy"]] + registration.SetMetricSamplingStrategy( + sampling_strategy_parsed[ + self.parameters.get("sampling_strategy", "random").lower() + ] ) - R.SetMetricSamplingPercentagePerLevel(self.parameters["sampling_percentage"]) - - if self.parameters["optimizer"] == "regular_step_gradient_descent": - R.SetOptimizerAsRegularStepGradientDescent( - minStep=self.parameters["optimizer_parameters"]["min_step"], - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - learningRate=self.parameters["optimizer_parameters"]["learningrate"], - # gradientMagnitudeTolerance=grad_tolerance, - relaxationFactor=self.parameters["optimizer_parameters"]["relaxation"], - gradientMagnitudeTolerance=self.parameters["optimizer_parameters"][ - "tolerance" - ], - estimateLearningRate=R.EachIteration, - maximumStepSizeInPhysicalUnits=self.parameters["optimizer_parameters"][ - "max_step" - ] + sampling_rate = self.parameters.get("sampling_percentage", 0.01) + if isinstance(sampling_rate, float): + registration.SetMetricSamplingPercentage(sampling_rate) + elif type(sampling_rate) in [np.ndarray, list]: + registration.SetMetricSamplingPercentagePerLevel(sampling_rate) + + # initialize some defaults + self.parameters["optimizer_parameters"] = self.parameters.get( + "optimizer_parameters", {} + ) + self.parameters["optimizer_parameters"]["type"] = self.parameters[ + "optimizer_parameters" + ].get("type", "regular_step_gradient_descent") + # set the optimizer parameters as either floats or integers + for key in self.parameters["optimizer_parameters"]: + if key not in ["type"]: + self.parameters["optimizer_parameters"][key] = float( + self.parameters["optimizer_parameters"][key] + ) + if key == "iterations": + self.parameters["optimizer_parameters"][key] = int( + self.parameters["optimizer_parameters"][key] + ) + if ( + self.parameters["optimizer_parameters"].get("type").lower() + == "regular_step_gradient_descent" + ): + registration.SetOptimizerAsRegularStepGradientDescent( + minStep=self.parameters["optimizer_parameters"].get("min_step", 1e-6), + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + learningRate=self.parameters["optimizer_parameters"].get( + "learningrate", 1.0 + ), + relaxationFactor=self.parameters["optimizer_parameters"].get( + "relaxation", 0.5 + ), + gradientMagnitudeTolerance=self.parameters["optimizer_parameters"].get( + "tolerance", 1e-4 + ), + estimateLearningRate=registration.EachIteration, + maximumStepSizeInPhysicalUnits=self.parameters[ + "optimizer_parameters" + ].get("max_step", 1.0) * physical_units, ) - elif self.parameters["optimizer"] == "gradient_descent": - R.SetOptimizerAsGradientDescent( - learningRate=self.parameters["optimizer_parameters"]["learningrate"], - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - convergenceMinimumValue=self.parameters["optimizer_parameters"][ - "convergence_minimum" - ], - convergenceWindowSize=self.parameters["optimizer_parameters"][ - "convergence_window_size" - ], - estimateLearningRate=R.EachIteration, - maximumStepSizeInPhysicalUnits=self.parameters["optimizer_parameters"][ - "max_step" - ] + elif ( + self.parameters["optimizer_parameters"].get("type").lower() + == "gradient_descent" + ): + registration.SetOptimizerAsGradientDescent( + learningRate=self.parameters["optimizer_parameters"].get( + "learningrate", 1.0 + ), + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + convergenceMinimumValue=self.parameters["optimizer_parameters"].get( + "convergence_minimum", 1e-6 + ), + convergenceWindowSize=self.parameters["optimizer_parameters"].get( + "convergence_window_size", 10 + ), + estimateLearningRate=registration.EachIteration, + maximumStepSizeInPhysicalUnits=self.parameters[ + "optimizer_parameters" + ].get("max_step", 1.0) * physical_units, ) - elif self.parameters["optimizer"] == "gradient_descent_line_search": - R.SetOptimizerAsGradientDescentLineSearch( - learningRate=self.parameters["optimizer_parameters"]["learningrate"], - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - convergenceMinimumValue=self.parameters["optimizer_parameters"][ - "convergence_minimum" - ], - convergenceWindowSize=self.parameters["optimizer_parameters"][ - "convergence_window_size" - ], - lineSearchLowerLimit=self.parameters["optimizer_parameters"][ - "line_search_lower_limit" - ], - lineSearchUpperLimit=self.parameters["optimizer_parameters"][ - "line_search_upper_limit" - ], - lineSearchEpsilon=self.parameters["optimizer_parameters"][ - "line_search_epsilon" - ], - lineSearchMaximumIterations=self.parameters["optimizer_parameters"][ - "line_search_maximum_iterations" - ], - estimateLearningRate=R.EachIteration, - maximumStepSizeInPhysicalUnits=self.parameters["optimizer_parameters"][ - "max_step" - ] + elif ( + self.parameters["optimizer_parameters"].get("type").lower() + == "gradient_descent_line_search" + ): + registration.SetOptimizerAsGradientDescentLineSearch( + learningRate=self.parameters["optimizer_parameters"].get( + "learningrate", 1.0 + ), + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + convergenceMinimumValue=self.parameters["optimizer_parameters"].get( + "convergence_minimum", 1e-6 + ), + convergenceWindowSize=self.parameters["optimizer_parameters"].get( + "convergence_window_size", 10 + ), + lineSearchLowerLimit=self.parameters["optimizer_parameters"].get( + "line_search_lower_limit", 0.0 + ), + lineSearchUpperLimit=self.parameters["optimizer_parameters"].get( + "line_search_upper_limit", 1.0 + ), + lineSearchEpsilon=self.parameters["optimizer_parameters"].get( + "line_search_epsilon", 0.01 + ), + lineSearchMaximumIterations=self.parameters["optimizer_parameters"].get( + "line_search_maximum_iterations", 20 + ), + estimateLearningRate=registration.EachIteration, + maximumStepSizeInPhysicalUnits=self.parameters[ + "optimizer_parameters" + ].get("max_step", 1.0) * physical_units, ) elif ( - self.parameters["optimizer"] + self.parameters["optimizer_parameters"].get("type").lower() == "Conjugate_step_gradient_descent_line_search" ): - R.SetOptimizerAsConjugateGradientLineSearch( - learningRate=self.parameters["optimizer_parameters"]["learningrate"], - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - convergenceMinimumValue=self.parameters["optimizer_parameters"][ - "convergence_minimum" - ], - convergenceWindowSize=self.parameters["optimizer_parameters"][ - "convergence_window_size" - ], - lineSearchLowerLimit=self.parameters["optimizer_parameters"][ - "line_search_lower_limit" - ], - lineSearchUpperLimit=self.parameters["optimizer_parameters"][ - "line_search_upper_limit" - ], - lineSearchEpsilon=self.parameters["optimizer_parameters"][ - "line_search_epsilon" - ], - lineSearchMaximumIterations=self.parameters["optimizer_parameters"][ - "line_search_maximum_iterations" - ], - estimateLearningRate=R.EachIteration, - maximumStepSizeInPhysicalUnits=self.parameters["optimizer_parameters"][ - "max_step" - ] + registration.SetOptimizerAsConjugateGradientLineSearch( + learningRate=self.parameters["optimizer_parameters"].get( + "learningrate", 1.0 + ), + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + convergenceMinimumValue=self.parameters["optimizer_parameters"].get( + "convergence_minimum", 1e-6 + ), + convergenceWindowSize=self.parameters["optimizer_parameters"].get( + "convergence_window_size", 10 + ), + lineSearchLowerLimit=self.parameters["optimizer_parameters"].get( + "line_search_lower_limit", 0.0 + ), + lineSearchUpperLimit=self.parameters["optimizer_parameters"].get( + "line_search_upper_limit", 1.0 + ), + lineSearchEpsilon=self.parameters["optimizer_parameters"].get( + "line_search_epsilon", 0.01 + ), + lineSearchMaximumIterations=self.parameters["optimizer_parameters"].get( + "line_search_maximum_iterations", 20 + ), + estimateLearningRate=registration.EachIteration, + maximumStepSizeInPhysicalUnits=self.parameters[ + "optimizer_parameters" + ].get("max_step", 1.0) * physical_units, ) - elif self.parameters["optimizer"] == "exhaustive": - R.SetOptimizerAsExhaustive( - numberOfSteps=self.parameters["optimizer_parameters"]["iterations"], - stepLength=self.parameters["optimizer_parameters"]["step_length"], + elif ( + self.parameters["optimizer_parameters"].get("type").lower() == "exhaustive" + ): + registration.SetOptimizerAsExhaustive( + numberOfSteps=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + stepLength=self.parameters["optimizer_parameters"].get( + "step_length", 0.1 + ), ) - elif self.parameters["optimizer"] == "amoeba": - R.SetOptimizerAsAmoeba( + elif self.parameters["optimizer_parameters"].get("type").lower() == "amoeba": + registration.SetOptimizerAsAmoeba( numberOfIterations=self.parameters["optimizer_parameters"][ "iterations" ], simplexDelta=self.parameters["optimizer_parameters"]["simplex_delta"], ) - elif self.parameters["optimizer"] == "lbfgsb": - R.SetOptimizerAsLBFGSB( - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - maximumNumberOfCorrections=self.parameters["optimizer_parameters"][ - "maximum_number_of_corrections" - ], + elif self.parameters["optimizer_parameters"].get("type").lower() == "lbfgsb": + registration.SetOptimizerAsLBFGSB( + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + maximumNumberOfCorrections=self.parameters["optimizer_parameters"].get( + "maximum_number_of_corrections", 5 + ), maximumNumberOfFunctionEvaluations=self.parameters[ "optimizer_parameters" - ]["maximum_number_of_function_evaluations"], - costFunctionConvergenceFactor=self.parameters["optimizer_parameters"][ - "cost_function_convergence_factor" - ], + ].get("maximum_number_of_function_evaluations", 2000), + costFunctionConvergenceFactor=self.parameters[ + "optimizer_parameters" + ].get("cost_function_convergence_factor", 1e7), ) - elif self.parameters["optimizer"] == "lbfgs2": - R.SetOptimizerAsLBFGS2( - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - solutionAccuracy=self.parameters["optimizer_parameters"][ - "solution_accuracy" - ], - hessianApproximateAccuracy=self.parameters["optimizer_parameters"][ - "hessian_approximate_accuracy" - ], - deltaConvergenceDistance=self.parameters["optimizer_parameters"][ - "delta_convergence_distance" - ], - deltaConvergenceTolerance=self.parameters["optimizer_parameters"][ - "delta_convergence_tolerance" - ], - lineSearchMaximumEvaluations=self.parameters["optimizer_parameters"][ - "line_search_maximum_evaluations" - ], - lineSearchMinimumStep=self.parameters["optimizer_parameters"][ - "line_search_minimum_step" - ], - lineSearchMaximumStep=self.parameters["optimizer_parameters"][ - "line_search_maximum_step" - ], - lineSearchAccuracy=self.parameters["optimizer_parameters"][ - "line_search_accuracy" - ], + elif self.parameters["optimizer_parameters"].get("type").lower() == "lbfgs2": + registration.SetOptimizerAsLBFGS2( + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + solutionAccuracy=self.parameters["optimizer_parameters"].get( + "solution_accuracy", 1e-7 + ), + hessianApproximateAccuracy=self.parameters["optimizer_parameters"].get( + "hessian_approximate_accuracy", 1e-7 + ), + deltaConvergenceDistance=self.parameters["optimizer_parameters"].get( + "delta_convergence_distance", 1e-5 + ), + deltaConvergenceTolerance=self.parameters["optimizer_parameters"].get( + "delta_convergence_tolerance", 1e-4 + ), + lineSearchMaximumEvaluations=self.parameters[ + "optimizer_parameters" + ].get("line_search_maximum_evaluations", 20), + lineSearchMinimumStep=self.parameters["optimizer_parameters"].get( + "line_search_minimum_step", 1e-20 + ), + lineSearchMaximumStep=self.parameters["optimizer_parameters"].get( + "line_search_maximum_step", 1e20 + ), + lineSearchAccuracy=self.parameters["optimizer_parameters"].get( + "line_search_accuracy", 0.9 + ), ) - elif self.parameters["optimizer"] == "one_plus_one_evolutionary": - R.SetOptimizerAsOnePlusOneEvolutionary( - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - epsilon=self.parameters["optimizer_parameters"]["epsilon"], - initialRadius=self.parameters["optimizer_parameters"]["initial_radius"], - growthFactor=self.parameters["optimizer_parameters"]["growth_factor"], - shrinkFactor=self.parameters["optimizer_parameters"]["shrink_factor"], + elif ( + self.parameters["optimizer_parameters"].get("type").lower() + == "one_plus_one_evolutionary" + ): + registration.SetOptimizerAsOnePlusOneEvolutionary( + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + epsilon=self.parameters["optimizer_parameters"].get("epsilon", 1e-6), + initialRadius=self.parameters["optimizer_parameters"].get( + "initial_radius", 1.0 + ), + growthFactor=self.parameters["optimizer_parameters"].get( + "growth_factor", 2.0 + ), + shrinkFactor=self.parameters["optimizer_parameters"].get( + "shrink_factor", 0.7 + ), ) - elif self.parameters["optimizer"] == "powell": - R.SetOptimizerAsPowell( - numberOfIterations=self.parameters["optimizer_parameters"][ - "iterations" - ], - maximumLineIterations=self.parameters["optimizer_parameters"][ - "maximum_line_iterations" - ], - stepLength=self.parameters["optimizer_parameters"]["step_length"], - stepTolerance=self.parameters["optimizer_parameters"]["step_tolerance"], - valueTolerance=self.parameters["optimizer_parameters"][ - "value_tolerance" - ], + elif self.parameters["optimizer_parameters"].get("type").lower() == "powell": + registration.SetOptimizerAsPowell( + numberOfIterations=self.parameters["optimizer_parameters"].get( + "iterations", 200 + ), + maximumLineIterations=self.parameters["optimizer_parameters"].get( + "maximum_line_iterations", 20 + ), + stepLength=self.parameters["optimizer_parameters"].get( + "step_length", 1.0 + ), + stepTolerance=self.parameters["optimizer_parameters"].get( + "step_tolerance", 0.001 + ), + valueTolerance=self.parameters["optimizer_parameters"].get( + "value_tolerance", 0.001 + ), ) - # R.SetOptimizerScalesFromJacobian() - R.SetOptimizerScalesFromPhysicalShift() + # registration.SetOptimizerScalesFromJacobian() + registration.SetOptimizerScalesFromPhysicalShift() - R.SetShrinkFactorsPerLevel(self.parameters["shrink_factors"]) - R.SetSmoothingSigmasPerLevel(self.parameters["smoothing_sigmas"]) - R.SmoothingSigmasAreSpecifiedInPhysicalUnitsOn() + registration.SetShrinkFactorsPerLevel( + self.parameters.get("shrink_factors", [8, 4, 2]) + ) + registration.SetSmoothingSigmasPerLevel( + self.parameters.get("smoothing_sigmas", [3, 2, 1]) + ) + registration.SmoothingSigmasAreSpecifiedInPhysicalUnitsOn() + + assert ( + self.parameters.get("transform", "") in self.available_transforms + ), f"`transform`needs to be set to one of the following: {self.available_transforms}" transform_function = self._get_transform_wrapper( self.parameters["transform"], dimension ) @@ -658,6 +734,7 @@ def _register_image_and_get_transform( # self.initialization_type.get(initialization.lower(), "geometry"), # ) + self.parameters["initialization"] = self.parameters.get("initialization", None) if self.parameters["initialization"] is not None: temp_moving = moving_image temp_initialization = self.parameters["initialization"].upper() @@ -678,17 +755,17 @@ def _register_image_and_get_transform( transform_function, eval("sitk.CenteredTransformInitializerFilter.%s" % (initializer_type)), ) - R.SetInitialTransform(final_transform, inPlace=False) + registration.SetInitialTransform(final_transform, inPlace=False) ## set the interpolator - all options: https://simpleitk.org/doxygen/latest/html/namespaceitk_1_1simple.html#a7cb1ef8bd02c669c02ea2f9f5aa374e5 # this should be linear to optimize results and computational efficacy - R.SetInterpolator(sitk.sitkLinear) + registration.SetInterpolator(sitk.sitkLinear) - # R.AddCommand(sitk.sitkIterationEvent, lambda: R) + # registration.AddCommand(sitk.sitkIterationEvent, lambda: R) self.logger.info("Starting registration.") output_transform = None for _ in range(self.parameters["attempts"]): try: - output_transform = R.Execute(target_image, moving_image) + output_transform = registration.Execute(target_image, moving_image) break except RuntimeError as e: self.logger.warning( @@ -696,11 +773,16 @@ def _register_image_and_get_transform( ) continue - if output_transform is None: - raise RuntimeError("Registration failed.") + assert output_transform is not None, "Registration failed." + + self.logger.info( + f"Final Optimizer Parameters:: convergence={registration.GetOptimizerConvergenceValue()}, iterations={registration.GetOptimizerIteration()}, metric={registration.GetMetricValue()}, stop condition={registration.GetOptimizerStopConditionDescription()}" + ) registration_transform_sitk = output_transform # if user is requesting a rigid registration, convert the transform to a rigid transform + if isinstance(output_transform, sitk.CompositeTransform): + registration_transform_sitk = output_transform.GetNthTransform(0) if self.parameters["transform"] in ["euler", "versorrigid"]: try: # Euler Transform used: @@ -718,35 +800,4 @@ def _register_image_and_get_transform( tmp.SetTranslation(registration_transform_sitk.GetTranslation()) tmp.SetCenter(registration_transform_sitk.GetCenter()) registration_transform_sitk = tmp - ## additional information - # print("Metric: ", R.MetricEvaluate(target_image, moving_image), flush=True) - # print( - # "Optimizer stop condition: ", - # R.GetOptimizerStopConditionDescription(), - # flush=True, - # ) - # print("Number of iterations: ", R.GetOptimizerIteration(), flush=True) - # print("Final metric value: ", R.GetMetricValue(), flush=True) - - # if rigid_registration: - # if target_image.GetDimension() == 2: - # output_transform = eval( - # "sitk.Euler%dDTransform(output_transform)" - # % (target_image.GetDimension()) - # ) - # elif target_image.GetDimension() == 3: - # output_transform = eval( - # "sitk.Euler%dDTransform(output_transform)" - # % (target_image.GetDimension()) - # ) - # # VersorRigid used: Transform from VersorRigid to Euler - # output_transform = eval( - # "sitk.VersorRigid%dDTransform(output_transform)" - # % (target_image.GetDimension()) - # ) - # tmp = eval("sitk.Euler%dDTransform()" % (target_image.GetDimension())) - # tmp.SetMatrix(output_transform.GetMatrix()) - # tmp.SetTranslation(output_transform.GetTranslation()) - # tmp.SetCenter(output_transform.GetCenter()) - # output_transform = tmp return registration_transform_sitk