diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 307fa32d6a0f..28cf9504484d 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -33,7 +33,6 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions -from apache_beam.options.pipeline_options import TestOptions from apache_beam.options.pipeline_options import TypeOptions from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.options.pipeline_options import _BeamArgumentParser @@ -45,6 +44,12 @@ _LOGGER = logging.getLogger(__name__) +try: + import apache_beam.io.gcp.gcsio + has_gcsio = True +except ImportError: + has_gcsio = False + # Mock runners to use for validations. class MockRunners(object): @@ -712,6 +717,16 @@ def test_options_store_false_with_different_dest(self): "the dest and the flag name to the map " "_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py") + def _check_errors(self, options, validator, expected): + if has_gcsio: + with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled', + return_value=False): + errors = options._handle_temp_and_staging_locations(validator) + self.assertEqual(errors, expected) + else: + errors = options._handle_temp_and_staging_locations(validator) + self.assertEqual(errors, expected) + def test_validation_good_stg_good_temp(self): runner = MockRunners.DataflowRunner() options = GoogleCloudOptions([ @@ -719,10 +734,8 @@ def test_validation_good_stg_good_temp(self): '--staging_location=gs://beam/stg', '--temp_location=gs://beam/tmp' ]) - options.view_as(TestOptions).dry_run = True validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://beam/stg") self.assertEqual( @@ -735,10 +748,8 @@ def test_validation_bad_stg_good_temp(self): '--staging_location=badGSpath', '--temp_location=gs://beam/tmp' ]) - options.view_as(TestOptions).dry_run = True validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://beam/tmp") self.assertEqual( @@ -751,10 +762,8 @@ def test_validation_good_stg_bad_temp(self): '--staging_location=gs://beam/stg', '--temp_location=badGSpath' ]) - options.view_as(TestOptions).dry_run = True validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://beam/stg") self.assertEqual( @@ -767,10 +776,8 @@ def test_validation_bad_stg_bad_temp_with_default(self): '--staging_location=badGSpath', '--temp_location=badGSpath' ]) - options.view_as(TestOptions).dry_run = True validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://default/bucket") self.assertEqual( @@ -783,18 +790,16 @@ def test_validation_bad_stg_bad_temp_no_default(self): '--staging_location=badGSpath', '--temp_location=badGSpath' ]) - options.view_as(TestOptions).dry_run = True validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(len(errors), 2, errors) - self.assertIn( - 'Invalid GCS path (badGSpath), given for the option: temp_location.', - errors, - errors) - self.assertIn( - 'Invalid GCS path (badGSpath), given for the option: staging_location.', - errors, - errors) + self._check_errors( + options, + validator, + [ + 'Invalid GCS path (badGSpath), given for the option: ' \ + 'temp_location.', + 'Invalid GCS path (badGSpath), given for the option: ' \ + 'staging_location.' + ]) if __name__ == '__main__':