Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherrypick #31550 and #31602 onto release branch #31562

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ def get_project_number(self, bucket):

return self.bucket_to_project_number.get(bucket, None)

def get_bucket(self, bucket_name):
def get_bucket(self, bucket_name, **kwargs):
"""Returns an object bucket from its name, or None if it does not exist."""
try:
return self.client.lookup_bucket(bucket_name)
return self.client.lookup_bucket(bucket_name, **kwargs)
except NotFound:
return None

Expand Down Expand Up @@ -529,6 +529,20 @@ def _updated_to_seconds(updated):
time.mktime(updated.timetuple()) - time.timezone +
updated.microsecond / 1000000.0)

def is_soft_delete_enabled(self, gcs_path):
try:
bucket_name, _ = parse_gcs_path(gcs_path)
# set retry timeout to 5 seconds when checking soft delete policy
bucket = self.get_bucket(bucket_name, retry=DEFAULT_RETRY.with_timeout(5))
if (bucket.soft_delete_policy is not None and
bucket.soft_delete_policy.retention_duration_seconds > 0):
return True
except Exception:
_LOGGER.warning(
"Unexpected error occurred when checking soft delete policy for %s" %
gcs_path)
return False


class BeamBlobReader(BlobReader):
def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE):
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,21 @@ def test_create_default_bucket(
self.assertEqual(
request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0)

@mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket")
def test_is_soft_delete_enabled(self, mock_get_bucket):
bucket = mock.MagicMock()
mock_get_bucket.return_value = bucket

# soft delete policy enabled
bucket.soft_delete_policy.retention_duration_seconds = 1024
self.assertTrue(
self.gcs.is_soft_delete_enabled("gs://beam_with_soft_delete/tmp"))

# soft delete policy disabled
bucket.soft_delete_policy.retention_duration_seconds = 0
self.assertFalse(
self.gcs.is_soft_delete_enabled("gs://beam_without_soft_delete/tmp"))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,18 +913,40 @@ def _create_default_gcs_bucket(self):
else:
return None

# Log warning if soft delete policy is enabled in a gcs bucket
# that is specified in an argument.
def _warn_if_soft_delete_policy_enabled(self, arg_name):
gcs_path = getattr(self, arg_name, None)
try:
from apache_beam.io.gcp import gcsio
if gcsio.GcsIO().is_soft_delete_enabled(gcs_path):
_LOGGER.warning(
"Bucket specified in %s has soft-delete policy enabled."
" To avoid being billed for unnecessary storage costs, turn"
" off the soft delete feature on buckets that your Dataflow"
" jobs use for temporary and staging storage. For more"
" information, see"
" https://cloud.google.com/storage/docs/use-soft-delete"
"#remove-soft-delete-policy." % arg_name)
except ImportError:
_LOGGER.warning('Unable to check soft delete policy due to import error.')

# If either temp or staging location has an issue, we use the valid one for
# both locations. If both are bad we return an error.
def _handle_temp_and_staging_locations(self, validator):
temp_errors = validator.validate_gcs_path(self, 'temp_location')
staging_errors = validator.validate_gcs_path(self, 'staging_location')
if temp_errors and not staging_errors:
setattr(self, 'temp_location', getattr(self, 'staging_location'))
self._warn_if_soft_delete_policy_enabled('staging_location')
return []
elif staging_errors and not temp_errors:
setattr(self, 'staging_location', getattr(self, 'temp_location'))
self._warn_if_soft_delete_policy_enabled('temp_location')
return []
elif not staging_errors and not temp_errors:
self._warn_if_soft_delete_policy_enabled('temp_location')
self._warn_if_soft_delete_policy_enabled('staging_location')
return []
# Both staging and temp locations are bad, try to use default bucket.
else:
Expand All @@ -935,6 +957,8 @@ def _handle_temp_and_staging_locations(self, validator):
else:
setattr(self, 'temp_location', default_bucket)
setattr(self, 'staging_location', default_bucket)
self._warn_if_soft_delete_policy_enabled('temp_location')
self._warn_if_soft_delete_policy_enabled('staging_location')
return []

def validate(self, validator):
Expand Down
Loading