Skip to content

Commit

Permalink
Merge pull request #18 from ENCODE-DCC/dev
Browse files Browse the repository at this point in the history
PIP-1380: fix for gcsuri lock permission
  • Loading branch information
leepc12 authored Sep 17, 2020
2 parents 66da3a1 + c5e3c45 commit 422db36
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
2 changes: 1 addition & 1 deletion autouri/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
from .s3uri import S3URI

__all__ = ["AbsPath", "AutoURI", "URIBase", "GCSURI", "HTTPURL", "S3URI"]
__version__ = "0.2.1"
__version__ = "0.2.2"
35 changes: 30 additions & 5 deletions autouri/gcsuri.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Check it with "gsutil versioning get gs://BUCKET-NAME"
https://cloud.google.com/storage/docs/object-versioning
"""
import json
import logging
import os
import time
Expand Down Expand Up @@ -32,6 +33,7 @@


ENV_VAR_GOOGLE_APPLICATION_CREDENTIALS = "GOOGLE_APPLICATION_CREDENTIALS"
GCS_TEMPORARY_HOLD_ERROR_MSG = "is under active Temporary hold"


def add_google_app_creds_to_env(service_account_key_file):
Expand Down Expand Up @@ -99,17 +101,41 @@ def acquire(self, timeout=None, poll_intervall=5.0):
super().acquire(timeout=timeout, poll_intervall=self._poll_interval)

def _acquire(self):
"""Try to acquire a lock.
Once successfully acquired, lock the .lock file temporarily by setting
blob.temporary_hold as True (similary to `gsutil retention temp set URI`).
This will be released in self._release().
Parse Forbidden error to check if it's raised from temporary hold.
It can also be raised from lack of write permission, which should be re-raised.
"""
u = GCSURI(self._lock_file, thread_id=self._thread_id)
try:
blob, bucket_obj = u.get_blob(new=True)
blob, _ = u.get_blob(new=True)
blob.upload_from_string("")
blob.temporary_hold = True
blob.patch()
self._lock_file_fd = id(self)
except (Forbidden, GatewayTimeout, NotFound, ServiceUnavailable):
pass

return None
except Forbidden as e:
err_msg = json.loads(e._response._content)["error"]["message"]
if GCS_TEMPORARY_HOLD_ERROR_MSG not in err_msg:
raise
logger.debug(
"Failed to acquire a file lock. "
"It's already locked by another process. "
"You need to wait until it's released. "
"Retrying until timeout. "
)

except (GatewayTimeout, NotFound, ServiceUnavailable) as e:
logger.debug(
"Failed to acquire a file lock. "
"Server is unavailable or busy? "
"Or too many requests? "
"Retrying until timeout. "
"{err}".format(err=str(e))
)

def _release(self):
u = GCSURI(self._lock_file, thread_id=self._thread_id)
Expand All @@ -136,7 +162,6 @@ def _release(self):
logger.error(error_msg)

time.sleep(self._retry_release_interval)
return None


class GCSURI(URIBase):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="autouri",
version="0.2.1",
version="0.2.2",
python_requires=">=3.6",
scripts=["bin/autouri"],
author="Jin wook Lee",
Expand Down

0 comments on commit 422db36

Please sign in to comment.