Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCSIO] Fix internal unit test failure #32518

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,11 @@ def _create_default_gcs_bucket(self):
# Log warning if soft delete policy is enabled in a gcs bucket
# that is specified in an argument.
def _warn_if_soft_delete_policy_enabled(self, arg_name):
# skip the check if it is in dry-run mode because the later step requires
# internet connection to access GCS
if self.view_as(TestOptions).dry_run:
return

gcs_path = getattr(self, arg_name, None)
try:
from apache_beam.io.gcp import gcsio
Expand Down
47 changes: 29 additions & 18 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@

_LOGGER = logging.getLogger(__name__)

try:
import apache_beam.io.gcp.gcsio # pylint: disable=unused-import
has_gcsio = True
except ImportError:
has_gcsio = False


# Mock runners to use for validations.
class MockRunners(object):
Expand Down Expand Up @@ -711,6 +717,16 @@ def test_options_store_false_with_different_dest(self):
"the dest and the flag name to the map "
"_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")

def _check_errors(self, options, validator, expected):
if has_gcsio:
with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled',
return_value=False):
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)
else:
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)

def test_validation_good_stg_good_temp(self):
runner = MockRunners.DataflowRunner()
options = GoogleCloudOptions([
Expand All @@ -719,8 +735,7 @@ def test_validation_good_stg_good_temp(self):
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
Expand All @@ -734,8 +749,7 @@ def test_validation_bad_stg_good_temp(self):
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/tmp")
self.assertEqual(
Expand All @@ -749,8 +763,7 @@ def test_validation_good_stg_bad_temp(self):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
Expand All @@ -764,8 +777,7 @@ def test_validation_bad_stg_bad_temp_with_default(self):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, [])
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://default/bucket")
self.assertEqual(
Expand All @@ -779,16 +791,15 @@ def test_validation_bad_stg_bad_temp_no_default(self):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(len(errors), 2, errors)
self.assertIn(
'Invalid GCS path (badGSpath), given for the option: temp_location.',
errors,
errors)
self.assertIn(
'Invalid GCS path (badGSpath), given for the option: staging_location.',
errors,
errors)
self._check_errors(
options,
validator,
[
'Invalid GCS path (badGSpath), given for the option: ' \
'temp_location.',
'Invalid GCS path (badGSpath), given for the option: ' \
'staging_location.'
])


if __name__ == '__main__':
Expand Down
Loading