Skip to content

Commit

Permalink
[GCSIO] Fix internal unit test failure (apache#32518)
Browse files Browse the repository at this point in the history
* Fix internal unit test failure.

* Minor refactor and add comment.

* Fix test failure in github action.

* Mock is_soft_delete_enabled only if gcsio can be loaded.

* Disable unused import lint. It is using in mock.

* Format
  • Loading branch information
shunping authored and reeba212 committed Dec 4, 2024
1 parent c1e993f commit 3f43922
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,11 @@ def _create_default_gcs_bucket(self):
# Log warning if soft delete policy is enabled in a gcs bucket
# that is specified in an argument.
def _warn_if_soft_delete_policy_enabled(self, arg_name):
# skip the check if it is in dry-run mode because the later step requires
# internet connection to access GCS
if self.view_as(TestOptions).dry_run:
return

gcs_path = getattr(self, arg_name, None)
try:
from apache_beam.io.gcp import gcsio
Expand Down
47 changes: 29 additions & 18 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@

_LOGGER = logging.getLogger(__name__)

try:
import apache_beam.io.gcp.gcsio # pylint: disable=unused-import
has_gcsio = True
except ImportError:
has_gcsio = False


# Mock runners to use for validations.
class MockRunners(object):
Expand Down Expand Up @@ -711,6 +717,16 @@ def test_options_store_false_with_different_dest(self):
"the dest and the flag name to the map "
"_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")

def _check_errors(self, options, validator, expected):
if has_gcsio:
with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled',
return_value=False):
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)
else:
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)

def test_validation_good_stg_good_temp(self):
runner = MockRunners.DataflowRunner()
options = GoogleCloudOptions([
Expand All @@ -719,8 +735,7 @@ def test_validation_good_stg_good_temp(self):
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
Expand All @@ -734,8 +749,7 @@ def test_validation_bad_stg_good_temp(self):
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/tmp")
self.assertEqual(
Expand All @@ -749,8 +763,7 @@ def test_validation_good_stg_bad_temp(self):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
Expand All @@ -764,8 +777,7 @@ def test_validation_bad_stg_bad_temp_with_default(self):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://default/bucket")
self.assertEqual(
Expand All @@ -779,16 +791,15 @@ def test_validation_bad_stg_bad_temp_no_default(self):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(len(errors), 2, errors)
self.assertIn(
'Invalid GCS path (badGSpath), given for the option: temp_location.',
errors,
errors)
self.assertIn(
'Invalid GCS path (badGSpath), given for the option: staging_location.',
errors,
errors)
self._check_errors(
options,
validator,
[
'Invalid GCS path (badGSpath), given for the option: ' \
'temp_location.',
'Invalid GCS path (badGSpath), given for the option: ' \
'staging_location.'
])


if __name__ == '__main__':
Expand Down

0 comments on commit 3f43922

Please sign in to comment.