Skip to content

Commit

Permalink
Mock is_soft_delete_enabled only if gcsio can be loaded.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Sep 21, 2024
1 parent b92d0ec commit 3b7ce6b
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import TestOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import _BeamArgumentParser
Expand All @@ -45,6 +44,12 @@

_LOGGER = logging.getLogger(__name__)

try:
import apache_beam.io.gcp.gcsio
has_gcsio = True
except ImportError:
has_gcsio = False


# Mock runners to use for validations.
class MockRunners(object):
Expand Down Expand Up @@ -712,17 +717,25 @@ def test_options_store_false_with_different_dest(self):
"the dest and the flag name to the map "
"_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")

def _check_errors(self, options, validator, expected):
if has_gcsio:
with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled',
return_value=False):
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)
else:
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)

def test_validation_good_stg_good_temp(self):
runner = MockRunners.DataflowRunner()
options = GoogleCloudOptions([
'--project=myproject',
'--staging_location=gs://beam/stg',
'--temp_location=gs://beam/tmp'
])
options.view_as(TestOptions).dry_run = True
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
Expand All @@ -735,10 +748,8 @@ def test_validation_bad_stg_good_temp(self):
'--staging_location=badGSpath',
'--temp_location=gs://beam/tmp'
])
options.view_as(TestOptions).dry_run = True
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/tmp")
self.assertEqual(
Expand All @@ -751,10 +762,8 @@ def test_validation_good_stg_bad_temp(self):
'--staging_location=gs://beam/stg',
'--temp_location=badGSpath'
])
options.view_as(TestOptions).dry_run = True
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
Expand All @@ -767,10 +776,8 @@ def test_validation_bad_stg_bad_temp_with_default(self):
'--staging_location=badGSpath',
'--temp_location=badGSpath'
])
options.view_as(TestOptions).dry_run = True
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://default/bucket")
self.assertEqual(
Expand All @@ -783,18 +790,16 @@ def test_validation_bad_stg_bad_temp_no_default(self):
'--staging_location=badGSpath',
'--temp_location=badGSpath'
])
options.view_as(TestOptions).dry_run = True
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(len(errors), 2, errors)
self.assertIn(
'Invalid GCS path (badGSpath), given for the option: temp_location.',
errors,
errors)
self.assertIn(
'Invalid GCS path (badGSpath), given for the option: staging_location.',
errors,
errors)
self._check_errors(
options,
validator,
[
'Invalid GCS path (badGSpath), given for the option: ' \
'temp_location.',
'Invalid GCS path (badGSpath), given for the option: ' \
'staging_location.'
])


if __name__ == '__main__':
Expand Down

0 comments on commit 3b7ce6b

Please sign in to comment.