diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 1c05996020a2..de6d902d87a9 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -156,10 +156,10 @@ def get_project_number(self, bucket): return self.bucket_to_project_number.get(bucket, None) - def get_bucket(self, bucket_name): + def get_bucket(self, bucket_name, **kwargs): """Returns an object bucket from its name, or None if it does not exist.""" try: - return self.client.lookup_bucket(bucket_name) + return self.client.lookup_bucket(bucket_name, **kwargs) except NotFound: return None @@ -529,6 +529,20 @@ def _updated_to_seconds(updated): time.mktime(updated.timetuple()) - time.timezone + updated.microsecond / 1000000.0) + def is_soft_delete_enabled(self, gcs_path): + try: + bucket_name, _ = parse_gcs_path(gcs_path) + # set retry timeout to 5 seconds when checking soft delete policy + bucket = self.get_bucket(bucket_name, retry=DEFAULT_RETRY.with_timeout(5)) + if (bucket.soft_delete_policy is not None and + bucket.soft_delete_policy.retention_duration_seconds > 0): + return True + except Exception: + _LOGGER.warning( + "Unexpected error occurred when checking soft delete policy for %s" % + gcs_path) + return False + class BeamBlobReader(BlobReader): def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE): diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index b17e0638d6b5..c1356b53095a 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -600,6 +600,21 @@ def test_create_default_bucket( self.assertEqual( request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0) + @mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket") + def test_is_soft_delete_enabled(self, mock_get_bucket): + bucket = mock.MagicMock() + mock_get_bucket.return_value = bucket + + # soft delete policy enabled + bucket.soft_delete_policy.retention_duration_seconds = 1024 + self.assertTrue( + self.gcs.is_soft_delete_enabled("gs://beam_with_soft_delete/tmp")) + + # soft delete policy disabled + bucket.soft_delete_policy.retention_duration_seconds = 0 + self.assertFalse( + self.gcs.is_soft_delete_enabled("gs://beam_without_soft_delete/tmp")) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 0af32837a3fb..55fb2f1703fc 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -913,6 +913,24 @@ def _create_default_gcs_bucket(self): else: return None + # Log warning if soft delete policy is enabled in a gcs bucket + # that is specified in an argument. + def _warn_if_soft_delete_policy_enabled(self, arg_name): + gcs_path = getattr(self, arg_name, None) + try: + from apache_beam.io.gcp import gcsio + if gcsio.GcsIO().is_soft_delete_enabled(gcs_path): + _LOGGER.warning( + "Bucket specified in %s has soft-delete policy enabled." + " To avoid being billed for unnecessary storage costs, turn" + " off the soft delete feature on buckets that your Dataflow" + " jobs use for temporary and staging storage. For more" + " information, see" + " https://cloud.google.com/storage/docs/use-soft-delete" + "#remove-soft-delete-policy." % arg_name) + except ImportError: + _LOGGER.warning('Unable to check soft delete policy due to import error.') + # If either temp or staging location has an issue, we use the valid one for # both locations. If both are bad we return an error. def _handle_temp_and_staging_locations(self, validator): @@ -920,11 +938,15 @@ def _handle_temp_and_staging_locations(self, validator): staging_errors = validator.validate_gcs_path(self, 'staging_location') if temp_errors and not staging_errors: setattr(self, 'temp_location', getattr(self, 'staging_location')) + self._warn_if_soft_delete_policy_enabled('staging_location') return [] elif staging_errors and not temp_errors: setattr(self, 'staging_location', getattr(self, 'temp_location')) + self._warn_if_soft_delete_policy_enabled('temp_location') return [] elif not staging_errors and not temp_errors: + self._warn_if_soft_delete_policy_enabled('temp_location') + self._warn_if_soft_delete_policy_enabled('staging_location') return [] # Both staging and temp locations are bad, try to use default bucket. else: @@ -935,6 +957,8 @@ def _handle_temp_and_staging_locations(self, validator): else: setattr(self, 'temp_location', default_bucket) setattr(self, 'staging_location', default_bucket) + self._warn_if_soft_delete_policy_enabled('temp_location') + self._warn_if_soft_delete_policy_enabled('staging_location') return [] def validate(self, validator):